Mercurial > dive4elements > river
view flys-aft/src/main/java/de/intevation/aft/River.java @ 4099:006e99437fb9
Store the W/Q differences o existing discharge tables to FLYS database.
flys-aft/trunk@3621 c6561f87-3c4e-4783-a992-168aeb5c3f6f
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Sun, 08 Jan 2012 09:25:36 +0000 |
parents | 9215253ad0be |
children | 981de0b77c6b |
line wrap: on
line source
package de.intevation.aft; import java.util.List; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Types; import org.apache.log4j.Logger; import de.intevation.db.ConnectedStatements; import de.intevation.db.SymbolicStatement; public class River extends IdPair { private static Logger log = Logger.getLogger(River.class); protected String name; public River() { } public River(int id1, int id2, String name) { super(id1, id2); this.name = name; } public String getName() { return name; } public boolean sync(SyncContext context) throws SQLException { log.info("sync river: " + this); Map<Long, DIPSGauge> dipsGauges = context.getDIPSGauges(); ConnectedStatements flysStatements = context.getFlysStatements(); ConnectedStatements aftStatements = context.getAftStatements(); ResultSet messstellenRs = aftStatements .getStatement("select.messstelle") .clearParameters() .setInt("GEWAESSER_NR", id2).executeQuery(); String riverName = getName(); Map<Long, DIPSGauge> aftDIPSGauges = new HashMap<Long, DIPSGauge>(); while (messstellenRs.next()) { String name = messstellenRs.getString("NAME"); String num = messstellenRs.getString("MESSSTELLE_NR"); Long number = SyncContext.numberToLong(num); if (number == null) { log.warn("Invalid MESSSTELLE_NR for MESSSTELLE '"+name+"'"); continue; } DIPSGauge dipsGauge = dipsGauges.get(number); if (dipsGauge == null) { log.warn( "MESSSTELLE '" + name + "' not found in DIPS. " + "Gauge number used for lookup: " + number); continue; } String gaugeRiver = dipsGauge.getRiverName(); if (!gaugeRiver.equalsIgnoreCase(riverName)) { log.warn( "MESSSTELLE '" + name + "' is assigned to river '" + gaugeRiver + "'. Needs to be on '" + riverName + "'."); continue; } dipsGauge.setAftName(name); dipsGauge.setOfficialNumber(number); aftDIPSGauges.put(number, dipsGauge); } messstellenRs.close(); List<DIPSGauge> updateGauges = new ArrayList<DIPSGauge>(); ResultSet gaugesRs = flysStatements .getStatement("select.gauges") .clearParameters() .setInt("river_id", id1).executeQuery(); while (gaugesRs.next()) { int gaugeId = gaugesRs.getInt("id"); String name = gaugesRs.getString("name"); long number = gaugesRs.getLong("official_number"); if (gaugesRs.wasNull()) { log.warn("FLYS: Gauge '" + name + "' has no official number. Ignored."); continue; } Long key = Long.valueOf(number); DIPSGauge aftDIPSGauge = aftDIPSGauges.remove(key); if (aftDIPSGauge == null) { log.warn("FLYS: Gauge '" + name + "' number " + number + " is not found in AFT/DIPS."); continue; } aftDIPSGauge.setFlysId(gaugeId); log.info("Gauge '" + name + "' found in FLYS, AFT and DIPS. -> Update"); updateGauges.add(aftDIPSGauge); } gaugesRs.close(); boolean modified = createGauges(context, aftDIPSGauges); modified |= updateGauges(context, updateGauges); return modified; } protected boolean updateGauges( SyncContext context, List<DIPSGauge> gauges ) throws SQLException { boolean modified = false; for (DIPSGauge gauge: gauges) { modified |= updateGauge(context, gauge); } return modified; } protected boolean updateGauge( SyncContext context, DIPSGauge gauge ) throws SQLException { // We need to load all discharge tables from both database // of the gauge and do some pairing based on their descriptions. boolean modified = false; List<DischargeTable> flysDTs = DischargeTable.loadFlysDischargeTables( context, gauge.getFlysId()); List<DischargeTable> aftDTs = DischargeTable.loadAftDischargeTables( context, gauge.getOfficialNumber()); Map<String, DischargeTable> desc2FlysDT = new HashMap<String, DischargeTable>(); for (DischargeTable dt: flysDTs) { String description = dt.getDescription(); if (description == null) { log.warn("FLYS: discharge table " + dt.getId() + " has no description. Ignored."); continue; } desc2FlysDT.put(description, dt); } List<DischargeTable> createDTs = new ArrayList<DischargeTable>(); for (DischargeTable aftDT: aftDTs) { String description = aftDT.getDescription(); DischargeTable flysDT = desc2FlysDT.remove(description); if (flysDT != null) { // Found in AFT and FLYS. log.info("FLYS: Discharge table '" + description + "' found in AFT and FLYS. -> update"); // Create the W/Q diff. modified |= writeWQChanges(context, flysDT, aftDT); } else { log.info("FLYS: Discharge table '" + description + "' not found in FLYS. -> create"); createDTs.add(aftDT); } } for (String description: desc2FlysDT.keySet()) { log.info("FLYS: Discharge table '" + description + "' found in FLYS but not in AFT. -> ignore"); } // TODO: Create the new discharge tables. return modified; } protected boolean writeWQChanges( SyncContext context, DischargeTable flysDT, DischargeTable aftDT ) throws SQLException { flysDT.loadFlysValues(context); aftDT.loadAftValues(context); WQDiff diff = new WQDiff(flysDT.getValues(), aftDT.getValues()); if (diff.hasChanges()) { diff.writeChanges(context, flysDT.getId()); return true; } return false; } protected boolean createGauges( SyncContext context, Map<Long, DIPSGauge> gauges ) throws SQLException { ConnectedStatements flysStatements = context.getFlysStatements(); SymbolicStatement.Instance nextId = flysStatements.getStatement("next.gauge.id"); SymbolicStatement.Instance insertStmnt = flysStatements.getStatement("insert.gauge"); boolean modified = false; for (Map.Entry<Long, DIPSGauge> entry: gauges.entrySet()) { Long officialNumber = entry.getKey(); DIPSGauge gauge = entry.getValue(); log.info("Gauge '" + gauge.getAftName() + "' not in FLYS but in AFT/DIPS. -> Create"); if (!gauge.hasDatums()) { log.warn("FLYS: Gauge '" + gauge.getAftName() + "' has no datum. Ignored."); continue; } ResultSet rs = null; flysStatements.beginTransaction(); try { (rs = nextId.executeQuery()).next(); int gaugeId = rs.getInt("gauge_id"); rs.close(); rs = null; insertStmnt .clearParameters() .setInt("id", gaugeId) .setString("name", gauge.getAftName()) .setInt("river_id", id1) .setDouble("station", gauge.getStation()) .setDouble("aeo", gauge.getAeo()) .setDouble("official_number", officialNumber) .setDouble("datum", gauge.getLatestDatum().getValue()); insertStmnt.execute(); log.info("FLYS: Created gauge '" + gauge.getAftName() + "' with id " + gaugeId + "."); gauge.setFlysId(gaugeId); createDischargeTables(context, officialNumber, gauge); flysStatements.commitTransaction(); modified = true; } catch (SQLException sqle) { flysStatements.rollbackTransaction(); log.error(sqle, sqle); } finally { if (rs != null) { rs.close(); } } } return modified; } protected void createDischargeTables( SyncContext context, Long officialNumber, DIPSGauge gauge ) throws SQLException { log.info("create discharge tables"); // Load the discharge tables from AFT. List<DischargeTable> dts = loadAftDischargeTables( context, officialNumber, gauge); // Persist the time intervals. persistFlysTimeIntervals(context, dts); // Persist the discharge tables int [] flysDTIds = persistFlysDischargeTables(context, dts); // Copy over the W/Q values copyWQsFromAftToFlys(context, dts, flysDTIds); } protected List<DischargeTable> loadAftDischargeTables( SyncContext context, Long officialNumber, DIPSGauge gauge ) throws SQLException { return DischargeTable.loadAftDischargeTables( context, officialNumber, gauge.getFlysId()); } protected void persistFlysTimeIntervals( SyncContext context, List<DischargeTable> dts ) throws SQLException { for (DischargeTable dt: dts) { TimeInterval timeInterval = dt.getTimeInterval(); if (timeInterval != null) { dt.setTimeInterval( context.fetchOrCreateFLYSTimeInterval(timeInterval)); } } } protected int [] persistFlysDischargeTables( SyncContext context, List<DischargeTable> dts ) throws SQLException { boolean debug = log.isDebugEnabled(); int [] flysDTIds = new int[dts.size()]; ResultSet rs = null; try { ConnectedStatements flysStatements = context.getFlysStatements(); SymbolicStatement.Instance nextId = flysStatements.getStatement("next.discharge.id"); SymbolicStatement.Instance insertDT = flysStatements.getStatement("insert.dischargetable"); for (int i = 0; i < flysDTIds.length; ++i) { rs = nextId.executeQuery(); rs.next(); int id = rs.getInt("discharge_table_id"); flysDTIds[i] = id; rs.close(); rs = null; DischargeTable dt = dts.get(i); insertDT.clearParameters() .setInt("id", id) .setInt("gauge_id", dt.getGaugeId()) .setString("description", dt.getDescription()); TimeInterval timeInterval = dt.getTimeInterval(); if (timeInterval != null) { insertDT.setInt("time_interval_id", timeInterval.getId()); } else { insertDT.setNull("time_interval_id", Types.INTEGER); } insertDT.execute(); if (debug) { log.debug("FLYS: Created discharge table id: " + id); } } } finally { if (rs != null) { rs.close(); rs = null; } } return flysDTIds; } protected void copyWQsFromAftToFlys( SyncContext context, List<DischargeTable> dts, int [] flysDTIds ) throws SQLException { for (int i = 0; i < flysDTIds.length; ++i) { DischargeTable dt = dts.get(i); dt.loadAftValues(context); dt.storeFlysValues(context, flysDTIds[i]); dt.clearValues(); // To save memory. } } public String toString() { return "[River: name=" + name + ", " + super.toString() + "]"; } } // vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :