view flys-aft/src/main/java/de/intevation/aft/River.java @ 4099:006e99437fb9

Store the W/Q differences o existing discharge tables to FLYS database. flys-aft/trunk@3621 c6561f87-3c4e-4783-a992-168aeb5c3f6f
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Sun, 08 Jan 2012 09:25:36 +0000
parents 9215253ad0be
children 981de0b77c6b
line wrap: on
line source
package de.intevation.aft;

import java.util.List;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;

import org.apache.log4j.Logger;

import de.intevation.db.ConnectedStatements;
import de.intevation.db.SymbolicStatement;

public class River
extends      IdPair
{
    private static Logger log = Logger.getLogger(River.class);

    protected String name;

    public River() {
    }

    public River(int id1, int id2, String name) {
        super(id1, id2);
        this.name = name;
    }

    public String getName() {
        return name;
    }


    public boolean sync(SyncContext context) throws SQLException {
        log.info("sync river: " + this);

        Map<Long, DIPSGauge> dipsGauges = context.getDIPSGauges();

        ConnectedStatements flysStatements = context.getFlysStatements();
        ConnectedStatements aftStatements  = context.getAftStatements();

        ResultSet messstellenRs = aftStatements
            .getStatement("select.messstelle")
            .clearParameters()
            .setInt("GEWAESSER_NR", id2).executeQuery();

        String riverName = getName();

        Map<Long, DIPSGauge> aftDIPSGauges = new HashMap<Long, DIPSGauge>();

        while (messstellenRs.next()) {
            String name = messstellenRs.getString("NAME");
            String num  = messstellenRs.getString("MESSSTELLE_NR");
            Long number = SyncContext.numberToLong(num);
            if (number == null) {
                log.warn("Invalid MESSSTELLE_NR for MESSSTELLE '"+name+"'");
                continue;
            }
            DIPSGauge dipsGauge = dipsGauges.get(number);
            if (dipsGauge == null) {
                log.warn(
                    "MESSSTELLE '" + name + "' not found in DIPS. " +
                    "Gauge number used for lookup: " + number);
                continue;
            }
            String gaugeRiver = dipsGauge.getRiverName();
            if (!gaugeRiver.equalsIgnoreCase(riverName)) {
                log.warn(
                    "MESSSTELLE '" + name + 
                    "' is assigned to river '" + gaugeRiver + 
                    "'. Needs to be on '" + riverName + "'.");
                continue;
            }
            dipsGauge.setAftName(name);
            dipsGauge.setOfficialNumber(number);
            aftDIPSGauges.put(number, dipsGauge);
        }

        messstellenRs.close();

        List<DIPSGauge> updateGauges = new ArrayList<DIPSGauge>();

        ResultSet gaugesRs = flysStatements
            .getStatement("select.gauges")
            .clearParameters()
            .setInt("river_id", id1).executeQuery();

        while (gaugesRs.next()) {
            int gaugeId = gaugesRs.getInt("id");
            String name = gaugesRs.getString("name");
            long   number = gaugesRs.getLong("official_number");
            if (gaugesRs.wasNull()) {
                log.warn("FLYS: Gauge '" + name + 
                    "' has no official number. Ignored.");
                continue;
            }
            Long key = Long.valueOf(number);
            DIPSGauge aftDIPSGauge = aftDIPSGauges.remove(key);
            if (aftDIPSGauge == null) {
                log.warn("FLYS: Gauge '" + name + "' number " + number +
                    " is not found in AFT/DIPS.");
                continue;
            }
            aftDIPSGauge.setFlysId(gaugeId);
            log.info("Gauge '" + name +
                "' found in FLYS, AFT and DIPS. -> Update");
            updateGauges.add(aftDIPSGauge);
        }
        gaugesRs.close();

        boolean modified = createGauges(context, aftDIPSGauges);

        modified |= updateGauges(context, updateGauges);

        return modified;
    }

    protected boolean updateGauges(
        SyncContext     context,
        List<DIPSGauge> gauges
    )
    throws SQLException
    {
        boolean modified = false;

        for (DIPSGauge gauge: gauges) {
            modified |= updateGauge(context, gauge);
        }

        return modified;
    }

    protected boolean updateGauge(
        SyncContext context,
        DIPSGauge   gauge
    )
    throws SQLException
    {
        // We need to load all discharge tables from both database
        // of the gauge and do some pairing based on their descriptions.

        boolean modified = false;

        List<DischargeTable> flysDTs =
            DischargeTable.loadFlysDischargeTables(
                context, gauge.getFlysId());

        List<DischargeTable> aftDTs =
            DischargeTable.loadAftDischargeTables(
                context, gauge.getOfficialNumber());

        Map<String, DischargeTable> desc2FlysDT =
            new HashMap<String, DischargeTable>();

        for (DischargeTable dt: flysDTs) {
            String description = dt.getDescription();
            if (description == null) {
                log.warn("FLYS: discharge table " + dt.getId() 
                    + " has no description. Ignored.");
                continue;
            }
            desc2FlysDT.put(description, dt);
        }

        List<DischargeTable> createDTs = new ArrayList<DischargeTable>();

        for (DischargeTable aftDT: aftDTs) {
            String description = aftDT.getDescription();
            DischargeTable flysDT = desc2FlysDT.remove(description);
            if (flysDT != null) {
                // Found in AFT and FLYS.
                log.info("FLYS: Discharge table '" + description
                    + "' found in AFT and FLYS. -> update");
                // Create the W/Q diff.
                modified |= writeWQChanges(context, flysDT, aftDT);
            }
            else {
                log.info("FLYS: Discharge table '" + description
                    + "' not found in FLYS. -> create");
                createDTs.add(aftDT);
            }
        }

        for (String description: desc2FlysDT.keySet()) {
            log.info("FLYS: Discharge table '" + description
                + "' found in FLYS but not in AFT. -> ignore");
        }

        // TODO: Create the new discharge tables.

        return modified;
    }

    protected boolean writeWQChanges(
        SyncContext    context,
        DischargeTable flysDT,
        DischargeTable aftDT
    )
    throws SQLException
    {
        flysDT.loadFlysValues(context);
        aftDT.loadAftValues(context);
        WQDiff diff = new WQDiff(flysDT.getValues(), aftDT.getValues());
        if (diff.hasChanges()) {
            diff.writeChanges(context, flysDT.getId());
            return true;
        }
        return false;
    }

    protected boolean createGauges(
        SyncContext          context,
        Map<Long, DIPSGauge> gauges
    )
    throws SQLException
    {
        ConnectedStatements flysStatements = context.getFlysStatements();

        SymbolicStatement.Instance nextId =
            flysStatements.getStatement("next.gauge.id");

        SymbolicStatement.Instance insertStmnt =
            flysStatements.getStatement("insert.gauge");

        boolean modified = false;

        for (Map.Entry<Long, DIPSGauge> entry: gauges.entrySet()) {
            Long      officialNumber = entry.getKey();
            DIPSGauge gauge          = entry.getValue();

            log.info("Gauge '" + gauge.getAftName() +
                "' not in FLYS but in AFT/DIPS. -> Create");

            if (!gauge.hasDatums()) {
                log.warn("FLYS: Gauge '" + 
                    gauge.getAftName() + "' has no datum. Ignored.");
                continue;
            }

            ResultSet rs = null;
            flysStatements.beginTransaction();
            try {
                (rs = nextId.executeQuery()).next();
                int gaugeId = rs.getInt("gauge_id");
                rs.close(); rs = null;

                insertStmnt
                    .clearParameters()
                    .setInt("id", gaugeId)
                    .setString("name", gauge.getAftName())
                    .setInt("river_id", id1)
                    .setDouble("station", gauge.getStation())
                    .setDouble("aeo", gauge.getAeo())
                    .setDouble("official_number", officialNumber)
                    .setDouble("datum", gauge.getLatestDatum().getValue());

                insertStmnt.execute();

                log.info("FLYS: Created gauge '" + gauge.getAftName() + 
                    "' with id " + gaugeId + ".");

                gauge.setFlysId(gaugeId);
                createDischargeTables(context, officialNumber, gauge);
                flysStatements.commitTransaction();
                modified = true;
            }
            catch (SQLException sqle) {
                flysStatements.rollbackTransaction();
                log.error(sqle, sqle);
            }
            finally {
                if (rs != null) {
                    rs.close();
                }
            }
        }

        return modified;
    }

    protected void createDischargeTables(
        SyncContext context,
        Long        officialNumber,
        DIPSGauge   gauge
    )
    throws SQLException
    {
        log.info("create discharge tables");

        // Load the discharge tables from AFT.
        List<DischargeTable> dts = loadAftDischargeTables(
            context, officialNumber, gauge);

        // Persist the time intervals.
        persistFlysTimeIntervals(context, dts);

        // Persist the discharge tables
        int [] flysDTIds = persistFlysDischargeTables(context, dts);

        // Copy over the W/Q values
        copyWQsFromAftToFlys(context, dts, flysDTIds);
    }

    protected List<DischargeTable> loadAftDischargeTables(
        SyncContext context,
        Long        officialNumber,
        DIPSGauge   gauge
    )
    throws SQLException
    {
        return DischargeTable.loadAftDischargeTables(
            context, officialNumber, gauge.getFlysId());
    }

    protected void persistFlysTimeIntervals(
        SyncContext          context,
        List<DischargeTable> dts
    )
    throws SQLException
    {
        for (DischargeTable dt: dts) {
            TimeInterval timeInterval = dt.getTimeInterval();
            if (timeInterval != null) {
                dt.setTimeInterval(
                    context.fetchOrCreateFLYSTimeInterval(timeInterval));
            }
        }
    }

    protected int [] persistFlysDischargeTables(
        SyncContext          context,
        List<DischargeTable> dts
    )
    throws SQLException
    {
        boolean debug = log.isDebugEnabled();

        int [] flysDTIds = new int[dts.size()];

        ResultSet rs = null;
        try {
            ConnectedStatements flysStatements =
                context.getFlysStatements();

            SymbolicStatement.Instance nextId =
                flysStatements.getStatement("next.discharge.id");

            SymbolicStatement.Instance insertDT =
                flysStatements.getStatement("insert.dischargetable");

            for (int i = 0; i < flysDTIds.length; ++i) {
                rs = nextId.executeQuery();
                rs.next();
                int id = rs.getInt("discharge_table_id");
                flysDTIds[i] = id;
                rs.close(); rs = null;

                DischargeTable dt = dts.get(i);
                insertDT.clearParameters()
                    .setInt("id", id)
                    .setInt("gauge_id", dt.getGaugeId())
                    .setString("description", dt.getDescription());

                TimeInterval timeInterval = dt.getTimeInterval();
                if (timeInterval != null) {
                    insertDT.setInt("time_interval_id", timeInterval.getId());
                }
                else {
                    insertDT.setNull("time_interval_id", Types.INTEGER);
                }

                insertDT.execute();
                if (debug) {
                    log.debug("FLYS: Created discharge table id: " + id);
                }
            }
        }
        finally {
            if (rs != null) {
                rs.close();
                rs = null;
            }
        }

        return flysDTIds;
    }

    protected void copyWQsFromAftToFlys(
        SyncContext          context,
        List<DischargeTable> dts,
        int []               flysDTIds
    )
    throws SQLException
    {
        for (int i = 0; i < flysDTIds.length; ++i) {
            DischargeTable dt = dts.get(i);
            dt.loadAftValues(context);
            dt.storeFlysValues(context, flysDTIds[i]);
            dt.clearValues(); // To save memory.
        }
    }

    public String toString() {
        return "[River: name=" + name + ", " + super.toString() + "]";
    }

}
// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :

http://dive4elements.wald.intevation.org