sascha@4075: package de.intevation.aft;
sascha@4075: 
sascha@4085: import java.util.List;
sascha@4085: import java.util.ArrayList;
sascha@4085: import java.util.HashMap;
sascha@4078: import java.util.Map;
sascha@4078: 
sascha@4076: import java.sql.ResultSet;
sascha@4075: import java.sql.SQLException;
sascha@4075: 
sascha@4075: import org.apache.log4j.Logger;
sascha@4075: 
sascha@4075: import de.intevation.db.ConnectedStatements;
sascha@4087: import de.intevation.db.SymbolicStatement;
sascha@4075: 
sascha@4075: public class River
sascha@4075: extends      IdPair
sascha@4075: {
sascha@4075:     private static Logger log = Logger.getLogger(River.class);
sascha@4075: 
sascha@4075:     protected String name;
sascha@4075: 
sascha@4075:     public River() {
sascha@4075:     }
sascha@4075: 
sascha@4075:     public River(int id1, int id2, String name) {
sascha@4075:         super(id1, id2);
sascha@4075:         this.name = name;
sascha@4075:     }
sascha@4075: 
sascha@4075:     public String getName() {
sascha@4075:         return name;
sascha@4075:     }
sascha@4075: 
sascha@4078: 
sascha@4094:     public boolean sync(SyncContext context) throws SQLException {
sascha@4075:         log.info("sync river: " + this);
sascha@4077: 
sascha@4084:         Map<Long, DIPSGauge> dipsGauges = context.getDIPSGauges();
sascha@4081: 
sascha@4077:         ConnectedStatements flysStatements = context.getFlysStatements();
sascha@4077:         ConnectedStatements aftStatements  = context.getAftStatements();
sascha@4077: 
sascha@4076:         ResultSet messstellenRs = aftStatements
sascha@4076:             .getStatement("select.messstelle")
sascha@4076:             .clearParameters()
sascha@4076:             .setInt("GEWAESSER_NR", id2).executeQuery();
sascha@4076: 
sascha@4085:         String riverName = getName();
sascha@4085: 
sascha@4085:         Map<Long, DIPSGauge> aftDIPSGauges = new HashMap<Long, DIPSGauge>();
sascha@4085: 
sascha@4076:         while (messstellenRs.next()) {
sascha@4076:             String name = messstellenRs.getString("NAME");
sascha@4076:             String num  = messstellenRs.getString("MESSSTELLE_NR");
sascha@4084:             Long number = SyncContext.numberToLong(num);
sascha@4081:             if (number == null) {
sascha@4110:                 log.warn("AFT: Invalid MESSSTELLE_NR for MESSSTELLE '"+name+"'");
sascha@4081:                 continue;
sascha@4081:             }
sascha@4081:             DIPSGauge dipsGauge = dipsGauges.get(number);
sascha@4085:             if (dipsGauge == null) {
sascha@4085:                 log.warn(
sascha@4110:                     "DIPS: MESSSTELLE '" + name + "' not found in DIPS. " +
sascha@4085:                     "Gauge number used for lookup: " + number);
sascha@4081:                 continue;
sascha@4081:             }
sascha@4085:             String gaugeRiver = dipsGauge.getRiverName();
sascha@4085:             if (!gaugeRiver.equalsIgnoreCase(riverName)) {
sascha@4085:                 log.warn(
sascha@4110:                     "DIPS: MESSSTELLE '" + name + 
sascha@4085:                     "' is assigned to river '" + gaugeRiver + 
sascha@4085:                     "'. Needs to be on '" + riverName + "'.");
sascha@4085:                 continue;
sascha@4085:             }
sascha@4085:             dipsGauge.setAftName(name);
sascha@4097:             dipsGauge.setOfficialNumber(number);
sascha@4085:             aftDIPSGauges.put(number, dipsGauge);
sascha@4076:         }
sascha@4076: 
sascha@4076:         messstellenRs.close();
sascha@4076: 
sascha@4085:         List<DIPSGauge> updateGauges = new ArrayList<DIPSGauge>();
sascha@4085: 
sascha@4076:         ResultSet gaugesRs = flysStatements
sascha@4076:             .getStatement("select.gauges")
sascha@4076:             .clearParameters()
sascha@4076:             .setInt("river_id", id1).executeQuery();
sascha@4076: 
sascha@4076:         while (gaugesRs.next()) {
sascha@4076:             int gaugeId = gaugesRs.getInt("id");
sascha@4076:             String name = gaugesRs.getString("name");
sascha@4085:             long   number = gaugesRs.getLong("official_number");
sascha@4085:             if (gaugesRs.wasNull()) {
sascha@4085:                 log.warn("FLYS: Gauge '" + name + 
sascha@4085:                     "' has no official number. Ignored.");
sascha@4085:                 continue;
sascha@4085:             }
sascha@4085:             Long key = Long.valueOf(number);
sascha@4085:             DIPSGauge aftDIPSGauge = aftDIPSGauges.remove(key);
sascha@4085:             if (aftDIPSGauge == null) {
sascha@4085:                 log.warn("FLYS: Gauge '" + name + "' number " + number +
sascha@4085:                     " is not found in AFT/DIPS.");
sascha@4085:                 continue;
sascha@4085:             }
sascha@4085:             aftDIPSGauge.setFlysId(gaugeId);
sascha@4085:             log.info("Gauge '" + name +
sascha@4085:                 "' found in FLYS, AFT and DIPS. -> Update");
sascha@4085:             updateGauges.add(aftDIPSGauge);
sascha@4076:         }
sascha@4094:         gaugesRs.close();
sascha@4076: 
sascha@4094:         boolean modified = createGauges(context, aftDIPSGauges);
sascha@4087: 
sascha@4097:         modified |= updateGauges(context, updateGauges);
sascha@4097: 
sascha@4097:         return modified;
sascha@4097:     }
sascha@4097: 
sascha@4097:     protected boolean updateGauges(
sascha@4097:         SyncContext     context,
sascha@4097:         List<DIPSGauge> gauges
sascha@4097:     )
sascha@4097:     throws SQLException
sascha@4097:     {
sascha@4097:         boolean modified = false;
sascha@4097: 
sascha@4097:         for (DIPSGauge gauge: gauges) {
sascha@4097:             modified |= updateGauge(context, gauge);
sascha@4097:         }
sascha@4097: 
sascha@4097:         return modified;
sascha@4097:     }
sascha@4097: 
sascha@4097:     protected boolean updateGauge(
sascha@4097:         SyncContext context,
sascha@4097:         DIPSGauge   gauge
sascha@4097:     )
sascha@4097:     throws SQLException
sascha@4097:     {
sascha@4102:         log.info("FLYS: Updating gauge '" + gauge.getAftName() + "'.");
sascha@4097:         // We need to load all discharge tables from both database
sascha@4097:         // of the gauge and do some pairing based on their descriptions.
sascha@4097: 
sascha@4097:         boolean modified = false;
sascha@4097: 
sascha@4104:         ConnectedStatements flysStatements = context.getFlysStatements();
sascha@4098: 
sascha@4104:         flysStatements.beginTransaction();
sascha@4104:         try {
sascha@4104:             List<DischargeTable> flysDTs =
sascha@4104:                 DischargeTable.loadFlysDischargeTables(
sascha@4104:                     context, gauge.getFlysId());
sascha@4104: 
sascha@4104:             List<DischargeTable> aftDTs =
sascha@4104:                 DischargeTable.loadAftDischargeTables(
sascha@4104:                     context, gauge.getOfficialNumber());
sascha@4104: 
sascha@4104:             Map<String, DischargeTable> desc2FlysDT =
sascha@4104:                 new HashMap<String, DischargeTable>();
sascha@4104: 
sascha@4104:             for (DischargeTable dt: flysDTs) {
sascha@4104:                 String description = dt.getDescription();
sascha@4104:                 if (description == null) {
sascha@4104:                     log.warn("FLYS: discharge table " + dt.getId() 
sascha@4104:                         + " has no description. Ignored.");
sascha@4104:                     continue;
sascha@4104:                 }
sascha@4104:                 desc2FlysDT.put(description, dt);
sascha@4098:             }
sascha@4104: 
sascha@4104:             List<DischargeTable> createDTs = new ArrayList<DischargeTable>();
sascha@4104: 
sascha@4104:             for (DischargeTable aftDT: aftDTs) {
sascha@4104:                 String description = aftDT.getDescription();
sascha@4104:                 DischargeTable flysDT = desc2FlysDT.remove(description);
sascha@4104:                 if (flysDT != null) {
sascha@4104:                     // Found in AFT and FLYS.
sascha@4104:                     log.info("FLYS: Discharge table '" + description
sascha@4104:                         + "' found in AFT and FLYS. -> update");
sascha@4104:                     // Create the W/Q diff.
sascha@4104:                     modified |= writeWQChanges(context, flysDT, aftDT);
sascha@4104:                 }
sascha@4104:                 else {
sascha@4104:                     log.info("FLYS: Discharge table '" + description
sascha@4104:                         + "' not found in FLYS. -> create");
sascha@4104:                     createDTs.add(aftDT);
sascha@4104:                 }
sascha@4104:             }
sascha@4104: 
sascha@4104:             for (String description: desc2FlysDT.keySet()) {
sascha@4098:                 log.info("FLYS: Discharge table '" + description
sascha@4104:                     + "' found in FLYS but not in AFT. -> ignore");
sascha@4098:             }
sascha@4098: 
sascha@4104:             log.info("FLYS: Copy " + createDTs.size() +
sascha@4104:                 " discharge tables over from AFT.");
sascha@4098: 
sascha@4104:             // Create the new discharge tables.
sascha@4104:             for (DischargeTable aftDT: createDTs) {
sascha@4104:                 createDischargeTable(context, aftDT, gauge.getFlysId());
sascha@4104:                 modified = true;
sascha@4104:             }
sascha@4102: 
sascha@4104:             flysStatements.commitTransaction();
sascha@4104:         }
sascha@4104:         catch (SQLException sqle) {
sascha@4104:             flysStatements.rollbackTransaction();
sascha@4104:             log.error(sqle, sqle);
sascha@4104:             modified = false;
sascha@4100:         }
sascha@4097: 
sascha@4094:         return modified;
sascha@4087:     }
sascha@4079: 
sascha@4099:     protected boolean writeWQChanges(
sascha@4099:         SyncContext    context,
sascha@4099:         DischargeTable flysDT,
sascha@4099:         DischargeTable aftDT
sascha@4099:     )
sascha@4099:     throws SQLException
sascha@4099:     {
sascha@4099:         flysDT.loadFlysValues(context);
sascha@4099:         aftDT.loadAftValues(context);
sascha@4099:         WQDiff diff = new WQDiff(flysDT.getValues(), aftDT.getValues());
sascha@4099:         if (diff.hasChanges()) {
sascha@4099:             diff.writeChanges(context, flysDT.getId());
sascha@4099:             return true;
sascha@4099:         }
sascha@4099:         return false;
sascha@4099:     }
sascha@4099: 
sascha@4094:     protected boolean createGauges(
sascha@4087:         SyncContext          context,
sascha@4087:         Map<Long, DIPSGauge> gauges
sascha@4087:     )
sascha@4087:     throws SQLException
sascha@4087:     {
sascha@4087:         ConnectedStatements flysStatements = context.getFlysStatements();
sascha@4087: 
sascha@4087:         SymbolicStatement.Instance nextId =
sascha@4087:             flysStatements.getStatement("next.gauge.id");
sascha@4087: 
sascha@4087:         SymbolicStatement.Instance insertStmnt =
sascha@4087:             flysStatements.getStatement("insert.gauge");
sascha@4087: 
sascha@4094:         boolean modified = false;
sascha@4094: 
sascha@4087:         for (Map.Entry<Long, DIPSGauge> entry: gauges.entrySet()) {
sascha@4087:             Long      officialNumber = entry.getKey();
sascha@4087:             DIPSGauge gauge          = entry.getValue();
sascha@4087: 
sascha@4085:             log.info("Gauge '" + gauge.getAftName() +
sascha@4085:                 "' not in FLYS but in AFT/DIPS. -> Create");
sascha@4087: 
sascha@4087:             if (!gauge.hasDatums()) {
sascha@4110:                 log.warn("DIPS: Gauge '" + 
sascha@4087:                     gauge.getAftName() + "' has no datum. Ignored.");
sascha@4087:                 continue;
sascha@4087:             }
sascha@4087: 
sascha@4087:             ResultSet rs = null;
sascha@4087:             flysStatements.beginTransaction();
sascha@4087:             try {
sascha@4087:                 (rs = nextId.executeQuery()).next();
sascha@4087:                 int gaugeId = rs.getInt("gauge_id");
sascha@4087:                 rs.close(); rs = null;
sascha@4087: 
sascha@4087:                 insertStmnt
sascha@4087:                     .clearParameters()
sascha@4087:                     .setInt("id", gaugeId)
sascha@4087:                     .setString("name", gauge.getAftName())
sascha@4087:                     .setInt("river_id", id1)
sascha@4087:                     .setDouble("station", gauge.getStation())
sascha@4087:                     .setDouble("aeo", gauge.getAeo())
sascha@4100:                     .setLong("official_number", officialNumber)
sascha@4087:                     .setDouble("datum", gauge.getLatestDatum().getValue());
sascha@4087: 
sascha@4087:                 insertStmnt.execute();
sascha@4087: 
sascha@4087:                 log.info("FLYS: Created gauge '" + gauge.getAftName() + 
sascha@4087:                     "' with id " + gaugeId + ".");
sascha@4087: 
sascha@4088:                 gauge.setFlysId(gaugeId);
sascha@4100:                 createDischargeTables(context, gauge);
sascha@4087:                 flysStatements.commitTransaction();
sascha@4094:                 modified = true;
sascha@4087:             }
sascha@4087:             catch (SQLException sqle) {
sascha@4087:                 flysStatements.rollbackTransaction();
sascha@4094:                 log.error(sqle, sqle);
sascha@4087:             }
sascha@4087:             finally {
sascha@4087:                 if (rs != null) {
sascha@4087:                     rs.close();
sascha@4087:                 }
sascha@4087:             }
sascha@4085:         }
sascha@4094: 
sascha@4094:         return modified;
sascha@4075:     }
sascha@4075: 
sascha@4102:     protected void createDischargeTable(
sascha@4102:         SyncContext    context,
sascha@4102:         DischargeTable aftDT,
sascha@4102:         int            flysGaugeId
sascha@4102:     )
sascha@4102:     throws SQLException
sascha@4102:     {
sascha@4102:         aftDT.persistFlysTimeInterval(context);
sascha@4102:         int flysId = aftDT.persistFlysDischargeTable(context, flysGaugeId);
sascha@4102: 
sascha@4102:         aftDT.loadAftValues(context);
sascha@4102:         aftDT.storeFlysValues(context, flysId);
sascha@4102:     }
sascha@4102: 
sascha@4088:     protected void createDischargeTables(
sascha@4088:         SyncContext context,
sascha@4088:         DIPSGauge   gauge
sascha@4088:     )
sascha@4088:     throws SQLException
sascha@4088:     {
sascha@4102:         log.info("FLYS: Create discharge tables for '" +
sascha@4102:             gauge.getAftName() + "'.");
sascha@4088: 
sascha@4093:         // Load the discharge tables from AFT.
sascha@4093:         List<DischargeTable> dts = loadAftDischargeTables(
sascha@4100:             context, gauge);
sascha@4088: 
sascha@4093:         // Persist the time intervals.
sascha@4093:         persistFlysTimeIntervals(context, dts);
sascha@4093: 
sascha@4093:         // Persist the discharge tables
sascha@4102:         int [] flysDTIds = persistFlysDischargeTables(
sascha@4102:             context, dts, gauge.getFlysId());
sascha@4093: 
sascha@4093:         // Copy over the W/Q values
sascha@4093:         copyWQsFromAftToFlys(context, dts, flysDTIds);
sascha@4093:     }
sascha@4093: 
sascha@4093:     protected List<DischargeTable> loadAftDischargeTables(
sascha@4093:         SyncContext context,
sascha@4093:         DIPSGauge   gauge
sascha@4093:     )
sascha@4093:     throws SQLException
sascha@4093:     {
sascha@4097:         return DischargeTable.loadAftDischargeTables(
sascha@4100:             context, gauge.getOfficialNumber(), gauge.getFlysId());
sascha@4093:     }
sascha@4093: 
sascha@4093:     protected void persistFlysTimeIntervals(
sascha@4093:         SyncContext          context,
sascha@4093:         List<DischargeTable> dts
sascha@4093:     )
sascha@4093:     throws SQLException
sascha@4093:     {
sascha@4090:         for (DischargeTable dt: dts) {
sascha@4102:             dt.persistFlysTimeInterval(context);
sascha@4090:         }
sascha@4093:     }
sascha@4090: 
sascha@4093:     protected int [] persistFlysDischargeTables(
sascha@4093:         SyncContext          context,
sascha@4102:         List<DischargeTable> dts,
sascha@4102:         int                  flysGaugeId
sascha@4093:     )
sascha@4093:     throws SQLException
sascha@4093:     {
sascha@4093:         int [] flysDTIds = new int[dts.size()];
sascha@4093: 
sascha@4102:         for (int i = 0; i < flysDTIds.length; ++i) {
sascha@4102:             flysDTIds[i] = dts.get(i)
sascha@4102:                 .persistFlysDischargeTable(context, flysGaugeId);
sascha@4088:         }
sascha@4093: 
sascha@4093:         return flysDTIds;
sascha@4093:     }
sascha@4093: 
sascha@4093:     protected void copyWQsFromAftToFlys(
sascha@4093:         SyncContext          context,
sascha@4093:         List<DischargeTable> dts,
sascha@4093:         int []               flysDTIds
sascha@4093:     )
sascha@4093:     throws SQLException
sascha@4093:     {
sascha@4093:         for (int i = 0; i < flysDTIds.length; ++i) {
sascha@4093:             DischargeTable dt = dts.get(i);
sascha@4093:             dt.loadAftValues(context);
sascha@4093:             dt.storeFlysValues(context, flysDTIds[i]);
sascha@4093:             dt.clearValues(); // To save memory.
sascha@4093:         }
sascha@4088:     }
sascha@4088: 
sascha@4075:     public String toString() {
sascha@4075:         return "[River: name=" + name + ", " + super.toString() + "]";
sascha@4075:     }
sascha@4075: }
sascha@4075: // vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :