sascha@4075: package de.intevation.aft; sascha@4075: teichmann@4772: import de.intevation.db.ConnectedStatements; teichmann@4772: import de.intevation.db.SymbolicStatement; sascha@4078: sascha@4076: import java.sql.ResultSet; sascha@4075: import java.sql.SQLException; sascha@4075: teichmann@4772: import java.util.ArrayList; teichmann@4772: import java.util.HashMap; teichmann@4772: import java.util.List; teichmann@4772: import java.util.Map; sascha@4075: teichmann@4772: import org.apache.log4j.Logger; sascha@4075: sascha@4075: public class River sascha@4075: extends IdPair sascha@4075: { sascha@4075: private static Logger log = Logger.getLogger(River.class); sascha@4075: sascha@4075: protected String name; sascha@4075: teichmann@4753: protected double from; teichmann@4753: protected double to; teichmann@4753: sascha@4075: public River() { sascha@4075: } sascha@4075: teichmann@4753: public River(int id1, String name, double from, double to) { teichmann@4753: super(id1); teichmann@4753: this.name = name; teichmann@4753: this.from = from; teichmann@4753: this.to = to; teichmann@4753: } teichmann@4753: sascha@4075: public River(int id1, int id2, String name) { sascha@4075: super(id1, id2); sascha@4075: this.name = name; sascha@4075: } sascha@4075: sascha@4075: public String getName() { sascha@4075: return name; sascha@4075: } sascha@4075: teichmann@4753: public double getFrom() { teichmann@4753: return from; teichmann@4753: } teichmann@4753: teichmann@4753: public void setFrom(double from) { teichmann@4753: this.from = from; teichmann@4753: } teichmann@4753: teichmann@4753: public double getTo() { teichmann@4753: return to; teichmann@4753: } teichmann@4753: teichmann@4753: public void setTo(double to) { teichmann@4753: this.to = to; teichmann@4753: } teichmann@4753: teichmann@4753: public boolean inside(double x) { teichmann@4753: return x >= from && x <= to; teichmann@4753: } sascha@4078: sascha@4094: public boolean sync(SyncContext context) throws SQLException { sascha@4075: log.info("sync river: " + this); sascha@4077: teichmann@5188: // Only take relevant gauges into account. teichmann@5188: Map dipsGauges = context.getDIPSGauges(name, from, to); sascha@4081: sascha@4077: ConnectedStatements flysStatements = context.getFlysStatements(); sascha@4077: ConnectedStatements aftStatements = context.getAftStatements(); sascha@4077: teichmann@4754: String riverName = getName(); teichmann@4754: teichmann@4754: Map aftDIPSGauges = new HashMap(); teichmann@4754: sascha@4076: ResultSet messstellenRs = aftStatements sascha@4076: .getStatement("select.messstelle") sascha@4076: .clearParameters() teichmann@4774: .setInt("GEWAESSER_NR", id2) teichmann@4774: .executeQuery(); sascha@4076: teichmann@4754: try { teichmann@4754: while (messstellenRs.next()) { teichmann@4774: String name = messstellenRs.getString("NAME"); teichmann@4774: String num = messstellenRs.getString("MESSSTELLE_NR"); teichmann@5188: double station = messstellenRs.getDouble("STATIONIERUNG"); teichmann@5188: teichmann@5258: if (!messstellenRs.wasNull() && !inside(station)) { teichmann@5258: log.warn("Station found in AFT but in not range: " + station); teichmann@5188: continue; teichmann@5188: } teichmann@4754: teichmann@4754: Long number = SyncContext.numberToLong(num); teichmann@4754: if (number == null) { teichmann@4754: log.warn("AFT: Invalid MESSSTELLE_NR for MESSSTELLE '"+name+"'"); teichmann@4754: continue; teichmann@4754: } teichmann@4754: DIPSGauge dipsGauge = dipsGauges.get(number); teichmann@4754: if (dipsGauge == null) { teichmann@4754: log.warn( teichmann@4754: "DIPS: MESSSTELLE '" + name + "' not found in DIPS. " + teichmann@4754: "Gauge number used for lookup: " + number); teichmann@4754: continue; teichmann@4754: } teichmann@4754: String gaugeRiver = dipsGauge.getRiverName(); teichmann@4754: if (!gaugeRiver.equalsIgnoreCase(riverName)) { teichmann@4754: log.warn( teichmann@4754: "DIPS: MESSSTELLE '" + name + teichmann@4754: "' is assigned to river '" + gaugeRiver + teichmann@4754: "'. Needs to be on '" + riverName + "'."); teichmann@4754: continue; teichmann@4754: } teichmann@4754: dipsGauge.setAftName(name); teichmann@4754: dipsGauge.setOfficialNumber(number); teichmann@4754: aftDIPSGauges.put(number, dipsGauge); sascha@4081: } sascha@4076: } teichmann@4754: finally { teichmann@4754: messstellenRs.close(); teichmann@4754: } sascha@4076: sascha@4085: List updateGauges = new ArrayList(); sascha@4085: sascha@4076: ResultSet gaugesRs = flysStatements sascha@4076: .getStatement("select.gauges") sascha@4076: .clearParameters() sascha@4076: .setInt("river_id", id1).executeQuery(); sascha@4076: teichmann@4754: try { teichmann@4754: while (gaugesRs.next()) { teichmann@4754: int gaugeId = gaugesRs.getInt("id"); teichmann@4754: String name = gaugesRs.getString("name"); teichmann@4754: long number = gaugesRs.getLong("official_number"); teichmann@4754: if (gaugesRs.wasNull()) { teichmann@4754: log.warn("FLYS: Gauge '" + name + teichmann@4754: "' has no official number. Ignored."); teichmann@4754: continue; teichmann@4754: } teichmann@4754: Long key = Long.valueOf(number); teichmann@4754: DIPSGauge aftDIPSGauge = aftDIPSGauges.remove(key); teichmann@4754: if (aftDIPSGauge == null) { teichmann@4754: log.warn("FLYS: Gauge '" + name + "' number " + number + teichmann@4754: " is not found in AFT/DIPS."); teichmann@4754: continue; teichmann@4754: } teichmann@4754: aftDIPSGauge.setFlysId(gaugeId); teichmann@4754: log.info("Gauge '" + name + teichmann@4754: "' found in FLYS, AFT and DIPS. -> Update"); teichmann@4754: updateGauges.add(aftDIPSGauge); sascha@4085: } sascha@4076: } teichmann@4754: finally { teichmann@4754: gaugesRs.close(); teichmann@4754: } sascha@4076: sascha@4094: boolean modified = createGauges(context, aftDIPSGauges); sascha@4087: sascha@4097: modified |= updateGauges(context, updateGauges); sascha@4097: sascha@4097: return modified; sascha@4097: } sascha@4097: sascha@4097: protected boolean updateGauges( sascha@4097: SyncContext context, sascha@4097: List gauges sascha@4097: ) sascha@4097: throws SQLException sascha@4097: { sascha@4097: boolean modified = false; sascha@4097: sascha@4097: for (DIPSGauge gauge: gauges) { teichmann@5258: modified |= updateBfGIdOnMasterDischargeTable(context, gauge); sascha@4097: modified |= updateGauge(context, gauge); sascha@4097: } sascha@4097: sascha@4097: return modified; sascha@4097: } sascha@4097: teichmann@5258: protected boolean updateBfGIdOnMasterDischargeTable( teichmann@5258: SyncContext context, teichmann@5258: DIPSGauge gauge teichmann@5258: ) throws SQLException { teichmann@5258: log.info( teichmann@5258: "FLYS: Updating master discharge table bfg_id for '" + teichmann@5258: gauge.getAftName() + "'"); teichmann@5258: ConnectedStatements flysStatements = context.getFlysStatements(); teichmann@5258: teichmann@5258: ResultSet rs = flysStatements teichmann@5258: .getStatement("select.gauge.master.discharge.table") teichmann@5258: .clearParameters() teichmann@5258: .setInt("gauge_id", gauge.getFlysId()) teichmann@5258: .executeQuery(); teichmann@5258: teichmann@5258: int flysId; teichmann@5258: teichmann@5258: try { teichmann@5480: if (!rs.next()) { teichmann@5258: log.error( teichmann@5258: "FLYS: No master discharge table found for gauge '" + teichmann@5258: gauge.getAftName() + "'"); teichmann@5258: return false; teichmann@5258: } teichmann@5258: String bfgId = rs.getString("bfg_id"); teichmann@5258: if (!rs.wasNull()) { // already has BFG_ID teichmann@5258: return false; teichmann@5258: } teichmann@5258: flysId = rs.getInt("id"); teichmann@5258: } finally { teichmann@5258: rs.close(); teichmann@5258: } teichmann@5258: teichmann@5258: // We need to find out the BFG_ID of the current discharge table teichmann@5258: // for this gauge in AFT. teichmann@5258: teichmann@5258: ConnectedStatements aftStatements = context.getAftStatements(); teichmann@5258: teichmann@5258: rs = aftStatements teichmann@5258: .getStatement("select.bfg.id.current") teichmann@5258: .clearParameters() teichmann@5258: .setString("number", "%" + gauge.getOfficialNumber()) teichmann@5258: .executeQuery(); teichmann@5258: teichmann@5258: String bfgId = null; teichmann@5258: teichmann@5258: try { teichmann@5258: if (rs.next()) { teichmann@5258: bfgId = rs.getString("BFG_ID"); teichmann@5258: } teichmann@5258: } finally { teichmann@5258: rs.close(); teichmann@5258: } teichmann@5258: teichmann@5258: if (bfgId == null) { teichmann@5258: log.warn( teichmann@5258: "No BFG_ID found for current discharge table of gauge '" + teichmann@5258: gauge + "'"); teichmann@5258: return false; teichmann@5258: } teichmann@5258: teichmann@5258: // Set the BFG_ID in FLYS. teichmann@5258: flysStatements.beginTransaction(); teichmann@5258: try { teichmann@5258: flysStatements teichmann@5258: .getStatement("update.bfg.id.discharge.table") teichmann@5258: .clearParameters() teichmann@5258: .setInt("id", flysId) teichmann@5258: .setString("bfg_id", bfgId) teichmann@5258: .executeUpdate(); teichmann@5258: flysStatements.commitTransaction(); teichmann@5258: } catch (SQLException sqle) { teichmann@5258: flysStatements.rollbackTransaction(); teichmann@5258: log.error(sqle, sqle); teichmann@5258: return false; teichmann@5258: } teichmann@5258: teichmann@5258: return true; teichmann@5258: } teichmann@5258: sascha@4097: protected boolean updateGauge( sascha@4097: SyncContext context, sascha@4097: DIPSGauge gauge sascha@4097: ) sascha@4097: throws SQLException sascha@4097: { sascha@4102: log.info("FLYS: Updating gauge '" + gauge.getAftName() + "'."); teichmann@4775: // We need to load all discharge tables from both databases teichmann@4775: // of the gauge and do some pairing based on their bfg_id. sascha@4097: sascha@4097: boolean modified = false; sascha@4097: sascha@4104: ConnectedStatements flysStatements = context.getFlysStatements(); sascha@4098: sascha@4104: flysStatements.beginTransaction(); sascha@4104: try { sascha@4104: List flysDTs = sascha@4104: DischargeTable.loadFlysDischargeTables( sascha@4104: context, gauge.getFlysId()); sascha@4104: sascha@4104: List aftDTs = sascha@4104: DischargeTable.loadAftDischargeTables( sascha@4104: context, gauge.getOfficialNumber()); sascha@4104: teichmann@4775: Map bfgId2FlysDT = sascha@4104: new HashMap(); sascha@4104: sascha@4104: for (DischargeTable dt: flysDTs) { teichmann@4775: String bfgId = dt.getBfgId(); teichmann@4775: if (bfgId == null) { teichmann@4736: log.warn("FLYS: discharge table " + dt.getId() teichmann@4775: + " has no bfg_id. Ignored."); sascha@4104: continue; sascha@4104: } teichmann@4775: bfgId2FlysDT.put(bfgId, dt); sascha@4098: } sascha@4104: sascha@4104: List createDTs = new ArrayList(); sascha@4104: sascha@4104: for (DischargeTable aftDT: aftDTs) { teichmann@4775: String bfgId = aftDT.getBfgId(); teichmann@4775: DischargeTable flysDT = bfgId2FlysDT.remove(bfgId); sascha@4104: if (flysDT != null) { sascha@4104: // Found in AFT and FLYS. teichmann@4775: log.info("FLYS: Discharge table '" + bfgId sascha@4104: + "' found in AFT and FLYS. -> update"); sascha@4104: // Create the W/Q diff. sascha@4104: modified |= writeWQChanges(context, flysDT, aftDT); sascha@4104: } sascha@4104: else { teichmann@4775: log.info("FLYS: Discharge table '" + bfgId sascha@4104: + "' not found in FLYS. -> create"); sascha@4104: createDTs.add(aftDT); sascha@4104: } sascha@4104: } sascha@4104: teichmann@4775: for (String bfgId: bfgId2FlysDT.keySet()) { teichmann@4775: log.info("FLYS: Discharge table '" + bfgId sascha@4104: + "' found in FLYS but not in AFT. -> ignore"); sascha@4098: } sascha@4098: sascha@4104: log.info("FLYS: Copy " + createDTs.size() + sascha@4104: " discharge tables over from AFT."); sascha@4098: sascha@4104: // Create the new discharge tables. sascha@4104: for (DischargeTable aftDT: createDTs) { sascha@4104: createDischargeTable(context, aftDT, gauge.getFlysId()); sascha@4104: modified = true; sascha@4104: } sascha@4102: sascha@4104: flysStatements.commitTransaction(); sascha@4104: } sascha@4104: catch (SQLException sqle) { sascha@4104: flysStatements.rollbackTransaction(); sascha@4104: log.error(sqle, sqle); sascha@4104: modified = false; sascha@4100: } sascha@4097: sascha@4094: return modified; sascha@4087: } sascha@4079: sascha@4099: protected boolean writeWQChanges( sascha@4099: SyncContext context, sascha@4099: DischargeTable flysDT, sascha@4099: DischargeTable aftDT sascha@4099: ) sascha@4099: throws SQLException sascha@4099: { sascha@4099: flysDT.loadFlysValues(context); sascha@4099: aftDT.loadAftValues(context); sascha@4099: WQDiff diff = new WQDiff(flysDT.getValues(), aftDT.getValues()); sascha@4099: if (diff.hasChanges()) { sascha@4099: diff.writeChanges(context, flysDT.getId()); sascha@4099: return true; sascha@4099: } sascha@4099: return false; sascha@4099: } sascha@4099: sascha@4094: protected boolean createGauges( sascha@4087: SyncContext context, sascha@4087: Map gauges sascha@4087: ) sascha@4087: throws SQLException sascha@4087: { sascha@4087: ConnectedStatements flysStatements = context.getFlysStatements(); sascha@4087: sascha@4087: SymbolicStatement.Instance nextId = sascha@4087: flysStatements.getStatement("next.gauge.id"); sascha@4087: sascha@4087: SymbolicStatement.Instance insertStmnt = sascha@4087: flysStatements.getStatement("insert.gauge"); sascha@4087: sascha@4094: boolean modified = false; sascha@4094: sascha@4087: for (Map.Entry entry: gauges.entrySet()) { sascha@4087: Long officialNumber = entry.getKey(); sascha@4087: DIPSGauge gauge = entry.getValue(); sascha@4087: sascha@4085: log.info("Gauge '" + gauge.getAftName() + sascha@4085: "' not in FLYS but in AFT/DIPS. -> Create"); sascha@4087: sascha@4087: if (!gauge.hasDatums()) { teichmann@4736: log.warn("DIPS: Gauge '" + sascha@4087: gauge.getAftName() + "' has no datum. Ignored."); sascha@4087: continue; sascha@4087: } sascha@4087: sascha@4087: ResultSet rs = null; sascha@4087: flysStatements.beginTransaction(); sascha@4087: try { sascha@4087: (rs = nextId.executeQuery()).next(); sascha@4087: int gaugeId = rs.getInt("gauge_id"); sascha@4087: rs.close(); rs = null; sascha@4087: sascha@4087: insertStmnt sascha@4087: .clearParameters() sascha@4087: .setInt("id", gaugeId) sascha@4087: .setString("name", gauge.getAftName()) sascha@4087: .setInt("river_id", id1) sascha@4087: .setDouble("station", gauge.getStation()) sascha@4087: .setDouble("aeo", gauge.getAeo()) sascha@4100: .setLong("official_number", officialNumber) sascha@4087: .setDouble("datum", gauge.getLatestDatum().getValue()); sascha@4087: sascha@4087: insertStmnt.execute(); sascha@4087: teichmann@4736: log.info("FLYS: Created gauge '" + gauge.getAftName() + sascha@4087: "' with id " + gaugeId + "."); sascha@4087: sascha@4088: gauge.setFlysId(gaugeId); sascha@4100: createDischargeTables(context, gauge); sascha@4087: flysStatements.commitTransaction(); sascha@4094: modified = true; sascha@4087: } sascha@4087: catch (SQLException sqle) { sascha@4087: flysStatements.rollbackTransaction(); sascha@4094: log.error(sqle, sqle); sascha@4087: } sascha@4087: finally { sascha@4087: if (rs != null) { sascha@4087: rs.close(); sascha@4087: } sascha@4087: } sascha@4085: } sascha@4094: sascha@4094: return modified; sascha@4075: } sascha@4075: sascha@4102: protected void createDischargeTable( sascha@4102: SyncContext context, sascha@4102: DischargeTable aftDT, sascha@4102: int flysGaugeId sascha@4102: ) sascha@4102: throws SQLException sascha@4102: { sascha@4102: aftDT.persistFlysTimeInterval(context); sascha@4102: int flysId = aftDT.persistFlysDischargeTable(context, flysGaugeId); sascha@4102: sascha@4102: aftDT.loadAftValues(context); sascha@4102: aftDT.storeFlysValues(context, flysId); sascha@4102: } sascha@4102: sascha@4088: protected void createDischargeTables( sascha@4088: SyncContext context, sascha@4088: DIPSGauge gauge sascha@4088: ) sascha@4088: throws SQLException sascha@4088: { sascha@4102: log.info("FLYS: Create discharge tables for '" + sascha@4102: gauge.getAftName() + "'."); sascha@4088: sascha@4093: // Load the discharge tables from AFT. sascha@4093: List dts = loadAftDischargeTables( sascha@4100: context, gauge); sascha@4088: sascha@4093: // Persist the time intervals. sascha@4093: persistFlysTimeIntervals(context, dts); sascha@4093: sascha@4093: // Persist the discharge tables sascha@4102: int [] flysDTIds = persistFlysDischargeTables( sascha@4102: context, dts, gauge.getFlysId()); sascha@4093: sascha@4093: // Copy over the W/Q values sascha@4093: copyWQsFromAftToFlys(context, dts, flysDTIds); sascha@4093: } sascha@4093: sascha@4093: protected List loadAftDischargeTables( sascha@4093: SyncContext context, sascha@4093: DIPSGauge gauge sascha@4093: ) sascha@4093: throws SQLException sascha@4093: { sascha@4097: return DischargeTable.loadAftDischargeTables( sascha@4100: context, gauge.getOfficialNumber(), gauge.getFlysId()); sascha@4093: } sascha@4093: sascha@4093: protected void persistFlysTimeIntervals( sascha@4093: SyncContext context, sascha@4093: List dts sascha@4093: ) sascha@4093: throws SQLException sascha@4093: { sascha@4090: for (DischargeTable dt: dts) { sascha@4102: dt.persistFlysTimeInterval(context); sascha@4090: } sascha@4093: } sascha@4090: sascha@4093: protected int [] persistFlysDischargeTables( sascha@4093: SyncContext context, sascha@4102: List dts, sascha@4102: int flysGaugeId sascha@4093: ) sascha@4093: throws SQLException sascha@4093: { sascha@4093: int [] flysDTIds = new int[dts.size()]; sascha@4093: sascha@4102: for (int i = 0; i < flysDTIds.length; ++i) { sascha@4102: flysDTIds[i] = dts.get(i) sascha@4102: .persistFlysDischargeTable(context, flysGaugeId); sascha@4088: } sascha@4093: sascha@4093: return flysDTIds; sascha@4093: } sascha@4093: sascha@4093: protected void copyWQsFromAftToFlys( sascha@4093: SyncContext context, sascha@4093: List dts, sascha@4093: int [] flysDTIds sascha@4093: ) sascha@4093: throws SQLException sascha@4093: { sascha@4093: for (int i = 0; i < flysDTIds.length; ++i) { sascha@4093: DischargeTable dt = dts.get(i); sascha@4093: dt.loadAftValues(context); sascha@4093: dt.storeFlysValues(context, flysDTIds[i]); sascha@4093: dt.clearValues(); // To save memory. sascha@4093: } sascha@4088: } sascha@4088: sascha@4075: public String toString() { sascha@4075: return "[River: name=" + name + ", " + super.toString() + "]"; sascha@4075: } sascha@4075: } sascha@4075: // vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :