sascha@4075: package de.intevation.aft; sascha@4075: sascha@4085: import java.util.List; sascha@4085: import java.util.ArrayList; sascha@4085: import java.util.HashMap; sascha@4078: import java.util.Map; sascha@4078: sascha@4076: import java.sql.ResultSet; sascha@4075: import java.sql.SQLException; sascha@4075: sascha@4075: import org.apache.log4j.Logger; sascha@4075: sascha@4075: import de.intevation.db.ConnectedStatements; sascha@4087: import de.intevation.db.SymbolicStatement; sascha@4075: sascha@4075: public class River sascha@4075: extends IdPair sascha@4075: { sascha@4075: private static Logger log = Logger.getLogger(River.class); sascha@4075: sascha@4075: protected String name; sascha@4075: sascha@4075: public River() { sascha@4075: } sascha@4075: sascha@4075: public River(int id1, int id2, String name) { sascha@4075: super(id1, id2); sascha@4075: this.name = name; sascha@4075: } sascha@4075: sascha@4075: public String getName() { sascha@4075: return name; sascha@4075: } sascha@4075: sascha@4078: sascha@4094: public boolean sync(SyncContext context) throws SQLException { sascha@4075: log.info("sync river: " + this); sascha@4077: sascha@4084: Map dipsGauges = context.getDIPSGauges(); sascha@4081: sascha@4077: ConnectedStatements flysStatements = context.getFlysStatements(); sascha@4077: ConnectedStatements aftStatements = context.getAftStatements(); sascha@4077: sascha@4076: ResultSet messstellenRs = aftStatements sascha@4076: .getStatement("select.messstelle") sascha@4076: .clearParameters() sascha@4076: .setInt("GEWAESSER_NR", id2).executeQuery(); sascha@4076: sascha@4085: String riverName = getName(); sascha@4085: sascha@4085: Map aftDIPSGauges = new HashMap(); sascha@4085: sascha@4076: while (messstellenRs.next()) { sascha@4076: String name = messstellenRs.getString("NAME"); sascha@4076: String num = messstellenRs.getString("MESSSTELLE_NR"); sascha@4084: Long number = SyncContext.numberToLong(num); sascha@4081: if (number == null) { sascha@4081: log.warn("Invalid MESSSTELLE_NR for MESSSTELLE '"+name+"'"); sascha@4081: continue; sascha@4081: } sascha@4081: DIPSGauge dipsGauge = dipsGauges.get(number); sascha@4085: if (dipsGauge == null) { sascha@4085: log.warn( sascha@4085: "MESSSTELLE '" + name + "' not found in DIPS. " + sascha@4085: "Gauge number used for lookup: " + number); sascha@4081: continue; sascha@4081: } sascha@4085: String gaugeRiver = dipsGauge.getRiverName(); sascha@4085: if (!gaugeRiver.equalsIgnoreCase(riverName)) { sascha@4085: log.warn( sascha@4085: "MESSSTELLE '" + name + sascha@4085: "' is assigned to river '" + gaugeRiver + sascha@4085: "'. Needs to be on '" + riverName + "'."); sascha@4085: continue; sascha@4085: } sascha@4085: dipsGauge.setAftName(name); sascha@4097: dipsGauge.setOfficialNumber(number); sascha@4085: aftDIPSGauges.put(number, dipsGauge); sascha@4076: } sascha@4076: sascha@4076: messstellenRs.close(); sascha@4076: sascha@4085: List updateGauges = new ArrayList(); sascha@4085: sascha@4076: ResultSet gaugesRs = flysStatements sascha@4076: .getStatement("select.gauges") sascha@4076: .clearParameters() sascha@4076: .setInt("river_id", id1).executeQuery(); sascha@4076: sascha@4076: while (gaugesRs.next()) { sascha@4076: int gaugeId = gaugesRs.getInt("id"); sascha@4076: String name = gaugesRs.getString("name"); sascha@4085: long number = gaugesRs.getLong("official_number"); sascha@4085: if (gaugesRs.wasNull()) { sascha@4085: log.warn("FLYS: Gauge '" + name + sascha@4085: "' has no official number. Ignored."); sascha@4085: continue; sascha@4085: } sascha@4085: Long key = Long.valueOf(number); sascha@4085: DIPSGauge aftDIPSGauge = aftDIPSGauges.remove(key); sascha@4085: if (aftDIPSGauge == null) { sascha@4085: log.warn("FLYS: Gauge '" + name + "' number " + number + sascha@4085: " is not found in AFT/DIPS."); sascha@4085: continue; sascha@4085: } sascha@4085: aftDIPSGauge.setFlysId(gaugeId); sascha@4085: log.info("Gauge '" + name + sascha@4085: "' found in FLYS, AFT and DIPS. -> Update"); sascha@4085: updateGauges.add(aftDIPSGauge); sascha@4076: } sascha@4094: gaugesRs.close(); sascha@4076: sascha@4094: boolean modified = createGauges(context, aftDIPSGauges); sascha@4087: sascha@4097: modified |= updateGauges(context, updateGauges); sascha@4097: sascha@4097: return modified; sascha@4097: } sascha@4097: sascha@4097: protected boolean updateGauges( sascha@4097: SyncContext context, sascha@4097: List gauges sascha@4097: ) sascha@4097: throws SQLException sascha@4097: { sascha@4097: boolean modified = false; sascha@4097: sascha@4097: for (DIPSGauge gauge: gauges) { sascha@4097: modified |= updateGauge(context, gauge); sascha@4097: } sascha@4097: sascha@4097: return modified; sascha@4097: } sascha@4097: sascha@4097: protected boolean updateGauge( sascha@4097: SyncContext context, sascha@4097: DIPSGauge gauge sascha@4097: ) sascha@4097: throws SQLException sascha@4097: { sascha@4102: log.info("FLYS: Updating gauge '" + gauge.getAftName() + "'."); sascha@4097: // We need to load all discharge tables from both database sascha@4097: // of the gauge and do some pairing based on their descriptions. sascha@4097: sascha@4097: boolean modified = false; sascha@4097: sascha@4104: ConnectedStatements flysStatements = context.getFlysStatements(); sascha@4098: sascha@4104: flysStatements.beginTransaction(); sascha@4104: try { sascha@4104: List flysDTs = sascha@4104: DischargeTable.loadFlysDischargeTables( sascha@4104: context, gauge.getFlysId()); sascha@4104: sascha@4104: List aftDTs = sascha@4104: DischargeTable.loadAftDischargeTables( sascha@4104: context, gauge.getOfficialNumber()); sascha@4104: sascha@4104: Map desc2FlysDT = sascha@4104: new HashMap(); sascha@4104: sascha@4104: for (DischargeTable dt: flysDTs) { sascha@4104: String description = dt.getDescription(); sascha@4104: if (description == null) { sascha@4104: log.warn("FLYS: discharge table " + dt.getId() sascha@4104: + " has no description. Ignored."); sascha@4104: continue; sascha@4104: } sascha@4104: desc2FlysDT.put(description, dt); sascha@4098: } sascha@4104: sascha@4104: List createDTs = new ArrayList(); sascha@4104: sascha@4104: for (DischargeTable aftDT: aftDTs) { sascha@4104: String description = aftDT.getDescription(); sascha@4104: DischargeTable flysDT = desc2FlysDT.remove(description); sascha@4104: if (flysDT != null) { sascha@4104: // Found in AFT and FLYS. sascha@4104: log.info("FLYS: Discharge table '" + description sascha@4104: + "' found in AFT and FLYS. -> update"); sascha@4104: // Create the W/Q diff. sascha@4104: modified |= writeWQChanges(context, flysDT, aftDT); sascha@4104: } sascha@4104: else { sascha@4104: log.info("FLYS: Discharge table '" + description sascha@4104: + "' not found in FLYS. -> create"); sascha@4104: createDTs.add(aftDT); sascha@4104: } sascha@4104: } sascha@4104: sascha@4104: for (String description: desc2FlysDT.keySet()) { sascha@4098: log.info("FLYS: Discharge table '" + description sascha@4104: + "' found in FLYS but not in AFT. -> ignore"); sascha@4098: } sascha@4098: sascha@4104: log.info("FLYS: Copy " + createDTs.size() + sascha@4104: " discharge tables over from AFT."); sascha@4098: sascha@4104: // Create the new discharge tables. sascha@4104: for (DischargeTable aftDT: createDTs) { sascha@4104: createDischargeTable(context, aftDT, gauge.getFlysId()); sascha@4104: modified = true; sascha@4104: } sascha@4102: sascha@4104: flysStatements.commitTransaction(); sascha@4104: } sascha@4104: catch (SQLException sqle) { sascha@4104: flysStatements.rollbackTransaction(); sascha@4104: log.error(sqle, sqle); sascha@4104: modified = false; sascha@4100: } sascha@4097: sascha@4094: return modified; sascha@4087: } sascha@4079: sascha@4099: protected boolean writeWQChanges( sascha@4099: SyncContext context, sascha@4099: DischargeTable flysDT, sascha@4099: DischargeTable aftDT sascha@4099: ) sascha@4099: throws SQLException sascha@4099: { sascha@4099: flysDT.loadFlysValues(context); sascha@4099: aftDT.loadAftValues(context); sascha@4099: WQDiff diff = new WQDiff(flysDT.getValues(), aftDT.getValues()); sascha@4099: if (diff.hasChanges()) { sascha@4099: diff.writeChanges(context, flysDT.getId()); sascha@4099: return true; sascha@4099: } sascha@4099: return false; sascha@4099: } sascha@4099: sascha@4094: protected boolean createGauges( sascha@4087: SyncContext context, sascha@4087: Map gauges sascha@4087: ) sascha@4087: throws SQLException sascha@4087: { sascha@4087: ConnectedStatements flysStatements = context.getFlysStatements(); sascha@4087: sascha@4087: SymbolicStatement.Instance nextId = sascha@4087: flysStatements.getStatement("next.gauge.id"); sascha@4087: sascha@4087: SymbolicStatement.Instance insertStmnt = sascha@4087: flysStatements.getStatement("insert.gauge"); sascha@4087: sascha@4094: boolean modified = false; sascha@4094: sascha@4087: for (Map.Entry entry: gauges.entrySet()) { sascha@4087: Long officialNumber = entry.getKey(); sascha@4087: DIPSGauge gauge = entry.getValue(); sascha@4087: sascha@4085: log.info("Gauge '" + gauge.getAftName() + sascha@4085: "' not in FLYS but in AFT/DIPS. -> Create"); sascha@4087: sascha@4087: if (!gauge.hasDatums()) { sascha@4087: log.warn("FLYS: Gauge '" + sascha@4087: gauge.getAftName() + "' has no datum. Ignored."); sascha@4087: continue; sascha@4087: } sascha@4087: sascha@4087: ResultSet rs = null; sascha@4087: flysStatements.beginTransaction(); sascha@4087: try { sascha@4087: (rs = nextId.executeQuery()).next(); sascha@4087: int gaugeId = rs.getInt("gauge_id"); sascha@4087: rs.close(); rs = null; sascha@4087: sascha@4087: insertStmnt sascha@4087: .clearParameters() sascha@4087: .setInt("id", gaugeId) sascha@4087: .setString("name", gauge.getAftName()) sascha@4087: .setInt("river_id", id1) sascha@4087: .setDouble("station", gauge.getStation()) sascha@4087: .setDouble("aeo", gauge.getAeo()) sascha@4100: .setLong("official_number", officialNumber) sascha@4087: .setDouble("datum", gauge.getLatestDatum().getValue()); sascha@4087: sascha@4087: insertStmnt.execute(); sascha@4087: sascha@4087: log.info("FLYS: Created gauge '" + gauge.getAftName() + sascha@4087: "' with id " + gaugeId + "."); sascha@4087: sascha@4088: gauge.setFlysId(gaugeId); sascha@4100: createDischargeTables(context, gauge); sascha@4087: flysStatements.commitTransaction(); sascha@4094: modified = true; sascha@4087: } sascha@4087: catch (SQLException sqle) { sascha@4087: flysStatements.rollbackTransaction(); sascha@4094: log.error(sqle, sqle); sascha@4087: } sascha@4087: finally { sascha@4087: if (rs != null) { sascha@4087: rs.close(); sascha@4087: } sascha@4087: } sascha@4085: } sascha@4094: sascha@4094: return modified; sascha@4075: } sascha@4075: sascha@4102: protected void createDischargeTable( sascha@4102: SyncContext context, sascha@4102: DischargeTable aftDT, sascha@4102: int flysGaugeId sascha@4102: ) sascha@4102: throws SQLException sascha@4102: { sascha@4102: aftDT.persistFlysTimeInterval(context); sascha@4102: int flysId = aftDT.persistFlysDischargeTable(context, flysGaugeId); sascha@4102: sascha@4102: aftDT.loadAftValues(context); sascha@4102: aftDT.storeFlysValues(context, flysId); sascha@4102: } sascha@4102: sascha@4088: protected void createDischargeTables( sascha@4088: SyncContext context, sascha@4088: DIPSGauge gauge sascha@4088: ) sascha@4088: throws SQLException sascha@4088: { sascha@4102: log.info("FLYS: Create discharge tables for '" + sascha@4102: gauge.getAftName() + "'."); sascha@4088: sascha@4093: // Load the discharge tables from AFT. sascha@4093: List dts = loadAftDischargeTables( sascha@4100: context, gauge); sascha@4088: sascha@4093: // Persist the time intervals. sascha@4093: persistFlysTimeIntervals(context, dts); sascha@4093: sascha@4093: // Persist the discharge tables sascha@4102: int [] flysDTIds = persistFlysDischargeTables( sascha@4102: context, dts, gauge.getFlysId()); sascha@4093: sascha@4093: // Copy over the W/Q values sascha@4093: copyWQsFromAftToFlys(context, dts, flysDTIds); sascha@4093: } sascha@4093: sascha@4093: protected List loadAftDischargeTables( sascha@4093: SyncContext context, sascha@4093: DIPSGauge gauge sascha@4093: ) sascha@4093: throws SQLException sascha@4093: { sascha@4097: return DischargeTable.loadAftDischargeTables( sascha@4100: context, gauge.getOfficialNumber(), gauge.getFlysId()); sascha@4093: } sascha@4093: sascha@4093: protected void persistFlysTimeIntervals( sascha@4093: SyncContext context, sascha@4093: List dts sascha@4093: ) sascha@4093: throws SQLException sascha@4093: { sascha@4090: for (DischargeTable dt: dts) { sascha@4102: dt.persistFlysTimeInterval(context); sascha@4090: } sascha@4093: } sascha@4090: sascha@4093: protected int [] persistFlysDischargeTables( sascha@4093: SyncContext context, sascha@4102: List dts, sascha@4102: int flysGaugeId sascha@4093: ) sascha@4093: throws SQLException sascha@4093: { sascha@4093: boolean debug = log.isDebugEnabled(); sascha@4093: sascha@4093: int [] flysDTIds = new int[dts.size()]; sascha@4093: sascha@4102: for (int i = 0; i < flysDTIds.length; ++i) { sascha@4102: flysDTIds[i] = dts.get(i) sascha@4102: .persistFlysDischargeTable(context, flysGaugeId); sascha@4088: } sascha@4093: sascha@4093: return flysDTIds; sascha@4093: } sascha@4093: sascha@4093: protected void copyWQsFromAftToFlys( sascha@4093: SyncContext context, sascha@4093: List dts, sascha@4093: int [] flysDTIds sascha@4093: ) sascha@4093: throws SQLException sascha@4093: { sascha@4093: for (int i = 0; i < flysDTIds.length; ++i) { sascha@4093: DischargeTable dt = dts.get(i); sascha@4093: dt.loadAftValues(context); sascha@4093: dt.storeFlysValues(context, flysDTIds[i]); sascha@4093: dt.clearValues(); // To save memory. sascha@4093: } sascha@4088: } sascha@4088: sascha@4075: public String toString() { sascha@4075: return "[River: name=" + name + ", " + super.toString() + "]"; sascha@4075: } sascha@4075: } sascha@4075: // vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :