sascha@4075: package de.intevation.aft; sascha@4075: sascha@4085: import java.util.List; sascha@4085: import java.util.ArrayList; sascha@4085: import java.util.HashMap; sascha@4078: import java.util.Map; sascha@4078: sascha@4076: import java.sql.ResultSet; sascha@4075: import java.sql.SQLException; sascha@4090: import java.sql.Types; sascha@4075: sascha@4075: import org.apache.log4j.Logger; sascha@4075: sascha@4075: import de.intevation.db.ConnectedStatements; sascha@4087: import de.intevation.db.SymbolicStatement; sascha@4075: sascha@4075: public class River sascha@4075: extends IdPair sascha@4075: { sascha@4075: private static Logger log = Logger.getLogger(River.class); sascha@4075: sascha@4075: protected String name; sascha@4075: sascha@4075: public River() { sascha@4075: } sascha@4075: sascha@4075: public River(int id1, int id2, String name) { sascha@4075: super(id1, id2); sascha@4075: this.name = name; sascha@4075: } sascha@4075: sascha@4075: public String getName() { sascha@4075: return name; sascha@4075: } sascha@4075: sascha@4078: sascha@4094: public boolean sync(SyncContext context) throws SQLException { sascha@4075: log.info("sync river: " + this); sascha@4077: sascha@4084: Map dipsGauges = context.getDIPSGauges(); sascha@4081: sascha@4077: ConnectedStatements flysStatements = context.getFlysStatements(); sascha@4077: ConnectedStatements aftStatements = context.getAftStatements(); sascha@4077: sascha@4076: ResultSet messstellenRs = aftStatements sascha@4076: .getStatement("select.messstelle") sascha@4076: .clearParameters() sascha@4076: .setInt("GEWAESSER_NR", id2).executeQuery(); sascha@4076: sascha@4085: String riverName = getName(); sascha@4085: sascha@4085: Map aftDIPSGauges = new HashMap(); sascha@4085: sascha@4076: while (messstellenRs.next()) { sascha@4076: String name = messstellenRs.getString("NAME"); sascha@4076: String num = messstellenRs.getString("MESSSTELLE_NR"); sascha@4084: Long number = SyncContext.numberToLong(num); sascha@4081: if (number == null) { sascha@4081: log.warn("Invalid MESSSTELLE_NR for MESSSTELLE '"+name+"'"); sascha@4081: continue; sascha@4081: } sascha@4081: DIPSGauge dipsGauge = dipsGauges.get(number); sascha@4085: if (dipsGauge == null) { sascha@4085: log.warn( sascha@4085: "MESSSTELLE '" + name + "' not found in DIPS. " + sascha@4085: "Gauge number used for lookup: " + number); sascha@4081: continue; sascha@4081: } sascha@4085: String gaugeRiver = dipsGauge.getRiverName(); sascha@4085: if (!gaugeRiver.equalsIgnoreCase(riverName)) { sascha@4085: log.warn( sascha@4085: "MESSSTELLE '" + name + sascha@4085: "' is assigned to river '" + gaugeRiver + sascha@4085: "'. Needs to be on '" + riverName + "'."); sascha@4085: continue; sascha@4085: } sascha@4085: dipsGauge.setAftName(name); sascha@4097: dipsGauge.setOfficialNumber(number); sascha@4085: aftDIPSGauges.put(number, dipsGauge); sascha@4076: } sascha@4076: sascha@4076: messstellenRs.close(); sascha@4076: sascha@4085: List updateGauges = new ArrayList(); sascha@4085: sascha@4076: ResultSet gaugesRs = flysStatements sascha@4076: .getStatement("select.gauges") sascha@4076: .clearParameters() sascha@4076: .setInt("river_id", id1).executeQuery(); sascha@4076: sascha@4076: while (gaugesRs.next()) { sascha@4076: int gaugeId = gaugesRs.getInt("id"); sascha@4076: String name = gaugesRs.getString("name"); sascha@4085: long number = gaugesRs.getLong("official_number"); sascha@4085: if (gaugesRs.wasNull()) { sascha@4085: log.warn("FLYS: Gauge '" + name + sascha@4085: "' has no official number. Ignored."); sascha@4085: continue; sascha@4085: } sascha@4085: Long key = Long.valueOf(number); sascha@4085: DIPSGauge aftDIPSGauge = aftDIPSGauges.remove(key); sascha@4085: if (aftDIPSGauge == null) { sascha@4085: log.warn("FLYS: Gauge '" + name + "' number " + number + sascha@4085: " is not found in AFT/DIPS."); sascha@4085: continue; sascha@4085: } sascha@4085: aftDIPSGauge.setFlysId(gaugeId); sascha@4085: log.info("Gauge '" + name + sascha@4085: "' found in FLYS, AFT and DIPS. -> Update"); sascha@4085: updateGauges.add(aftDIPSGauge); sascha@4076: } sascha@4094: gaugesRs.close(); sascha@4076: sascha@4094: boolean modified = createGauges(context, aftDIPSGauges); sascha@4087: sascha@4097: modified |= updateGauges(context, updateGauges); sascha@4097: sascha@4097: return modified; sascha@4097: } sascha@4097: sascha@4097: protected boolean updateGauges( sascha@4097: SyncContext context, sascha@4097: List gauges sascha@4097: ) sascha@4097: throws SQLException sascha@4097: { sascha@4097: boolean modified = false; sascha@4097: sascha@4097: for (DIPSGauge gauge: gauges) { sascha@4097: modified |= updateGauge(context, gauge); sascha@4097: } sascha@4097: sascha@4097: return modified; sascha@4097: } sascha@4097: sascha@4097: protected boolean updateGauge( sascha@4097: SyncContext context, sascha@4097: DIPSGauge gauge sascha@4097: ) sascha@4097: throws SQLException sascha@4097: { sascha@4097: // We need to load all discharge tables from both database sascha@4097: // of the gauge and do some pairing based on their descriptions. sascha@4097: sascha@4097: boolean modified = false; sascha@4097: sascha@4097: List flysDTs = sascha@4097: DischargeTable.loadFlysDischargeTables( sascha@4097: context, gauge.getFlysId()); sascha@4097: sascha@4097: List aftDTs = sascha@4097: DischargeTable.loadAftDischargeTables( sascha@4097: context, gauge.getOfficialNumber()); sascha@4097: sascha@4097: // TODO: Do pairing sascha@4097: sascha@4094: return modified; sascha@4087: } sascha@4079: sascha@4094: protected boolean createGauges( sascha@4087: SyncContext context, sascha@4087: Map gauges sascha@4087: ) sascha@4087: throws SQLException sascha@4087: { sascha@4087: ConnectedStatements flysStatements = context.getFlysStatements(); sascha@4087: sascha@4087: SymbolicStatement.Instance nextId = sascha@4087: flysStatements.getStatement("next.gauge.id"); sascha@4087: sascha@4087: SymbolicStatement.Instance insertStmnt = sascha@4087: flysStatements.getStatement("insert.gauge"); sascha@4087: sascha@4094: boolean modified = false; sascha@4094: sascha@4087: for (Map.Entry entry: gauges.entrySet()) { sascha@4087: Long officialNumber = entry.getKey(); sascha@4087: DIPSGauge gauge = entry.getValue(); sascha@4087: sascha@4085: log.info("Gauge '" + gauge.getAftName() + sascha@4085: "' not in FLYS but in AFT/DIPS. -> Create"); sascha@4087: sascha@4087: if (!gauge.hasDatums()) { sascha@4087: log.warn("FLYS: Gauge '" + sascha@4087: gauge.getAftName() + "' has no datum. Ignored."); sascha@4087: continue; sascha@4087: } sascha@4087: sascha@4087: ResultSet rs = null; sascha@4087: flysStatements.beginTransaction(); sascha@4087: try { sascha@4087: (rs = nextId.executeQuery()).next(); sascha@4087: int gaugeId = rs.getInt("gauge_id"); sascha@4087: rs.close(); rs = null; sascha@4087: sascha@4087: insertStmnt sascha@4087: .clearParameters() sascha@4087: .setInt("id", gaugeId) sascha@4087: .setString("name", gauge.getAftName()) sascha@4087: .setInt("river_id", id1) sascha@4087: .setDouble("station", gauge.getStation()) sascha@4087: .setDouble("aeo", gauge.getAeo()) sascha@4087: .setDouble("official_number", officialNumber) sascha@4087: .setDouble("datum", gauge.getLatestDatum().getValue()); sascha@4087: sascha@4087: insertStmnt.execute(); sascha@4087: sascha@4087: log.info("FLYS: Created gauge '" + gauge.getAftName() + sascha@4087: "' with id " + gaugeId + "."); sascha@4087: sascha@4088: gauge.setFlysId(gaugeId); sascha@4088: createDischargeTables(context, officialNumber, gauge); sascha@4087: flysStatements.commitTransaction(); sascha@4094: modified = true; sascha@4087: } sascha@4087: catch (SQLException sqle) { sascha@4087: flysStatements.rollbackTransaction(); sascha@4094: log.error(sqle, sqle); sascha@4087: } sascha@4087: finally { sascha@4087: if (rs != null) { sascha@4087: rs.close(); sascha@4087: } sascha@4087: } sascha@4085: } sascha@4094: sascha@4094: return modified; sascha@4075: } sascha@4075: sascha@4088: protected void createDischargeTables( sascha@4088: SyncContext context, sascha@4088: Long officialNumber, sascha@4088: DIPSGauge gauge sascha@4088: ) sascha@4088: throws SQLException sascha@4088: { sascha@4088: log.info("create discharge tables"); sascha@4088: sascha@4093: // Load the discharge tables from AFT. sascha@4093: List dts = loadAftDischargeTables( sascha@4093: context, officialNumber, gauge); sascha@4088: sascha@4093: // Persist the time intervals. sascha@4093: persistFlysTimeIntervals(context, dts); sascha@4093: sascha@4093: // Persist the discharge tables sascha@4093: int [] flysDTIds = persistFlysDischargeTables(context, dts); sascha@4093: sascha@4093: // Copy over the W/Q values sascha@4093: copyWQsFromAftToFlys(context, dts, flysDTIds); sascha@4093: } sascha@4093: sascha@4093: protected List loadAftDischargeTables( sascha@4093: SyncContext context, sascha@4093: Long officialNumber, sascha@4093: DIPSGauge gauge sascha@4093: ) sascha@4093: throws SQLException sascha@4093: { sascha@4097: return DischargeTable.loadAftDischargeTables( sascha@4097: context, officialNumber, gauge.getFlysId()); sascha@4093: } sascha@4093: sascha@4093: protected void persistFlysTimeIntervals( sascha@4093: SyncContext context, sascha@4093: List dts sascha@4093: ) sascha@4093: throws SQLException sascha@4093: { sascha@4090: for (DischargeTable dt: dts) { sascha@4090: TimeInterval timeInterval = dt.getTimeInterval(); sascha@4090: if (timeInterval != null) { sascha@4090: dt.setTimeInterval( sascha@4090: context.fetchOrCreateFLYSTimeInterval(timeInterval)); sascha@4090: } sascha@4090: } sascha@4093: } sascha@4090: sascha@4093: protected int [] persistFlysDischargeTables( sascha@4093: SyncContext context, sascha@4093: List dts sascha@4093: ) sascha@4093: throws SQLException sascha@4093: { sascha@4093: boolean debug = log.isDebugEnabled(); sascha@4093: sascha@4093: int [] flysDTIds = new int[dts.size()]; sascha@4093: sascha@4093: ResultSet rs = null; sascha@4090: try { sascha@4093: ConnectedStatements flysStatements = sascha@4093: context.getFlysStatements(); sascha@4093: sascha@4090: SymbolicStatement.Instance nextId = sascha@4090: flysStatements.getStatement("next.discharge.id"); sascha@4090: sascha@4090: SymbolicStatement.Instance insertDT = sascha@4090: flysStatements.getStatement("insert.dischargetable"); sascha@4090: sascha@4093: for (int i = 0; i < flysDTIds.length; ++i) { sascha@4090: rs = nextId.executeQuery(); sascha@4090: rs.next(); sascha@4090: int id = rs.getInt("discharge_table_id"); sascha@4093: flysDTIds[i] = id; sascha@4090: rs.close(); rs = null; sascha@4090: sascha@4093: DischargeTable dt = dts.get(i); sascha@4090: insertDT.clearParameters() sascha@4090: .setInt("id", id) sascha@4090: .setInt("gauge_id", dt.getGaugeId()) sascha@4090: .setString("description", dt.getDescription()); sascha@4090: sascha@4090: TimeInterval timeInterval = dt.getTimeInterval(); sascha@4090: if (timeInterval != null) { sascha@4090: insertDT.setInt("time_interval_id", timeInterval.getId()); sascha@4090: } sascha@4090: else { sascha@4090: insertDT.setNull("time_interval_id", Types.INTEGER); sascha@4090: } sascha@4090: sascha@4090: insertDT.execute(); sascha@4090: if (debug) { sascha@4090: log.debug("FLYS: Created discharge table id: " + id); sascha@4088: } sascha@4088: } sascha@4088: } sascha@4088: finally { sascha@4088: if (rs != null) { sascha@4088: rs.close(); sascha@4090: rs = null; sascha@4088: } sascha@4088: } sascha@4093: sascha@4093: return flysDTIds; sascha@4093: } sascha@4093: sascha@4093: protected void copyWQsFromAftToFlys( sascha@4093: SyncContext context, sascha@4093: List dts, sascha@4093: int [] flysDTIds sascha@4093: ) sascha@4093: throws SQLException sascha@4093: { sascha@4093: for (int i = 0; i < flysDTIds.length; ++i) { sascha@4093: DischargeTable dt = dts.get(i); sascha@4093: dt.loadAftValues(context); sascha@4093: dt.storeFlysValues(context, flysDTIds[i]); sascha@4093: dt.clearValues(); // To save memory. sascha@4093: } sascha@4088: } sascha@4088: sascha@4075: public String toString() { sascha@4075: return "[River: name=" + name + ", " + super.toString() + "]"; sascha@4075: } sascha@4075: sascha@4075: } sascha@4075: // vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :