diff etl/src/main/java/org/dive4elements/river/etl/aft/River.java @ 5838:5aa05a7a34b7

Rename modules to more fitting names.
author Sascha L. Teichmann <teichmann@intevation.de>
date Thu, 25 Apr 2013 15:23:37 +0200
parents flys-aft/src/main/java/org/dive4elements/river/etl/aft/River.java@9438e9259213
children 8bd9b551456c
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/etl/src/main/java/org/dive4elements/river/etl/aft/River.java	Thu Apr 25 15:23:37 2013 +0200
@@ -0,0 +1,525 @@
+package org.dive4elements.river.etl.aft;
+
+import org.dive4elements.river.etl.db.ConnectedStatements;
+import org.dive4elements.river.etl.db.SymbolicStatement;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class River
+extends      IdPair
+{
+    private static Logger log = Logger.getLogger(River.class);
+
+    protected String name;
+
+    protected double from;
+    protected double to;
+
+    public River() {
+    }
+
+    public River(int id1, String name, double from, double to) {
+        super(id1);
+        this.name = name;
+        this.from = from;
+        this.to   = to;
+    }
+
+    public River(int id1, int id2, String name) {
+        super(id1, id2);
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public double getFrom() {
+        return from;
+    }
+
+    public void setFrom(double from) {
+        this.from = from;
+    }
+
+    public double getTo() {
+        return to;
+    }
+
+    public void setTo(double to) {
+        this.to = to;
+    }
+
+    public boolean inside(double x) {
+        return x >= from && x <= to;
+    }
+
+    public boolean sync(SyncContext context) throws SQLException {
+        log.info("sync river: " + this);
+
+        // Only take relevant gauges into account.
+        Map<Long, DIPSGauge> dipsGauges = context.getDIPSGauges(name, from, to);
+
+        ConnectedStatements flysStatements = context.getFlysStatements();
+        ConnectedStatements aftStatements  = context.getAftStatements();
+
+        String riverName = getName();
+
+        Map<Long, DIPSGauge> aftDIPSGauges = new HashMap<Long, DIPSGauge>();
+
+        ResultSet messstellenRs = aftStatements
+            .getStatement("select.messstelle")
+            .clearParameters()
+            .setInt("GEWAESSER_NR", id2)
+            .executeQuery();
+
+        try {
+            while (messstellenRs.next()) {
+                String name = messstellenRs.getString("NAME");
+                String num  = messstellenRs.getString("MESSSTELLE_NR");
+                double station = messstellenRs.getDouble("STATIONIERUNG");
+
+                if (!messstellenRs.wasNull() && !inside(station)) {
+                    log.warn("Station found in AFT but in not range: " + station);
+                    continue;
+                }
+
+                Long number = SyncContext.numberToLong(num);
+                if (number == null) {
+                    log.warn("AFT: Invalid MESSSTELLE_NR for MESSSTELLE '"+name+"'");
+                    continue;
+                }
+                DIPSGauge dipsGauge = dipsGauges.get(number);
+                if (dipsGauge == null) {
+                    log.warn(
+                        "DIPS: MESSSTELLE '" + name + "' not found in DIPS. " +
+                        "Gauge number used for lookup: " + number);
+                    continue;
+                }
+                String gaugeRiver = dipsGauge.getRiverName();
+                if (!gaugeRiver.equalsIgnoreCase(riverName)) {
+                    log.warn(
+                        "DIPS: MESSSTELLE '" + name +
+                        "' is assigned to river '" + gaugeRiver +
+                        "'. Needs to be on '" + riverName + "'.");
+                    continue;
+                }
+                dipsGauge.setAftName(name);
+                dipsGauge.setOfficialNumber(number);
+                aftDIPSGauges.put(number, dipsGauge);
+            }
+        }
+        finally {
+            messstellenRs.close();
+        }
+
+        List<DIPSGauge> updateGauges = new ArrayList<DIPSGauge>();
+
+        ResultSet gaugesRs = flysStatements
+            .getStatement("select.gauges")
+            .clearParameters()
+            .setInt("river_id", id1).executeQuery();
+
+        try {
+            while (gaugesRs.next()) {
+                int gaugeId = gaugesRs.getInt("id");
+                String name = gaugesRs.getString("name");
+                long   number = gaugesRs.getLong("official_number");
+                if (gaugesRs.wasNull()) {
+                    log.warn("FLYS: Gauge '" + name +
+                        "' has no official number. Ignored.");
+                    continue;
+                }
+                Long key = Long.valueOf(number);
+                DIPSGauge aftDIPSGauge = aftDIPSGauges.remove(key);
+                if (aftDIPSGauge == null) {
+                    log.warn("FLYS: Gauge '" + name + "' number " + number +
+                        " is not found in AFT/DIPS.");
+                    continue;
+                }
+                aftDIPSGauge.setFlysId(gaugeId);
+                log.info("Gauge '" + name +
+                    "' found in FLYS, AFT and DIPS. -> Update");
+                updateGauges.add(aftDIPSGauge);
+            }
+        }
+        finally {
+            gaugesRs.close();
+        }
+
+        boolean modified = createGauges(context, aftDIPSGauges);
+
+        modified |= updateGauges(context, updateGauges);
+
+        return modified;
+    }
+
+    protected boolean updateGauges(
+        SyncContext     context,
+        List<DIPSGauge> gauges
+    )
+    throws SQLException
+    {
+        boolean modified = false;
+
+        for (DIPSGauge gauge: gauges) {
+            // XXX: Do dont modify the master AT.
+            // modified |= updateBfGIdOnMasterDischargeTable(context, gauge);
+            modified |= updateGauge(context, gauge);
+        }
+
+        return modified;
+    }
+
+    protected boolean updateBfGIdOnMasterDischargeTable(
+        SyncContext context,
+        DIPSGauge   gauge
+    ) throws SQLException {
+        log.info(
+            "FLYS: Updating master discharge table bfg_id for '" +
+            gauge.getAftName() + "'");
+        ConnectedStatements flysStatements = context.getFlysStatements();
+
+        ResultSet rs = flysStatements
+            .getStatement("select.gauge.master.discharge.table")
+            .clearParameters()
+            .setInt("gauge_id", gauge.getFlysId())
+            .executeQuery();
+
+        int flysId;
+
+        try {
+            if (!rs.next()) {
+                log.error(
+                    "FLYS: No master discharge table found for gauge '" +
+                    gauge.getAftName() + "'");
+                return false;
+            }
+            String bfgId = rs.getString("bfg_id");
+            if (!rs.wasNull()) { // already has BFG_ID
+                return false;
+            }
+            flysId = rs.getInt("id");
+        } finally {
+            rs.close();
+        }
+
+        // We need to find out the BFG_ID of the current discharge table
+        // for this gauge in AFT.
+
+        ConnectedStatements aftStatements = context.getAftStatements();
+
+        rs = aftStatements
+            .getStatement("select.bfg.id.current")
+            .clearParameters()
+            .setString("number", "%" + gauge.getOfficialNumber())
+            .executeQuery();
+
+        String bfgId = null;
+
+        try {
+            if (rs.next()) {
+                bfgId = rs.getString("BFG_ID");
+            }
+        } finally {
+            rs.close();
+        }
+
+        if (bfgId == null) {
+            log.warn(
+                "No BFG_ID found for current discharge table of gauge '" +
+                gauge + "'");
+            return false;
+        }
+
+        // Set the BFG_ID in FLYS.
+        flysStatements.beginTransaction();
+        try {
+            flysStatements
+                .getStatement("update.bfg.id.discharge.table")
+                .clearParameters()
+                .setInt("id", flysId)
+                .setString("bfg_id", bfgId)
+                .executeUpdate();
+            flysStatements.commitTransaction();
+        } catch (SQLException sqle) {
+            flysStatements.rollbackTransaction();
+            log.error(sqle, sqle);
+            return false;
+        }
+
+        return true;
+    }
+
+    protected boolean updateGauge(
+        SyncContext context,
+        DIPSGauge   gauge
+    )
+    throws SQLException
+    {
+        log.info("FLYS: Updating gauge '" + gauge.getAftName() + "'.");
+        // We need to load all discharge tables from both databases
+        // of the gauge and do some pairing based on their bfg_id.
+
+        boolean modified = false;
+
+        ConnectedStatements flysStatements = context.getFlysStatements();
+
+        flysStatements.beginTransaction();
+        try {
+            List<DischargeTable> flysDTs =
+                DischargeTable.loadFlysDischargeTables(
+                    context, gauge.getFlysId());
+
+            List<DischargeTable> aftDTs =
+                DischargeTable.loadAftDischargeTables(
+                    context, gauge.getOfficialNumber());
+
+            Map<String, DischargeTable> bfgId2FlysDT =
+                new HashMap<String, DischargeTable>();
+
+            for (DischargeTable dt: flysDTs) {
+                String bfgId = dt.getBfgId();
+                if (bfgId == null) {
+                    log.warn("FLYS: discharge table " + dt.getId()
+                        + " has no bfg_id. Ignored.");
+                    continue;
+                }
+                bfgId2FlysDT.put(bfgId, dt);
+            }
+
+            List<DischargeTable> createDTs = new ArrayList<DischargeTable>();
+
+            for (DischargeTable aftDT: aftDTs) {
+                String bfgId = aftDT.getBfgId();
+                DischargeTable flysDT = bfgId2FlysDT.remove(bfgId);
+                if (flysDT != null) {
+                    // Found in AFT and FLYS.
+                    log.info("FLYS: Discharge table '" + bfgId
+                        + "' found in AFT and FLYS. -> update");
+                    // Create the W/Q diff.
+                    modified |= writeWQChanges(context, flysDT, aftDT);
+                }
+                else {
+                    log.info("FLYS: Discharge table '" + bfgId
+                        + "' not found in FLYS. -> create");
+                    createDTs.add(aftDT);
+                }
+            }
+
+            for (String bfgId: bfgId2FlysDT.keySet()) {
+                log.info("FLYS: Discharge table '" + bfgId
+                    + "' found in FLYS but not in AFT. -> ignore");
+            }
+
+            log.info("FLYS: Copy " + createDTs.size() +
+                " discharge tables over from AFT.");
+
+            // Create the new discharge tables.
+            for (DischargeTable aftDT: createDTs) {
+                createDischargeTable(context, aftDT, gauge.getFlysId());
+                modified = true;
+            }
+
+            flysStatements.commitTransaction();
+        }
+        catch (SQLException sqle) {
+            flysStatements.rollbackTransaction();
+            log.error(sqle, sqle);
+            modified = false;
+        }
+
+        return modified;
+    }
+
+    protected boolean writeWQChanges(
+        SyncContext    context,
+        DischargeTable flysDT,
+        DischargeTable aftDT
+    )
+    throws SQLException
+    {
+        flysDT.loadFlysValues(context);
+        aftDT.loadAftValues(context);
+        WQDiff diff = new WQDiff(flysDT.getValues(), aftDT.getValues());
+        if (diff.hasChanges()) {
+            diff.writeChanges(context, flysDT.getId());
+            return true;
+        }
+        return false;
+    }
+
+    protected boolean createGauges(
+        SyncContext          context,
+        Map<Long, DIPSGauge> gauges
+    )
+    throws SQLException
+    {
+        ConnectedStatements flysStatements = context.getFlysStatements();
+
+        SymbolicStatement.Instance nextId =
+            flysStatements.getStatement("next.gauge.id");
+
+        SymbolicStatement.Instance insertStmnt =
+            flysStatements.getStatement("insert.gauge");
+
+        boolean modified = false;
+
+        for (Map.Entry<Long, DIPSGauge> entry: gauges.entrySet()) {
+            Long      officialNumber = entry.getKey();
+            DIPSGauge gauge          = entry.getValue();
+
+            log.info("Gauge '" + gauge.getAftName() +
+                "' not in FLYS but in AFT/DIPS. -> Create");
+
+            if (!gauge.hasDatums()) {
+                log.warn("DIPS: Gauge '" +
+                    gauge.getAftName() + "' has no datum. Ignored.");
+                continue;
+            }
+
+            ResultSet rs = null;
+            flysStatements.beginTransaction();
+            try {
+                (rs = nextId.executeQuery()).next();
+                int gaugeId = rs.getInt("gauge_id");
+                rs.close(); rs = null;
+
+                insertStmnt
+                    .clearParameters()
+                    .setInt("id", gaugeId)
+                    .setString("name", gauge.getAftName())
+                    .setInt("river_id", id1)
+                    .setDouble("station", gauge.getStation())
+                    .setDouble("aeo", gauge.getAeo())
+                    .setLong("official_number", officialNumber)
+                    .setDouble("datum", gauge.getLatestDatum().getValue());
+
+                insertStmnt.execute();
+
+                log.info("FLYS: Created gauge '" + gauge.getAftName() +
+                    "' with id " + gaugeId + ".");
+
+                gauge.setFlysId(gaugeId);
+                createDischargeTables(context, gauge);
+                flysStatements.commitTransaction();
+                modified = true;
+            }
+            catch (SQLException sqle) {
+                flysStatements.rollbackTransaction();
+                log.error(sqle, sqle);
+            }
+            finally {
+                if (rs != null) {
+                    rs.close();
+                }
+            }
+        }
+
+        return modified;
+    }
+
+    protected void createDischargeTable(
+        SyncContext    context,
+        DischargeTable aftDT,
+        int            flysGaugeId
+    )
+    throws SQLException
+    {
+        aftDT.persistFlysTimeInterval(context);
+        int flysId = aftDT.persistFlysDischargeTable(context, flysGaugeId);
+
+        aftDT.loadAftValues(context);
+        aftDT.storeFlysValues(context, flysId);
+    }
+
+    protected void createDischargeTables(
+        SyncContext context,
+        DIPSGauge   gauge
+    )
+    throws SQLException
+    {
+        log.info("FLYS: Create discharge tables for '" +
+            gauge.getAftName() + "'.");
+
+        // Load the discharge tables from AFT.
+        List<DischargeTable> dts = loadAftDischargeTables(
+            context, gauge);
+
+        // Persist the time intervals.
+        persistFlysTimeIntervals(context, dts);
+
+        // Persist the discharge tables
+        int [] flysDTIds = persistFlysDischargeTables(
+            context, dts, gauge.getFlysId());
+
+        // Copy over the W/Q values
+        copyWQsFromAftToFlys(context, dts, flysDTIds);
+    }
+
+    protected List<DischargeTable> loadAftDischargeTables(
+        SyncContext context,
+        DIPSGauge   gauge
+    )
+    throws SQLException
+    {
+        return DischargeTable.loadAftDischargeTables(
+            context, gauge.getOfficialNumber(), gauge.getFlysId());
+    }
+
+    protected void persistFlysTimeIntervals(
+        SyncContext          context,
+        List<DischargeTable> dts
+    )
+    throws SQLException
+    {
+        for (DischargeTable dt: dts) {
+            dt.persistFlysTimeInterval(context);
+        }
+    }
+
+    protected int [] persistFlysDischargeTables(
+        SyncContext          context,
+        List<DischargeTable> dts,
+        int                  flysGaugeId
+    )
+    throws SQLException
+    {
+        int [] flysDTIds = new int[dts.size()];
+
+        for (int i = 0; i < flysDTIds.length; ++i) {
+            flysDTIds[i] = dts.get(i)
+                .persistFlysDischargeTable(context, flysGaugeId);
+        }
+
+        return flysDTIds;
+    }
+
+    protected void copyWQsFromAftToFlys(
+        SyncContext          context,
+        List<DischargeTable> dts,
+        int []               flysDTIds
+    )
+    throws SQLException
+    {
+        for (int i = 0; i < flysDTIds.length; ++i) {
+            DischargeTable dt = dts.get(i);
+            dt.loadAftValues(context);
+            dt.storeFlysValues(context, flysDTIds[i]);
+            dt.clearValues(); // To save memory.
+        }
+    }
+
+    public String toString() {
+        return "[River: name=" + name + ", " + super.toString() + "]";
+    }
+}
+// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :

http://dive4elements.wald.intevation.org