view flys-aft/src/main/java/de/intevation/aft/River.java @ 4093:7bddd4601707

Copy over W/Q values from AFT to FLYS for new discharge tables. flys-aft/trunk@3603 c6561f87-3c4e-4783-a992-168aeb5c3f6f
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 05 Jan 2012 17:46:18 +0000
parents d556e29592f5
children b20b710aa86f
line wrap: on
line source
package de.intevation.aft;

import java.util.List;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Date;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;

import org.apache.log4j.Logger;

import de.intevation.db.ConnectedStatements;
import de.intevation.db.SymbolicStatement;

public class River
extends      IdPair
{
    private static Logger log = Logger.getLogger(River.class);

    protected String name;

    public River() {
    }

    public River(int id1, int id2, String name) {
        super(id1, id2);
        this.name = name;
    }

    public String getName() {
        return name;
    }


    public void sync(SyncContext context) throws SQLException {
        log.info("sync river: " + this);

        Map<Long, DIPSGauge> dipsGauges = context.getDIPSGauges();

        ConnectedStatements flysStatements = context.getFlysStatements();
        ConnectedStatements aftStatements  = context.getAftStatements();

        ResultSet messstellenRs = aftStatements
            .getStatement("select.messstelle")
            .clearParameters()
            .setInt("GEWAESSER_NR", id2).executeQuery();

        String riverName = getName();

        Map<Long, DIPSGauge> aftDIPSGauges = new HashMap<Long, DIPSGauge>();

        while (messstellenRs.next()) {
            String name = messstellenRs.getString("NAME");
            String num  = messstellenRs.getString("MESSSTELLE_NR");
            Long number = SyncContext.numberToLong(num);
            if (number == null) {
                log.warn("Invalid MESSSTELLE_NR for MESSSTELLE '"+name+"'");
                continue;
            }
            DIPSGauge dipsGauge = dipsGauges.get(number);
            if (dipsGauge == null) {
                log.warn(
                    "MESSSTELLE '" + name + "' not found in DIPS. " +
                    "Gauge number used for lookup: " + number);
                continue;
            }
            String gaugeRiver = dipsGauge.getRiverName();
            if (!gaugeRiver.equalsIgnoreCase(riverName)) {
                log.warn(
                    "MESSSTELLE '" + name + 
                    "' is assigned to river '" + gaugeRiver + 
                    "'. Needs to be on '" + riverName + "'.");
                continue;
            }
            dipsGauge.setAftName(name);
            aftDIPSGauges.put(number, dipsGauge);
        }

        messstellenRs.close();


        List<DIPSGauge> updateGauges = new ArrayList<DIPSGauge>();

        ResultSet gaugesRs = flysStatements
            .getStatement("select.gauges")
            .clearParameters()
            .setInt("river_id", id1).executeQuery();

        while (gaugesRs.next()) {
            int gaugeId = gaugesRs.getInt("id");
            String name = gaugesRs.getString("name");
            long   number = gaugesRs.getLong("official_number");
            if (gaugesRs.wasNull()) {
                log.warn("FLYS: Gauge '" + name + 
                    "' has no official number. Ignored.");
                continue;
            }
            Long key = Long.valueOf(number);
            DIPSGauge aftDIPSGauge = aftDIPSGauges.remove(key);
            if (aftDIPSGauge == null) {
                log.warn("FLYS: Gauge '" + name + "' number " + number +
                    " is not found in AFT/DIPS.");
                continue;
            }
            aftDIPSGauge.setFlysId(gaugeId);
            log.info("Gauge '" + name +
                "' found in FLYS, AFT and DIPS. -> Update");
            updateGauges.add(aftDIPSGauge);
        }

        createGauges(context, aftDIPSGauges);

        gaugesRs.close();
    }

    protected void createGauges(
        SyncContext          context,
        Map<Long, DIPSGauge> gauges
    )
    throws SQLException
    {
        ConnectedStatements flysStatements = context.getFlysStatements();

        SymbolicStatement.Instance nextId =
            flysStatements.getStatement("next.gauge.id");

        SymbolicStatement.Instance insertStmnt =
            flysStatements.getStatement("insert.gauge");

        for (Map.Entry<Long, DIPSGauge> entry: gauges.entrySet()) {
            Long      officialNumber = entry.getKey();
            DIPSGauge gauge          = entry.getValue();

            log.info("Gauge '" + gauge.getAftName() +
                "' not in FLYS but in AFT/DIPS. -> Create");

            if (!gauge.hasDatums()) {
                log.warn("FLYS: Gauge '" + 
                    gauge.getAftName() + "' has no datum. Ignored.");
                continue;
            }

            ResultSet rs = null;
            flysStatements.beginTransaction();
            try {
                (rs = nextId.executeQuery()).next();
                int gaugeId = rs.getInt("gauge_id");
                rs.close(); rs = null;

                insertStmnt
                    .clearParameters()
                    .setInt("id", gaugeId)
                    .setString("name", gauge.getAftName())
                    .setInt("river_id", id1)
                    .setDouble("station", gauge.getStation())
                    .setDouble("aeo", gauge.getAeo())
                    .setDouble("official_number", officialNumber)
                    .setDouble("datum", gauge.getLatestDatum().getValue());

                insertStmnt.execute();

                log.info("FLYS: Created gauge '" + gauge.getAftName() + 
                    "' with id " + gaugeId + ".");

                gauge.setFlysId(gaugeId);
                createDischargeTables(context, officialNumber, gauge);

                flysStatements.commitTransaction();
            }
            catch (SQLException sqle) {
                flysStatements.rollbackTransaction();
                throw sqle;
            }
            finally {
                if (rs != null) {
                    rs.close();
                }
            }
        }
    }

    protected void createDischargeTables(
        SyncContext context,
        Long        officialNumber,
        DIPSGauge   gauge
    )
    throws SQLException
    {
        log.info("create discharge tables");

        // Load the discharge tables from AFT.
        List<DischargeTable> dts = loadAftDischargeTables(
            context, officialNumber, gauge);

        // Persist the time intervals.
        persistFlysTimeIntervals(context, dts);

        // Persist the discharge tables
        int [] flysDTIds = persistFlysDischargeTables(context, dts);

        // Copy over the W/Q values
        copyWQsFromAftToFlys(context, dts, flysDTIds);
    }

    protected List<DischargeTable> loadAftDischargeTables(
        SyncContext context,
        Long        officialNumber,
        DIPSGauge   gauge
    )
    throws SQLException
    {
        boolean debug = log.isDebugEnabled();

        List<DischargeTable> dts = new ArrayList<DischargeTable>();

        ResultSet rs = null;
        try {
            rs = context
                .getAftStatements()
                .getStatement("select.abflusstafel")
                .clearParameters()
                .setString("number", "%" + officialNumber).
                executeQuery();

            while (rs.next()) {
                int    dtId        = rs.getInt("ABFLUSSTAFEL_NR");
                Date   from        = rs.getDate("GUELTIG_VON");
                Date   to          = rs.getDate("GUELTIG_BIS");
                String description = rs.getString("ABFLUSSTAFEL_BEZ");
                if (description == null) {
                    description = String.valueOf(officialNumber);
                }

                double datumValue = rs.getDouble("PEGELNULLPUNKT");
                Double datum = rs.wasNull() ? null : datumValue;

                if (debug) {
                    log.debug("id:          " + dtId);
                    log.debug("valid from:  " + from);
                    log.debug("valid to:    " + to);
                    log.debug("datum:       " + datum);
                    log.debug("description: " + description);
                }

                TimeInterval timeInterval = from == null
                    ? null
                    : new TimeInterval(from, to);

                DischargeTable dt = new DischargeTable(
                    dtId,
                    gauge.getFlysId(),
                    timeInterval,
                    description);
                dts.add(dt);
            }
        }
        finally {
            if (rs != null) {
                rs.close();
                rs = null;
            }
        }

        return dts;
    }

    protected void persistFlysTimeIntervals(
        SyncContext          context,
        List<DischargeTable> dts
    )
    throws SQLException
    {
        for (DischargeTable dt: dts) {
            TimeInterval timeInterval = dt.getTimeInterval();
            if (timeInterval != null) {
                dt.setTimeInterval(
                    context.fetchOrCreateFLYSTimeInterval(timeInterval));
            }
        }
    }

    protected int [] persistFlysDischargeTables(
        SyncContext          context,
        List<DischargeTable> dts
    )
    throws SQLException
    {
        boolean debug = log.isDebugEnabled();

        int [] flysDTIds = new int[dts.size()];

        ResultSet rs = null;
        try {
            ConnectedStatements flysStatements =
                context.getFlysStatements();

            SymbolicStatement.Instance nextId =
                flysStatements.getStatement("next.discharge.id");

            SymbolicStatement.Instance insertDT =
                flysStatements.getStatement("insert.dischargetable");

            for (int i = 0; i < flysDTIds.length; ++i) {
                rs = nextId.executeQuery();
                rs.next();
                int id = rs.getInt("discharge_table_id");
                flysDTIds[i] = id;
                rs.close(); rs = null;

                DischargeTable dt = dts.get(i);
                insertDT.clearParameters()
                    .setInt("id", id)
                    .setInt("gauge_id", dt.getGaugeId())
                    .setString("description", dt.getDescription());

                TimeInterval timeInterval = dt.getTimeInterval();
                if (timeInterval != null) {
                    insertDT.setInt("time_interval_id", timeInterval.getId());
                }
                else {
                    insertDT.setNull("time_interval_id", Types.INTEGER);
                }

                insertDT.execute();
                if (debug) {
                    log.debug("FLYS: Created discharge table id: " + id);
                }
            }
        }
        finally {
            if (rs != null) {
                rs.close();
                rs = null;
            }
        }

        return flysDTIds;
    }

    protected void copyWQsFromAftToFlys(
        SyncContext          context,
        List<DischargeTable> dts,
        int []               flysDTIds
    )
    throws SQLException
    {
        for (int i = 0; i < flysDTIds.length; ++i) {
            DischargeTable dt = dts.get(i);
            dt.loadAftValues(context);
            dt.storeFlysValues(context, flysDTIds[i]);
            dt.clearValues(); // To save memory.
        }
    }

    public String toString() {
        return "[River: name=" + name + ", " + super.toString() + "]";
    }

}
// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :

http://dive4elements.wald.intevation.org