view flys-aft/src/main/java/de/intevation/aft/River.java @ 4255:670e98f5a441

Fixed leak while merging facets. The ThemeList that is used by OutputHelper to sort the Facets for an Output now uses a list to store the ManagedFacets. The correct order is made up by sorting the List using Collections.sort() function of the Java JDK. Therfore, the ManagedFacet class implements the Comparable interface. The return value of its compareTo(other) method depends on the value of the 'position' field.
author Ingo Weinzierl <weinzierl.ingo@googlemail.com>
date Thu, 25 Oct 2012 14:01:46 +0200
parents 06891562e633
children 79bb64f66c74
line wrap: on
line source
package de.intevation.aft;

import java.util.List;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.log4j.Logger;

import de.intevation.db.ConnectedStatements;
import de.intevation.db.SymbolicStatement;

public class River
extends      IdPair
{
    private static Logger log = Logger.getLogger(River.class);

    protected String name;

    public River() {
    }

    public River(int id1, int id2, String name) {
        super(id1, id2);
        this.name = name;
    }

    public String getName() {
        return name;
    }


    public boolean sync(SyncContext context) throws SQLException {
        log.info("sync river: " + this);

        Map<Long, DIPSGauge> dipsGauges = context.getDIPSGauges();

        ConnectedStatements flysStatements = context.getFlysStatements();
        ConnectedStatements aftStatements  = context.getAftStatements();

        ResultSet messstellenRs = aftStatements
            .getStatement("select.messstelle")
            .clearParameters()
            .setInt("GEWAESSER_NR", id2).executeQuery();

        String riverName = getName();

        Map<Long, DIPSGauge> aftDIPSGauges = new HashMap<Long, DIPSGauge>();

        while (messstellenRs.next()) {
            String name = messstellenRs.getString("NAME");
            String num  = messstellenRs.getString("MESSSTELLE_NR");
            Long number = SyncContext.numberToLong(num);
            if (number == null) {
                log.warn("AFT: Invalid MESSSTELLE_NR for MESSSTELLE '"+name+"'");
                continue;
            }
            DIPSGauge dipsGauge = dipsGauges.get(number);
            if (dipsGauge == null) {
                log.warn(
                    "DIPS: MESSSTELLE '" + name + "' not found in DIPS. " +
                    "Gauge number used for lookup: " + number);
                continue;
            }
            String gaugeRiver = dipsGauge.getRiverName();
            if (!gaugeRiver.equalsIgnoreCase(riverName)) {
                log.warn(
                    "DIPS: MESSSTELLE '" + name + 
                    "' is assigned to river '" + gaugeRiver + 
                    "'. Needs to be on '" + riverName + "'.");
                continue;
            }
            dipsGauge.setAftName(name);
            dipsGauge.setOfficialNumber(number);
            aftDIPSGauges.put(number, dipsGauge);
        }

        messstellenRs.close();

        List<DIPSGauge> updateGauges = new ArrayList<DIPSGauge>();

        ResultSet gaugesRs = flysStatements
            .getStatement("select.gauges")
            .clearParameters()
            .setInt("river_id", id1).executeQuery();

        while (gaugesRs.next()) {
            int gaugeId = gaugesRs.getInt("id");
            String name = gaugesRs.getString("name");
            long   number = gaugesRs.getLong("official_number");
            if (gaugesRs.wasNull()) {
                log.warn("FLYS: Gauge '" + name + 
                    "' has no official number. Ignored.");
                continue;
            }
            Long key = Long.valueOf(number);
            DIPSGauge aftDIPSGauge = aftDIPSGauges.remove(key);
            if (aftDIPSGauge == null) {
                log.warn("FLYS: Gauge '" + name + "' number " + number +
                    " is not found in AFT/DIPS.");
                continue;
            }
            aftDIPSGauge.setFlysId(gaugeId);
            log.info("Gauge '" + name +
                "' found in FLYS, AFT and DIPS. -> Update");
            updateGauges.add(aftDIPSGauge);
        }
        gaugesRs.close();

        boolean modified = createGauges(context, aftDIPSGauges);

        modified |= updateGauges(context, updateGauges);

        return modified;
    }

    protected boolean updateGauges(
        SyncContext     context,
        List<DIPSGauge> gauges
    )
    throws SQLException
    {
        boolean modified = false;

        for (DIPSGauge gauge: gauges) {
            modified |= updateGauge(context, gauge);
        }

        return modified;
    }

    protected boolean updateGauge(
        SyncContext context,
        DIPSGauge   gauge
    )
    throws SQLException
    {
        log.info("FLYS: Updating gauge '" + gauge.getAftName() + "'.");
        // We need to load all discharge tables from both database
        // of the gauge and do some pairing based on their descriptions.

        boolean modified = false;

        ConnectedStatements flysStatements = context.getFlysStatements();

        flysStatements.beginTransaction();
        try {
            List<DischargeTable> flysDTs =
                DischargeTable.loadFlysDischargeTables(
                    context, gauge.getFlysId());

            List<DischargeTable> aftDTs =
                DischargeTable.loadAftDischargeTables(
                    context, gauge.getOfficialNumber());

            Map<String, DischargeTable> desc2FlysDT =
                new HashMap<String, DischargeTable>();

            for (DischargeTable dt: flysDTs) {
                String description = dt.getDescription();
                if (description == null) {
                    log.warn("FLYS: discharge table " + dt.getId() 
                        + " has no description. Ignored.");
                    continue;
                }
                desc2FlysDT.put(description, dt);
            }

            List<DischargeTable> createDTs = new ArrayList<DischargeTable>();

            for (DischargeTable aftDT: aftDTs) {
                String description = aftDT.getDescription();
                DischargeTable flysDT = desc2FlysDT.remove(description);
                if (flysDT != null) {
                    // Found in AFT and FLYS.
                    log.info("FLYS: Discharge table '" + description
                        + "' found in AFT and FLYS. -> update");
                    // Create the W/Q diff.
                    modified |= writeWQChanges(context, flysDT, aftDT);
                }
                else {
                    log.info("FLYS: Discharge table '" + description
                        + "' not found in FLYS. -> create");
                    createDTs.add(aftDT);
                }
            }

            for (String description: desc2FlysDT.keySet()) {
                log.info("FLYS: Discharge table '" + description
                    + "' found in FLYS but not in AFT. -> ignore");
            }

            log.info("FLYS: Copy " + createDTs.size() +
                " discharge tables over from AFT.");

            // Create the new discharge tables.
            for (DischargeTable aftDT: createDTs) {
                createDischargeTable(context, aftDT, gauge.getFlysId());
                modified = true;
            }

            flysStatements.commitTransaction();
        }
        catch (SQLException sqle) {
            flysStatements.rollbackTransaction();
            log.error(sqle, sqle);
            modified = false;
        }

        return modified;
    }

    protected boolean writeWQChanges(
        SyncContext    context,
        DischargeTable flysDT,
        DischargeTable aftDT
    )
    throws SQLException
    {
        flysDT.loadFlysValues(context);
        aftDT.loadAftValues(context);
        WQDiff diff = new WQDiff(flysDT.getValues(), aftDT.getValues());
        if (diff.hasChanges()) {
            diff.writeChanges(context, flysDT.getId());
            return true;
        }
        return false;
    }

    protected boolean createGauges(
        SyncContext          context,
        Map<Long, DIPSGauge> gauges
    )
    throws SQLException
    {
        ConnectedStatements flysStatements = context.getFlysStatements();

        SymbolicStatement.Instance nextId =
            flysStatements.getStatement("next.gauge.id");

        SymbolicStatement.Instance insertStmnt =
            flysStatements.getStatement("insert.gauge");

        boolean modified = false;

        for (Map.Entry<Long, DIPSGauge> entry: gauges.entrySet()) {
            Long      officialNumber = entry.getKey();
            DIPSGauge gauge          = entry.getValue();

            log.info("Gauge '" + gauge.getAftName() +
                "' not in FLYS but in AFT/DIPS. -> Create");

            if (!gauge.hasDatums()) {
                log.warn("DIPS: Gauge '" + 
                    gauge.getAftName() + "' has no datum. Ignored.");
                continue;
            }

            ResultSet rs = null;
            flysStatements.beginTransaction();
            try {
                (rs = nextId.executeQuery()).next();
                int gaugeId = rs.getInt("gauge_id");
                rs.close(); rs = null;

                insertStmnt
                    .clearParameters()
                    .setInt("id", gaugeId)
                    .setString("name", gauge.getAftName())
                    .setInt("river_id", id1)
                    .setDouble("station", gauge.getStation())
                    .setDouble("aeo", gauge.getAeo())
                    .setLong("official_number", officialNumber)
                    .setDouble("datum", gauge.getLatestDatum().getValue());

                insertStmnt.execute();

                log.info("FLYS: Created gauge '" + gauge.getAftName() + 
                    "' with id " + gaugeId + ".");

                gauge.setFlysId(gaugeId);
                createDischargeTables(context, gauge);
                flysStatements.commitTransaction();
                modified = true;
            }
            catch (SQLException sqle) {
                flysStatements.rollbackTransaction();
                log.error(sqle, sqle);
            }
            finally {
                if (rs != null) {
                    rs.close();
                }
            }
        }

        return modified;
    }

    protected void createDischargeTable(
        SyncContext    context,
        DischargeTable aftDT,
        int            flysGaugeId
    )
    throws SQLException
    {
        aftDT.persistFlysTimeInterval(context);
        int flysId = aftDT.persistFlysDischargeTable(context, flysGaugeId);

        aftDT.loadAftValues(context);
        aftDT.storeFlysValues(context, flysId);
    }

    protected void createDischargeTables(
        SyncContext context,
        DIPSGauge   gauge
    )
    throws SQLException
    {
        log.info("FLYS: Create discharge tables for '" +
            gauge.getAftName() + "'.");

        // Load the discharge tables from AFT.
        List<DischargeTable> dts = loadAftDischargeTables(
            context, gauge);

        // Persist the time intervals.
        persistFlysTimeIntervals(context, dts);

        // Persist the discharge tables
        int [] flysDTIds = persistFlysDischargeTables(
            context, dts, gauge.getFlysId());

        // Copy over the W/Q values
        copyWQsFromAftToFlys(context, dts, flysDTIds);
    }

    protected List<DischargeTable> loadAftDischargeTables(
        SyncContext context,
        DIPSGauge   gauge
    )
    throws SQLException
    {
        return DischargeTable.loadAftDischargeTables(
            context, gauge.getOfficialNumber(), gauge.getFlysId());
    }

    protected void persistFlysTimeIntervals(
        SyncContext          context,
        List<DischargeTable> dts
    )
    throws SQLException
    {
        for (DischargeTable dt: dts) {
            dt.persistFlysTimeInterval(context);
        }
    }

    protected int [] persistFlysDischargeTables(
        SyncContext          context,
        List<DischargeTable> dts,
        int                  flysGaugeId
    )
    throws SQLException
    {
        boolean debug = log.isDebugEnabled();

        int [] flysDTIds = new int[dts.size()];

        for (int i = 0; i < flysDTIds.length; ++i) {
            flysDTIds[i] = dts.get(i)
                .persistFlysDischargeTable(context, flysGaugeId);
        }

        return flysDTIds;
    }

    protected void copyWQsFromAftToFlys(
        SyncContext          context,
        List<DischargeTable> dts,
        int []               flysDTIds
    )
    throws SQLException
    {
        for (int i = 0; i < flysDTIds.length; ++i) {
            DischargeTable dt = dts.get(i);
            dt.loadAftValues(context);
            dt.storeFlysValues(context, flysDTIds[i]);
            dt.clearValues(); // To save memory.
        }
    }

    public String toString() {
        return "[River: name=" + name + ", " + super.toString() + "]";
    }
}
// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :

http://dive4elements.wald.intevation.org