Mercurial > dive4elements > river
view flys-aft/src/main/java/de/intevation/aft/River.java @ 5475:b6a6d13eec41
backout r5460 to be compatible with r5471 and Oracle database
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Wed, 27 Mar 2013 16:26:50 +0100 |
parents | da1e897c7224 |
children | 7a68967ca72a |
line wrap: on
line source
package de.intevation.aft; import de.intevation.db.ConnectedStatements; import de.intevation.db.SymbolicStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.log4j.Logger; public class River extends IdPair { private static Logger log = Logger.getLogger(River.class); protected String name; protected double from; protected double to; public River() { } public River(int id1, String name, double from, double to) { super(id1); this.name = name; this.from = from; this.to = to; } public River(int id1, int id2, String name) { super(id1, id2); this.name = name; } public String getName() { return name; } public double getFrom() { return from; } public void setFrom(double from) { this.from = from; } public double getTo() { return to; } public void setTo(double to) { this.to = to; } public boolean inside(double x) { return x >= from && x <= to; } public boolean sync(SyncContext context) throws SQLException { log.info("sync river: " + this); // Only take relevant gauges into account. Map<Long, DIPSGauge> dipsGauges = context.getDIPSGauges(name, from, to); ConnectedStatements flysStatements = context.getFlysStatements(); ConnectedStatements aftStatements = context.getAftStatements(); String riverName = getName(); Map<Long, DIPSGauge> aftDIPSGauges = new HashMap<Long, DIPSGauge>(); ResultSet messstellenRs = aftStatements .getStatement("select.messstelle") .clearParameters() .setInt("GEWAESSER_NR", id2) .executeQuery(); try { while (messstellenRs.next()) { String name = messstellenRs.getString("NAME"); String num = messstellenRs.getString("MESSSTELLE_NR"); double station = messstellenRs.getDouble("STATIONIERUNG"); if (!messstellenRs.wasNull() && !inside(station)) { log.warn("Station found in AFT but in not range: " + station); continue; } Long number = SyncContext.numberToLong(num); if (number == null) { log.warn("AFT: Invalid MESSSTELLE_NR for MESSSTELLE '"+name+"'"); continue; } DIPSGauge dipsGauge = dipsGauges.get(number); if (dipsGauge == null) { log.warn( "DIPS: MESSSTELLE '" + name + "' not found in DIPS. " + "Gauge number used for lookup: " + number); continue; } String gaugeRiver = dipsGauge.getRiverName(); if (!gaugeRiver.equalsIgnoreCase(riverName)) { log.warn( "DIPS: MESSSTELLE '" + name + "' is assigned to river '" + gaugeRiver + "'. Needs to be on '" + riverName + "'."); continue; } dipsGauge.setAftName(name); dipsGauge.setOfficialNumber(number); aftDIPSGauges.put(number, dipsGauge); } } finally { messstellenRs.close(); } List<DIPSGauge> updateGauges = new ArrayList<DIPSGauge>(); ResultSet gaugesRs = flysStatements .getStatement("select.gauges") .clearParameters() .setInt("river_id", id1).executeQuery(); try { while (gaugesRs.next()) { int gaugeId = gaugesRs.getInt("id"); String name = gaugesRs.getString("name"); long number = gaugesRs.getLong("official_number"); if (gaugesRs.wasNull()) { log.warn("FLYS: Gauge '" + name + "' has no official number. Ignored."); continue; } Long key = Long.valueOf(number); DIPSGauge aftDIPSGauge = aftDIPSGauges.remove(key); if (aftDIPSGauge == null) { log.warn("FLYS: Gauge '" + name + "' number " + number + " is not found in AFT/DIPS."); continue; } aftDIPSGauge.setFlysId(gaugeId); log.info("Gauge '" + name + "' found in FLYS, AFT and DIPS. -> Update"); updateGauges.add(aftDIPSGauge); } } finally { gaugesRs.close(); } boolean modified = createGauges(context, aftDIPSGauges); modified |= updateGauges(context, updateGauges); return modified; } protected boolean updateGauges( SyncContext context, List<DIPSGauge> gauges ) throws SQLException { boolean modified = false; for (DIPSGauge gauge: gauges) { modified |= updateBfGIdOnMasterDischargeTable(context, gauge); modified |= updateGauge(context, gauge); } return modified; } protected boolean updateBfGIdOnMasterDischargeTable( SyncContext context, DIPSGauge gauge ) throws SQLException { log.info( "FLYS: Updating master discharge table bfg_id for '" + gauge.getAftName() + "'"); ConnectedStatements flysStatements = context.getFlysStatements(); ResultSet rs = flysStatements .getStatement("select.gauge.master.discharge.table") .clearParameters() .setInt("gauge_id", gauge.getFlysId()) .executeQuery(); int flysId; try { if (rs.next()) { log.error( "FLYS: No master discharge table found for gauge '" + gauge.getAftName() + "'"); return false; } String bfgId = rs.getString("bfg_id"); if (!rs.wasNull()) { // already has BFG_ID return false; } flysId = rs.getInt("id"); } finally { rs.close(); } // We need to find out the BFG_ID of the current discharge table // for this gauge in AFT. ConnectedStatements aftStatements = context.getAftStatements(); rs = aftStatements .getStatement("select.bfg.id.current") .clearParameters() .setString("number", "%" + gauge.getOfficialNumber()) .executeQuery(); String bfgId = null; try { if (rs.next()) { bfgId = rs.getString("BFG_ID"); } } finally { rs.close(); } if (bfgId == null) { log.warn( "No BFG_ID found for current discharge table of gauge '" + gauge + "'"); return false; } // Set the BFG_ID in FLYS. flysStatements.beginTransaction(); try { flysStatements .getStatement("update.bfg.id.discharge.table") .clearParameters() .setInt("id", flysId) .setString("bfg_id", bfgId) .executeUpdate(); flysStatements.commitTransaction(); } catch (SQLException sqle) { flysStatements.rollbackTransaction(); log.error(sqle, sqle); return false; } return true; } protected boolean updateGauge( SyncContext context, DIPSGauge gauge ) throws SQLException { log.info("FLYS: Updating gauge '" + gauge.getAftName() + "'."); // We need to load all discharge tables from both databases // of the gauge and do some pairing based on their bfg_id. boolean modified = false; ConnectedStatements flysStatements = context.getFlysStatements(); flysStatements.beginTransaction(); try { List<DischargeTable> flysDTs = DischargeTable.loadFlysDischargeTables( context, gauge.getFlysId()); List<DischargeTable> aftDTs = DischargeTable.loadAftDischargeTables( context, gauge.getOfficialNumber()); Map<String, DischargeTable> bfgId2FlysDT = new HashMap<String, DischargeTable>(); for (DischargeTable dt: flysDTs) { String bfgId = dt.getBfgId(); if (bfgId == null) { log.warn("FLYS: discharge table " + dt.getId() + " has no bfg_id. Ignored."); continue; } bfgId2FlysDT.put(bfgId, dt); } List<DischargeTable> createDTs = new ArrayList<DischargeTable>(); for (DischargeTable aftDT: aftDTs) { String bfgId = aftDT.getBfgId(); DischargeTable flysDT = bfgId2FlysDT.remove(bfgId); if (flysDT != null) { // Found in AFT and FLYS. log.info("FLYS: Discharge table '" + bfgId + "' found in AFT and FLYS. -> update"); // Create the W/Q diff. modified |= writeWQChanges(context, flysDT, aftDT); } else { log.info("FLYS: Discharge table '" + bfgId + "' not found in FLYS. -> create"); createDTs.add(aftDT); } } for (String bfgId: bfgId2FlysDT.keySet()) { log.info("FLYS: Discharge table '" + bfgId + "' found in FLYS but not in AFT. -> ignore"); } log.info("FLYS: Copy " + createDTs.size() + " discharge tables over from AFT."); // Create the new discharge tables. for (DischargeTable aftDT: createDTs) { createDischargeTable(context, aftDT, gauge.getFlysId()); modified = true; } flysStatements.commitTransaction(); } catch (SQLException sqle) { flysStatements.rollbackTransaction(); log.error(sqle, sqle); modified = false; } return modified; } protected boolean writeWQChanges( SyncContext context, DischargeTable flysDT, DischargeTable aftDT ) throws SQLException { flysDT.loadFlysValues(context); aftDT.loadAftValues(context); WQDiff diff = new WQDiff(flysDT.getValues(), aftDT.getValues()); if (diff.hasChanges()) { diff.writeChanges(context, flysDT.getId()); return true; } return false; } protected boolean createGauges( SyncContext context, Map<Long, DIPSGauge> gauges ) throws SQLException { ConnectedStatements flysStatements = context.getFlysStatements(); SymbolicStatement.Instance nextId = flysStatements.getStatement("next.gauge.id"); SymbolicStatement.Instance insertStmnt = flysStatements.getStatement("insert.gauge"); boolean modified = false; for (Map.Entry<Long, DIPSGauge> entry: gauges.entrySet()) { Long officialNumber = entry.getKey(); DIPSGauge gauge = entry.getValue(); log.info("Gauge '" + gauge.getAftName() + "' not in FLYS but in AFT/DIPS. -> Create"); if (!gauge.hasDatums()) { log.warn("DIPS: Gauge '" + gauge.getAftName() + "' has no datum. Ignored."); continue; } ResultSet rs = null; flysStatements.beginTransaction(); try { (rs = nextId.executeQuery()).next(); int gaugeId = rs.getInt("gauge_id"); rs.close(); rs = null; insertStmnt .clearParameters() .setInt("id", gaugeId) .setString("name", gauge.getAftName()) .setInt("river_id", id1) .setDouble("station", gauge.getStation()) .setDouble("aeo", gauge.getAeo()) .setLong("official_number", officialNumber) .setDouble("datum", gauge.getLatestDatum().getValue()); insertStmnt.execute(); log.info("FLYS: Created gauge '" + gauge.getAftName() + "' with id " + gaugeId + "."); gauge.setFlysId(gaugeId); createDischargeTables(context, gauge); flysStatements.commitTransaction(); modified = true; } catch (SQLException sqle) { flysStatements.rollbackTransaction(); log.error(sqle, sqle); } finally { if (rs != null) { rs.close(); } } } return modified; } protected void createDischargeTable( SyncContext context, DischargeTable aftDT, int flysGaugeId ) throws SQLException { aftDT.persistFlysTimeInterval(context); int flysId = aftDT.persistFlysDischargeTable(context, flysGaugeId); aftDT.loadAftValues(context); aftDT.storeFlysValues(context, flysId); } protected void createDischargeTables( SyncContext context, DIPSGauge gauge ) throws SQLException { log.info("FLYS: Create discharge tables for '" + gauge.getAftName() + "'."); // Load the discharge tables from AFT. List<DischargeTable> dts = loadAftDischargeTables( context, gauge); // Persist the time intervals. persistFlysTimeIntervals(context, dts); // Persist the discharge tables int [] flysDTIds = persistFlysDischargeTables( context, dts, gauge.getFlysId()); // Copy over the W/Q values copyWQsFromAftToFlys(context, dts, flysDTIds); } protected List<DischargeTable> loadAftDischargeTables( SyncContext context, DIPSGauge gauge ) throws SQLException { return DischargeTable.loadAftDischargeTables( context, gauge.getOfficialNumber(), gauge.getFlysId()); } protected void persistFlysTimeIntervals( SyncContext context, List<DischargeTable> dts ) throws SQLException { for (DischargeTable dt: dts) { dt.persistFlysTimeInterval(context); } } protected int [] persistFlysDischargeTables( SyncContext context, List<DischargeTable> dts, int flysGaugeId ) throws SQLException { int [] flysDTIds = new int[dts.size()]; for (int i = 0; i < flysDTIds.length; ++i) { flysDTIds[i] = dts.get(i) .persistFlysDischargeTable(context, flysGaugeId); } return flysDTIds; } protected void copyWQsFromAftToFlys( SyncContext context, List<DischargeTable> dts, int [] flysDTIds ) throws SQLException { for (int i = 0; i < flysDTIds.length; ++i) { DischargeTable dt = dts.get(i); dt.loadAftValues(context); dt.storeFlysValues(context, flysDTIds[i]); dt.clearValues(); // To save memory. } } public String toString() { return "[River: name=" + name + ", " + super.toString() + "]"; } } // vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :