view flys-aft/src/main/java/de/intevation/aft/River.java @ 4523:504cd5801785

Added new column 'year' to sediment density values in postgresql and oracle schema.
author Raimund Renkert <rrenkert@intevation.de>
date Wed, 14 Nov 2012 17:19:54 +0100
parents 79bb64f66c74
children b195fede1c3b
line wrap: on
line source
package de.intevation.aft;

import java.util.List;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.log4j.Logger;

import de.intevation.db.ConnectedStatements;
import de.intevation.db.SymbolicStatement;

public class River
extends      IdPair
{
    private static Logger log = Logger.getLogger(River.class);

    protected String name;

    public River() {
    }

    public River(int id1, int id2, String name) {
        super(id1, id2);
        this.name = name;
    }

    public String getName() {
        return name;
    }


    public boolean sync(SyncContext context) throws SQLException {
        log.info("sync river: " + this);

        Map<Long, DIPSGauge> dipsGauges = context.getDIPSGauges();

        ConnectedStatements flysStatements = context.getFlysStatements();
        ConnectedStatements aftStatements  = context.getAftStatements();

        ResultSet messstellenRs = aftStatements
            .getStatement("select.messstelle")
            .clearParameters()
            .setInt("GEWAESSER_NR", id2).executeQuery();

        String riverName = getName();

        Map<Long, DIPSGauge> aftDIPSGauges = new HashMap<Long, DIPSGauge>();

        while (messstellenRs.next()) {
            String name = messstellenRs.getString("NAME");
            String num  = messstellenRs.getString("MESSSTELLE_NR");
            Long number = SyncContext.numberToLong(num);
            if (number == null) {
                log.warn("AFT: Invalid MESSSTELLE_NR for MESSSTELLE '"+name+"'");
                continue;
            }
            DIPSGauge dipsGauge = dipsGauges.get(number);
            if (dipsGauge == null) {
                log.warn(
                    "DIPS: MESSSTELLE '" + name + "' not found in DIPS. " +
                    "Gauge number used for lookup: " + number);
                continue;
            }
            String gaugeRiver = dipsGauge.getRiverName();
            if (!gaugeRiver.equalsIgnoreCase(riverName)) {
                log.warn(
                    "DIPS: MESSSTELLE '" + name + 
                    "' is assigned to river '" + gaugeRiver + 
                    "'. Needs to be on '" + riverName + "'.");
                continue;
            }
            dipsGauge.setAftName(name);
            dipsGauge.setOfficialNumber(number);
            aftDIPSGauges.put(number, dipsGauge);
        }

        messstellenRs.close();

        List<DIPSGauge> updateGauges = new ArrayList<DIPSGauge>();

        ResultSet gaugesRs = flysStatements
            .getStatement("select.gauges")
            .clearParameters()
            .setInt("river_id", id1).executeQuery();

        while (gaugesRs.next()) {
            int gaugeId = gaugesRs.getInt("id");
            String name = gaugesRs.getString("name");
            long   number = gaugesRs.getLong("official_number");
            if (gaugesRs.wasNull()) {
                log.warn("FLYS: Gauge '" + name + 
                    "' has no official number. Ignored.");
                continue;
            }
            Long key = Long.valueOf(number);
            DIPSGauge aftDIPSGauge = aftDIPSGauges.remove(key);
            if (aftDIPSGauge == null) {
                log.warn("FLYS: Gauge '" + name + "' number " + number +
                    " is not found in AFT/DIPS.");
                continue;
            }
            aftDIPSGauge.setFlysId(gaugeId);
            log.info("Gauge '" + name +
                "' found in FLYS, AFT and DIPS. -> Update");
            updateGauges.add(aftDIPSGauge);
        }
        gaugesRs.close();

        boolean modified = createGauges(context, aftDIPSGauges);

        modified |= updateGauges(context, updateGauges);

        return modified;
    }

    protected boolean updateGauges(
        SyncContext     context,
        List<DIPSGauge> gauges
    )
    throws SQLException
    {
        boolean modified = false;

        for (DIPSGauge gauge: gauges) {
            modified |= updateGauge(context, gauge);
        }

        return modified;
    }

    protected boolean updateGauge(
        SyncContext context,
        DIPSGauge   gauge
    )
    throws SQLException
    {
        log.info("FLYS: Updating gauge '" + gauge.getAftName() + "'.");
        // We need to load all discharge tables from both database
        // of the gauge and do some pairing based on their descriptions.

        boolean modified = false;

        ConnectedStatements flysStatements = context.getFlysStatements();

        flysStatements.beginTransaction();
        try {
            List<DischargeTable> flysDTs =
                DischargeTable.loadFlysDischargeTables(
                    context, gauge.getFlysId());

            List<DischargeTable> aftDTs =
                DischargeTable.loadAftDischargeTables(
                    context, gauge.getOfficialNumber());

            Map<String, DischargeTable> desc2FlysDT =
                new HashMap<String, DischargeTable>();

            for (DischargeTable dt: flysDTs) {
                String description = dt.getDescription();
                if (description == null) {
                    log.warn("FLYS: discharge table " + dt.getId() 
                        + " has no description. Ignored.");
                    continue;
                }
                desc2FlysDT.put(description, dt);
            }

            List<DischargeTable> createDTs = new ArrayList<DischargeTable>();

            for (DischargeTable aftDT: aftDTs) {
                String description = aftDT.getDescription();
                DischargeTable flysDT = desc2FlysDT.remove(description);
                if (flysDT != null) {
                    // Found in AFT and FLYS.
                    log.info("FLYS: Discharge table '" + description
                        + "' found in AFT and FLYS. -> update");
                    // Create the W/Q diff.
                    modified |= writeWQChanges(context, flysDT, aftDT);
                }
                else {
                    log.info("FLYS: Discharge table '" + description
                        + "' not found in FLYS. -> create");
                    createDTs.add(aftDT);
                }
            }

            for (String description: desc2FlysDT.keySet()) {
                log.info("FLYS: Discharge table '" + description
                    + "' found in FLYS but not in AFT. -> ignore");
            }

            log.info("FLYS: Copy " + createDTs.size() +
                " discharge tables over from AFT.");

            // Create the new discharge tables.
            for (DischargeTable aftDT: createDTs) {
                createDischargeTable(context, aftDT, gauge.getFlysId());
                modified = true;
            }

            flysStatements.commitTransaction();
        }
        catch (SQLException sqle) {
            flysStatements.rollbackTransaction();
            log.error(sqle, sqle);
            modified = false;
        }

        return modified;
    }

    protected boolean writeWQChanges(
        SyncContext    context,
        DischargeTable flysDT,
        DischargeTable aftDT
    )
    throws SQLException
    {
        flysDT.loadFlysValues(context);
        aftDT.loadAftValues(context);
        WQDiff diff = new WQDiff(flysDT.getValues(), aftDT.getValues());
        if (diff.hasChanges()) {
            diff.writeChanges(context, flysDT.getId());
            return true;
        }
        return false;
    }

    protected boolean createGauges(
        SyncContext          context,
        Map<Long, DIPSGauge> gauges
    )
    throws SQLException
    {
        ConnectedStatements flysStatements = context.getFlysStatements();

        SymbolicStatement.Instance nextId =
            flysStatements.getStatement("next.gauge.id");

        SymbolicStatement.Instance insertStmnt =
            flysStatements.getStatement("insert.gauge");

        boolean modified = false;

        for (Map.Entry<Long, DIPSGauge> entry: gauges.entrySet()) {
            Long      officialNumber = entry.getKey();
            DIPSGauge gauge          = entry.getValue();

            log.info("Gauge '" + gauge.getAftName() +
                "' not in FLYS but in AFT/DIPS. -> Create");

            if (!gauge.hasDatums()) {
                log.warn("DIPS: Gauge '" + 
                    gauge.getAftName() + "' has no datum. Ignored.");
                continue;
            }

            ResultSet rs = null;
            flysStatements.beginTransaction();
            try {
                (rs = nextId.executeQuery()).next();
                int gaugeId = rs.getInt("gauge_id");
                rs.close(); rs = null;

                insertStmnt
                    .clearParameters()
                    .setInt("id", gaugeId)
                    .setString("name", gauge.getAftName())
                    .setInt("river_id", id1)
                    .setDouble("station", gauge.getStation())
                    .setDouble("aeo", gauge.getAeo())
                    .setLong("official_number", officialNumber)
                    .setDouble("datum", gauge.getLatestDatum().getValue());

                insertStmnt.execute();

                log.info("FLYS: Created gauge '" + gauge.getAftName() + 
                    "' with id " + gaugeId + ".");

                gauge.setFlysId(gaugeId);
                createDischargeTables(context, gauge);
                flysStatements.commitTransaction();
                modified = true;
            }
            catch (SQLException sqle) {
                flysStatements.rollbackTransaction();
                log.error(sqle, sqle);
            }
            finally {
                if (rs != null) {
                    rs.close();
                }
            }
        }

        return modified;
    }

    protected void createDischargeTable(
        SyncContext    context,
        DischargeTable aftDT,
        int            flysGaugeId
    )
    throws SQLException
    {
        aftDT.persistFlysTimeInterval(context);
        int flysId = aftDT.persistFlysDischargeTable(context, flysGaugeId);

        aftDT.loadAftValues(context);
        aftDT.storeFlysValues(context, flysId);
    }

    protected void createDischargeTables(
        SyncContext context,
        DIPSGauge   gauge
    )
    throws SQLException
    {
        log.info("FLYS: Create discharge tables for '" +
            gauge.getAftName() + "'.");

        // Load the discharge tables from AFT.
        List<DischargeTable> dts = loadAftDischargeTables(
            context, gauge);

        // Persist the time intervals.
        persistFlysTimeIntervals(context, dts);

        // Persist the discharge tables
        int [] flysDTIds = persistFlysDischargeTables(
            context, dts, gauge.getFlysId());

        // Copy over the W/Q values
        copyWQsFromAftToFlys(context, dts, flysDTIds);
    }

    protected List<DischargeTable> loadAftDischargeTables(
        SyncContext context,
        DIPSGauge   gauge
    )
    throws SQLException
    {
        return DischargeTable.loadAftDischargeTables(
            context, gauge.getOfficialNumber(), gauge.getFlysId());
    }

    protected void persistFlysTimeIntervals(
        SyncContext          context,
        List<DischargeTable> dts
    )
    throws SQLException
    {
        for (DischargeTable dt: dts) {
            dt.persistFlysTimeInterval(context);
        }
    }

    protected int [] persistFlysDischargeTables(
        SyncContext          context,
        List<DischargeTable> dts,
        int                  flysGaugeId
    )
    throws SQLException
    {
        int [] flysDTIds = new int[dts.size()];

        for (int i = 0; i < flysDTIds.length; ++i) {
            flysDTIds[i] = dts.get(i)
                .persistFlysDischargeTable(context, flysGaugeId);
        }

        return flysDTIds;
    }

    protected void copyWQsFromAftToFlys(
        SyncContext          context,
        List<DischargeTable> dts,
        int []               flysDTIds
    )
    throws SQLException
    {
        for (int i = 0; i < flysDTIds.length; ++i) {
            DischargeTable dt = dts.get(i);
            dt.loadAftValues(context);
            dt.storeFlysValues(context, flysDTIds[i]);
            dt.clearValues(); // To save memory.
        }
    }

    public String toString() {
        return "[River: name=" + name + ", " + super.toString() + "]";
    }
}
// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :

http://dive4elements.wald.intevation.org