changeset 5825:f529495f901d

moved directories to org.dive4elements.river.etl
author Sascha L. Teichmann <teichmann@intevation.de>
date Thu, 25 Apr 2013 11:42:54 +0200
parents 06643e440d1e
children 9438e9259213
files flys-aft/src/main/java/org/dive4elements/etl/aft/DIPSGauge.java flys-aft/src/main/java/org/dive4elements/etl/aft/DischargeTable.java flys-aft/src/main/java/org/dive4elements/etl/aft/IdPair.java flys-aft/src/main/java/org/dive4elements/etl/aft/Notification.java flys-aft/src/main/java/org/dive4elements/etl/aft/River.java flys-aft/src/main/java/org/dive4elements/etl/aft/Rivers.java flys-aft/src/main/java/org/dive4elements/etl/aft/Sync.java flys-aft/src/main/java/org/dive4elements/etl/aft/SyncContext.java flys-aft/src/main/java/org/dive4elements/etl/aft/TimeInterval.java flys-aft/src/main/java/org/dive4elements/etl/aft/WQ.java flys-aft/src/main/java/org/dive4elements/etl/aft/WQDiff.java flys-aft/src/main/java/org/dive4elements/etl/db/ConnectedStatements.java flys-aft/src/main/java/org/dive4elements/etl/db/ConnectionBuilder.java flys-aft/src/main/java/org/dive4elements/etl/db/Statements.java flys-aft/src/main/java/org/dive4elements/etl/db/SymbolicStatement.java flys-aft/src/main/java/org/dive4elements/etl/utils/XML.java flys-aft/src/main/java/org/dive4elements/river/etl/aft/DIPSGauge.java flys-aft/src/main/java/org/dive4elements/river/etl/aft/DischargeTable.java flys-aft/src/main/java/org/dive4elements/river/etl/aft/IdPair.java flys-aft/src/main/java/org/dive4elements/river/etl/aft/Notification.java flys-aft/src/main/java/org/dive4elements/river/etl/aft/River.java flys-aft/src/main/java/org/dive4elements/river/etl/aft/Rivers.java flys-aft/src/main/java/org/dive4elements/river/etl/aft/Sync.java flys-aft/src/main/java/org/dive4elements/river/etl/aft/SyncContext.java flys-aft/src/main/java/org/dive4elements/river/etl/aft/TimeInterval.java flys-aft/src/main/java/org/dive4elements/river/etl/aft/WQ.java flys-aft/src/main/java/org/dive4elements/river/etl/aft/WQDiff.java flys-aft/src/main/java/org/dive4elements/river/etl/db/ConnectedStatements.java flys-aft/src/main/java/org/dive4elements/river/etl/db/ConnectionBuilder.java flys-aft/src/main/java/org/dive4elements/river/etl/db/Statements.java flys-aft/src/main/java/org/dive4elements/river/etl/db/SymbolicStatement.java flys-aft/src/main/java/org/dive4elements/river/etl/utils/XML.java
diffstat 32 files changed, 2836 insertions(+), 2836 deletions(-) [+]
line wrap: on
line diff
--- a/flys-aft/src/main/java/org/dive4elements/etl/aft/DIPSGauge.java	Thu Apr 25 11:35:06 2013 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,193 +0,0 @@
-package de.intevation.aft;
-
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.List;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.log4j.Logger;
-
-import org.w3c.dom.Element;
-import org.w3c.dom.NodeList;
-
-public class DIPSGauge
-{
-    private static Logger log = Logger.getLogger(DIPSGauge.class);
-
-    public static final Pattern DATE_PATTERN = Pattern.compile(
-        "(\\d{4})-(\\d{2})-(\\d{2})\\s+(\\d{2}):(\\d{2}):(\\d{2})");
-
-    public static final Comparator<Datum> DATE_CMP = new Comparator<Datum>() {
-        public int compare(Datum a, Datum b) {
-            return a.date.compareTo(b.date);
-        }
-    };
-
-    public static class Datum {
-
-        protected double value;
-        protected Date   date;
-
-        public Datum() {
-        }
-
-        public Datum(Element element) {
-            value = Double.parseDouble(element.getAttribute("WERT"));
-            String dateString = element.getAttribute("GUELTIGAB");
-            if (dateString.length() == 0) {
-                throw
-                    new IllegalArgumentException("missing GUELTIGAB attribute");
-            }
-            Matcher m = DATE_PATTERN.matcher(dateString);
-            if (!m.matches()) {
-                throw
-                    new IllegalArgumentException("GUELTIGAB does not match");
-            }
-
-            int year  = Integer.parseInt(m.group(1));
-            int month = Integer.parseInt(m.group(2));
-            int day   = Integer.parseInt(m.group(3));
-            int hours = Integer.parseInt(m.group(4));
-            int mins  = Integer.parseInt(m.group(5));
-            int secs  = Integer.parseInt(m.group(6));
-
-            Calendar cal = Calendar.getInstance();
-            cal.set(year, month, day, hours, mins, secs);
-
-            date = cal.getTime();
-        }
-
-        public double getValue() {
-            return value;
-        }
-
-        public void setValue(double value) {
-            this.value = value;
-        }
-
-        public Date getDate() {
-            return date;
-        }
-
-        public void setDate(Date date) {
-            this.date = date;
-        }
-    } // class datum
-
-    protected double aeo;
-
-    protected double station;
-
-    protected String name;
-
-    protected String riverName;
-
-    protected List<Datum> datums;
-
-    protected int flysId;
-
-    protected String aftName;
-
-    protected Long   officialNumber;
-
-    public DIPSGauge() {
-    }
-
-    public DIPSGauge(Element element) {
-
-        name          = element.getAttribute("NAME");
-        riverName     = element.getAttribute("GEWAESSER");
-
-        String aeoString = element.getAttribute("EINZUGSGEBIET_AEO");
-        if (aeoString.length() == 0) {
-            log.warn("DIPS: Setting AEO of gauge '" + name + "' to zero.");
-            aeoString = "0";
-        }
-        aeo = Double.parseDouble(aeoString);
-
-        String stationString = element.getAttribute("STATIONIERUNG");
-        if (stationString.length() == 0) {
-            log.warn("DIPS: Setting station of gauge '" + name + "' to zero.");
-            stationString = "-99999";
-        }
-        station = Double.parseDouble(stationString);
-        if (station == 0d) {
-            log.warn("DIPS: Station of gauge '" + name + "' is zero.");
-        }
-
-        datums = new ArrayList<Datum>();
-        NodeList nodes = element.getElementsByTagName("PNP");
-        for (int i = 0, N = nodes.getLength(); i < N; ++i) {
-            Element e = (Element)nodes.item(i);
-            Datum datum = new Datum(e);
-            datums.add(datum);
-        }
-        Collections.sort(datums, DATE_CMP);
-    }
-
-    public List<Datum> getDatums() {
-        return datums;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public String getRiverName() {
-        return riverName;
-    }
-
-    public int getFlysId() {
-        return flysId;
-    }
-
-    public void setFlysId(int flysId) {
-        this.flysId = flysId;
-    }
-
-    public String getAftName() {
-        return aftName != null ? aftName : name;
-    }
-
-    public void setAftName(String aftName) {
-        this.aftName = aftName;
-    }
-
-    public double getStation() {
-        return station;
-    }
-
-    public double getAeo() {
-        return aeo;
-    }
-
-    public void setAeo(double aeo) {
-        this.aeo = aeo;
-    }
-
-    public void setStation(double station) {
-        this.station = station;
-    }
-
-    public boolean hasDatums() {
-        return !datums.isEmpty();
-    }
-
-    public Datum getLatestDatum() {
-        return datums.get(datums.size()-1);
-    }
-
-    public Long getOfficialNumber() {
-        return officialNumber;
-    }
-
-    public void setOfficialNumber(Long officialNumber) {
-        this.officialNumber = officialNumber;
-    }
-}
-// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/aft/DischargeTable.java	Thu Apr 25 11:35:06 2013 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,372 +0,0 @@
-package de.intevation.aft;
-
-import de.intevation.db.ConnectedStatements;
-import de.intevation.db.SymbolicStatement;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Types;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.log4j.Logger;
-
-/** A Discharge Table. */
-public class DischargeTable
-{
-    private static Logger log = Logger.getLogger(DischargeTable.class);
-
-    protected int          id;
-    protected int          gaugeId;
-    protected TimeInterval timeInterval;
-    protected String       description;
-    protected String       bfgId;
-    protected Set<WQ>      values;
-
-    public DischargeTable() {
-    }
-
-    public DischargeTable(
-        int          gaugeId,
-        TimeInterval timeInterval,
-        String       description,
-        String       bfgId
-    ) {
-        this.gaugeId      = gaugeId;
-        this.timeInterval = timeInterval;
-        this.description  = description;
-        this.bfgId        = bfgId;
-        values = new TreeSet<WQ>(WQ.EPS_CMP);
-    }
-
-    public DischargeTable(
-        int          id,
-        int          gaugeId,
-        TimeInterval timeInterval,
-        String       description,
-        String       bfgId
-    ) {
-        this(gaugeId, timeInterval, description, bfgId);
-        this.id = id;
-    }
-
-    public int getId() {
-        return id;
-    }
-
-    public void setId(int id) {
-        this.id = id;
-    }
-
-    public int getGaugeId() {
-        return gaugeId;
-    }
-
-    public void setGaugeId(int gaugeId) {
-        this.gaugeId = gaugeId;
-    }
-
-    public TimeInterval getTimeInterval() {
-        return timeInterval;
-    }
-
-    public void setTimeInterval(TimeInterval timeInterval) {
-        this.timeInterval = timeInterval;
-    }
-
-    public String getDescription() {
-        return description;
-    }
-
-    public void setDescription(String description) {
-        this.description = description;
-    }
-
-    public String getBfgId() {
-        return bfgId;
-    }
-
-    public void setBfgId(String bfgId) {
-        this.bfgId = bfgId;
-    }
-
-
-    public void clearValues() {
-        values.clear();
-    }
-
-    public Set<WQ> getValues() {
-        return values;
-    }
-
-    public void setValues(Set<WQ> values) {
-        this.values = values;
-    }
-
-
-    protected void loadValues(SymbolicStatement.Instance query)
-    throws SQLException
-    {
-        ResultSet rs = query.executeQuery();
-        while (rs.next()) {
-            int    id = rs.getInt("id");
-            double w  = rs.getDouble("w");
-            double q  = rs.getDouble("q");
-            if (!values.add(new WQ(id, w, q))) {
-                log.warn("FLYS/AFT: Value duplication w="+w+" q="+q+". -> ignore.");
-            }
-        }
-        rs.close();
-    }
-
-    public void loadAftValues(SyncContext context) throws SQLException {
-        loadValues(context.getAftStatements()
-            .getStatement("select.tafelwert")
-            .clearParameters()
-            .setInt("number", getId()));
-    }
-
-    public void loadFlysValues(SyncContext context) throws SQLException {
-        loadValues(context.getFlysStatements()
-            .getStatement("select.discharge.table.values")
-            .clearParameters()
-            .setInt("table_id", getId()));
-    }
-
-    public void storeFlysValues(
-        SyncContext context,
-        int         dischargeTableId
-    )
-    throws SQLException
-    {
-        ConnectedStatements flysStatements = context.getFlysStatements();
-
-        // Create the ids.
-        SymbolicStatement.Instance nextId = flysStatements
-            .getStatement("next.discharge.table.values.id");
-
-        // Insert the values.
-        SymbolicStatement.Instance insertDTV = flysStatements
-            .getStatement("insert.discharge.table.value");
-
-        for (WQ wq: values) {
-            int wqId;
-            ResultSet rs = nextId.executeQuery();
-            try {
-                rs.next();
-                wqId = rs.getInt("discharge_table_values_id");
-            }
-            finally {
-                rs.close();
-            }
-
-            insertDTV
-                .clearParameters()
-                .setInt("id", wqId)
-                .setInt("table_id", dischargeTableId)
-                .setDouble("w", wq.getW())
-                .setDouble("q", wq.getQ())
-                .execute();
-        }
-    }
-
-    public static List<DischargeTable> loadFlysDischargeTables(
-        SyncContext context,
-        int         gaugeId
-    )
-    throws SQLException
-    {
-        List<DischargeTable> dts = new ArrayList<DischargeTable>();
-
-        ResultSet rs = context
-            .getFlysStatements()
-            .getStatement("select.gauge.discharge.tables")
-            .clearParameters()
-            .setInt("gauge_id", gaugeId)
-            .executeQuery();
-        try {
-            OUTER: while (rs.next()) {
-                int    id          = rs.getInt("id");
-                String description = rs.getString("description");
-                String bfgId       = rs.getString("bfg_id");
-                if (description == null) {
-                    description = "";
-                }
-                if (bfgId == null) {
-                    bfgId = "";
-                }
-                for (DischargeTable dt: dts) {
-                    if (dt.getBfgId().equals(bfgId)) {
-                        log.warn("FLYS: Found discharge table '" +
-                            bfgId + "' with same bfg_id. -> ignore");
-                        continue OUTER;
-                    }
-                }
-                Date startTime = rs.getDate("start_time");
-                Date stopTime  = rs.getDate("stop_time");
-                TimeInterval ti = startTime == null
-                    ? null
-                    : new TimeInterval(startTime, stopTime);
-
-                DischargeTable dt = new DischargeTable(
-                    id, gaugeId, ti, description, bfgId);
-                dts.add(dt);
-            }
-        }
-        finally {
-            rs.close();
-        }
-
-        return dts;
-    }
-
-    public static List<DischargeTable> loadAftDischargeTables(
-        SyncContext context,
-        Long        officialNumber
-    )
-    throws SQLException
-    {
-        return loadAftDischargeTables(context, officialNumber, 0);
-    }
-
-    public static List<DischargeTable> loadAftDischargeTables(
-        SyncContext context,
-        Long        officialNumber,
-        int         flysGaugeId
-    )
-    throws SQLException
-    {
-        List<DischargeTable> dts = new ArrayList<DischargeTable>();
-
-        ResultSet rs = context
-            .getAftStatements()
-            .getStatement("select.abflusstafel")
-            .clearParameters()
-            .setString("number", "%" + officialNumber)
-            .executeQuery();
-        try {
-            OUTER: while (rs.next()) {
-                int  dtId = rs.getInt("ABFLUSSTAFEL_NR");
-                Date from = rs.getDate("GUELTIG_VON");
-                Date to   = rs.getDate("GUELTIG_BIS");
-
-                if (from == null) {
-                    log.warn("AFT: ABFLUSSTAFEL_NR = "
-                        + dtId + ": GUELTIG_VON = NULL -> ignored.");
-                }
-
-                if (to == null) {
-                    log.warn("AFT: ABFLUSSTAFEL_NR = "
-                        + dtId + ": GUELTIG_BIS = NULL -> ignored.");
-                }
-
-                if (from == null || to == null) {
-                    continue;
-                }
-
-                if (from.compareTo(to) > 0) {
-                        log.warn("AFT: ABFLUSSTAFEL_NR = "
-                        + dtId + ": " + from + " > " + to + ". -> swap");
-                    Date temp = from;
-                    from = to;
-                    to = temp;
-                }
-
-                String description = rs.getString("ABFLUSSTAFEL_BEZ");
-                if (description == null) {
-                    description = String.valueOf(officialNumber);
-                }
-
-                String bfgId = rs.getString("BFG_ID");
-                if (bfgId == null) {
-                    bfgId = "";
-                }
-
-                for (DischargeTable dt: dts) {
-                    if (dt.getBfgId().equals(bfgId)) {
-                        log.warn("AFT: Found discharge table '" +
-                            bfgId + "' with same bfg_id. -> ignore.");
-                        continue OUTER;
-                    }
-                }
-
-                TimeInterval timeInterval = new TimeInterval(from, to);
-
-                DischargeTable dt = new DischargeTable(
-                    dtId,
-                    flysGaugeId,
-                    timeInterval,
-                    description,
-                    bfgId);
-                dts.add(dt);
-            }
-        }
-        finally {
-            rs.close();
-        }
-
-        return dts;
-    }
-
-    public void persistFlysTimeInterval(
-        SyncContext context
-    )
-    throws SQLException
-    {
-        if (timeInterval != null) {
-            timeInterval = context.fetchOrCreateFLYSTimeInterval(
-                timeInterval);
-        }
-    }
-
-    public int persistFlysDischargeTable(
-        SyncContext context,
-        int         gaugeId
-    )
-    throws SQLException
-    {
-        ConnectedStatements flysStatements =
-            context.getFlysStatements();
-
-        int flysId;
-
-        ResultSet rs = flysStatements
-            .getStatement("next.discharge.id")
-            .executeQuery();
-        try {
-            rs.next();
-            flysId = rs.getInt("discharge_table_id");
-        }
-        finally {
-            rs.close();
-        }
-
-        SymbolicStatement.Instance insertDT = flysStatements
-            .getStatement("insert.dischargetable")
-            .clearParameters()
-            .setInt("id", flysId)
-            .setInt("gauge_id", gaugeId)
-            .setString("description", description)
-            .setString("bfg_id", bfgId);
-
-        if (timeInterval != null) {
-            insertDT.setInt("time_interval_id", timeInterval.getId());
-        }
-        else {
-            insertDT.setNull("time_interval_id", Types.INTEGER);
-        }
-
-        insertDT.execute();
-
-        if (log.isDebugEnabled()) {
-            log.debug("FLYS: Created discharge table id: " + id);
-        }
-
-        return flysId;
-    }
-}
-// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/aft/IdPair.java	Thu Apr 25 11:35:06 2013 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,40 +0,0 @@
-package de.intevation.aft;
-
-public class IdPair
-{
-    protected int id1;
-    protected int id2;
-
-    public IdPair() {
-    }
-
-    public IdPair(int id1) {
-        this.id1 = id1;
-    }
-
-    public IdPair(int id1, int id2) {
-        this(id1);
-        this.id2 = id2;
-    }
-
-    public int getId1() {
-        return id1;
-    }
-
-    public void setId1(int id1) {
-        this.id1 = id1;
-    }
-
-    public int getId2() {
-        return id2;
-    }
-
-    public void setId2(int id2) {
-        this.id2 = id2;
-    }
-
-    public String toString() {
-        return "[IdPair: id1=" + id1 + ", id2=" + id2 + "]";
-    }
-}
-// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/aft/Notification.java	Thu Apr 25 11:35:06 2013 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,101 +0,0 @@
-package de.intevation.aft;
-
-import de.intevation.utils.XML;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.net.URLConnection;
-
-import org.apache.log4j.Logger;
-
-import org.w3c.dom.Document;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
-
-public class Notification
-{
-    private static Logger log = Logger.getLogger(Notification.class);
-
-    protected Document message;
-
-    public Notification() {
-    }
-
-    public Notification(Document message) {
-        this.message = message;
-    }
-
-    public Notification(Node message) {
-        this(wrap(message));
-    }
-
-    public static Document wrap(Node node) {
-        Document document = XML.newDocument();
-
-        // Send first element as message.
-        // Fall back to root node.
-        Node toImport = node;
-
-        NodeList children = node.getChildNodes();
-        for (int i = 0, N = children.getLength(); i < N; ++i) {
-            Node child = children.item(i);
-            if (child.getNodeType() == Node.ELEMENT_NODE) {
-                toImport = child;
-                break;
-            }
-        }
-
-        toImport = document.importNode(toImport, true);
-        document.appendChild(toImport);
-        document.normalizeDocument();
-        return document;
-    }
-
-    public Document sendPOST(URL url) {
-
-        OutputStream out    = null;
-        InputStream  in     = null;
-        Document     result = null;
-
-        try {
-            URLConnection ucon = url.openConnection();
-
-            if (!(ucon instanceof HttpURLConnection)) {
-                log.warn("NOTIFY: '" + url + "' is not an HTTP(S) connection.");
-                return null;
-            }
-
-            HttpURLConnection con = (HttpURLConnection)ucon;
-
-            con.setRequestMethod("POST");
-            con.setDoInput(true);
-            con.setDoOutput(true);
-            con.setUseCaches(false);
-            con.setRequestProperty("Content-Type", "text/xml");
-
-            out = con.getOutputStream();
-            XML.toStream(message, out);
-            out.flush();
-            in = con.getInputStream();
-            result = XML.parseDocument(in);
-        }
-        catch (IOException ioe) {
-            log.error("NOTIFY: Sending message to '" + url + "' failed.", ioe);
-        }
-        finally {
-            if (out != null) {
-                try { out.close(); } catch (IOException ioe) {}
-            }
-            if (in != null) {
-                try { in.close(); } catch (IOException ioe) {}
-            }
-        }
-
-        return result;
-    }
-}
-// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/aft/River.java	Thu Apr 25 11:35:06 2013 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,525 +0,0 @@
-package de.intevation.aft;
-
-import de.intevation.db.ConnectedStatements;
-import de.intevation.db.SymbolicStatement;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-public class River
-extends      IdPair
-{
-    private static Logger log = Logger.getLogger(River.class);
-
-    protected String name;
-
-    protected double from;
-    protected double to;
-
-    public River() {
-    }
-
-    public River(int id1, String name, double from, double to) {
-        super(id1);
-        this.name = name;
-        this.from = from;
-        this.to   = to;
-    }
-
-    public River(int id1, int id2, String name) {
-        super(id1, id2);
-        this.name = name;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public double getFrom() {
-        return from;
-    }
-
-    public void setFrom(double from) {
-        this.from = from;
-    }
-
-    public double getTo() {
-        return to;
-    }
-
-    public void setTo(double to) {
-        this.to = to;
-    }
-
-    public boolean inside(double x) {
-        return x >= from && x <= to;
-    }
-
-    public boolean sync(SyncContext context) throws SQLException {
-        log.info("sync river: " + this);
-
-        // Only take relevant gauges into account.
-        Map<Long, DIPSGauge> dipsGauges = context.getDIPSGauges(name, from, to);
-
-        ConnectedStatements flysStatements = context.getFlysStatements();
-        ConnectedStatements aftStatements  = context.getAftStatements();
-
-        String riverName = getName();
-
-        Map<Long, DIPSGauge> aftDIPSGauges = new HashMap<Long, DIPSGauge>();
-
-        ResultSet messstellenRs = aftStatements
-            .getStatement("select.messstelle")
-            .clearParameters()
-            .setInt("GEWAESSER_NR", id2)
-            .executeQuery();
-
-        try {
-            while (messstellenRs.next()) {
-                String name = messstellenRs.getString("NAME");
-                String num  = messstellenRs.getString("MESSSTELLE_NR");
-                double station = messstellenRs.getDouble("STATIONIERUNG");
-
-                if (!messstellenRs.wasNull() && !inside(station)) {
-                    log.warn("Station found in AFT but in not range: " + station);
-                    continue;
-                }
-
-                Long number = SyncContext.numberToLong(num);
-                if (number == null) {
-                    log.warn("AFT: Invalid MESSSTELLE_NR for MESSSTELLE '"+name+"'");
-                    continue;
-                }
-                DIPSGauge dipsGauge = dipsGauges.get(number);
-                if (dipsGauge == null) {
-                    log.warn(
-                        "DIPS: MESSSTELLE '" + name + "' not found in DIPS. " +
-                        "Gauge number used for lookup: " + number);
-                    continue;
-                }
-                String gaugeRiver = dipsGauge.getRiverName();
-                if (!gaugeRiver.equalsIgnoreCase(riverName)) {
-                    log.warn(
-                        "DIPS: MESSSTELLE '" + name +
-                        "' is assigned to river '" + gaugeRiver +
-                        "'. Needs to be on '" + riverName + "'.");
-                    continue;
-                }
-                dipsGauge.setAftName(name);
-                dipsGauge.setOfficialNumber(number);
-                aftDIPSGauges.put(number, dipsGauge);
-            }
-        }
-        finally {
-            messstellenRs.close();
-        }
-
-        List<DIPSGauge> updateGauges = new ArrayList<DIPSGauge>();
-
-        ResultSet gaugesRs = flysStatements
-            .getStatement("select.gauges")
-            .clearParameters()
-            .setInt("river_id", id1).executeQuery();
-
-        try {
-            while (gaugesRs.next()) {
-                int gaugeId = gaugesRs.getInt("id");
-                String name = gaugesRs.getString("name");
-                long   number = gaugesRs.getLong("official_number");
-                if (gaugesRs.wasNull()) {
-                    log.warn("FLYS: Gauge '" + name +
-                        "' has no official number. Ignored.");
-                    continue;
-                }
-                Long key = Long.valueOf(number);
-                DIPSGauge aftDIPSGauge = aftDIPSGauges.remove(key);
-                if (aftDIPSGauge == null) {
-                    log.warn("FLYS: Gauge '" + name + "' number " + number +
-                        " is not found in AFT/DIPS.");
-                    continue;
-                }
-                aftDIPSGauge.setFlysId(gaugeId);
-                log.info("Gauge '" + name +
-                    "' found in FLYS, AFT and DIPS. -> Update");
-                updateGauges.add(aftDIPSGauge);
-            }
-        }
-        finally {
-            gaugesRs.close();
-        }
-
-        boolean modified = createGauges(context, aftDIPSGauges);
-
-        modified |= updateGauges(context, updateGauges);
-
-        return modified;
-    }
-
-    protected boolean updateGauges(
-        SyncContext     context,
-        List<DIPSGauge> gauges
-    )
-    throws SQLException
-    {
-        boolean modified = false;
-
-        for (DIPSGauge gauge: gauges) {
-            // XXX: Do dont modify the master AT.
-            // modified |= updateBfGIdOnMasterDischargeTable(context, gauge);
-            modified |= updateGauge(context, gauge);
-        }
-
-        return modified;
-    }
-
-    protected boolean updateBfGIdOnMasterDischargeTable(
-        SyncContext context,
-        DIPSGauge   gauge
-    ) throws SQLException {
-        log.info(
-            "FLYS: Updating master discharge table bfg_id for '" +
-            gauge.getAftName() + "'");
-        ConnectedStatements flysStatements = context.getFlysStatements();
-
-        ResultSet rs = flysStatements
-            .getStatement("select.gauge.master.discharge.table")
-            .clearParameters()
-            .setInt("gauge_id", gauge.getFlysId())
-            .executeQuery();
-
-        int flysId;
-
-        try {
-            if (!rs.next()) {
-                log.error(
-                    "FLYS: No master discharge table found for gauge '" +
-                    gauge.getAftName() + "'");
-                return false;
-            }
-            String bfgId = rs.getString("bfg_id");
-            if (!rs.wasNull()) { // already has BFG_ID
-                return false;
-            }
-            flysId = rs.getInt("id");
-        } finally {
-            rs.close();
-        }
-
-        // We need to find out the BFG_ID of the current discharge table
-        // for this gauge in AFT.
-
-        ConnectedStatements aftStatements = context.getAftStatements();
-
-        rs = aftStatements
-            .getStatement("select.bfg.id.current")
-            .clearParameters()
-            .setString("number", "%" + gauge.getOfficialNumber())
-            .executeQuery();
-
-        String bfgId = null;
-
-        try {
-            if (rs.next()) {
-                bfgId = rs.getString("BFG_ID");
-            }
-        } finally {
-            rs.close();
-        }
-
-        if (bfgId == null) {
-            log.warn(
-                "No BFG_ID found for current discharge table of gauge '" +
-                gauge + "'");
-            return false;
-        }
-
-        // Set the BFG_ID in FLYS.
-        flysStatements.beginTransaction();
-        try {
-            flysStatements
-                .getStatement("update.bfg.id.discharge.table")
-                .clearParameters()
-                .setInt("id", flysId)
-                .setString("bfg_id", bfgId)
-                .executeUpdate();
-            flysStatements.commitTransaction();
-        } catch (SQLException sqle) {
-            flysStatements.rollbackTransaction();
-            log.error(sqle, sqle);
-            return false;
-        }
-
-        return true;
-    }
-
-    protected boolean updateGauge(
-        SyncContext context,
-        DIPSGauge   gauge
-    )
-    throws SQLException
-    {
-        log.info("FLYS: Updating gauge '" + gauge.getAftName() + "'.");
-        // We need to load all discharge tables from both databases
-        // of the gauge and do some pairing based on their bfg_id.
-
-        boolean modified = false;
-
-        ConnectedStatements flysStatements = context.getFlysStatements();
-
-        flysStatements.beginTransaction();
-        try {
-            List<DischargeTable> flysDTs =
-                DischargeTable.loadFlysDischargeTables(
-                    context, gauge.getFlysId());
-
-            List<DischargeTable> aftDTs =
-                DischargeTable.loadAftDischargeTables(
-                    context, gauge.getOfficialNumber());
-
-            Map<String, DischargeTable> bfgId2FlysDT =
-                new HashMap<String, DischargeTable>();
-
-            for (DischargeTable dt: flysDTs) {
-                String bfgId = dt.getBfgId();
-                if (bfgId == null) {
-                    log.warn("FLYS: discharge table " + dt.getId()
-                        + " has no bfg_id. Ignored.");
-                    continue;
-                }
-                bfgId2FlysDT.put(bfgId, dt);
-            }
-
-            List<DischargeTable> createDTs = new ArrayList<DischargeTable>();
-
-            for (DischargeTable aftDT: aftDTs) {
-                String bfgId = aftDT.getBfgId();
-                DischargeTable flysDT = bfgId2FlysDT.remove(bfgId);
-                if (flysDT != null) {
-                    // Found in AFT and FLYS.
-                    log.info("FLYS: Discharge table '" + bfgId
-                        + "' found in AFT and FLYS. -> update");
-                    // Create the W/Q diff.
-                    modified |= writeWQChanges(context, flysDT, aftDT);
-                }
-                else {
-                    log.info("FLYS: Discharge table '" + bfgId
-                        + "' not found in FLYS. -> create");
-                    createDTs.add(aftDT);
-                }
-            }
-
-            for (String bfgId: bfgId2FlysDT.keySet()) {
-                log.info("FLYS: Discharge table '" + bfgId
-                    + "' found in FLYS but not in AFT. -> ignore");
-            }
-
-            log.info("FLYS: Copy " + createDTs.size() +
-                " discharge tables over from AFT.");
-
-            // Create the new discharge tables.
-            for (DischargeTable aftDT: createDTs) {
-                createDischargeTable(context, aftDT, gauge.getFlysId());
-                modified = true;
-            }
-
-            flysStatements.commitTransaction();
-        }
-        catch (SQLException sqle) {
-            flysStatements.rollbackTransaction();
-            log.error(sqle, sqle);
-            modified = false;
-        }
-
-        return modified;
-    }
-
-    protected boolean writeWQChanges(
-        SyncContext    context,
-        DischargeTable flysDT,
-        DischargeTable aftDT
-    )
-    throws SQLException
-    {
-        flysDT.loadFlysValues(context);
-        aftDT.loadAftValues(context);
-        WQDiff diff = new WQDiff(flysDT.getValues(), aftDT.getValues());
-        if (diff.hasChanges()) {
-            diff.writeChanges(context, flysDT.getId());
-            return true;
-        }
-        return false;
-    }
-
-    protected boolean createGauges(
-        SyncContext          context,
-        Map<Long, DIPSGauge> gauges
-    )
-    throws SQLException
-    {
-        ConnectedStatements flysStatements = context.getFlysStatements();
-
-        SymbolicStatement.Instance nextId =
-            flysStatements.getStatement("next.gauge.id");
-
-        SymbolicStatement.Instance insertStmnt =
-            flysStatements.getStatement("insert.gauge");
-
-        boolean modified = false;
-
-        for (Map.Entry<Long, DIPSGauge> entry: gauges.entrySet()) {
-            Long      officialNumber = entry.getKey();
-            DIPSGauge gauge          = entry.getValue();
-
-            log.info("Gauge '" + gauge.getAftName() +
-                "' not in FLYS but in AFT/DIPS. -> Create");
-
-            if (!gauge.hasDatums()) {
-                log.warn("DIPS: Gauge '" +
-                    gauge.getAftName() + "' has no datum. Ignored.");
-                continue;
-            }
-
-            ResultSet rs = null;
-            flysStatements.beginTransaction();
-            try {
-                (rs = nextId.executeQuery()).next();
-                int gaugeId = rs.getInt("gauge_id");
-                rs.close(); rs = null;
-
-                insertStmnt
-                    .clearParameters()
-                    .setInt("id", gaugeId)
-                    .setString("name", gauge.getAftName())
-                    .setInt("river_id", id1)
-                    .setDouble("station", gauge.getStation())
-                    .setDouble("aeo", gauge.getAeo())
-                    .setLong("official_number", officialNumber)
-                    .setDouble("datum", gauge.getLatestDatum().getValue());
-
-                insertStmnt.execute();
-
-                log.info("FLYS: Created gauge '" + gauge.getAftName() +
-                    "' with id " + gaugeId + ".");
-
-                gauge.setFlysId(gaugeId);
-                createDischargeTables(context, gauge);
-                flysStatements.commitTransaction();
-                modified = true;
-            }
-            catch (SQLException sqle) {
-                flysStatements.rollbackTransaction();
-                log.error(sqle, sqle);
-            }
-            finally {
-                if (rs != null) {
-                    rs.close();
-                }
-            }
-        }
-
-        return modified;
-    }
-
-    protected void createDischargeTable(
-        SyncContext    context,
-        DischargeTable aftDT,
-        int            flysGaugeId
-    )
-    throws SQLException
-    {
-        aftDT.persistFlysTimeInterval(context);
-        int flysId = aftDT.persistFlysDischargeTable(context, flysGaugeId);
-
-        aftDT.loadAftValues(context);
-        aftDT.storeFlysValues(context, flysId);
-    }
-
-    protected void createDischargeTables(
-        SyncContext context,
-        DIPSGauge   gauge
-    )
-    throws SQLException
-    {
-        log.info("FLYS: Create discharge tables for '" +
-            gauge.getAftName() + "'.");
-
-        // Load the discharge tables from AFT.
-        List<DischargeTable> dts = loadAftDischargeTables(
-            context, gauge);
-
-        // Persist the time intervals.
-        persistFlysTimeIntervals(context, dts);
-
-        // Persist the discharge tables
-        int [] flysDTIds = persistFlysDischargeTables(
-            context, dts, gauge.getFlysId());
-
-        // Copy over the W/Q values
-        copyWQsFromAftToFlys(context, dts, flysDTIds);
-    }
-
-    protected List<DischargeTable> loadAftDischargeTables(
-        SyncContext context,
-        DIPSGauge   gauge
-    )
-    throws SQLException
-    {
-        return DischargeTable.loadAftDischargeTables(
-            context, gauge.getOfficialNumber(), gauge.getFlysId());
-    }
-
-    protected void persistFlysTimeIntervals(
-        SyncContext          context,
-        List<DischargeTable> dts
-    )
-    throws SQLException
-    {
-        for (DischargeTable dt: dts) {
-            dt.persistFlysTimeInterval(context);
-        }
-    }
-
-    protected int [] persistFlysDischargeTables(
-        SyncContext          context,
-        List<DischargeTable> dts,
-        int                  flysGaugeId
-    )
-    throws SQLException
-    {
-        int [] flysDTIds = new int[dts.size()];
-
-        for (int i = 0; i < flysDTIds.length; ++i) {
-            flysDTIds[i] = dts.get(i)
-                .persistFlysDischargeTable(context, flysGaugeId);
-        }
-
-        return flysDTIds;
-    }
-
-    protected void copyWQsFromAftToFlys(
-        SyncContext          context,
-        List<DischargeTable> dts,
-        int []               flysDTIds
-    )
-    throws SQLException
-    {
-        for (int i = 0; i < flysDTIds.length; ++i) {
-            DischargeTable dt = dts.get(i);
-            dt.loadAftValues(context);
-            dt.storeFlysValues(context, flysDTIds[i]);
-            dt.clearValues(); // To save memory.
-        }
-    }
-
-    public String toString() {
-        return "[River: name=" + name + ", " + super.toString() + "]";
-    }
-}
-// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/aft/Rivers.java	Thu Apr 25 11:35:06 2013 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,77 +0,0 @@
-package de.intevation.aft;
-
-import de.intevation.db.ConnectedStatements;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-public class Rivers
-{
-    private static Logger log = Logger.getLogger(Rivers.class);
-
-    public Rivers() {
-    }
-
-    public boolean sync(SyncContext context) throws SQLException {
-
-        log.info("sync: rivers");
-
-        ConnectedStatements flysStatements = context.getFlysStatements();
-        ConnectedStatements aftStatements  = context.getAftStatements();
-
-        Map<String, River> flysRivers = new HashMap<String, River>();
-
-        ResultSet flysRs = flysStatements
-            .getStatement("select.rivers").executeQuery();
-
-        try {
-            while (flysRs.next()) {
-                int    id   = flysRs.getInt("id");
-                String name = flysRs.getString("name");
-                double from = flysRs.getDouble("min_km");
-                double to   = flysRs.getDouble("max_km");
-                flysRivers.put(name.toLowerCase(), new River(id, name, from, to));
-            }
-        }
-        finally {
-            flysRs.close();
-        }
-
-        List<River> commonRivers = new ArrayList<River>();
-
-        ResultSet aftRs = aftStatements
-            .getStatement("select.gewaesser").executeQuery();
-
-        try {
-            while (aftRs.next()) {
-                String name = aftRs.getString("NAME");
-                River river = flysRivers.get(name.toLowerCase());
-                if (river != null) {
-                    int id2 = aftRs.getInt("GEWAESSER_NR");
-                    river.setId2(id2);
-                    commonRivers.add(river);
-                }
-            }
-        }
-        finally {
-            aftRs.close();
-        }
-
-
-        boolean modified = false;
-
-        for (River river: commonRivers) {
-            modified |= river.sync(context);
-        }
-
-        return modified;
-    }
-}
-// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/aft/Sync.java	Thu Apr 25 11:35:06 2013 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,170 +0,0 @@
-package de.intevation.aft;
-
-import de.intevation.db.ConnectionBuilder;
-
-import de.intevation.utils.XML;
-
-import java.io.File;
-
-import java.net.MalformedURLException;
-import java.net.URL;
-
-import java.sql.SQLException;
-
-import javax.xml.xpath.XPathConstants;
-
-import org.apache.log4j.Logger;
-
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.NodeList;
-
-public class Sync
-{
-    private static Logger log = Logger.getLogger(Sync.class);
-
-    public static final String FLYS = "flys";
-    public static final String AFT  = "aft";
-
-    public static final String XPATH_DIPS   = "/sync/dips/file/text()";
-    public static final String XPATH_REPAIR = "/sync/dips/repair/text()";
-
-    public static final String XPATH_NOTIFICATIONS =
-        "/sync/notifications/notification";
-
-    public static final String CONFIG_FILE =
-        System.getProperty("config.file", "config.xml");
-
-    public static void sendNotifications(Document config) {
-        NodeList notifications = (NodeList)XML.xpath(
-            config, XPATH_NOTIFICATIONS, XPathConstants.NODESET, null, null);
-
-        if (notifications == null) {
-            return;
-        }
-
-        for (int i = 0, N = notifications.getLength(); i < N; ++i) {
-            Element notification = (Element)notifications.item(i);
-            String urlString = notification.getAttribute("url");
-
-            URL url;
-            try {
-                url = new URL(urlString);
-            }
-            catch (MalformedURLException mfue) {
-                log.warn("NOTIFY: Invalid URL '" + urlString + "'. Ignored.", mfue);
-                continue;
-            }
-
-            Notification n = new Notification(notification);
-
-            Document result = n.sendPOST(url);
-
-            if (result != null) {
-                log.info("Send notifcation to '" + urlString + "'.");
-                log.info(XML.toString(result));
-            }
-        }
-    }
-
-    public static void main(String [] args) {
-
-        File configFile = new File(CONFIG_FILE);
-
-        if (!configFile.isFile() || !configFile.canRead()) {
-            log.error("cannot read config file");
-            System.exit(1);
-        }
-
-        Document config = XML.parseDocument(configFile, Boolean.FALSE);
-
-        if (config == null) {
-            log.error("Cannot load config file.");
-            System.exit(1);
-        }
-
-        String dipsF = (String)XML.xpath(
-            config, XPATH_DIPS, XPathConstants.STRING, null, null);
-
-        if (dipsF == null || dipsF.length() == 0) {
-            log.error("Cannot find path to DIPS XML in config.");
-            System.exit(1);
-        }
-
-        File dipsFile = new File(dipsF);
-
-        if (!dipsFile.isFile() || !dipsFile.canRead()) {
-            log.error("DIPS: Cannot find '" + dipsF + "'.");
-            System.exit(1);
-        }
-
-        Document dips = XML.parseDocument(dipsFile, Boolean.FALSE);
-
-        if (dips == null) {
-            log.error("DIPS: Cannot load DIPS document.");
-            System.exit(1);
-        }
-
-        String repairF = (String)XML.xpath(
-            config, XPATH_REPAIR, XPathConstants.STRING, null, null);
-
-        if (repairF != null && repairF.length() > 0) {
-            File repairFile = new File(repairF);
-            if (!repairFile.isFile() || !repairFile.canRead()) {
-                log.warn("REPAIR: Cannot open DIPS repair XSLT file.");
-            }
-            else {
-                Document fixed = XML.transform(dips, repairFile);
-                if (fixed == null) {
-                    log.warn("REPAIR: Fixing DIPS failed.");
-                }
-                else {
-                    dips = fixed;
-                }
-            }
-        }
-
-        int exitCode = 0;
-
-        ConnectionBuilder aftConnectionBuilder =
-            new ConnectionBuilder(AFT, config);
-
-        ConnectionBuilder flysConnectionBuilder =
-            new ConnectionBuilder(FLYS, config);
-
-        SyncContext syncContext = null;
-
-        boolean modified = false;
-        try {
-            syncContext = new SyncContext(
-                aftConnectionBuilder.getConnectedStatements(),
-                flysConnectionBuilder.getConnectedStatements(),
-                dips);
-            syncContext.init();
-            Rivers rivers = new Rivers();
-            modified = rivers.sync(syncContext);
-        }
-        catch (SQLException sqle) {
-            log.error("SYNC: Syncing failed.", sqle);
-            exitCode = 1;
-        }
-        finally {
-            if (syncContext != null) {
-                syncContext.close();
-            }
-        }
-
-        if (modified) {
-            log.info("Modifications found.");
-            sendNotifications(config);
-        }
-        else {
-            log.info("No modifications found.");
-        }
-
-        if (exitCode != 0) {
-            System.exit(1);
-        }
-    }
-}
-// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/aft/SyncContext.java	Thu Apr 25 11:35:06 2013 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,220 +0,0 @@
-package de.intevation.aft;
-
-import de.intevation.db.ConnectedStatements;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.log4j.Logger;
-
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.NodeList;
-
-public class SyncContext
-{
-    private static Logger log = Logger.getLogger(SyncContext.class);
-
-    protected ConnectedStatements             aftStatements;
-    protected ConnectedStatements             flysStatements;
-    protected Document                        dips;
-
-    protected Map<Long, DIPSGauge>            numberToGauge;
-    protected Map<TimeInterval, TimeInterval> flysTimeIntervals;
-
-    public SyncContext() {
-    }
-
-    public SyncContext(
-        ConnectedStatements aftStatements,
-        ConnectedStatements flysStatements,
-        Document            dips
-    ) {
-        this.aftStatements  = aftStatements;
-        this.flysStatements = flysStatements;
-        this.dips           = dips;
-    }
-
-    public void init() throws SQLException {
-        numberToGauge       = indexByNumber(dips);
-        flysTimeIntervals   = loadTimeIntervals();
-    }
-
-    public ConnectedStatements getAftStatements() {
-        return aftStatements;
-    }
-
-    public void setAftStatements(ConnectedStatements aftStatements) {
-        this.aftStatements = aftStatements;
-    }
-
-    public ConnectedStatements getFlysStatements() {
-        return flysStatements;
-    }
-
-    public void setFlysStatements(ConnectedStatements flysStatements) {
-        this.flysStatements = flysStatements;
-    }
-
-    public Document getDips() {
-        return dips;
-    }
-
-    public void setDips(Document dips) {
-        this.dips = dips;
-    }
-
-    void close() {
-        aftStatements.close();
-        flysStatements.close();
-    }
-
-    public static Long numberToLong(String s) {
-        try {
-            return Long.valueOf(s.trim());
-        }
-        catch (NumberFormatException nfe) {
-        }
-        return null;
-    }
-
-    public Map<Long, DIPSGauge> getDIPSGauges() {
-        return numberToGauge;
-    }
-
-    public Map<Long, DIPSGauge> getDIPSGauges(
-        String riverName,
-        double from,
-        double to
-    ) {
-        if (from > to) {
-            double t = from;
-            from = to;
-            to = t;
-        }
-
-        riverName = riverName.toLowerCase();
-
-        Map<Long, DIPSGauge> result = new HashMap<Long, DIPSGauge>();
-
-        for (Map.Entry<Long, DIPSGauge> entry: numberToGauge.entrySet()) {
-            DIPSGauge gauge = entry.getValue();
-            // XXX: Maybe a bit too sloppy.
-            if (!riverName.contains(gauge.getRiverName().toLowerCase())) {
-                continue;
-            }
-            double station = gauge.getStation();
-            if (station >= from && station <= to) {
-                result.put(entry.getKey(), gauge);
-            }
-        }
-
-        return result;
-    }
-
-    protected static Map<Long, DIPSGauge> indexByNumber(Document document) {
-        Map<Long, DIPSGauge> map = new HashMap<Long, DIPSGauge>();
-        NodeList nodes = document.getElementsByTagName("PEGELSTATION");
-        for (int i = nodes.getLength()-1; i >= 0; --i) {
-            Element element = (Element)nodes.item(i);
-            String numberString = element.getAttribute("NUMMER");
-            Long number = numberToLong(numberString);
-            if (number != null) {
-                DIPSGauge newG = new DIPSGauge(element);
-                DIPSGauge oldG = map.put(number, newG);
-                if (oldG != null) {
-                    log.warn("DIPS: '" + newG.getName() +
-                        "' collides with '" + oldG.getName() +
-                        "' on gauge number " + number + ".");
-                }
-            }
-            else {
-                log.warn("DIPS: Gauge '" + element.getAttribute("NAME") +
-                    "' has invalid gauge number '" + numberString + "'.");
-            }
-        }
-        return map;
-    }
-
-    protected Map<TimeInterval, TimeInterval> loadTimeIntervals()
-    throws SQLException {
-
-        boolean debug = log.isDebugEnabled();
-
-        Map<TimeInterval, TimeInterval> intervals =
-            new TreeMap<TimeInterval, TimeInterval>();
-
-        ResultSet rs = flysStatements
-            .getStatement("select.timeintervals")
-            .executeQuery();
-
-        try {
-            while (rs.next()) {
-                int  id    = rs.getInt("id");
-                Date start = rs.getDate("start_time");
-                Date stop  = rs.getDate("stop_time");
-
-                if (debug) {
-                    log.debug("id:    " + id);
-                    log.debug("start: " + start);
-                    log.debug("stop:  " + stop);
-                }
-
-                TimeInterval ti = new TimeInterval(id, start, stop);
-                intervals.put(ti, ti);
-            }
-        }
-        finally {
-            rs.close();
-        }
-
-        if (debug) {
-            log.debug("loaded time intervals: " + intervals.size());
-        }
-
-        return intervals;
-    }
-
-    public TimeInterval fetchOrCreateFLYSTimeInterval(TimeInterval key)
-    throws SQLException
-    {
-        TimeInterval old = flysTimeIntervals.get(key);
-        if (old != null) {
-            return old;
-        }
-
-        ResultSet rs = flysStatements
-            .getStatement("next.timeinterval.id")
-            .executeQuery();
-
-        try {
-            rs.next();
-            key.setId(rs.getInt("time_interval_id"));
-        }
-        finally {
-            rs.close();
-        }
-
-        if (log.isDebugEnabled()) {
-            log.debug("FLYS: Created time interval id: " + key.getId());
-            log.debug("FLYS: " + key);
-        }
-
-        flysStatements.getStatement("insert.timeinterval")
-            .clearParameters()
-            .setInt("id", key.getId())
-            .setObject("start_time", key.getStart())
-            .setObject("stop_time", key.getStop())
-            .execute();
-
-        flysTimeIntervals.put(key, key);
-
-        return key;
-    }
-}
-// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/aft/TimeInterval.java	Thu Apr 25 11:35:06 2013 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,70 +0,0 @@
-package de.intevation.aft;
-
-import java.util.Date;
-
-public class TimeInterval
-implements   Comparable<TimeInterval>
-{
-    protected int  id;
-    protected Date start;
-    protected Date stop;
-
-    public TimeInterval() {
-    }
-
-    public TimeInterval(Date start, Date stop) {
-        this.start = start;
-        this.stop  = stop;
-    }
-
-    public TimeInterval(int id, Date start, Date stop) {
-        this(start, stop);
-        this.id    = id;
-    }
-
-    protected static int compare(Date d1, Date d2) {
-        long s1 = d1 != null ? d1.getTime()/1000L : 0L;
-        long s2 = d2 != null ? d2.getTime()/1000L : 0L;
-        long diff = s1 - s2;
-        return diff < 0L
-            ? -1
-            : diff > 0L ? 1 : 0;
-    }
-
-    @Override
-    public int compareTo(TimeInterval other) {
-        int cmp = compare(start, other.start);
-        return cmp != 0
-            ? cmp
-            : compare(stop, other.stop);
-    }
-
-    public int getId() {
-        return id;
-    }
-
-    public void setId(int id) {
-        this.id = id;
-    }
-
-    public Date getStart() {
-        return start;
-    }
-
-    public void setStart(Date start) {
-        this.start = start;
-    }
-
-    public Date getStop() {
-        return stop;
-    }
-
-    public void setStop(Date stop) {
-        this.stop = stop;
-    }
-
-    public String toString() {
-        return "[TimeInterval: start=" + start + ", stop=" + stop + "]";
-    }
-}
-// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/aft/WQ.java	Thu Apr 25 11:35:06 2013 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,67 +0,0 @@
-package de.intevation.aft;
-
-import java.util.Comparator;
-
-public class WQ
-{
-    public static final double EPSILON = 1e-4;
-
-    public static final Comparator<WQ> EPS_CMP = new Comparator<WQ>() {
-        @Override
-        public int compare(WQ a, WQ b) {
-            int cmp = compareEpsilon(a.q, b.q);
-            if (cmp != 0) return cmp;
-            return compareEpsilon(a.w, b.w);
-        }
-    };
-
-    protected int id;
-
-    protected double w;
-    protected double q;
-
-    public WQ() {
-    }
-
-    public WQ(double w, double q) {
-        this.w = w;
-        this.q = q;
-    }
-
-    public WQ(int id, double w, double q) {
-        this.id = id;
-        this.w  = w;
-        this.q  = q;
-    }
-
-    public static final int compareEpsilon(double a, double b) {
-        double diff = a - b;
-        if (diff < -EPSILON) return -1;
-        return diff > EPSILON ? +1 : 0;
-    }
-
-    public int getId() {
-        return id;
-    }
-
-    public void setId(int id) {
-        this.id = id;
-    }
-
-    public double getW() {
-        return w;
-    }
-
-    public void setW(double w) {
-        this.w = w;
-    }
-
-    public double getQ() {
-        return q;
-    }
-
-    public void setQ(double q) {
-        this.q = q;
-    }
-}
-// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/aft/WQDiff.java	Thu Apr 25 11:35:06 2013 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,118 +0,0 @@
-package de.intevation.aft;
-
-import de.intevation.db.ConnectedStatements;
-import de.intevation.db.SymbolicStatement;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.TreeSet;
-
-public class WQDiff
-{
-    protected Set<WQ> toAdd;
-    protected Set<WQ> toDelete;
-
-    public WQDiff() {
-    }
-
-    public WQDiff(Collection<WQ> a, Collection<WQ> b) {
-        toAdd    = new TreeSet<WQ>(WQ.EPS_CMP);
-        toDelete = new TreeSet<WQ>(WQ.EPS_CMP);
-        build(a, b);
-    }
-
-    public void build(Collection<WQ> a, Collection<WQ> b) {
-        toAdd.addAll(b);
-        toAdd.removeAll(a);
-
-        toDelete.addAll(a);
-        toDelete.removeAll(b);
-    }
-
-    public void clear() {
-        toAdd.clear();
-        toDelete.clear();
-    }
-
-    public Set<WQ> getToAdd() {
-        return toAdd;
-    }
-
-    public void setToAdd(Set<WQ> toAdd) {
-        this.toAdd = toAdd;
-    }
-
-    public Set<WQ> getToDelete() {
-        return toDelete;
-    }
-
-    public void setToDelete(Set<WQ> toDelete) {
-        this.toDelete = toDelete;
-    }
-
-    public boolean hasChanges() {
-        return !(toAdd.isEmpty() && toDelete.isEmpty());
-    }
-
-    public void writeChanges(
-        SyncContext context,
-        int         tableId
-    )
-    throws SQLException
-    {
-        ConnectedStatements flysStatements = context.getFlysStatements();
-
-        // Delete the old entries
-        if (!toDelete.isEmpty()) {
-            SymbolicStatement.Instance deleteDTV =
-                flysStatements.getStatement("delete.discharge.table.value");
-            for (WQ wq: toDelete) {
-                deleteDTV
-                    .clearParameters()
-                    .setInt("id", wq.getId())
-                    .execute();
-            }
-        }
-
-        // Add the new entries.
-        if (!toAdd.isEmpty()) {
-            SymbolicStatement.Instance nextId =
-                flysStatements.getStatement("next.discharge.table.values.id");
-
-            SymbolicStatement.Instance insertDTV =
-                flysStatements.getStatement("insert.discharge.table.value");
-
-            // Recycle old ids as much as possible.
-            Iterator<WQ> oldIds = toDelete.iterator();
-
-            // Create ids for new entries.
-            for (WQ wq: toAdd) {
-                if (oldIds.hasNext()) {
-                    wq.setId(oldIds.next().getId());
-                }
-                else {
-                    ResultSet rs = nextId.executeQuery();
-                    rs.next();
-                    wq.setId(rs.getInt("discharge_table_values_id"));
-                    rs.close();
-                }
-            }
-
-            // Write the new entries.
-            for (WQ wq: toAdd) {
-                insertDTV
-                    .clearParameters()
-                    .setInt("id", wq.getId())
-                    .setInt("table_id", tableId)
-                    .setDouble("w", wq.getW())
-                    .setDouble("q", wq.getQ())
-                    .execute();
-            }
-        }
-    }
-}
-// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/db/ConnectedStatements.java	Thu Apr 25 11:35:06 2013 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,110 +0,0 @@
-package de.intevation.db;
-
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.SQLException;
-import java.sql.Savepoint;
-
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-public class ConnectedStatements
-{
-    private static Logger log = Logger.getLogger(ConnectedStatements.class);
-
-    protected Connection connection;
-
-    protected Map<String, SymbolicStatement> statements;
-
-    protected Map<String, SymbolicStatement.Instance> boundStatements;
-
-    protected Deque<Savepoint> savepoints;
-
-    public ConnectedStatements(
-        Connection connection,
-        Map<String, SymbolicStatement> statements
-    )
-    throws SQLException
-    {
-        this.connection = connection;
-        this.statements = statements;
-        checkSavePoints();
-
-        boundStatements = new HashMap<String, SymbolicStatement.Instance>();
-    }
-
-    protected void checkSavePoints() throws SQLException {
-        DatabaseMetaData metaData = connection.getMetaData();
-        if (metaData.supportsSavepoints()) {
-            log.info("Driver '" + metaData.getDriverName() +
-                "' does support savepoints.");
-            savepoints = new ArrayDeque<Savepoint>();
-        }
-        else {
-            log.info("Driver '" + metaData.getDriverName() +
-                "' does not support savepoints.");
-        }
-    }
-
-    public SymbolicStatement.Instance getStatement(String key)
-    throws SQLException
-    {
-        SymbolicStatement.Instance stmnt = boundStatements.get(key);
-        if (stmnt != null) {
-            return stmnt;
-        }
-
-        SymbolicStatement ss = statements.get(key);
-        if (ss == null) {
-            return null;
-        }
-
-        stmnt = ss.new Instance(connection);
-        boundStatements.put(key, stmnt);
-        return stmnt;
-    }
-
-    public void beginTransaction() throws SQLException {
-        if (savepoints != null) {
-            savepoints.push(connection.setSavepoint());
-        }
-    }
-
-    public void commitTransaction() throws SQLException {
-        if (savepoints != null) {
-            savepoints.pop();
-        }
-        connection.commit();
-    }
-
-    public void rollbackTransaction() throws SQLException {
-        if (savepoints != null) {
-            Savepoint savepoint = savepoints.pop();
-            connection.rollback(savepoint);
-        }
-        else {
-            connection.rollback();
-        }
-    }
-
-    public void close() {
-        for (SymbolicStatement.Instance s: boundStatements.values()) {
-            s.close();
-        }
-
-        try {
-            if (savepoints != null && !savepoints.isEmpty()) {
-                Savepoint savepoint = savepoints.peekFirst();
-                connection.rollback(savepoint);
-            }
-            connection.close();
-        }
-        catch (SQLException sqle) {
-        }
-    }
-}
-// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/db/ConnectionBuilder.java	Thu Apr 25 11:35:06 2013 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,121 +0,0 @@
-package de.intevation.db;
-
-import de.intevation.utils.XML;
-
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import javax.xml.xpath.XPathConstants;
-
-import org.apache.log4j.Logger;
-
-import org.w3c.dom.Document;
-import org.w3c.dom.NodeList;
-
-public class ConnectionBuilder
-{
-    private static Logger log = Logger.getLogger(ConnectionBuilder.class);
-
-    public static final String XPATH_DRIVER     = "/sync/side[@name=$type]/db/driver/text()";
-    public static final String XPATH_USER       = "/sync/side[@name=$type]/db/user/text()";
-    public static final String XPATH_PASSWORD   = "/sync/side[@name=$type]/db/password/text()";
-    public static final String XPATH_URL        = "/sync/side[@name=$type]/db/url/text()";
-    public static final String XPATH_EXEC_LOGIN = "/sync/side[@name=$type]/db/execute-login/statement";
-
-    protected String       type;
-    protected String       driver;
-    protected String       user;
-    protected String       password;
-    protected String       url;
-    protected List<String> loginStatements;
-
-    public ConnectionBuilder(String type, Document document) {
-        this.type = type;
-        extractCredentials(document);
-    }
-
-    protected static List<String> extractStrings(NodeList nodes) {
-        int N = nodes.getLength();
-        List<String> result = new ArrayList<String>(N);
-        for (int i = 0; i < N; ++i) {
-            result.add(nodes.item(i).getTextContent());
-        }
-        return result;
-    }
-
-    protected void extractCredentials(Document document) {
-        HashMap<String, String> map = new HashMap<String, String>();
-        map.put("type", type);
-
-        driver = (String)XML.xpath(
-            document, XPATH_DRIVER, XPathConstants.STRING, null, map);
-        user = (String)XML.xpath(
-            document, XPATH_USER, XPathConstants.STRING, null, map);
-        password = (String)XML.xpath(
-            document, XPATH_PASSWORD, XPathConstants.STRING, null, map);
-        url = (String)XML.xpath(
-            document, XPATH_URL, XPathConstants.STRING, null, map);
-        loginStatements = extractStrings((NodeList)XML.xpath(
-            document, XPATH_EXEC_LOGIN, XPathConstants.NODESET, null, map));
-
-        if (log.isDebugEnabled()) {
-            log.debug("driver: " + driver);
-            log.debug("user: " + user);
-            log.debug("password: *******");
-            log.debug("url: " + url);
-            log.debug("number of login statements: " + loginStatements.size());
-        }
-    }
-
-    public Connection getConnection() throws SQLException {
-
-        if (driver != null && driver.length() > 0) {
-            try {
-                Class.forName(driver);
-            }
-            catch (ClassNotFoundException cnfe) {
-                throw new SQLException(cnfe);
-            }
-        }
-
-        Connection connection =
-            DriverManager.getConnection(url, user, password);
-
-        connection.setAutoCommit(false);
-
-        DatabaseMetaData metaData = connection.getMetaData();
-
-        if (metaData.supportsTransactionIsolationLevel(
-            Connection.TRANSACTION_READ_UNCOMMITTED)) {
-            connection.setTransactionIsolation(
-                Connection.TRANSACTION_READ_UNCOMMITTED);
-        }
-
-        for (String sql: loginStatements) {
-            Statement stmnt = connection.createStatement();
-            try {
-                stmnt.execute(sql);
-            }
-            finally {
-                stmnt.close();
-            }
-        }
-
-        return connection;
-    }
-
-    public ConnectedStatements getConnectedStatements() throws SQLException {
-        return new ConnectedStatements(
-            getConnection(),
-            new Statements(type, driver != null ? driver : "")
-                .getStatements());
-    }
-}
-// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/db/Statements.java	Thu Apr 25 11:35:06 2013 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,124 +0,0 @@
-package de.intevation.db;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.log4j.Logger;
-
-public class Statements
-{
-    private static Logger log = Logger.getLogger(Statements.class);
-
-    public static final String RESOURCE_PATH = "/sql/";
-    public static final String COMMON_PROPERTIES = "-common.properties";
-
-    protected String type;
-    protected String driver;
-
-    protected Map<String, SymbolicStatement> statements;
-
-    public Statements(String type, String driver) {
-        this.type   = type;
-        this.driver = driver;
-    }
-
-    public SymbolicStatement getStatement(String key) {
-        return getStatements().get(key);
-    }
-
-    public Map<String, SymbolicStatement> getStatements() {
-        if (statements == null) {
-            statements = loadStatements();
-        }
-        return statements;
-    }
-
-    protected Map<String, SymbolicStatement> loadStatements() {
-        Map<String, SymbolicStatement> statements =
-            new HashMap<String, SymbolicStatement>();
-
-        Properties properties = loadProperties();
-
-        for (Enumeration<?> e = properties.propertyNames(); e.hasMoreElements();) {
-            String key = (String)e.nextElement();
-            String value = properties.getProperty(key);
-            SymbolicStatement symbolic = new SymbolicStatement(value);
-            statements.put(key, symbolic);
-        }
-
-        return statements;
-    }
-
-    protected String driverToProperties() {
-        return
-            type + "-" +
-            driver.replace('.', '-').toLowerCase() + ".properties";
-    }
-
-    protected Properties loadCommon() {
-        Properties common = new Properties();
-
-        String path = RESOURCE_PATH + type + COMMON_PROPERTIES;
-
-        InputStream in = Statements.class.getResourceAsStream(path);
-
-        if (in != null) {
-            try {
-                common.load(in);
-            }
-            catch (IOException ioe) {
-                log.error("cannot load defaults: " + path, ioe);
-            }
-            finally {
-                try {
-                    in.close();
-                }
-                catch (IOException ioe) {
-                }
-            }
-        }
-        else {
-            log.warn("cannot find: " + path);
-        }
-
-        return common;
-    }
-
-    protected Properties loadProperties() {
-
-        Properties common = loadCommon();
-
-        Properties properties = new Properties(common);
-
-        String path = RESOURCE_PATH + driverToProperties();
-
-        InputStream in = Statements.class.getResourceAsStream(path);
-
-        if (in != null) {
-            try {
-                properties.load(in);
-            }
-            catch (IOException ioe) {
-                log.error("cannot load statements: " + path, ioe);
-            }
-            finally {
-                try {
-                    in.close();
-                }
-                catch (IOException ioe) {
-                }
-            }
-        }
-        else {
-            log.warn("cannot find: " + path);
-        }
-
-        return properties;
-    }
-}
-// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/db/SymbolicStatement.java	Thu Apr 25 11:35:06 2013 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,196 +0,0 @@
-package de.intevation.db;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Timestamp;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.log4j.Logger;
-
-public class SymbolicStatement {
-
-    private static Logger log = Logger.getLogger(SymbolicStatement.class);
-
-    public static final Pattern VAR = Pattern.compile(":([a-zA-Z0-9_]+)");
-
-    protected String statement;
-    protected String compiled;
-    protected Map<String, List<Integer>> positions;
-
-    public class Instance {
-
-        /** TODO: Support more types. */
-
-        protected PreparedStatement stmnt;
-
-        public Instance(Connection connection) throws SQLException {
-            stmnt = connection.prepareStatement(compiled);
-        }
-
-        public void close() {
-            try {
-                stmnt.close();
-            }
-            catch (SQLException sqle) {
-                log.error("cannot close statement", sqle);
-            }
-        }
-
-        public Instance setInt(String key, int value)
-        throws SQLException
-        {
-            List<Integer> pos = positions.get(key.toLowerCase());
-            if (pos != null) {
-                for (Integer p: pos) {
-                    stmnt.setInt(p, value);
-                }
-            }
-
-            return this;
-        }
-
-        public Instance setString(String key, String value)
-        throws SQLException
-        {
-            List<Integer> pos = positions.get(key.toLowerCase());
-            if (pos != null) {
-                for (Integer p: pos) {
-                    stmnt.setString(p, value);
-                }
-            }
-            return this;
-        }
-
-        public Instance setObject(String key, Object value)
-        throws SQLException
-        {
-            List<Integer> pos = positions.get(key.toLowerCase());
-            if (pos != null) {
-                for (Integer p: pos) {
-                    stmnt.setObject(p, value);
-                }
-            }
-            return this;
-        }
-
-        public Instance setTimestamp(String key, Timestamp value)
-        throws SQLException
-        {
-            List<Integer> pos = positions.get(key.toLowerCase());
-            if (pos != null) {
-                for (Integer p: pos) {
-                    stmnt.setTimestamp(p, value);
-                }
-            }
-            return this;
-        }
-
-        public Instance setDouble(String key, double value)
-        throws SQLException
-        {
-            List<Integer> pos = positions.get(key.toLowerCase());
-            if (pos != null) {
-                for (Integer p: pos) {
-                    stmnt.setDouble(p, value);
-                }
-            }
-            return this;
-        }
-
-        public Instance setLong(String key, long value)
-        throws SQLException
-        {
-            List<Integer> pos = positions.get(key.toLowerCase());
-            if (pos != null) {
-                for (Integer p: pos) {
-                    stmnt.setLong(p, value);
-                }
-            }
-            return this;
-        }
-
-        public Instance setNull(String key, int sqlType)
-        throws SQLException
-        {
-            List<Integer> pos = positions.get(key.toLowerCase());
-            if (pos != null) {
-                for (Integer p: pos) {
-                    stmnt.setNull(p, sqlType);
-                }
-            }
-            return this;
-        }
-
-        public Instance set(Map<String, Object> map) throws SQLException {
-            for (Map.Entry<String, Object> entry: map.entrySet()) {
-                setObject(entry.getKey(), entry.getValue());
-            }
-            return this;
-        }
-
-        public Instance clearParameters() throws SQLException {
-            stmnt.clearParameters();
-            return this;
-        }
-
-        public boolean execute() throws SQLException {
-            if (log.isDebugEnabled()) {
-                log.debug("execute: " + compiled);
-            }
-            return stmnt.execute();
-        }
-
-        public ResultSet executeQuery() throws SQLException {
-            if (log.isDebugEnabled()) {
-                log.debug("query: " + compiled);
-            }
-            return stmnt.executeQuery();
-        }
-
-        public int executeUpdate() throws SQLException {
-            if (log.isDebugEnabled()) {
-                log.debug("update: " + compiled);
-            }
-            return stmnt.executeUpdate();
-        }
-
-    } // class Instance
-
-    public SymbolicStatement(String statement) {
-        this.statement = statement;
-        compile();
-    }
-
-    public String getStatement() {
-        return statement;
-    }
-
-    protected void compile() {
-        positions = new HashMap<String, List<Integer>>();
-
-        StringBuffer sb = new StringBuffer();
-        Matcher m = VAR.matcher(statement);
-        int index = 1;
-        while (m.find()) {
-            String key = m.group(1).toLowerCase();
-            List<Integer> list = positions.get(key);
-            if (list == null) {
-                list = new ArrayList<Integer>();
-                positions.put(key, list);
-            }
-            list.add(index++);
-            m.appendReplacement(sb, "?");
-        }
-        m.appendTail(sb);
-        compiled = sb.toString();
-    }
-} // class SymbolicStatement
--- a/flys-aft/src/main/java/org/dive4elements/etl/utils/XML.java	Thu Apr 25 11:35:06 2013 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,332 +0,0 @@
-package de.intevation.utils;
-
-import java.io.BufferedInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.StringWriter;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.xml.namespace.NamespaceContext;
-import javax.xml.namespace.QName;
-
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerConfigurationException;
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.TransformerFactoryConfigurationError;
-
-import javax.xml.transform.dom.DOMResult;
-import javax.xml.transform.dom.DOMSource;
-
-import javax.xml.transform.stream.StreamResult;
-import javax.xml.transform.stream.StreamSource;
-
-import javax.xml.xpath.XPath;
-import javax.xml.xpath.XPathExpressionException;
-import javax.xml.xpath.XPathFactory;
-import javax.xml.xpath.XPathVariableResolver;
-
-import org.apache.log4j.Logger;
-
-import org.w3c.dom.Document;
-
-import org.xml.sax.SAXException;
-
-public final class XML
-{
-    /** Logger for this class. */
-    private static Logger log = Logger.getLogger(XML.class);
-
-    public static class MapXPathVariableResolver
-    implements          XPathVariableResolver
-    {
-        protected Map<String, String> variables;
-
-
-        public MapXPathVariableResolver() {
-            this.variables = new HashMap<String, String>();
-        }
-
-
-        public MapXPathVariableResolver(Map<String, String> variables) {
-            this.variables = variables;
-        }
-
-
-        public void addVariable(String name, String value) {
-            variables.put(name, value);
-        }
-
-
-        @Override
-        public Object resolveVariable(QName variableName) {
-            String key = variableName.getLocalPart();
-            return variables.get(key);
-        }
-    } // class MapXPathVariableResolver
-
-    private XML() {
-    }
-
-        /**
-     * Creates a new XML document
-     * @return the new XML document ot null if something went wrong during
-     * creation.
-     */
-    public static final Document newDocument() {
-        DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
-        factory.setNamespaceAware(true);
-
-        try {
-            return factory.newDocumentBuilder().newDocument();
-        }
-        catch (ParserConfigurationException pce) {
-            log.error(pce.getLocalizedMessage(), pce);
-        }
-        return null;
-    }
-
-    /**
-     * Loads a XML document namespace aware from a file
-     * @param file The file to load.
-     * @return the XML document or null if something went wrong
-     * during loading.
-     */
-    public static final Document parseDocument(File file) {
-        return parseDocument(file, Boolean.TRUE);
-    }
-
-    public static final Document parseDocument(File file, Boolean namespaceAware) {
-        InputStream inputStream = null;
-        try {
-            inputStream = new BufferedInputStream(new FileInputStream(file));
-            return parseDocument(inputStream, namespaceAware);
-        }
-        catch (IOException ioe) {
-            log.error(ioe.getLocalizedMessage(), ioe);
-        }
-        finally {
-            if (inputStream != null) {
-                try { inputStream.close(); }
-                catch (IOException ioe) {}
-            }
-        }
-        return null;
-    }
-
-
-    public static final Document parseDocument(InputStream inputStream) {
-        return parseDocument(inputStream, Boolean.TRUE);
-    }
-
-    public static final Document parseDocument(
-        InputStream inputStream,
-        Boolean     namespaceAware
-    ) {
-        DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
-
-        if (namespaceAware != null) {
-            factory.setNamespaceAware(namespaceAware.booleanValue());
-        }
-
-        try {
-            return factory.newDocumentBuilder().parse(inputStream);
-        }
-        catch (ParserConfigurationException pce) {
-            log.error(pce.getLocalizedMessage(), pce);
-        }
-        catch (SAXException se) {
-            log.error(se.getLocalizedMessage(), se);
-        }
-        catch (IOException ioe) {
-            log.error(ioe.getLocalizedMessage(), ioe);
-        }
-        return null;
-    }
-
-
-    /**
-     * Creates a new XPath without a namespace context.
-     * @return the new XPath.
-     */
-    public static final XPath newXPath() {
-        return newXPath(null, null);
-    }
-
-    /**
-     * Creates a new XPath with a given namespace context.
-     * @param namespaceContext The namespace context to be used or null
-     * if none should be used.
-     * @return The new XPath
-     */
-    public static final XPath newXPath(
-        NamespaceContext      namespaceContext,
-        XPathVariableResolver resolver)
-    {
-        XPathFactory factory = XPathFactory.newInstance();
-        XPath        xpath   = factory.newXPath();
-        if (namespaceContext != null) {
-            xpath.setNamespaceContext(namespaceContext);
-        }
-
-        if (resolver != null) {
-            xpath.setXPathVariableResolver(resolver);
-        }
-        return xpath;
-    }
-
-    /**
-     * Evaluates an XPath query on a given object and returns the result
-     * as a given type. No namespace context is used.
-     * @param root  The object which is used as the root of the tree to
-     * be searched in.
-     * @param query The XPath query
-     * @param returnTyp The type of the result.
-     * @return The result of type 'returnTyp' or null if something
-     * went wrong during XPath evaluation.
-     */
-    public static final Object xpath(
-        Object root,
-        String query,
-        QName  returnTyp
-    ) {
-        return xpath(root, query, returnTyp, null);
-    }
-
-    /**
-     * Evaluates an XPath query on a given object and returns the result
-     * as a given type. Optionally a namespace context is used.
-     * @param root The object which is used as the root of the tree to
-     * be searched in.
-     * @param query The XPath query
-     * @param returnType The type of the result.
-     * @param namespaceContext The namespace context to be used or null
-     * if none should be used.
-     * @return The result of type 'returnTyp' or null if something
-     * went wrong during XPath evaluation.
-     */
-    public static final Object xpath(
-        Object           root,
-        String           query,
-        QName            returnType,
-        NamespaceContext namespaceContext
-    ) {
-        return xpath(root, query, returnType, namespaceContext, null);
-    }
-
-    public static final Object xpath(
-        Object           root,
-        String           query,
-        QName            returnType,
-        NamespaceContext namespaceContext,
-        Map<String, String> variables)
-    {
-        if (root == null) {
-            return null;
-        }
-
-        XPathVariableResolver resolver = variables != null
-            ? new MapXPathVariableResolver(variables)
-            : null;
-
-        try {
-            XPath xpath = newXPath(namespaceContext, resolver);
-            if (xpath != null) {
-                return xpath.evaluate(query, root, returnType);
-            }
-        }
-        catch (XPathExpressionException xpee) {
-            log.error(xpee.getLocalizedMessage(), xpee);
-        }
-
-        return null;
-    }
-
-    public static Document transform(
-        Document           document,
-        File               xformFile
-    ) {
-        try {
-            Transformer transformer =
-                TransformerFactory
-                    .newInstance()
-                    .newTransformer(
-                        new StreamSource(xformFile));
-
-            DOMResult result = new DOMResult();
-
-            transformer.transform(new DOMSource(document), result);
-
-            return (Document)result.getNode();
-        }
-        catch (TransformerConfigurationException tce) {
-            log.error(tce, tce);
-        }
-        catch (TransformerException te) {
-            log.error(te, te);
-        }
-
-        return null;
-    }
-
-   /**
-     * Streams out an XML document to a given output stream.
-     * @param document The document to be streamed out.
-     * @param out      The output stream to be used.
-     * @return true if operation succeeded else false.
-     */
-    public static boolean toStream(Document document, OutputStream out) {
-        try {
-            Transformer transformer =
-                TransformerFactory.newInstance().newTransformer();
-            DOMSource    source = new DOMSource(document);
-            StreamResult result = new StreamResult(out);
-            transformer.transform(source, result);
-            return true;
-        }
-        catch (TransformerConfigurationException tce) {
-            log.error(tce.getLocalizedMessage(), tce);
-        }
-        catch (TransformerFactoryConfigurationError tfce) {
-            log.error(tfce.getLocalizedMessage(), tfce);
-        }
-        catch (TransformerException te) {
-            log.error(te.getLocalizedMessage(), te);
-        }
-
-        return false;
-    }
-
-    public static String toString(Document document) {
-        try {
-            Transformer transformer =
-                TransformerFactory.newInstance().newTransformer();
-            DOMSource    source = new DOMSource(document);
-            StringWriter out    = new StringWriter();
-            StreamResult result = new StreamResult(out);
-            transformer.transform(source, result);
-            out.flush();
-            return out.toString();
-        }
-        catch (TransformerConfigurationException tce) {
-            log.error(tce.getLocalizedMessage(), tce);
-        }
-        catch (TransformerFactoryConfigurationError tfce) {
-            log.error(tfce.getLocalizedMessage(), tfce);
-        }
-        catch (TransformerException te) {
-            log.error(te.getLocalizedMessage(), te);
-        }
-
-        return null;
-    }
-}
-// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flys-aft/src/main/java/org/dive4elements/river/etl/aft/DIPSGauge.java	Thu Apr 25 11:42:54 2013 +0200
@@ -0,0 +1,193 @@
+package de.intevation.aft;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Logger;
+
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+public class DIPSGauge
+{
+    private static Logger log = Logger.getLogger(DIPSGauge.class);
+
+    public static final Pattern DATE_PATTERN = Pattern.compile(
+        "(\\d{4})-(\\d{2})-(\\d{2})\\s+(\\d{2}):(\\d{2}):(\\d{2})");
+
+    public static final Comparator<Datum> DATE_CMP = new Comparator<Datum>() {
+        public int compare(Datum a, Datum b) {
+            return a.date.compareTo(b.date);
+        }
+    };
+
+    public static class Datum {
+
+        protected double value;
+        protected Date   date;
+
+        public Datum() {
+        }
+
+        public Datum(Element element) {
+            value = Double.parseDouble(element.getAttribute("WERT"));
+            String dateString = element.getAttribute("GUELTIGAB");
+            if (dateString.length() == 0) {
+                throw
+                    new IllegalArgumentException("missing GUELTIGAB attribute");
+            }
+            Matcher m = DATE_PATTERN.matcher(dateString);
+            if (!m.matches()) {
+                throw
+                    new IllegalArgumentException("GUELTIGAB does not match");
+            }
+
+            int year  = Integer.parseInt(m.group(1));
+            int month = Integer.parseInt(m.group(2));
+            int day   = Integer.parseInt(m.group(3));
+            int hours = Integer.parseInt(m.group(4));
+            int mins  = Integer.parseInt(m.group(5));
+            int secs  = Integer.parseInt(m.group(6));
+
+            Calendar cal = Calendar.getInstance();
+            cal.set(year, month, day, hours, mins, secs);
+
+            date = cal.getTime();
+        }
+
+        public double getValue() {
+            return value;
+        }
+
+        public void setValue(double value) {
+            this.value = value;
+        }
+
+        public Date getDate() {
+            return date;
+        }
+
+        public void setDate(Date date) {
+            this.date = date;
+        }
+    } // class datum
+
+    protected double aeo;
+
+    protected double station;
+
+    protected String name;
+
+    protected String riverName;
+
+    protected List<Datum> datums;
+
+    protected int flysId;
+
+    protected String aftName;
+
+    protected Long   officialNumber;
+
+    public DIPSGauge() {
+    }
+
+    public DIPSGauge(Element element) {
+
+        name          = element.getAttribute("NAME");
+        riverName     = element.getAttribute("GEWAESSER");
+
+        String aeoString = element.getAttribute("EINZUGSGEBIET_AEO");
+        if (aeoString.length() == 0) {
+            log.warn("DIPS: Setting AEO of gauge '" + name + "' to zero.");
+            aeoString = "0";
+        }
+        aeo = Double.parseDouble(aeoString);
+
+        String stationString = element.getAttribute("STATIONIERUNG");
+        if (stationString.length() == 0) {
+            log.warn("DIPS: Setting station of gauge '" + name + "' to zero.");
+            stationString = "-99999";
+        }
+        station = Double.parseDouble(stationString);
+        if (station == 0d) {
+            log.warn("DIPS: Station of gauge '" + name + "' is zero.");
+        }
+
+        datums = new ArrayList<Datum>();
+        NodeList nodes = element.getElementsByTagName("PNP");
+        for (int i = 0, N = nodes.getLength(); i < N; ++i) {
+            Element e = (Element)nodes.item(i);
+            Datum datum = new Datum(e);
+            datums.add(datum);
+        }
+        Collections.sort(datums, DATE_CMP);
+    }
+
+    public List<Datum> getDatums() {
+        return datums;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getRiverName() {
+        return riverName;
+    }
+
+    public int getFlysId() {
+        return flysId;
+    }
+
+    public void setFlysId(int flysId) {
+        this.flysId = flysId;
+    }
+
+    public String getAftName() {
+        return aftName != null ? aftName : name;
+    }
+
+    public void setAftName(String aftName) {
+        this.aftName = aftName;
+    }
+
+    public double getStation() {
+        return station;
+    }
+
+    public double getAeo() {
+        return aeo;
+    }
+
+    public void setAeo(double aeo) {
+        this.aeo = aeo;
+    }
+
+    public void setStation(double station) {
+        this.station = station;
+    }
+
+    public boolean hasDatums() {
+        return !datums.isEmpty();
+    }
+
+    public Datum getLatestDatum() {
+        return datums.get(datums.size()-1);
+    }
+
+    public Long getOfficialNumber() {
+        return officialNumber;
+    }
+
+    public void setOfficialNumber(Long officialNumber) {
+        this.officialNumber = officialNumber;
+    }
+}
+// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flys-aft/src/main/java/org/dive4elements/river/etl/aft/DischargeTable.java	Thu Apr 25 11:42:54 2013 +0200
@@ -0,0 +1,372 @@
+package de.intevation.aft;
+
+import de.intevation.db.ConnectedStatements;
+import de.intevation.db.SymbolicStatement;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.log4j.Logger;
+
+/** A Discharge Table. */
+public class DischargeTable
+{
+    private static Logger log = Logger.getLogger(DischargeTable.class);
+
+    protected int          id;
+    protected int          gaugeId;
+    protected TimeInterval timeInterval;
+    protected String       description;
+    protected String       bfgId;
+    protected Set<WQ>      values;
+
+    public DischargeTable() {
+    }
+
+    public DischargeTable(
+        int          gaugeId,
+        TimeInterval timeInterval,
+        String       description,
+        String       bfgId
+    ) {
+        this.gaugeId      = gaugeId;
+        this.timeInterval = timeInterval;
+        this.description  = description;
+        this.bfgId        = bfgId;
+        values = new TreeSet<WQ>(WQ.EPS_CMP);
+    }
+
+    public DischargeTable(
+        int          id,
+        int          gaugeId,
+        TimeInterval timeInterval,
+        String       description,
+        String       bfgId
+    ) {
+        this(gaugeId, timeInterval, description, bfgId);
+        this.id = id;
+    }
+
+    public int getId() {
+        return id;
+    }
+
+    public void setId(int id) {
+        this.id = id;
+    }
+
+    public int getGaugeId() {
+        return gaugeId;
+    }
+
+    public void setGaugeId(int gaugeId) {
+        this.gaugeId = gaugeId;
+    }
+
+    public TimeInterval getTimeInterval() {
+        return timeInterval;
+    }
+
+    public void setTimeInterval(TimeInterval timeInterval) {
+        this.timeInterval = timeInterval;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+    public String getBfgId() {
+        return bfgId;
+    }
+
+    public void setBfgId(String bfgId) {
+        this.bfgId = bfgId;
+    }
+
+
+    public void clearValues() {
+        values.clear();
+    }
+
+    public Set<WQ> getValues() {
+        return values;
+    }
+
+    public void setValues(Set<WQ> values) {
+        this.values = values;
+    }
+
+
+    protected void loadValues(SymbolicStatement.Instance query)
+    throws SQLException
+    {
+        ResultSet rs = query.executeQuery();
+        while (rs.next()) {
+            int    id = rs.getInt("id");
+            double w  = rs.getDouble("w");
+            double q  = rs.getDouble("q");
+            if (!values.add(new WQ(id, w, q))) {
+                log.warn("FLYS/AFT: Value duplication w="+w+" q="+q+". -> ignore.");
+            }
+        }
+        rs.close();
+    }
+
+    public void loadAftValues(SyncContext context) throws SQLException {
+        loadValues(context.getAftStatements()
+            .getStatement("select.tafelwert")
+            .clearParameters()
+            .setInt("number", getId()));
+    }
+
+    public void loadFlysValues(SyncContext context) throws SQLException {
+        loadValues(context.getFlysStatements()
+            .getStatement("select.discharge.table.values")
+            .clearParameters()
+            .setInt("table_id", getId()));
+    }
+
+    public void storeFlysValues(
+        SyncContext context,
+        int         dischargeTableId
+    )
+    throws SQLException
+    {
+        ConnectedStatements flysStatements = context.getFlysStatements();
+
+        // Create the ids.
+        SymbolicStatement.Instance nextId = flysStatements
+            .getStatement("next.discharge.table.values.id");
+
+        // Insert the values.
+        SymbolicStatement.Instance insertDTV = flysStatements
+            .getStatement("insert.discharge.table.value");
+
+        for (WQ wq: values) {
+            int wqId;
+            ResultSet rs = nextId.executeQuery();
+            try {
+                rs.next();
+                wqId = rs.getInt("discharge_table_values_id");
+            }
+            finally {
+                rs.close();
+            }
+
+            insertDTV
+                .clearParameters()
+                .setInt("id", wqId)
+                .setInt("table_id", dischargeTableId)
+                .setDouble("w", wq.getW())
+                .setDouble("q", wq.getQ())
+                .execute();
+        }
+    }
+
+    public static List<DischargeTable> loadFlysDischargeTables(
+        SyncContext context,
+        int         gaugeId
+    )
+    throws SQLException
+    {
+        List<DischargeTable> dts = new ArrayList<DischargeTable>();
+
+        ResultSet rs = context
+            .getFlysStatements()
+            .getStatement("select.gauge.discharge.tables")
+            .clearParameters()
+            .setInt("gauge_id", gaugeId)
+            .executeQuery();
+        try {
+            OUTER: while (rs.next()) {
+                int    id          = rs.getInt("id");
+                String description = rs.getString("description");
+                String bfgId       = rs.getString("bfg_id");
+                if (description == null) {
+                    description = "";
+                }
+                if (bfgId == null) {
+                    bfgId = "";
+                }
+                for (DischargeTable dt: dts) {
+                    if (dt.getBfgId().equals(bfgId)) {
+                        log.warn("FLYS: Found discharge table '" +
+                            bfgId + "' with same bfg_id. -> ignore");
+                        continue OUTER;
+                    }
+                }
+                Date startTime = rs.getDate("start_time");
+                Date stopTime  = rs.getDate("stop_time");
+                TimeInterval ti = startTime == null
+                    ? null
+                    : new TimeInterval(startTime, stopTime);
+
+                DischargeTable dt = new DischargeTable(
+                    id, gaugeId, ti, description, bfgId);
+                dts.add(dt);
+            }
+        }
+        finally {
+            rs.close();
+        }
+
+        return dts;
+    }
+
+    public static List<DischargeTable> loadAftDischargeTables(
+        SyncContext context,
+        Long        officialNumber
+    )
+    throws SQLException
+    {
+        return loadAftDischargeTables(context, officialNumber, 0);
+    }
+
+    public static List<DischargeTable> loadAftDischargeTables(
+        SyncContext context,
+        Long        officialNumber,
+        int         flysGaugeId
+    )
+    throws SQLException
+    {
+        List<DischargeTable> dts = new ArrayList<DischargeTable>();
+
+        ResultSet rs = context
+            .getAftStatements()
+            .getStatement("select.abflusstafel")
+            .clearParameters()
+            .setString("number", "%" + officialNumber)
+            .executeQuery();
+        try {
+            OUTER: while (rs.next()) {
+                int  dtId = rs.getInt("ABFLUSSTAFEL_NR");
+                Date from = rs.getDate("GUELTIG_VON");
+                Date to   = rs.getDate("GUELTIG_BIS");
+
+                if (from == null) {
+                    log.warn("AFT: ABFLUSSTAFEL_NR = "
+                        + dtId + ": GUELTIG_VON = NULL -> ignored.");
+                }
+
+                if (to == null) {
+                    log.warn("AFT: ABFLUSSTAFEL_NR = "
+                        + dtId + ": GUELTIG_BIS = NULL -> ignored.");
+                }
+
+                if (from == null || to == null) {
+                    continue;
+                }
+
+                if (from.compareTo(to) > 0) {
+                        log.warn("AFT: ABFLUSSTAFEL_NR = "
+                        + dtId + ": " + from + " > " + to + ". -> swap");
+                    Date temp = from;
+                    from = to;
+                    to = temp;
+                }
+
+                String description = rs.getString("ABFLUSSTAFEL_BEZ");
+                if (description == null) {
+                    description = String.valueOf(officialNumber);
+                }
+
+                String bfgId = rs.getString("BFG_ID");
+                if (bfgId == null) {
+                    bfgId = "";
+                }
+
+                for (DischargeTable dt: dts) {
+                    if (dt.getBfgId().equals(bfgId)) {
+                        log.warn("AFT: Found discharge table '" +
+                            bfgId + "' with same bfg_id. -> ignore.");
+                        continue OUTER;
+                    }
+                }
+
+                TimeInterval timeInterval = new TimeInterval(from, to);
+
+                DischargeTable dt = new DischargeTable(
+                    dtId,
+                    flysGaugeId,
+                    timeInterval,
+                    description,
+                    bfgId);
+                dts.add(dt);
+            }
+        }
+        finally {
+            rs.close();
+        }
+
+        return dts;
+    }
+
+    public void persistFlysTimeInterval(
+        SyncContext context
+    )
+    throws SQLException
+    {
+        if (timeInterval != null) {
+            timeInterval = context.fetchOrCreateFLYSTimeInterval(
+                timeInterval);
+        }
+    }
+
+    public int persistFlysDischargeTable(
+        SyncContext context,
+        int         gaugeId
+    )
+    throws SQLException
+    {
+        ConnectedStatements flysStatements =
+            context.getFlysStatements();
+
+        int flysId;
+
+        ResultSet rs = flysStatements
+            .getStatement("next.discharge.id")
+            .executeQuery();
+        try {
+            rs.next();
+            flysId = rs.getInt("discharge_table_id");
+        }
+        finally {
+            rs.close();
+        }
+
+        SymbolicStatement.Instance insertDT = flysStatements
+            .getStatement("insert.dischargetable")
+            .clearParameters()
+            .setInt("id", flysId)
+            .setInt("gauge_id", gaugeId)
+            .setString("description", description)
+            .setString("bfg_id", bfgId);
+
+        if (timeInterval != null) {
+            insertDT.setInt("time_interval_id", timeInterval.getId());
+        }
+        else {
+            insertDT.setNull("time_interval_id", Types.INTEGER);
+        }
+
+        insertDT.execute();
+
+        if (log.isDebugEnabled()) {
+            log.debug("FLYS: Created discharge table id: " + id);
+        }
+
+        return flysId;
+    }
+}
+// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flys-aft/src/main/java/org/dive4elements/river/etl/aft/IdPair.java	Thu Apr 25 11:42:54 2013 +0200
@@ -0,0 +1,40 @@
+package de.intevation.aft;
+
+public class IdPair
+{
+    protected int id1;
+    protected int id2;
+
+    public IdPair() {
+    }
+
+    public IdPair(int id1) {
+        this.id1 = id1;
+    }
+
+    public IdPair(int id1, int id2) {
+        this(id1);
+        this.id2 = id2;
+    }
+
+    public int getId1() {
+        return id1;
+    }
+
+    public void setId1(int id1) {
+        this.id1 = id1;
+    }
+
+    public int getId2() {
+        return id2;
+    }
+
+    public void setId2(int id2) {
+        this.id2 = id2;
+    }
+
+    public String toString() {
+        return "[IdPair: id1=" + id1 + ", id2=" + id2 + "]";
+    }
+}
+// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flys-aft/src/main/java/org/dive4elements/river/etl/aft/Notification.java	Thu Apr 25 11:42:54 2013 +0200
@@ -0,0 +1,101 @@
+package de.intevation.aft;
+
+import de.intevation.utils.XML;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLConnection;
+
+import org.apache.log4j.Logger;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+public class Notification
+{
+    private static Logger log = Logger.getLogger(Notification.class);
+
+    protected Document message;
+
+    public Notification() {
+    }
+
+    public Notification(Document message) {
+        this.message = message;
+    }
+
+    public Notification(Node message) {
+        this(wrap(message));
+    }
+
+    public static Document wrap(Node node) {
+        Document document = XML.newDocument();
+
+        // Send first element as message.
+        // Fall back to root node.
+        Node toImport = node;
+
+        NodeList children = node.getChildNodes();
+        for (int i = 0, N = children.getLength(); i < N; ++i) {
+            Node child = children.item(i);
+            if (child.getNodeType() == Node.ELEMENT_NODE) {
+                toImport = child;
+                break;
+            }
+        }
+
+        toImport = document.importNode(toImport, true);
+        document.appendChild(toImport);
+        document.normalizeDocument();
+        return document;
+    }
+
+    public Document sendPOST(URL url) {
+
+        OutputStream out    = null;
+        InputStream  in     = null;
+        Document     result = null;
+
+        try {
+            URLConnection ucon = url.openConnection();
+
+            if (!(ucon instanceof HttpURLConnection)) {
+                log.warn("NOTIFY: '" + url + "' is not an HTTP(S) connection.");
+                return null;
+            }
+
+            HttpURLConnection con = (HttpURLConnection)ucon;
+
+            con.setRequestMethod("POST");
+            con.setDoInput(true);
+            con.setDoOutput(true);
+            con.setUseCaches(false);
+            con.setRequestProperty("Content-Type", "text/xml");
+
+            out = con.getOutputStream();
+            XML.toStream(message, out);
+            out.flush();
+            in = con.getInputStream();
+            result = XML.parseDocument(in);
+        }
+        catch (IOException ioe) {
+            log.error("NOTIFY: Sending message to '" + url + "' failed.", ioe);
+        }
+        finally {
+            if (out != null) {
+                try { out.close(); } catch (IOException ioe) {}
+            }
+            if (in != null) {
+                try { in.close(); } catch (IOException ioe) {}
+            }
+        }
+
+        return result;
+    }
+}
+// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flys-aft/src/main/java/org/dive4elements/river/etl/aft/River.java	Thu Apr 25 11:42:54 2013 +0200
@@ -0,0 +1,525 @@
+package de.intevation.aft;
+
+import de.intevation.db.ConnectedStatements;
+import de.intevation.db.SymbolicStatement;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class River
+extends      IdPair
+{
+    private static Logger log = Logger.getLogger(River.class);
+
+    protected String name;
+
+    protected double from;
+    protected double to;
+
+    public River() {
+    }
+
+    public River(int id1, String name, double from, double to) {
+        super(id1);
+        this.name = name;
+        this.from = from;
+        this.to   = to;
+    }
+
+    public River(int id1, int id2, String name) {
+        super(id1, id2);
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public double getFrom() {
+        return from;
+    }
+
+    public void setFrom(double from) {
+        this.from = from;
+    }
+
+    public double getTo() {
+        return to;
+    }
+
+    public void setTo(double to) {
+        this.to = to;
+    }
+
+    public boolean inside(double x) {
+        return x >= from && x <= to;
+    }
+
+    public boolean sync(SyncContext context) throws SQLException {
+        log.info("sync river: " + this);
+
+        // Only take relevant gauges into account.
+        Map<Long, DIPSGauge> dipsGauges = context.getDIPSGauges(name, from, to);
+
+        ConnectedStatements flysStatements = context.getFlysStatements();
+        ConnectedStatements aftStatements  = context.getAftStatements();
+
+        String riverName = getName();
+
+        Map<Long, DIPSGauge> aftDIPSGauges = new HashMap<Long, DIPSGauge>();
+
+        ResultSet messstellenRs = aftStatements
+            .getStatement("select.messstelle")
+            .clearParameters()
+            .setInt("GEWAESSER_NR", id2)
+            .executeQuery();
+
+        try {
+            while (messstellenRs.next()) {
+                String name = messstellenRs.getString("NAME");
+                String num  = messstellenRs.getString("MESSSTELLE_NR");
+                double station = messstellenRs.getDouble("STATIONIERUNG");
+
+                if (!messstellenRs.wasNull() && !inside(station)) {
+                    log.warn("Station found in AFT but in not range: " + station);
+                    continue;
+                }
+
+                Long number = SyncContext.numberToLong(num);
+                if (number == null) {
+                    log.warn("AFT: Invalid MESSSTELLE_NR for MESSSTELLE '"+name+"'");
+                    continue;
+                }
+                DIPSGauge dipsGauge = dipsGauges.get(number);
+                if (dipsGauge == null) {
+                    log.warn(
+                        "DIPS: MESSSTELLE '" + name + "' not found in DIPS. " +
+                        "Gauge number used for lookup: " + number);
+                    continue;
+                }
+                String gaugeRiver = dipsGauge.getRiverName();
+                if (!gaugeRiver.equalsIgnoreCase(riverName)) {
+                    log.warn(
+                        "DIPS: MESSSTELLE '" + name +
+                        "' is assigned to river '" + gaugeRiver +
+                        "'. Needs to be on '" + riverName + "'.");
+                    continue;
+                }
+                dipsGauge.setAftName(name);
+                dipsGauge.setOfficialNumber(number);
+                aftDIPSGauges.put(number, dipsGauge);
+            }
+        }
+        finally {
+            messstellenRs.close();
+        }
+
+        List<DIPSGauge> updateGauges = new ArrayList<DIPSGauge>();
+
+        ResultSet gaugesRs = flysStatements
+            .getStatement("select.gauges")
+            .clearParameters()
+            .setInt("river_id", id1).executeQuery();
+
+        try {
+            while (gaugesRs.next()) {
+                int gaugeId = gaugesRs.getInt("id");
+                String name = gaugesRs.getString("name");
+                long   number = gaugesRs.getLong("official_number");
+                if (gaugesRs.wasNull()) {
+                    log.warn("FLYS: Gauge '" + name +
+                        "' has no official number. Ignored.");
+                    continue;
+                }
+                Long key = Long.valueOf(number);
+                DIPSGauge aftDIPSGauge = aftDIPSGauges.remove(key);
+                if (aftDIPSGauge == null) {
+                    log.warn("FLYS: Gauge '" + name + "' number " + number +
+                        " is not found in AFT/DIPS.");
+                    continue;
+                }
+                aftDIPSGauge.setFlysId(gaugeId);
+                log.info("Gauge '" + name +
+                    "' found in FLYS, AFT and DIPS. -> Update");
+                updateGauges.add(aftDIPSGauge);
+            }
+        }
+        finally {
+            gaugesRs.close();
+        }
+
+        boolean modified = createGauges(context, aftDIPSGauges);
+
+        modified |= updateGauges(context, updateGauges);
+
+        return modified;
+    }
+
+    protected boolean updateGauges(
+        SyncContext     context,
+        List<DIPSGauge> gauges
+    )
+    throws SQLException
+    {
+        boolean modified = false;
+
+        for (DIPSGauge gauge: gauges) {
+            // XXX: Do dont modify the master AT.
+            // modified |= updateBfGIdOnMasterDischargeTable(context, gauge);
+            modified |= updateGauge(context, gauge);
+        }
+
+        return modified;
+    }
+
+    protected boolean updateBfGIdOnMasterDischargeTable(
+        SyncContext context,
+        DIPSGauge   gauge
+    ) throws SQLException {
+        log.info(
+            "FLYS: Updating master discharge table bfg_id for '" +
+            gauge.getAftName() + "'");
+        ConnectedStatements flysStatements = context.getFlysStatements();
+
+        ResultSet rs = flysStatements
+            .getStatement("select.gauge.master.discharge.table")
+            .clearParameters()
+            .setInt("gauge_id", gauge.getFlysId())
+            .executeQuery();
+
+        int flysId;
+
+        try {
+            if (!rs.next()) {
+                log.error(
+                    "FLYS: No master discharge table found for gauge '" +
+                    gauge.getAftName() + "'");
+                return false;
+            }
+            String bfgId = rs.getString("bfg_id");
+            if (!rs.wasNull()) { // already has BFG_ID
+                return false;
+            }
+            flysId = rs.getInt("id");
+        } finally {
+            rs.close();
+        }
+
+        // We need to find out the BFG_ID of the current discharge table
+        // for this gauge in AFT.
+
+        ConnectedStatements aftStatements = context.getAftStatements();
+
+        rs = aftStatements
+            .getStatement("select.bfg.id.current")
+            .clearParameters()
+            .setString("number", "%" + gauge.getOfficialNumber())
+            .executeQuery();
+
+        String bfgId = null;
+
+        try {
+            if (rs.next()) {
+                bfgId = rs.getString("BFG_ID");
+            }
+        } finally {
+            rs.close();
+        }
+
+        if (bfgId == null) {
+            log.warn(
+                "No BFG_ID found for current discharge table of gauge '" +
+                gauge + "'");
+            return false;
+        }
+
+        // Set the BFG_ID in FLYS.
+        flysStatements.beginTransaction();
+        try {
+            flysStatements
+                .getStatement("update.bfg.id.discharge.table")
+                .clearParameters()
+                .setInt("id", flysId)
+                .setString("bfg_id", bfgId)
+                .executeUpdate();
+            flysStatements.commitTransaction();
+        } catch (SQLException sqle) {
+            flysStatements.rollbackTransaction();
+            log.error(sqle, sqle);
+            return false;
+        }
+
+        return true;
+    }
+
+    protected boolean updateGauge(
+        SyncContext context,
+        DIPSGauge   gauge
+    )
+    throws SQLException
+    {
+        log.info("FLYS: Updating gauge '" + gauge.getAftName() + "'.");
+        // We need to load all discharge tables from both databases
+        // of the gauge and do some pairing based on their bfg_id.
+
+        boolean modified = false;
+
+        ConnectedStatements flysStatements = context.getFlysStatements();
+
+        flysStatements.beginTransaction();
+        try {
+            List<DischargeTable> flysDTs =
+                DischargeTable.loadFlysDischargeTables(
+                    context, gauge.getFlysId());
+
+            List<DischargeTable> aftDTs =
+                DischargeTable.loadAftDischargeTables(
+                    context, gauge.getOfficialNumber());
+
+            Map<String, DischargeTable> bfgId2FlysDT =
+                new HashMap<String, DischargeTable>();
+
+            for (DischargeTable dt: flysDTs) {
+                String bfgId = dt.getBfgId();
+                if (bfgId == null) {
+                    log.warn("FLYS: discharge table " + dt.getId()
+                        + " has no bfg_id. Ignored.");
+                    continue;
+                }
+                bfgId2FlysDT.put(bfgId, dt);
+            }
+
+            List<DischargeTable> createDTs = new ArrayList<DischargeTable>();
+
+            for (DischargeTable aftDT: aftDTs) {
+                String bfgId = aftDT.getBfgId();
+                DischargeTable flysDT = bfgId2FlysDT.remove(bfgId);
+                if (flysDT != null) {
+                    // Found in AFT and FLYS.
+                    log.info("FLYS: Discharge table '" + bfgId
+                        + "' found in AFT and FLYS. -> update");
+                    // Create the W/Q diff.
+                    modified |= writeWQChanges(context, flysDT, aftDT);
+                }
+                else {
+                    log.info("FLYS: Discharge table '" + bfgId
+                        + "' not found in FLYS. -> create");
+                    createDTs.add(aftDT);
+                }
+            }
+
+            for (String bfgId: bfgId2FlysDT.keySet()) {
+                log.info("FLYS: Discharge table '" + bfgId
+                    + "' found in FLYS but not in AFT. -> ignore");
+            }
+
+            log.info("FLYS: Copy " + createDTs.size() +
+                " discharge tables over from AFT.");
+
+            // Create the new discharge tables.
+            for (DischargeTable aftDT: createDTs) {
+                createDischargeTable(context, aftDT, gauge.getFlysId());
+                modified = true;
+            }
+
+            flysStatements.commitTransaction();
+        }
+        catch (SQLException sqle) {
+            flysStatements.rollbackTransaction();
+            log.error(sqle, sqle);
+            modified = false;
+        }
+
+        return modified;
+    }
+
+    protected boolean writeWQChanges(
+        SyncContext    context,
+        DischargeTable flysDT,
+        DischargeTable aftDT
+    )
+    throws SQLException
+    {
+        flysDT.loadFlysValues(context);
+        aftDT.loadAftValues(context);
+        WQDiff diff = new WQDiff(flysDT.getValues(), aftDT.getValues());
+        if (diff.hasChanges()) {
+            diff.writeChanges(context, flysDT.getId());
+            return true;
+        }
+        return false;
+    }
+
+    protected boolean createGauges(
+        SyncContext          context,
+        Map<Long, DIPSGauge> gauges
+    )
+    throws SQLException
+    {
+        ConnectedStatements flysStatements = context.getFlysStatements();
+
+        SymbolicStatement.Instance nextId =
+            flysStatements.getStatement("next.gauge.id");
+
+        SymbolicStatement.Instance insertStmnt =
+            flysStatements.getStatement("insert.gauge");
+
+        boolean modified = false;
+
+        for (Map.Entry<Long, DIPSGauge> entry: gauges.entrySet()) {
+            Long      officialNumber = entry.getKey();
+            DIPSGauge gauge          = entry.getValue();
+
+            log.info("Gauge '" + gauge.getAftName() +
+                "' not in FLYS but in AFT/DIPS. -> Create");
+
+            if (!gauge.hasDatums()) {
+                log.warn("DIPS: Gauge '" +
+                    gauge.getAftName() + "' has no datum. Ignored.");
+                continue;
+            }
+
+            ResultSet rs = null;
+            flysStatements.beginTransaction();
+            try {
+                (rs = nextId.executeQuery()).next();
+                int gaugeId = rs.getInt("gauge_id");
+                rs.close(); rs = null;
+
+                insertStmnt
+                    .clearParameters()
+                    .setInt("id", gaugeId)
+                    .setString("name", gauge.getAftName())
+                    .setInt("river_id", id1)
+                    .setDouble("station", gauge.getStation())
+                    .setDouble("aeo", gauge.getAeo())
+                    .setLong("official_number", officialNumber)
+                    .setDouble("datum", gauge.getLatestDatum().getValue());
+
+                insertStmnt.execute();
+
+                log.info("FLYS: Created gauge '" + gauge.getAftName() +
+                    "' with id " + gaugeId + ".");
+
+                gauge.setFlysId(gaugeId);
+                createDischargeTables(context, gauge);
+                flysStatements.commitTransaction();
+                modified = true;
+            }
+            catch (SQLException sqle) {
+                flysStatements.rollbackTransaction();
+                log.error(sqle, sqle);
+            }
+            finally {
+                if (rs != null) {
+                    rs.close();
+                }
+            }
+        }
+
+        return modified;
+    }
+
+    protected void createDischargeTable(
+        SyncContext    context,
+        DischargeTable aftDT,
+        int            flysGaugeId
+    )
+    throws SQLException
+    {
+        aftDT.persistFlysTimeInterval(context);
+        int flysId = aftDT.persistFlysDischargeTable(context, flysGaugeId);
+
+        aftDT.loadAftValues(context);
+        aftDT.storeFlysValues(context, flysId);
+    }
+
+    protected void createDischargeTables(
+        SyncContext context,
+        DIPSGauge   gauge
+    )
+    throws SQLException
+    {
+        log.info("FLYS: Create discharge tables for '" +
+            gauge.getAftName() + "'.");
+
+        // Load the discharge tables from AFT.
+        List<DischargeTable> dts = loadAftDischargeTables(
+            context, gauge);
+
+        // Persist the time intervals.
+        persistFlysTimeIntervals(context, dts);
+
+        // Persist the discharge tables
+        int [] flysDTIds = persistFlysDischargeTables(
+            context, dts, gauge.getFlysId());
+
+        // Copy over the W/Q values
+        copyWQsFromAftToFlys(context, dts, flysDTIds);
+    }
+
+    protected List<DischargeTable> loadAftDischargeTables(
+        SyncContext context,
+        DIPSGauge   gauge
+    )
+    throws SQLException
+    {
+        return DischargeTable.loadAftDischargeTables(
+            context, gauge.getOfficialNumber(), gauge.getFlysId());
+    }
+
+    protected void persistFlysTimeIntervals(
+        SyncContext          context,
+        List<DischargeTable> dts
+    )
+    throws SQLException
+    {
+        for (DischargeTable dt: dts) {
+            dt.persistFlysTimeInterval(context);
+        }
+    }
+
+    protected int [] persistFlysDischargeTables(
+        SyncContext          context,
+        List<DischargeTable> dts,
+        int                  flysGaugeId
+    )
+    throws SQLException
+    {
+        int [] flysDTIds = new int[dts.size()];
+
+        for (int i = 0; i < flysDTIds.length; ++i) {
+            flysDTIds[i] = dts.get(i)
+                .persistFlysDischargeTable(context, flysGaugeId);
+        }
+
+        return flysDTIds;
+    }
+
+    protected void copyWQsFromAftToFlys(
+        SyncContext          context,
+        List<DischargeTable> dts,
+        int []               flysDTIds
+    )
+    throws SQLException
+    {
+        for (int i = 0; i < flysDTIds.length; ++i) {
+            DischargeTable dt = dts.get(i);
+            dt.loadAftValues(context);
+            dt.storeFlysValues(context, flysDTIds[i]);
+            dt.clearValues(); // To save memory.
+        }
+    }
+
+    public String toString() {
+        return "[River: name=" + name + ", " + super.toString() + "]";
+    }
+}
+// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flys-aft/src/main/java/org/dive4elements/river/etl/aft/Rivers.java	Thu Apr 25 11:42:54 2013 +0200
@@ -0,0 +1,77 @@
+package de.intevation.aft;
+
+import de.intevation.db.ConnectedStatements;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class Rivers
+{
+    private static Logger log = Logger.getLogger(Rivers.class);
+
+    public Rivers() {
+    }
+
+    public boolean sync(SyncContext context) throws SQLException {
+
+        log.info("sync: rivers");
+
+        ConnectedStatements flysStatements = context.getFlysStatements();
+        ConnectedStatements aftStatements  = context.getAftStatements();
+
+        Map<String, River> flysRivers = new HashMap<String, River>();
+
+        ResultSet flysRs = flysStatements
+            .getStatement("select.rivers").executeQuery();
+
+        try {
+            while (flysRs.next()) {
+                int    id   = flysRs.getInt("id");
+                String name = flysRs.getString("name");
+                double from = flysRs.getDouble("min_km");
+                double to   = flysRs.getDouble("max_km");
+                flysRivers.put(name.toLowerCase(), new River(id, name, from, to));
+            }
+        }
+        finally {
+            flysRs.close();
+        }
+
+        List<River> commonRivers = new ArrayList<River>();
+
+        ResultSet aftRs = aftStatements
+            .getStatement("select.gewaesser").executeQuery();
+
+        try {
+            while (aftRs.next()) {
+                String name = aftRs.getString("NAME");
+                River river = flysRivers.get(name.toLowerCase());
+                if (river != null) {
+                    int id2 = aftRs.getInt("GEWAESSER_NR");
+                    river.setId2(id2);
+                    commonRivers.add(river);
+                }
+            }
+        }
+        finally {
+            aftRs.close();
+        }
+
+
+        boolean modified = false;
+
+        for (River river: commonRivers) {
+            modified |= river.sync(context);
+        }
+
+        return modified;
+    }
+}
+// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flys-aft/src/main/java/org/dive4elements/river/etl/aft/Sync.java	Thu Apr 25 11:42:54 2013 +0200
@@ -0,0 +1,170 @@
+package de.intevation.aft;
+
+import de.intevation.db.ConnectionBuilder;
+
+import de.intevation.utils.XML;
+
+import java.io.File;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import java.sql.SQLException;
+
+import javax.xml.xpath.XPathConstants;
+
+import org.apache.log4j.Logger;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+public class Sync
+{
+    private static Logger log = Logger.getLogger(Sync.class);
+
+    public static final String FLYS = "flys";
+    public static final String AFT  = "aft";
+
+    public static final String XPATH_DIPS   = "/sync/dips/file/text()";
+    public static final String XPATH_REPAIR = "/sync/dips/repair/text()";
+
+    public static final String XPATH_NOTIFICATIONS =
+        "/sync/notifications/notification";
+
+    public static final String CONFIG_FILE =
+        System.getProperty("config.file", "config.xml");
+
+    public static void sendNotifications(Document config) {
+        NodeList notifications = (NodeList)XML.xpath(
+            config, XPATH_NOTIFICATIONS, XPathConstants.NODESET, null, null);
+
+        if (notifications == null) {
+            return;
+        }
+
+        for (int i = 0, N = notifications.getLength(); i < N; ++i) {
+            Element notification = (Element)notifications.item(i);
+            String urlString = notification.getAttribute("url");
+
+            URL url;
+            try {
+                url = new URL(urlString);
+            }
+            catch (MalformedURLException mfue) {
+                log.warn("NOTIFY: Invalid URL '" + urlString + "'. Ignored.", mfue);
+                continue;
+            }
+
+            Notification n = new Notification(notification);
+
+            Document result = n.sendPOST(url);
+
+            if (result != null) {
+                log.info("Send notifcation to '" + urlString + "'.");
+                log.info(XML.toString(result));
+            }
+        }
+    }
+
+    public static void main(String [] args) {
+
+        File configFile = new File(CONFIG_FILE);
+
+        if (!configFile.isFile() || !configFile.canRead()) {
+            log.error("cannot read config file");
+            System.exit(1);
+        }
+
+        Document config = XML.parseDocument(configFile, Boolean.FALSE);
+
+        if (config == null) {
+            log.error("Cannot load config file.");
+            System.exit(1);
+        }
+
+        String dipsF = (String)XML.xpath(
+            config, XPATH_DIPS, XPathConstants.STRING, null, null);
+
+        if (dipsF == null || dipsF.length() == 0) {
+            log.error("Cannot find path to DIPS XML in config.");
+            System.exit(1);
+        }
+
+        File dipsFile = new File(dipsF);
+
+        if (!dipsFile.isFile() || !dipsFile.canRead()) {
+            log.error("DIPS: Cannot find '" + dipsF + "'.");
+            System.exit(1);
+        }
+
+        Document dips = XML.parseDocument(dipsFile, Boolean.FALSE);
+
+        if (dips == null) {
+            log.error("DIPS: Cannot load DIPS document.");
+            System.exit(1);
+        }
+
+        String repairF = (String)XML.xpath(
+            config, XPATH_REPAIR, XPathConstants.STRING, null, null);
+
+        if (repairF != null && repairF.length() > 0) {
+            File repairFile = new File(repairF);
+            if (!repairFile.isFile() || !repairFile.canRead()) {
+                log.warn("REPAIR: Cannot open DIPS repair XSLT file.");
+            }
+            else {
+                Document fixed = XML.transform(dips, repairFile);
+                if (fixed == null) {
+                    log.warn("REPAIR: Fixing DIPS failed.");
+                }
+                else {
+                    dips = fixed;
+                }
+            }
+        }
+
+        int exitCode = 0;
+
+        ConnectionBuilder aftConnectionBuilder =
+            new ConnectionBuilder(AFT, config);
+
+        ConnectionBuilder flysConnectionBuilder =
+            new ConnectionBuilder(FLYS, config);
+
+        SyncContext syncContext = null;
+
+        boolean modified = false;
+        try {
+            syncContext = new SyncContext(
+                aftConnectionBuilder.getConnectedStatements(),
+                flysConnectionBuilder.getConnectedStatements(),
+                dips);
+            syncContext.init();
+            Rivers rivers = new Rivers();
+            modified = rivers.sync(syncContext);
+        }
+        catch (SQLException sqle) {
+            log.error("SYNC: Syncing failed.", sqle);
+            exitCode = 1;
+        }
+        finally {
+            if (syncContext != null) {
+                syncContext.close();
+            }
+        }
+
+        if (modified) {
+            log.info("Modifications found.");
+            sendNotifications(config);
+        }
+        else {
+            log.info("No modifications found.");
+        }
+
+        if (exitCode != 0) {
+            System.exit(1);
+        }
+    }
+}
+// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flys-aft/src/main/java/org/dive4elements/river/etl/aft/SyncContext.java	Thu Apr 25 11:42:54 2013 +0200
@@ -0,0 +1,220 @@
+package de.intevation.aft;
+
+import de.intevation.db.ConnectedStatements;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.log4j.Logger;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+public class SyncContext
+{
+    private static Logger log = Logger.getLogger(SyncContext.class);
+
+    protected ConnectedStatements             aftStatements;
+    protected ConnectedStatements             flysStatements;
+    protected Document                        dips;
+
+    protected Map<Long, DIPSGauge>            numberToGauge;
+    protected Map<TimeInterval, TimeInterval> flysTimeIntervals;
+
+    public SyncContext() {
+    }
+
+    public SyncContext(
+        ConnectedStatements aftStatements,
+        ConnectedStatements flysStatements,
+        Document            dips
+    ) {
+        this.aftStatements  = aftStatements;
+        this.flysStatements = flysStatements;
+        this.dips           = dips;
+    }
+
+    public void init() throws SQLException {
+        numberToGauge       = indexByNumber(dips);
+        flysTimeIntervals   = loadTimeIntervals();
+    }
+
+    public ConnectedStatements getAftStatements() {
+        return aftStatements;
+    }
+
+    public void setAftStatements(ConnectedStatements aftStatements) {
+        this.aftStatements = aftStatements;
+    }
+
+    public ConnectedStatements getFlysStatements() {
+        return flysStatements;
+    }
+
+    public void setFlysStatements(ConnectedStatements flysStatements) {
+        this.flysStatements = flysStatements;
+    }
+
+    public Document getDips() {
+        return dips;
+    }
+
+    public void setDips(Document dips) {
+        this.dips = dips;
+    }
+
+    void close() {
+        aftStatements.close();
+        flysStatements.close();
+    }
+
+    public static Long numberToLong(String s) {
+        try {
+            return Long.valueOf(s.trim());
+        }
+        catch (NumberFormatException nfe) {
+        }
+        return null;
+    }
+
+    public Map<Long, DIPSGauge> getDIPSGauges() {
+        return numberToGauge;
+    }
+
+    public Map<Long, DIPSGauge> getDIPSGauges(
+        String riverName,
+        double from,
+        double to
+    ) {
+        if (from > to) {
+            double t = from;
+            from = to;
+            to = t;
+        }
+
+        riverName = riverName.toLowerCase();
+
+        Map<Long, DIPSGauge> result = new HashMap<Long, DIPSGauge>();
+
+        for (Map.Entry<Long, DIPSGauge> entry: numberToGauge.entrySet()) {
+            DIPSGauge gauge = entry.getValue();
+            // XXX: Maybe a bit too sloppy.
+            if (!riverName.contains(gauge.getRiverName().toLowerCase())) {
+                continue;
+            }
+            double station = gauge.getStation();
+            if (station >= from && station <= to) {
+                result.put(entry.getKey(), gauge);
+            }
+        }
+
+        return result;
+    }
+
+    protected static Map<Long, DIPSGauge> indexByNumber(Document document) {
+        Map<Long, DIPSGauge> map = new HashMap<Long, DIPSGauge>();
+        NodeList nodes = document.getElementsByTagName("PEGELSTATION");
+        for (int i = nodes.getLength()-1; i >= 0; --i) {
+            Element element = (Element)nodes.item(i);
+            String numberString = element.getAttribute("NUMMER");
+            Long number = numberToLong(numberString);
+            if (number != null) {
+                DIPSGauge newG = new DIPSGauge(element);
+                DIPSGauge oldG = map.put(number, newG);
+                if (oldG != null) {
+                    log.warn("DIPS: '" + newG.getName() +
+                        "' collides with '" + oldG.getName() +
+                        "' on gauge number " + number + ".");
+                }
+            }
+            else {
+                log.warn("DIPS: Gauge '" + element.getAttribute("NAME") +
+                    "' has invalid gauge number '" + numberString + "'.");
+            }
+        }
+        return map;
+    }
+
+    protected Map<TimeInterval, TimeInterval> loadTimeIntervals()
+    throws SQLException {
+
+        boolean debug = log.isDebugEnabled();
+
+        Map<TimeInterval, TimeInterval> intervals =
+            new TreeMap<TimeInterval, TimeInterval>();
+
+        ResultSet rs = flysStatements
+            .getStatement("select.timeintervals")
+            .executeQuery();
+
+        try {
+            while (rs.next()) {
+                int  id    = rs.getInt("id");
+                Date start = rs.getDate("start_time");
+                Date stop  = rs.getDate("stop_time");
+
+                if (debug) {
+                    log.debug("id:    " + id);
+                    log.debug("start: " + start);
+                    log.debug("stop:  " + stop);
+                }
+
+                TimeInterval ti = new TimeInterval(id, start, stop);
+                intervals.put(ti, ti);
+            }
+        }
+        finally {
+            rs.close();
+        }
+
+        if (debug) {
+            log.debug("loaded time intervals: " + intervals.size());
+        }
+
+        return intervals;
+    }
+
+    public TimeInterval fetchOrCreateFLYSTimeInterval(TimeInterval key)
+    throws SQLException
+    {
+        TimeInterval old = flysTimeIntervals.get(key);
+        if (old != null) {
+            return old;
+        }
+
+        ResultSet rs = flysStatements
+            .getStatement("next.timeinterval.id")
+            .executeQuery();
+
+        try {
+            rs.next();
+            key.setId(rs.getInt("time_interval_id"));
+        }
+        finally {
+            rs.close();
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("FLYS: Created time interval id: " + key.getId());
+            log.debug("FLYS: " + key);
+        }
+
+        flysStatements.getStatement("insert.timeinterval")
+            .clearParameters()
+            .setInt("id", key.getId())
+            .setObject("start_time", key.getStart())
+            .setObject("stop_time", key.getStop())
+            .execute();
+
+        flysTimeIntervals.put(key, key);
+
+        return key;
+    }
+}
+// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flys-aft/src/main/java/org/dive4elements/river/etl/aft/TimeInterval.java	Thu Apr 25 11:42:54 2013 +0200
@@ -0,0 +1,70 @@
+package de.intevation.aft;
+
+import java.util.Date;
+
+public class TimeInterval
+implements   Comparable<TimeInterval>
+{
+    protected int  id;
+    protected Date start;
+    protected Date stop;
+
+    public TimeInterval() {
+    }
+
+    public TimeInterval(Date start, Date stop) {
+        this.start = start;
+        this.stop  = stop;
+    }
+
+    public TimeInterval(int id, Date start, Date stop) {
+        this(start, stop);
+        this.id    = id;
+    }
+
+    protected static int compare(Date d1, Date d2) {
+        long s1 = d1 != null ? d1.getTime()/1000L : 0L;
+        long s2 = d2 != null ? d2.getTime()/1000L : 0L;
+        long diff = s1 - s2;
+        return diff < 0L
+            ? -1
+            : diff > 0L ? 1 : 0;
+    }
+
+    @Override
+    public int compareTo(TimeInterval other) {
+        int cmp = compare(start, other.start);
+        return cmp != 0
+            ? cmp
+            : compare(stop, other.stop);
+    }
+
+    public int getId() {
+        return id;
+    }
+
+    public void setId(int id) {
+        this.id = id;
+    }
+
+    public Date getStart() {
+        return start;
+    }
+
+    public void setStart(Date start) {
+        this.start = start;
+    }
+
+    public Date getStop() {
+        return stop;
+    }
+
+    public void setStop(Date stop) {
+        this.stop = stop;
+    }
+
+    public String toString() {
+        return "[TimeInterval: start=" + start + ", stop=" + stop + "]";
+    }
+}
+// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flys-aft/src/main/java/org/dive4elements/river/etl/aft/WQ.java	Thu Apr 25 11:42:54 2013 +0200
@@ -0,0 +1,67 @@
+package de.intevation.aft;
+
+import java.util.Comparator;
+
+public class WQ
+{
+    public static final double EPSILON = 1e-4;
+
+    public static final Comparator<WQ> EPS_CMP = new Comparator<WQ>() {
+        @Override
+        public int compare(WQ a, WQ b) {
+            int cmp = compareEpsilon(a.q, b.q);
+            if (cmp != 0) return cmp;
+            return compareEpsilon(a.w, b.w);
+        }
+    };
+
+    protected int id;
+
+    protected double w;
+    protected double q;
+
+    public WQ() {
+    }
+
+    public WQ(double w, double q) {
+        this.w = w;
+        this.q = q;
+    }
+
+    public WQ(int id, double w, double q) {
+        this.id = id;
+        this.w  = w;
+        this.q  = q;
+    }
+
+    public static final int compareEpsilon(double a, double b) {
+        double diff = a - b;
+        if (diff < -EPSILON) return -1;
+        return diff > EPSILON ? +1 : 0;
+    }
+
+    public int getId() {
+        return id;
+    }
+
+    public void setId(int id) {
+        this.id = id;
+    }
+
+    public double getW() {
+        return w;
+    }
+
+    public void setW(double w) {
+        this.w = w;
+    }
+
+    public double getQ() {
+        return q;
+    }
+
+    public void setQ(double q) {
+        this.q = q;
+    }
+}
+// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flys-aft/src/main/java/org/dive4elements/river/etl/aft/WQDiff.java	Thu Apr 25 11:42:54 2013 +0200
@@ -0,0 +1,118 @@
+package de.intevation.aft;
+
+import de.intevation.db.ConnectedStatements;
+import de.intevation.db.SymbolicStatement;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeSet;
+
+public class WQDiff
+{
+    protected Set<WQ> toAdd;
+    protected Set<WQ> toDelete;
+
+    public WQDiff() {
+    }
+
+    public WQDiff(Collection<WQ> a, Collection<WQ> b) {
+        toAdd    = new TreeSet<WQ>(WQ.EPS_CMP);
+        toDelete = new TreeSet<WQ>(WQ.EPS_CMP);
+        build(a, b);
+    }
+
+    public void build(Collection<WQ> a, Collection<WQ> b) {
+        toAdd.addAll(b);
+        toAdd.removeAll(a);
+
+        toDelete.addAll(a);
+        toDelete.removeAll(b);
+    }
+
+    public void clear() {
+        toAdd.clear();
+        toDelete.clear();
+    }
+
+    public Set<WQ> getToAdd() {
+        return toAdd;
+    }
+
+    public void setToAdd(Set<WQ> toAdd) {
+        this.toAdd = toAdd;
+    }
+
+    public Set<WQ> getToDelete() {
+        return toDelete;
+    }
+
+    public void setToDelete(Set<WQ> toDelete) {
+        this.toDelete = toDelete;
+    }
+
+    public boolean hasChanges() {
+        return !(toAdd.isEmpty() && toDelete.isEmpty());
+    }
+
+    public void writeChanges(
+        SyncContext context,
+        int         tableId
+    )
+    throws SQLException
+    {
+        ConnectedStatements flysStatements = context.getFlysStatements();
+
+        // Delete the old entries
+        if (!toDelete.isEmpty()) {
+            SymbolicStatement.Instance deleteDTV =
+                flysStatements.getStatement("delete.discharge.table.value");
+            for (WQ wq: toDelete) {
+                deleteDTV
+                    .clearParameters()
+                    .setInt("id", wq.getId())
+                    .execute();
+            }
+        }
+
+        // Add the new entries.
+        if (!toAdd.isEmpty()) {
+            SymbolicStatement.Instance nextId =
+                flysStatements.getStatement("next.discharge.table.values.id");
+
+            SymbolicStatement.Instance insertDTV =
+                flysStatements.getStatement("insert.discharge.table.value");
+
+            // Recycle old ids as much as possible.
+            Iterator<WQ> oldIds = toDelete.iterator();
+
+            // Create ids for new entries.
+            for (WQ wq: toAdd) {
+                if (oldIds.hasNext()) {
+                    wq.setId(oldIds.next().getId());
+                }
+                else {
+                    ResultSet rs = nextId.executeQuery();
+                    rs.next();
+                    wq.setId(rs.getInt("discharge_table_values_id"));
+                    rs.close();
+                }
+            }
+
+            // Write the new entries.
+            for (WQ wq: toAdd) {
+                insertDTV
+                    .clearParameters()
+                    .setInt("id", wq.getId())
+                    .setInt("table_id", tableId)
+                    .setDouble("w", wq.getW())
+                    .setDouble("q", wq.getQ())
+                    .execute();
+            }
+        }
+    }
+}
+// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flys-aft/src/main/java/org/dive4elements/river/etl/db/ConnectedStatements.java	Thu Apr 25 11:42:54 2013 +0200
@@ -0,0 +1,110 @@
+package de.intevation.db;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.sql.Savepoint;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class ConnectedStatements
+{
+    private static Logger log = Logger.getLogger(ConnectedStatements.class);
+
+    protected Connection connection;
+
+    protected Map<String, SymbolicStatement> statements;
+
+    protected Map<String, SymbolicStatement.Instance> boundStatements;
+
+    protected Deque<Savepoint> savepoints;
+
+    public ConnectedStatements(
+        Connection connection,
+        Map<String, SymbolicStatement> statements
+    )
+    throws SQLException
+    {
+        this.connection = connection;
+        this.statements = statements;
+        checkSavePoints();
+
+        boundStatements = new HashMap<String, SymbolicStatement.Instance>();
+    }
+
+    protected void checkSavePoints() throws SQLException {
+        DatabaseMetaData metaData = connection.getMetaData();
+        if (metaData.supportsSavepoints()) {
+            log.info("Driver '" + metaData.getDriverName() +
+                "' does support savepoints.");
+            savepoints = new ArrayDeque<Savepoint>();
+        }
+        else {
+            log.info("Driver '" + metaData.getDriverName() +
+                "' does not support savepoints.");
+        }
+    }
+
+    public SymbolicStatement.Instance getStatement(String key)
+    throws SQLException
+    {
+        SymbolicStatement.Instance stmnt = boundStatements.get(key);
+        if (stmnt != null) {
+            return stmnt;
+        }
+
+        SymbolicStatement ss = statements.get(key);
+        if (ss == null) {
+            return null;
+        }
+
+        stmnt = ss.new Instance(connection);
+        boundStatements.put(key, stmnt);
+        return stmnt;
+    }
+
+    public void beginTransaction() throws SQLException {
+        if (savepoints != null) {
+            savepoints.push(connection.setSavepoint());
+        }
+    }
+
+    public void commitTransaction() throws SQLException {
+        if (savepoints != null) {
+            savepoints.pop();
+        }
+        connection.commit();
+    }
+
+    public void rollbackTransaction() throws SQLException {
+        if (savepoints != null) {
+            Savepoint savepoint = savepoints.pop();
+            connection.rollback(savepoint);
+        }
+        else {
+            connection.rollback();
+        }
+    }
+
+    public void close() {
+        for (SymbolicStatement.Instance s: boundStatements.values()) {
+            s.close();
+        }
+
+        try {
+            if (savepoints != null && !savepoints.isEmpty()) {
+                Savepoint savepoint = savepoints.peekFirst();
+                connection.rollback(savepoint);
+            }
+            connection.close();
+        }
+        catch (SQLException sqle) {
+        }
+    }
+}
+// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flys-aft/src/main/java/org/dive4elements/river/etl/db/ConnectionBuilder.java	Thu Apr 25 11:42:54 2013 +0200
@@ -0,0 +1,121 @@
+package de.intevation.db;
+
+import de.intevation.utils.XML;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import javax.xml.xpath.XPathConstants;
+
+import org.apache.log4j.Logger;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.NodeList;
+
+public class ConnectionBuilder
+{
+    private static Logger log = Logger.getLogger(ConnectionBuilder.class);
+
+    public static final String XPATH_DRIVER     = "/sync/side[@name=$type]/db/driver/text()";
+    public static final String XPATH_USER       = "/sync/side[@name=$type]/db/user/text()";
+    public static final String XPATH_PASSWORD   = "/sync/side[@name=$type]/db/password/text()";
+    public static final String XPATH_URL        = "/sync/side[@name=$type]/db/url/text()";
+    public static final String XPATH_EXEC_LOGIN = "/sync/side[@name=$type]/db/execute-login/statement";
+
+    protected String       type;
+    protected String       driver;
+    protected String       user;
+    protected String       password;
+    protected String       url;
+    protected List<String> loginStatements;
+
+    public ConnectionBuilder(String type, Document document) {
+        this.type = type;
+        extractCredentials(document);
+    }
+
+    protected static List<String> extractStrings(NodeList nodes) {
+        int N = nodes.getLength();
+        List<String> result = new ArrayList<String>(N);
+        for (int i = 0; i < N; ++i) {
+            result.add(nodes.item(i).getTextContent());
+        }
+        return result;
+    }
+
+    protected void extractCredentials(Document document) {
+        HashMap<String, String> map = new HashMap<String, String>();
+        map.put("type", type);
+
+        driver = (String)XML.xpath(
+            document, XPATH_DRIVER, XPathConstants.STRING, null, map);
+        user = (String)XML.xpath(
+            document, XPATH_USER, XPathConstants.STRING, null, map);
+        password = (String)XML.xpath(
+            document, XPATH_PASSWORD, XPathConstants.STRING, null, map);
+        url = (String)XML.xpath(
+            document, XPATH_URL, XPathConstants.STRING, null, map);
+        loginStatements = extractStrings((NodeList)XML.xpath(
+            document, XPATH_EXEC_LOGIN, XPathConstants.NODESET, null, map));
+
+        if (log.isDebugEnabled()) {
+            log.debug("driver: " + driver);
+            log.debug("user: " + user);
+            log.debug("password: *******");
+            log.debug("url: " + url);
+            log.debug("number of login statements: " + loginStatements.size());
+        }
+    }
+
+    public Connection getConnection() throws SQLException {
+
+        if (driver != null && driver.length() > 0) {
+            try {
+                Class.forName(driver);
+            }
+            catch (ClassNotFoundException cnfe) {
+                throw new SQLException(cnfe);
+            }
+        }
+
+        Connection connection =
+            DriverManager.getConnection(url, user, password);
+
+        connection.setAutoCommit(false);
+
+        DatabaseMetaData metaData = connection.getMetaData();
+
+        if (metaData.supportsTransactionIsolationLevel(
+            Connection.TRANSACTION_READ_UNCOMMITTED)) {
+            connection.setTransactionIsolation(
+                Connection.TRANSACTION_READ_UNCOMMITTED);
+        }
+
+        for (String sql: loginStatements) {
+            Statement stmnt = connection.createStatement();
+            try {
+                stmnt.execute(sql);
+            }
+            finally {
+                stmnt.close();
+            }
+        }
+
+        return connection;
+    }
+
+    public ConnectedStatements getConnectedStatements() throws SQLException {
+        return new ConnectedStatements(
+            getConnection(),
+            new Statements(type, driver != null ? driver : "")
+                .getStatements());
+    }
+}
+// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flys-aft/src/main/java/org/dive4elements/river/etl/db/Statements.java	Thu Apr 25 11:42:54 2013 +0200
@@ -0,0 +1,124 @@
+package de.intevation.db;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.log4j.Logger;
+
+public class Statements
+{
+    private static Logger log = Logger.getLogger(Statements.class);
+
+    public static final String RESOURCE_PATH = "/sql/";
+    public static final String COMMON_PROPERTIES = "-common.properties";
+
+    protected String type;
+    protected String driver;
+
+    protected Map<String, SymbolicStatement> statements;
+
+    public Statements(String type, String driver) {
+        this.type   = type;
+        this.driver = driver;
+    }
+
+    public SymbolicStatement getStatement(String key) {
+        return getStatements().get(key);
+    }
+
+    public Map<String, SymbolicStatement> getStatements() {
+        if (statements == null) {
+            statements = loadStatements();
+        }
+        return statements;
+    }
+
+    protected Map<String, SymbolicStatement> loadStatements() {
+        Map<String, SymbolicStatement> statements =
+            new HashMap<String, SymbolicStatement>();
+
+        Properties properties = loadProperties();
+
+        for (Enumeration<?> e = properties.propertyNames(); e.hasMoreElements();) {
+            String key = (String)e.nextElement();
+            String value = properties.getProperty(key);
+            SymbolicStatement symbolic = new SymbolicStatement(value);
+            statements.put(key, symbolic);
+        }
+
+        return statements;
+    }
+
+    protected String driverToProperties() {
+        return
+            type + "-" +
+            driver.replace('.', '-').toLowerCase() + ".properties";
+    }
+
+    protected Properties loadCommon() {
+        Properties common = new Properties();
+
+        String path = RESOURCE_PATH + type + COMMON_PROPERTIES;
+
+        InputStream in = Statements.class.getResourceAsStream(path);
+
+        if (in != null) {
+            try {
+                common.load(in);
+            }
+            catch (IOException ioe) {
+                log.error("cannot load defaults: " + path, ioe);
+            }
+            finally {
+                try {
+                    in.close();
+                }
+                catch (IOException ioe) {
+                }
+            }
+        }
+        else {
+            log.warn("cannot find: " + path);
+        }
+
+        return common;
+    }
+
+    protected Properties loadProperties() {
+
+        Properties common = loadCommon();
+
+        Properties properties = new Properties(common);
+
+        String path = RESOURCE_PATH + driverToProperties();
+
+        InputStream in = Statements.class.getResourceAsStream(path);
+
+        if (in != null) {
+            try {
+                properties.load(in);
+            }
+            catch (IOException ioe) {
+                log.error("cannot load statements: " + path, ioe);
+            }
+            finally {
+                try {
+                    in.close();
+                }
+                catch (IOException ioe) {
+                }
+            }
+        }
+        else {
+            log.warn("cannot find: " + path);
+        }
+
+        return properties;
+    }
+}
+// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flys-aft/src/main/java/org/dive4elements/river/etl/db/SymbolicStatement.java	Thu Apr 25 11:42:54 2013 +0200
@@ -0,0 +1,196 @@
+package de.intevation.db;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Logger;
+
+public class SymbolicStatement {
+
+    private static Logger log = Logger.getLogger(SymbolicStatement.class);
+
+    public static final Pattern VAR = Pattern.compile(":([a-zA-Z0-9_]+)");
+
+    protected String statement;
+    protected String compiled;
+    protected Map<String, List<Integer>> positions;
+
+    public class Instance {
+
+        /** TODO: Support more types. */
+
+        protected PreparedStatement stmnt;
+
+        public Instance(Connection connection) throws SQLException {
+            stmnt = connection.prepareStatement(compiled);
+        }
+
+        public void close() {
+            try {
+                stmnt.close();
+            }
+            catch (SQLException sqle) {
+                log.error("cannot close statement", sqle);
+            }
+        }
+
+        public Instance setInt(String key, int value)
+        throws SQLException
+        {
+            List<Integer> pos = positions.get(key.toLowerCase());
+            if (pos != null) {
+                for (Integer p: pos) {
+                    stmnt.setInt(p, value);
+                }
+            }
+
+            return this;
+        }
+
+        public Instance setString(String key, String value)
+        throws SQLException
+        {
+            List<Integer> pos = positions.get(key.toLowerCase());
+            if (pos != null) {
+                for (Integer p: pos) {
+                    stmnt.setString(p, value);
+                }
+            }
+            return this;
+        }
+
+        public Instance setObject(String key, Object value)
+        throws SQLException
+        {
+            List<Integer> pos = positions.get(key.toLowerCase());
+            if (pos != null) {
+                for (Integer p: pos) {
+                    stmnt.setObject(p, value);
+                }
+            }
+            return this;
+        }
+
+        public Instance setTimestamp(String key, Timestamp value)
+        throws SQLException
+        {
+            List<Integer> pos = positions.get(key.toLowerCase());
+            if (pos != null) {
+                for (Integer p: pos) {
+                    stmnt.setTimestamp(p, value);
+                }
+            }
+            return this;
+        }
+
+        public Instance setDouble(String key, double value)
+        throws SQLException
+        {
+            List<Integer> pos = positions.get(key.toLowerCase());
+            if (pos != null) {
+                for (Integer p: pos) {
+                    stmnt.setDouble(p, value);
+                }
+            }
+            return this;
+        }
+
+        public Instance setLong(String key, long value)
+        throws SQLException
+        {
+            List<Integer> pos = positions.get(key.toLowerCase());
+            if (pos != null) {
+                for (Integer p: pos) {
+                    stmnt.setLong(p, value);
+                }
+            }
+            return this;
+        }
+
+        public Instance setNull(String key, int sqlType)
+        throws SQLException
+        {
+            List<Integer> pos = positions.get(key.toLowerCase());
+            if (pos != null) {
+                for (Integer p: pos) {
+                    stmnt.setNull(p, sqlType);
+                }
+            }
+            return this;
+        }
+
+        public Instance set(Map<String, Object> map) throws SQLException {
+            for (Map.Entry<String, Object> entry: map.entrySet()) {
+                setObject(entry.getKey(), entry.getValue());
+            }
+            return this;
+        }
+
+        public Instance clearParameters() throws SQLException {
+            stmnt.clearParameters();
+            return this;
+        }
+
+        public boolean execute() throws SQLException {
+            if (log.isDebugEnabled()) {
+                log.debug("execute: " + compiled);
+            }
+            return stmnt.execute();
+        }
+
+        public ResultSet executeQuery() throws SQLException {
+            if (log.isDebugEnabled()) {
+                log.debug("query: " + compiled);
+            }
+            return stmnt.executeQuery();
+        }
+
+        public int executeUpdate() throws SQLException {
+            if (log.isDebugEnabled()) {
+                log.debug("update: " + compiled);
+            }
+            return stmnt.executeUpdate();
+        }
+
+    } // class Instance
+
+    public SymbolicStatement(String statement) {
+        this.statement = statement;
+        compile();
+    }
+
+    public String getStatement() {
+        return statement;
+    }
+
+    protected void compile() {
+        positions = new HashMap<String, List<Integer>>();
+
+        StringBuffer sb = new StringBuffer();
+        Matcher m = VAR.matcher(statement);
+        int index = 1;
+        while (m.find()) {
+            String key = m.group(1).toLowerCase();
+            List<Integer> list = positions.get(key);
+            if (list == null) {
+                list = new ArrayList<Integer>();
+                positions.put(key, list);
+            }
+            list.add(index++);
+            m.appendReplacement(sb, "?");
+        }
+        m.appendTail(sb);
+        compiled = sb.toString();
+    }
+} // class SymbolicStatement
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flys-aft/src/main/java/org/dive4elements/river/etl/utils/XML.java	Thu Apr 25 11:42:54 2013 +0200
@@ -0,0 +1,332 @@
+package de.intevation.utils;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.StringWriter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.xml.namespace.NamespaceContext;
+import javax.xml.namespace.QName;
+
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerConfigurationException;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.TransformerFactoryConfigurationError;
+
+import javax.xml.transform.dom.DOMResult;
+import javax.xml.transform.dom.DOMSource;
+
+import javax.xml.transform.stream.StreamResult;
+import javax.xml.transform.stream.StreamSource;
+
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+import javax.xml.xpath.XPathVariableResolver;
+
+import org.apache.log4j.Logger;
+
+import org.w3c.dom.Document;
+
+import org.xml.sax.SAXException;
+
+public final class XML
+{
+    /** Logger for this class. */
+    private static Logger log = Logger.getLogger(XML.class);
+
+    public static class MapXPathVariableResolver
+    implements          XPathVariableResolver
+    {
+        protected Map<String, String> variables;
+
+
+        public MapXPathVariableResolver() {
+            this.variables = new HashMap<String, String>();
+        }
+
+
+        public MapXPathVariableResolver(Map<String, String> variables) {
+            this.variables = variables;
+        }
+
+
+        public void addVariable(String name, String value) {
+            variables.put(name, value);
+        }
+
+
+        @Override
+        public Object resolveVariable(QName variableName) {
+            String key = variableName.getLocalPart();
+            return variables.get(key);
+        }
+    } // class MapXPathVariableResolver
+
+    private XML() {
+    }
+
+        /**
+     * Creates a new XML document
+     * @return the new XML document ot null if something went wrong during
+     * creation.
+     */
+    public static final Document newDocument() {
+        DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+        factory.setNamespaceAware(true);
+
+        try {
+            return factory.newDocumentBuilder().newDocument();
+        }
+        catch (ParserConfigurationException pce) {
+            log.error(pce.getLocalizedMessage(), pce);
+        }
+        return null;
+    }
+
+    /**
+     * Loads a XML document namespace aware from a file
+     * @param file The file to load.
+     * @return the XML document or null if something went wrong
+     * during loading.
+     */
+    public static final Document parseDocument(File file) {
+        return parseDocument(file, Boolean.TRUE);
+    }
+
+    public static final Document parseDocument(File file, Boolean namespaceAware) {
+        InputStream inputStream = null;
+        try {
+            inputStream = new BufferedInputStream(new FileInputStream(file));
+            return parseDocument(inputStream, namespaceAware);
+        }
+        catch (IOException ioe) {
+            log.error(ioe.getLocalizedMessage(), ioe);
+        }
+        finally {
+            if (inputStream != null) {
+                try { inputStream.close(); }
+                catch (IOException ioe) {}
+            }
+        }
+        return null;
+    }
+
+
+    public static final Document parseDocument(InputStream inputStream) {
+        return parseDocument(inputStream, Boolean.TRUE);
+    }
+
+    public static final Document parseDocument(
+        InputStream inputStream,
+        Boolean     namespaceAware
+    ) {
+        DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+
+        if (namespaceAware != null) {
+            factory.setNamespaceAware(namespaceAware.booleanValue());
+        }
+
+        try {
+            return factory.newDocumentBuilder().parse(inputStream);
+        }
+        catch (ParserConfigurationException pce) {
+            log.error(pce.getLocalizedMessage(), pce);
+        }
+        catch (SAXException se) {
+            log.error(se.getLocalizedMessage(), se);
+        }
+        catch (IOException ioe) {
+            log.error(ioe.getLocalizedMessage(), ioe);
+        }
+        return null;
+    }
+
+
+    /**
+     * Creates a new XPath without a namespace context.
+     * @return the new XPath.
+     */
+    public static final XPath newXPath() {
+        return newXPath(null, null);
+    }
+
+    /**
+     * Creates a new XPath with a given namespace context.
+     * @param namespaceContext The namespace context to be used or null
+     * if none should be used.
+     * @return The new XPath
+     */
+    public static final XPath newXPath(
+        NamespaceContext      namespaceContext,
+        XPathVariableResolver resolver)
+    {
+        XPathFactory factory = XPathFactory.newInstance();
+        XPath        xpath   = factory.newXPath();
+        if (namespaceContext != null) {
+            xpath.setNamespaceContext(namespaceContext);
+        }
+
+        if (resolver != null) {
+            xpath.setXPathVariableResolver(resolver);
+        }
+        return xpath;
+    }
+
+    /**
+     * Evaluates an XPath query on a given object and returns the result
+     * as a given type. No namespace context is used.
+     * @param root  The object which is used as the root of the tree to
+     * be searched in.
+     * @param query The XPath query
+     * @param returnTyp The type of the result.
+     * @return The result of type 'returnTyp' or null if something
+     * went wrong during XPath evaluation.
+     */
+    public static final Object xpath(
+        Object root,
+        String query,
+        QName  returnTyp
+    ) {
+        return xpath(root, query, returnTyp, null);
+    }
+
+    /**
+     * Evaluates an XPath query on a given object and returns the result
+     * as a given type. Optionally a namespace context is used.
+     * @param root The object which is used as the root of the tree to
+     * be searched in.
+     * @param query The XPath query
+     * @param returnType The type of the result.
+     * @param namespaceContext The namespace context to be used or null
+     * if none should be used.
+     * @return The result of type 'returnTyp' or null if something
+     * went wrong during XPath evaluation.
+     */
+    public static final Object xpath(
+        Object           root,
+        String           query,
+        QName            returnType,
+        NamespaceContext namespaceContext
+    ) {
+        return xpath(root, query, returnType, namespaceContext, null);
+    }
+
+    public static final Object xpath(
+        Object           root,
+        String           query,
+        QName            returnType,
+        NamespaceContext namespaceContext,
+        Map<String, String> variables)
+    {
+        if (root == null) {
+            return null;
+        }
+
+        XPathVariableResolver resolver = variables != null
+            ? new MapXPathVariableResolver(variables)
+            : null;
+
+        try {
+            XPath xpath = newXPath(namespaceContext, resolver);
+            if (xpath != null) {
+                return xpath.evaluate(query, root, returnType);
+            }
+        }
+        catch (XPathExpressionException xpee) {
+            log.error(xpee.getLocalizedMessage(), xpee);
+        }
+
+        return null;
+    }
+
+    public static Document transform(
+        Document           document,
+        File               xformFile
+    ) {
+        try {
+            Transformer transformer =
+                TransformerFactory
+                    .newInstance()
+                    .newTransformer(
+                        new StreamSource(xformFile));
+
+            DOMResult result = new DOMResult();
+
+            transformer.transform(new DOMSource(document), result);
+
+            return (Document)result.getNode();
+        }
+        catch (TransformerConfigurationException tce) {
+            log.error(tce, tce);
+        }
+        catch (TransformerException te) {
+            log.error(te, te);
+        }
+
+        return null;
+    }
+
+   /**
+     * Streams out an XML document to a given output stream.
+     * @param document The document to be streamed out.
+     * @param out      The output stream to be used.
+     * @return true if operation succeeded else false.
+     */
+    public static boolean toStream(Document document, OutputStream out) {
+        try {
+            Transformer transformer =
+                TransformerFactory.newInstance().newTransformer();
+            DOMSource    source = new DOMSource(document);
+            StreamResult result = new StreamResult(out);
+            transformer.transform(source, result);
+            return true;
+        }
+        catch (TransformerConfigurationException tce) {
+            log.error(tce.getLocalizedMessage(), tce);
+        }
+        catch (TransformerFactoryConfigurationError tfce) {
+            log.error(tfce.getLocalizedMessage(), tfce);
+        }
+        catch (TransformerException te) {
+            log.error(te.getLocalizedMessage(), te);
+        }
+
+        return false;
+    }
+
+    public static String toString(Document document) {
+        try {
+            Transformer transformer =
+                TransformerFactory.newInstance().newTransformer();
+            DOMSource    source = new DOMSource(document);
+            StringWriter out    = new StringWriter();
+            StreamResult result = new StreamResult(out);
+            transformer.transform(source, result);
+            out.flush();
+            return out.toString();
+        }
+        catch (TransformerConfigurationException tce) {
+            log.error(tce.getLocalizedMessage(), tce);
+        }
+        catch (TransformerFactoryConfigurationError tfce) {
+            log.error(tfce.getLocalizedMessage(), tfce);
+        }
+        catch (TransformerException te) {
+            log.error(te.getLocalizedMessage(), te);
+        }
+
+        return null;
+    }
+}
+// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :

http://dive4elements.wald.intevation.org