teichmann@5841: /* Copyright (C) 2011, 2012, 2013 by Bundesanstalt für Gewässerkunde teichmann@5841: * Software engineering by Intevation GmbH teichmann@5841: * teichmann@5991: * This file is Free Software under the GNU AGPL (>=v3) teichmann@5841: * and comes with ABSOLUTELY NO WARRANTY! Check out the teichmann@5991: * documentation coming with Dive4Elements River for details. teichmann@5841: */ teichmann@5841: teichmann@5826: package org.dive4elements.river.etl.aft; sascha@4075: teichmann@5826: import org.dive4elements.river.etl.db.ConnectedStatements; teichmann@5826: import org.dive4elements.river.etl.db.SymbolicStatement; sascha@4078: sascha@4076: import java.sql.ResultSet; sascha@4075: import java.sql.SQLException; sascha@4075: teichmann@4772: import java.util.ArrayList; teichmann@6895: import java.util.Comparator; teichmann@4772: import java.util.HashMap; teichmann@4772: import java.util.List; teichmann@4772: import java.util.Map; teichmann@6895: import java.util.TreeMap; sascha@4075: teichmann@4772: import org.apache.log4j.Logger; sascha@4075: sascha@4075: public class River sascha@4075: extends IdPair sascha@4075: { sascha@4075: private static Logger log = Logger.getLogger(River.class); sascha@4075: sascha@4075: protected String name; sascha@4075: teichmann@4753: protected double from; teichmann@4753: protected double to; teichmann@4753: sascha@4075: public River() { sascha@4075: } sascha@4075: teichmann@4753: public River(int id1, String name, double from, double to) { teichmann@4753: super(id1); teichmann@4753: this.name = name; teichmann@4753: this.from = from; teichmann@4753: this.to = to; teichmann@4753: } teichmann@4753: sascha@4075: public River(int id1, int id2, String name) { sascha@4075: super(id1, id2); sascha@4075: this.name = name; sascha@4075: } sascha@4075: sascha@4075: public String getName() { sascha@4075: return name; sascha@4075: } sascha@4075: teichmann@4753: public double getFrom() { teichmann@4753: return from; teichmann@4753: } teichmann@4753: teichmann@4753: public void setFrom(double from) { teichmann@4753: this.from = from; teichmann@4753: } teichmann@4753: teichmann@4753: public double getTo() { teichmann@4753: return to; teichmann@4753: } teichmann@4753: teichmann@4753: public void setTo(double to) { teichmann@4753: this.to = to; teichmann@4753: } teichmann@4753: teichmann@4753: public boolean inside(double x) { teichmann@4753: return x >= from && x <= to; teichmann@4753: } sascha@4078: sascha@4094: public boolean sync(SyncContext context) throws SQLException { sascha@4075: log.info("sync river: " + this); sascha@4077: teichmann@5188: // Only take relevant gauges into account. teichmann@5188: Map dipsGauges = context.getDIPSGauges(name, from, to); sascha@4081: sascha@4077: ConnectedStatements flysStatements = context.getFlysStatements(); sascha@4077: ConnectedStatements aftStatements = context.getAftStatements(); sascha@4077: teichmann@4754: String riverName = getName(); teichmann@7009: String lowerRiverName = riverName.toLowerCase(); teichmann@4754: teichmann@4754: Map aftDIPSGauges = new HashMap(); teichmann@4754: sascha@4076: ResultSet messstellenRs = aftStatements sascha@4076: .getStatement("select.messstelle") sascha@4076: .clearParameters() teichmann@4774: .setInt("GEWAESSER_NR", id2) teichmann@4774: .executeQuery(); sascha@4076: teichmann@4754: try { teichmann@4754: while (messstellenRs.next()) { teichmann@4774: String name = messstellenRs.getString("NAME"); teichmann@4774: String num = messstellenRs.getString("MESSSTELLE_NR"); teichmann@5188: double station = messstellenRs.getDouble("STATIONIERUNG"); teichmann@5188: teichmann@5258: if (!messstellenRs.wasNull() && !inside(station)) { teichmann@5258: log.warn("Station found in AFT but in not range: " + station); teichmann@5188: continue; teichmann@5188: } teichmann@4754: teichmann@4754: Long number = SyncContext.numberToLong(num); teichmann@4754: if (number == null) { teichmann@4754: log.warn("AFT: Invalid MESSSTELLE_NR for MESSSTELLE '"+name+"'"); teichmann@4754: continue; teichmann@4754: } teichmann@4754: DIPSGauge dipsGauge = dipsGauges.get(number); teichmann@4754: if (dipsGauge == null) { teichmann@4754: log.warn( teichmann@4754: "DIPS: MESSSTELLE '" + name + "' not found in DIPS. " + teichmann@4754: "Gauge number used for lookup: " + number); teichmann@4754: continue; teichmann@4754: } teichmann@4754: String gaugeRiver = dipsGauge.getRiverName(); teichmann@7010: if (!lowerRiverName.contains(gaugeRiver.toLowerCase())) { teichmann@4754: log.warn( teichmann@4754: "DIPS: MESSSTELLE '" + name + teichmann@4754: "' is assigned to river '" + gaugeRiver + teichmann@4754: "'. Needs to be on '" + riverName + "'."); teichmann@4754: continue; teichmann@4754: } teichmann@4754: dipsGauge.setAftName(name); teichmann@4754: dipsGauge.setOfficialNumber(number); teichmann@4754: aftDIPSGauges.put(number, dipsGauge); sascha@4081: } sascha@4076: } teichmann@4754: finally { teichmann@4754: messstellenRs.close(); teichmann@4754: } sascha@4076: sascha@4085: List updateGauges = new ArrayList(); sascha@4085: sascha@4076: ResultSet gaugesRs = flysStatements sascha@4076: .getStatement("select.gauges") sascha@4076: .clearParameters() sascha@4076: .setInt("river_id", id1).executeQuery(); sascha@4076: teichmann@6895: TreeMap station2gaugeName = new TreeMap( teichmann@6895: new Comparator() { teichmann@6895: @Override teichmann@6895: public int compare(Double a, Double b) { teichmann@6895: double diff = a - b; teichmann@6895: if (diff < -0.0001) return -1; teichmann@6895: if (diff > 0.0001) return +1; teichmann@6895: return 0; teichmann@6895: } teichmann@6895: }); teichmann@6895: teichmann@4754: try { teichmann@4754: while (gaugesRs.next()) { teichmann@4754: int gaugeId = gaugesRs.getInt("id"); teichmann@4754: String name = gaugesRs.getString("name"); teichmann@4754: long number = gaugesRs.getLong("official_number"); teichmann@6895: double station = gaugesRs.getDouble("station"); teichmann@6895: station2gaugeName.put(station, name); teichmann@6895: teichmann@4754: if (gaugesRs.wasNull()) { teichmann@4754: log.warn("FLYS: Gauge '" + name + teichmann@4754: "' has no official number. Ignored."); teichmann@4754: continue; teichmann@4754: } teichmann@4754: Long key = Long.valueOf(number); teichmann@4754: DIPSGauge aftDIPSGauge = aftDIPSGauges.remove(key); teichmann@4754: if (aftDIPSGauge == null) { teichmann@4754: log.warn("FLYS: Gauge '" + name + "' number " + number + teichmann@4754: " is not found in AFT/DIPS."); teichmann@4754: continue; teichmann@4754: } teichmann@4754: aftDIPSGauge.setFlysId(gaugeId); teichmann@4754: log.info("Gauge '" + name + teichmann@4754: "' found in FLYS, AFT and DIPS. -> Update"); teichmann@4754: updateGauges.add(aftDIPSGauge); sascha@4085: } sascha@4076: } teichmann@4754: finally { teichmann@4754: gaugesRs.close(); teichmann@4754: } sascha@4076: teichmann@6895: boolean modified = createGauges( teichmann@6895: context, aftDIPSGauges, station2gaugeName); sascha@4087: sascha@4097: modified |= updateGauges(context, updateGauges); sascha@4097: sascha@4097: return modified; sascha@4097: } sascha@4097: sascha@4097: protected boolean updateGauges( sascha@4097: SyncContext context, sascha@4097: List gauges sascha@4097: ) sascha@4097: throws SQLException sascha@4097: { sascha@4097: boolean modified = false; sascha@4097: sascha@4097: for (DIPSGauge gauge: gauges) { teichmann@5616: // XXX: Do dont modify the master AT. teichmann@5616: // modified |= updateBfGIdOnMasterDischargeTable(context, gauge); sascha@4097: modified |= updateGauge(context, gauge); sascha@4097: } sascha@4097: sascha@4097: return modified; sascha@4097: } sascha@4097: teichmann@5258: protected boolean updateBfGIdOnMasterDischargeTable( teichmann@5258: SyncContext context, teichmann@5258: DIPSGauge gauge teichmann@5258: ) throws SQLException { teichmann@5258: log.info( teichmann@5258: "FLYS: Updating master discharge table bfg_id for '" + teichmann@5258: gauge.getAftName() + "'"); teichmann@5258: ConnectedStatements flysStatements = context.getFlysStatements(); teichmann@5258: teichmann@5258: ResultSet rs = flysStatements teichmann@5258: .getStatement("select.gauge.master.discharge.table") teichmann@5258: .clearParameters() teichmann@5258: .setInt("gauge_id", gauge.getFlysId()) teichmann@5258: .executeQuery(); teichmann@5258: teichmann@5258: int flysId; teichmann@5258: teichmann@5258: try { teichmann@5480: if (!rs.next()) { teichmann@5258: log.error( teichmann@5258: "FLYS: No master discharge table found for gauge '" + teichmann@5258: gauge.getAftName() + "'"); teichmann@5258: return false; teichmann@5258: } teichmann@5258: String bfgId = rs.getString("bfg_id"); teichmann@5258: if (!rs.wasNull()) { // already has BFG_ID teichmann@5258: return false; teichmann@5258: } teichmann@5258: flysId = rs.getInt("id"); teichmann@5258: } finally { teichmann@5258: rs.close(); teichmann@5258: } teichmann@5258: teichmann@5258: // We need to find out the BFG_ID of the current discharge table teichmann@5258: // for this gauge in AFT. teichmann@5258: teichmann@5258: ConnectedStatements aftStatements = context.getAftStatements(); teichmann@5258: teichmann@5258: rs = aftStatements teichmann@5258: .getStatement("select.bfg.id.current") teichmann@5258: .clearParameters() teichmann@5258: .setString("number", "%" + gauge.getOfficialNumber()) teichmann@5258: .executeQuery(); teichmann@5258: teichmann@5258: String bfgId = null; teichmann@5258: teichmann@5258: try { teichmann@5258: if (rs.next()) { teichmann@5258: bfgId = rs.getString("BFG_ID"); teichmann@5258: } teichmann@5258: } finally { teichmann@5258: rs.close(); teichmann@5258: } teichmann@5258: teichmann@5258: if (bfgId == null) { teichmann@5258: log.warn( teichmann@5258: "No BFG_ID found for current discharge table of gauge '" + teichmann@5258: gauge + "'"); teichmann@5258: return false; teichmann@5258: } teichmann@5258: teichmann@5258: // Set the BFG_ID in FLYS. teichmann@5258: flysStatements.beginTransaction(); teichmann@5258: try { teichmann@5258: flysStatements teichmann@5258: .getStatement("update.bfg.id.discharge.table") teichmann@5258: .clearParameters() teichmann@5258: .setInt("id", flysId) teichmann@5258: .setString("bfg_id", bfgId) teichmann@5258: .executeUpdate(); teichmann@5258: flysStatements.commitTransaction(); teichmann@5258: } catch (SQLException sqle) { teichmann@5258: flysStatements.rollbackTransaction(); teichmann@5258: log.error(sqle, sqle); teichmann@5258: return false; teichmann@5258: } teichmann@5258: teichmann@5258: return true; teichmann@5258: } teichmann@5258: sascha@4097: protected boolean updateGauge( sascha@4097: SyncContext context, sascha@4097: DIPSGauge gauge sascha@4097: ) sascha@4097: throws SQLException sascha@4097: { sascha@4102: log.info("FLYS: Updating gauge '" + gauge.getAftName() + "'."); teichmann@4775: // We need to load all discharge tables from both databases teichmann@4775: // of the gauge and do some pairing based on their bfg_id. sascha@4097: sascha@4097: boolean modified = false; sascha@4097: sascha@4104: ConnectedStatements flysStatements = context.getFlysStatements(); sascha@4098: sascha@4104: flysStatements.beginTransaction(); sascha@4104: try { sascha@4104: List flysDTs = sascha@4104: DischargeTable.loadFlysDischargeTables( sascha@4104: context, gauge.getFlysId()); sascha@4104: sascha@4104: List aftDTs = sascha@4104: DischargeTable.loadAftDischargeTables( sascha@4104: context, gauge.getOfficialNumber()); sascha@4104: teichmann@4775: Map bfgId2FlysDT = sascha@4104: new HashMap(); sascha@4104: sascha@4104: for (DischargeTable dt: flysDTs) { teichmann@4775: String bfgId = dt.getBfgId(); teichmann@4775: if (bfgId == null) { teichmann@4736: log.warn("FLYS: discharge table " + dt.getId() teichmann@4775: + " has no bfg_id. Ignored."); sascha@4104: continue; sascha@4104: } teichmann@4775: bfgId2FlysDT.put(bfgId, dt); sascha@4098: } sascha@4104: sascha@4104: List createDTs = new ArrayList(); sascha@4104: sascha@4104: for (DischargeTable aftDT: aftDTs) { teichmann@4775: String bfgId = aftDT.getBfgId(); teichmann@4775: DischargeTable flysDT = bfgId2FlysDT.remove(bfgId); sascha@4104: if (flysDT != null) { sascha@4104: // Found in AFT and FLYS. teichmann@4775: log.info("FLYS: Discharge table '" + bfgId sascha@4104: + "' found in AFT and FLYS. -> update"); sascha@4104: // Create the W/Q diff. sascha@4104: modified |= writeWQChanges(context, flysDT, aftDT); sascha@4104: } sascha@4104: else { teichmann@4775: log.info("FLYS: Discharge table '" + bfgId sascha@4104: + "' not found in FLYS. -> create"); sascha@4104: createDTs.add(aftDT); sascha@4104: } sascha@4104: } sascha@4104: teichmann@5930: modified |= deleteDischargeTables(context, bfgId2FlysDT); teichmann@5930: sascha@4104: log.info("FLYS: Copy " + createDTs.size() + sascha@4104: " discharge tables over from AFT."); sascha@4098: sascha@4104: // Create the new discharge tables. sascha@4104: for (DischargeTable aftDT: createDTs) { sascha@4104: createDischargeTable(context, aftDT, gauge.getFlysId()); sascha@4104: modified = true; sascha@4104: } sascha@4102: sascha@4104: flysStatements.commitTransaction(); sascha@4104: } sascha@4104: catch (SQLException sqle) { sascha@4104: flysStatements.rollbackTransaction(); sascha@4104: log.error(sqle, sqle); sascha@4104: modified = false; sascha@4100: } sascha@4097: sascha@4094: return modified; sascha@4087: } sascha@4079: sascha@4099: protected boolean writeWQChanges( sascha@4099: SyncContext context, sascha@4099: DischargeTable flysDT, sascha@4099: DischargeTable aftDT sascha@4099: ) sascha@4099: throws SQLException sascha@4099: { sascha@4099: flysDT.loadFlysValues(context); sascha@4099: aftDT.loadAftValues(context); sascha@4099: WQDiff diff = new WQDiff(flysDT.getValues(), aftDT.getValues()); sascha@4099: if (diff.hasChanges()) { sascha@4099: diff.writeChanges(context, flysDT.getId()); sascha@4099: return true; sascha@4099: } sascha@4099: return false; sascha@4099: } sascha@4099: sascha@4094: protected boolean createGauges( sascha@4087: SyncContext context, teichmann@6895: Map gauges, teichmann@6895: Map station2gaugeName sascha@4087: ) sascha@4087: throws SQLException sascha@4087: { sascha@4087: ConnectedStatements flysStatements = context.getFlysStatements(); sascha@4087: sascha@4087: SymbolicStatement.Instance nextId = sascha@4087: flysStatements.getStatement("next.gauge.id"); sascha@4087: sascha@4087: SymbolicStatement.Instance insertStmnt = sascha@4087: flysStatements.getStatement("insert.gauge"); sascha@4087: sascha@4094: boolean modified = false; sascha@4094: sascha@4087: for (Map.Entry entry: gauges.entrySet()) { sascha@4087: Long officialNumber = entry.getKey(); sascha@4087: DIPSGauge gauge = entry.getValue(); sascha@4087: sascha@4085: log.info("Gauge '" + gauge.getAftName() + sascha@4085: "' not in FLYS but in AFT/DIPS. -> Create"); sascha@4087: teichmann@6895: String flysGaugeName = station2gaugeName.get(gauge.getStation()); teichmann@6895: if (flysGaugeName != null) { teichmann@6895: log.warn("FLYS: AFT gauge " + gauge.getName() + teichmann@6895: " has same station as FLYS gauge " + flysGaugeName + teichmann@6895: " -> ignored."); teichmann@6895: continue; teichmann@6895: } teichmann@6895: sascha@4087: if (!gauge.hasDatums()) { teichmann@4736: log.warn("DIPS: Gauge '" + sascha@4087: gauge.getAftName() + "' has no datum. Ignored."); sascha@4087: continue; sascha@4087: } sascha@4087: sascha@4087: ResultSet rs = null; sascha@4087: flysStatements.beginTransaction(); sascha@4087: try { sascha@4087: (rs = nextId.executeQuery()).next(); sascha@4087: int gaugeId = rs.getInt("gauge_id"); sascha@4087: rs.close(); rs = null; sascha@4087: sascha@4087: insertStmnt sascha@4087: .clearParameters() sascha@4087: .setInt("id", gaugeId) sascha@4087: .setString("name", gauge.getAftName()) sascha@4087: .setInt("river_id", id1) sascha@4087: .setDouble("station", gauge.getStation()) sascha@4087: .setDouble("aeo", gauge.getAeo()) sascha@4100: .setLong("official_number", officialNumber) sascha@4087: .setDouble("datum", gauge.getLatestDatum().getValue()); sascha@4087: sascha@4087: insertStmnt.execute(); sascha@4087: teichmann@4736: log.info("FLYS: Created gauge '" + gauge.getAftName() + sascha@4087: "' with id " + gaugeId + "."); sascha@4087: sascha@4088: gauge.setFlysId(gaugeId); sascha@4100: createDischargeTables(context, gauge); sascha@4087: flysStatements.commitTransaction(); sascha@4094: modified = true; sascha@4087: } sascha@4087: catch (SQLException sqle) { sascha@4087: flysStatements.rollbackTransaction(); sascha@4094: log.error(sqle, sqle); sascha@4087: } sascha@4087: finally { sascha@4087: if (rs != null) { sascha@4087: rs.close(); sascha@4087: } sascha@4087: } sascha@4085: } sascha@4094: sascha@4094: return modified; sascha@4075: } sascha@4075: sascha@4102: protected void createDischargeTable( sascha@4102: SyncContext context, sascha@4102: DischargeTable aftDT, sascha@4102: int flysGaugeId sascha@4102: ) sascha@4102: throws SQLException sascha@4102: { sascha@4102: aftDT.persistFlysTimeInterval(context); sascha@4102: int flysId = aftDT.persistFlysDischargeTable(context, flysGaugeId); sascha@4102: sascha@4102: aftDT.loadAftValues(context); sascha@4102: aftDT.storeFlysValues(context, flysId); sascha@4102: } sascha@4102: teichmann@5930: protected boolean deleteDischargeTables( teichmann@5930: SyncContext context, teichmann@5930: Map tables teichmann@5930: ) teichmann@5930: throws SQLException teichmann@5930: { teichmann@5930: ConnectedStatements flysStatements = context.getFlysStatements(); teichmann@5930: teichmann@5930: SymbolicStatement.Instance deleteDischargeTableValues = teichmann@5930: flysStatements.getStatement("delete.discharge.table.values"); teichmann@5930: teichmann@5930: SymbolicStatement.Instance deleteDischargeTable = teichmann@5930: flysStatements.getStatement("delete.discharge.table"); teichmann@5930: teichmann@5930: boolean modified = false; teichmann@5930: teichmann@5930: for (Map.Entry entry: tables.entrySet()) { teichmann@5930: log.info("FLYS: Discharge table '" + entry.getKey() teichmann@5930: + "' found in FLYS but not in AFT. -> delete"); teichmann@5930: int id = entry.getValue().getId(); teichmann@5930: teichmann@5930: deleteDischargeTableValues teichmann@5930: .clearParameters() teichmann@5930: .setInt("id", id); teichmann@5930: deleteDischargeTableValues.execute(); teichmann@5930: teichmann@5930: deleteDischargeTable teichmann@5930: .clearParameters() teichmann@5930: .setInt("id", id); teichmann@5930: deleteDischargeTable.execute(); teichmann@5930: teichmann@5930: modified = true; teichmann@5930: } teichmann@5930: return modified; teichmann@5930: } teichmann@5930: sascha@4088: protected void createDischargeTables( sascha@4088: SyncContext context, sascha@4088: DIPSGauge gauge sascha@4088: ) sascha@4088: throws SQLException sascha@4088: { sascha@4102: log.info("FLYS: Create discharge tables for '" + sascha@4102: gauge.getAftName() + "'."); sascha@4088: sascha@4093: // Load the discharge tables from AFT. sascha@4093: List dts = loadAftDischargeTables( sascha@4100: context, gauge); sascha@4088: sascha@4093: // Persist the time intervals. sascha@4093: persistFlysTimeIntervals(context, dts); sascha@4093: sascha@4093: // Persist the discharge tables sascha@4102: int [] flysDTIds = persistFlysDischargeTables( sascha@4102: context, dts, gauge.getFlysId()); sascha@4093: sascha@4093: // Copy over the W/Q values sascha@4093: copyWQsFromAftToFlys(context, dts, flysDTIds); sascha@4093: } sascha@4093: sascha@4093: protected List loadAftDischargeTables( sascha@4093: SyncContext context, sascha@4093: DIPSGauge gauge sascha@4093: ) sascha@4093: throws SQLException sascha@4093: { sascha@4097: return DischargeTable.loadAftDischargeTables( sascha@4100: context, gauge.getOfficialNumber(), gauge.getFlysId()); sascha@4093: } sascha@4093: sascha@4093: protected void persistFlysTimeIntervals( sascha@4093: SyncContext context, sascha@4093: List dts sascha@4093: ) sascha@4093: throws SQLException sascha@4093: { sascha@4090: for (DischargeTable dt: dts) { sascha@4102: dt.persistFlysTimeInterval(context); sascha@4090: } sascha@4093: } sascha@4090: sascha@4093: protected int [] persistFlysDischargeTables( sascha@4093: SyncContext context, sascha@4102: List dts, sascha@4102: int flysGaugeId sascha@4093: ) sascha@4093: throws SQLException sascha@4093: { sascha@4093: int [] flysDTIds = new int[dts.size()]; sascha@4093: sascha@4102: for (int i = 0; i < flysDTIds.length; ++i) { sascha@4102: flysDTIds[i] = dts.get(i) sascha@4102: .persistFlysDischargeTable(context, flysGaugeId); sascha@4088: } sascha@4093: sascha@4093: return flysDTIds; sascha@4093: } sascha@4093: sascha@4093: protected void copyWQsFromAftToFlys( sascha@4093: SyncContext context, sascha@4093: List dts, sascha@4093: int [] flysDTIds sascha@4093: ) sascha@4093: throws SQLException sascha@4093: { sascha@4093: for (int i = 0; i < flysDTIds.length; ++i) { sascha@4093: DischargeTable dt = dts.get(i); sascha@4093: dt.loadAftValues(context); sascha@4093: dt.storeFlysValues(context, flysDTIds[i]); sascha@4093: dt.clearValues(); // To save memory. sascha@4093: } sascha@4088: } sascha@4088: sascha@4075: public String toString() { sascha@4075: return "[River: name=" + name + ", " + super.toString() + "]"; sascha@4075: } sascha@4075: } sascha@4075: // vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :