Mercurial > dive4elements > river
changeset 5825:f529495f901d
moved directories to org.dive4elements.river.etl
author | Sascha L. Teichmann <teichmann@intevation.de> |
---|---|
date | Thu, 25 Apr 2013 11:42:54 +0200 |
parents | 06643e440d1e |
children | 9438e9259213 |
files | flys-aft/src/main/java/org/dive4elements/etl/aft/DIPSGauge.java flys-aft/src/main/java/org/dive4elements/etl/aft/DischargeTable.java flys-aft/src/main/java/org/dive4elements/etl/aft/IdPair.java flys-aft/src/main/java/org/dive4elements/etl/aft/Notification.java flys-aft/src/main/java/org/dive4elements/etl/aft/River.java flys-aft/src/main/java/org/dive4elements/etl/aft/Rivers.java flys-aft/src/main/java/org/dive4elements/etl/aft/Sync.java flys-aft/src/main/java/org/dive4elements/etl/aft/SyncContext.java flys-aft/src/main/java/org/dive4elements/etl/aft/TimeInterval.java flys-aft/src/main/java/org/dive4elements/etl/aft/WQ.java flys-aft/src/main/java/org/dive4elements/etl/aft/WQDiff.java flys-aft/src/main/java/org/dive4elements/etl/db/ConnectedStatements.java flys-aft/src/main/java/org/dive4elements/etl/db/ConnectionBuilder.java flys-aft/src/main/java/org/dive4elements/etl/db/Statements.java flys-aft/src/main/java/org/dive4elements/etl/db/SymbolicStatement.java flys-aft/src/main/java/org/dive4elements/etl/utils/XML.java flys-aft/src/main/java/org/dive4elements/river/etl/aft/DIPSGauge.java flys-aft/src/main/java/org/dive4elements/river/etl/aft/DischargeTable.java flys-aft/src/main/java/org/dive4elements/river/etl/aft/IdPair.java flys-aft/src/main/java/org/dive4elements/river/etl/aft/Notification.java flys-aft/src/main/java/org/dive4elements/river/etl/aft/River.java flys-aft/src/main/java/org/dive4elements/river/etl/aft/Rivers.java flys-aft/src/main/java/org/dive4elements/river/etl/aft/Sync.java flys-aft/src/main/java/org/dive4elements/river/etl/aft/SyncContext.java flys-aft/src/main/java/org/dive4elements/river/etl/aft/TimeInterval.java flys-aft/src/main/java/org/dive4elements/river/etl/aft/WQ.java flys-aft/src/main/java/org/dive4elements/river/etl/aft/WQDiff.java flys-aft/src/main/java/org/dive4elements/river/etl/db/ConnectedStatements.java flys-aft/src/main/java/org/dive4elements/river/etl/db/ConnectionBuilder.java flys-aft/src/main/java/org/dive4elements/river/etl/db/Statements.java flys-aft/src/main/java/org/dive4elements/river/etl/db/SymbolicStatement.java flys-aft/src/main/java/org/dive4elements/river/etl/utils/XML.java |
diffstat | 32 files changed, 2836 insertions(+), 2836 deletions(-) [+] |
line wrap: on
line diff
--- a/flys-aft/src/main/java/org/dive4elements/etl/aft/DIPSGauge.java Thu Apr 25 11:35:06 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,193 +0,0 @@ -package de.intevation.aft; - -import java.util.ArrayList; -import java.util.Calendar; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.List; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.log4j.Logger; - -import org.w3c.dom.Element; -import org.w3c.dom.NodeList; - -public class DIPSGauge -{ - private static Logger log = Logger.getLogger(DIPSGauge.class); - - public static final Pattern DATE_PATTERN = Pattern.compile( - "(\\d{4})-(\\d{2})-(\\d{2})\\s+(\\d{2}):(\\d{2}):(\\d{2})"); - - public static final Comparator<Datum> DATE_CMP = new Comparator<Datum>() { - public int compare(Datum a, Datum b) { - return a.date.compareTo(b.date); - } - }; - - public static class Datum { - - protected double value; - protected Date date; - - public Datum() { - } - - public Datum(Element element) { - value = Double.parseDouble(element.getAttribute("WERT")); - String dateString = element.getAttribute("GUELTIGAB"); - if (dateString.length() == 0) { - throw - new IllegalArgumentException("missing GUELTIGAB attribute"); - } - Matcher m = DATE_PATTERN.matcher(dateString); - if (!m.matches()) { - throw - new IllegalArgumentException("GUELTIGAB does not match"); - } - - int year = Integer.parseInt(m.group(1)); - int month = Integer.parseInt(m.group(2)); - int day = Integer.parseInt(m.group(3)); - int hours = Integer.parseInt(m.group(4)); - int mins = Integer.parseInt(m.group(5)); - int secs = Integer.parseInt(m.group(6)); - - Calendar cal = Calendar.getInstance(); - cal.set(year, month, day, hours, mins, secs); - - date = cal.getTime(); - } - - public double getValue() { - return value; - } - - public void setValue(double value) { - this.value = value; - } - - public Date getDate() { - return date; - } - - public void setDate(Date date) { - this.date = date; - } - } // class datum - - protected double aeo; - - protected double station; - - protected String name; - - protected String riverName; - - protected List<Datum> datums; - - protected int flysId; - - protected String aftName; - - protected Long officialNumber; - - public DIPSGauge() { - } - - public DIPSGauge(Element element) { - - name = element.getAttribute("NAME"); - riverName = element.getAttribute("GEWAESSER"); - - String aeoString = element.getAttribute("EINZUGSGEBIET_AEO"); - if (aeoString.length() == 0) { - log.warn("DIPS: Setting AEO of gauge '" + name + "' to zero."); - aeoString = "0"; - } - aeo = Double.parseDouble(aeoString); - - String stationString = element.getAttribute("STATIONIERUNG"); - if (stationString.length() == 0) { - log.warn("DIPS: Setting station of gauge '" + name + "' to zero."); - stationString = "-99999"; - } - station = Double.parseDouble(stationString); - if (station == 0d) { - log.warn("DIPS: Station of gauge '" + name + "' is zero."); - } - - datums = new ArrayList<Datum>(); - NodeList nodes = element.getElementsByTagName("PNP"); - for (int i = 0, N = nodes.getLength(); i < N; ++i) { - Element e = (Element)nodes.item(i); - Datum datum = new Datum(e); - datums.add(datum); - } - Collections.sort(datums, DATE_CMP); - } - - public List<Datum> getDatums() { - return datums; - } - - public String getName() { - return name; - } - - public String getRiverName() { - return riverName; - } - - public int getFlysId() { - return flysId; - } - - public void setFlysId(int flysId) { - this.flysId = flysId; - } - - public String getAftName() { - return aftName != null ? aftName : name; - } - - public void setAftName(String aftName) { - this.aftName = aftName; - } - - public double getStation() { - return station; - } - - public double getAeo() { - return aeo; - } - - public void setAeo(double aeo) { - this.aeo = aeo; - } - - public void setStation(double station) { - this.station = station; - } - - public boolean hasDatums() { - return !datums.isEmpty(); - } - - public Datum getLatestDatum() { - return datums.get(datums.size()-1); - } - - public Long getOfficialNumber() { - return officialNumber; - } - - public void setOfficialNumber(Long officialNumber) { - this.officialNumber = officialNumber; - } -} -// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/aft/DischargeTable.java Thu Apr 25 11:35:06 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,372 +0,0 @@ -package de.intevation.aft; - -import de.intevation.db.ConnectedStatements; -import de.intevation.db.SymbolicStatement; - -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Types; - -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.log4j.Logger; - -/** A Discharge Table. */ -public class DischargeTable -{ - private static Logger log = Logger.getLogger(DischargeTable.class); - - protected int id; - protected int gaugeId; - protected TimeInterval timeInterval; - protected String description; - protected String bfgId; - protected Set<WQ> values; - - public DischargeTable() { - } - - public DischargeTable( - int gaugeId, - TimeInterval timeInterval, - String description, - String bfgId - ) { - this.gaugeId = gaugeId; - this.timeInterval = timeInterval; - this.description = description; - this.bfgId = bfgId; - values = new TreeSet<WQ>(WQ.EPS_CMP); - } - - public DischargeTable( - int id, - int gaugeId, - TimeInterval timeInterval, - String description, - String bfgId - ) { - this(gaugeId, timeInterval, description, bfgId); - this.id = id; - } - - public int getId() { - return id; - } - - public void setId(int id) { - this.id = id; - } - - public int getGaugeId() { - return gaugeId; - } - - public void setGaugeId(int gaugeId) { - this.gaugeId = gaugeId; - } - - public TimeInterval getTimeInterval() { - return timeInterval; - } - - public void setTimeInterval(TimeInterval timeInterval) { - this.timeInterval = timeInterval; - } - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - } - - public String getBfgId() { - return bfgId; - } - - public void setBfgId(String bfgId) { - this.bfgId = bfgId; - } - - - public void clearValues() { - values.clear(); - } - - public Set<WQ> getValues() { - return values; - } - - public void setValues(Set<WQ> values) { - this.values = values; - } - - - protected void loadValues(SymbolicStatement.Instance query) - throws SQLException - { - ResultSet rs = query.executeQuery(); - while (rs.next()) { - int id = rs.getInt("id"); - double w = rs.getDouble("w"); - double q = rs.getDouble("q"); - if (!values.add(new WQ(id, w, q))) { - log.warn("FLYS/AFT: Value duplication w="+w+" q="+q+". -> ignore."); - } - } - rs.close(); - } - - public void loadAftValues(SyncContext context) throws SQLException { - loadValues(context.getAftStatements() - .getStatement("select.tafelwert") - .clearParameters() - .setInt("number", getId())); - } - - public void loadFlysValues(SyncContext context) throws SQLException { - loadValues(context.getFlysStatements() - .getStatement("select.discharge.table.values") - .clearParameters() - .setInt("table_id", getId())); - } - - public void storeFlysValues( - SyncContext context, - int dischargeTableId - ) - throws SQLException - { - ConnectedStatements flysStatements = context.getFlysStatements(); - - // Create the ids. - SymbolicStatement.Instance nextId = flysStatements - .getStatement("next.discharge.table.values.id"); - - // Insert the values. - SymbolicStatement.Instance insertDTV = flysStatements - .getStatement("insert.discharge.table.value"); - - for (WQ wq: values) { - int wqId; - ResultSet rs = nextId.executeQuery(); - try { - rs.next(); - wqId = rs.getInt("discharge_table_values_id"); - } - finally { - rs.close(); - } - - insertDTV - .clearParameters() - .setInt("id", wqId) - .setInt("table_id", dischargeTableId) - .setDouble("w", wq.getW()) - .setDouble("q", wq.getQ()) - .execute(); - } - } - - public static List<DischargeTable> loadFlysDischargeTables( - SyncContext context, - int gaugeId - ) - throws SQLException - { - List<DischargeTable> dts = new ArrayList<DischargeTable>(); - - ResultSet rs = context - .getFlysStatements() - .getStatement("select.gauge.discharge.tables") - .clearParameters() - .setInt("gauge_id", gaugeId) - .executeQuery(); - try { - OUTER: while (rs.next()) { - int id = rs.getInt("id"); - String description = rs.getString("description"); - String bfgId = rs.getString("bfg_id"); - if (description == null) { - description = ""; - } - if (bfgId == null) { - bfgId = ""; - } - for (DischargeTable dt: dts) { - if (dt.getBfgId().equals(bfgId)) { - log.warn("FLYS: Found discharge table '" + - bfgId + "' with same bfg_id. -> ignore"); - continue OUTER; - } - } - Date startTime = rs.getDate("start_time"); - Date stopTime = rs.getDate("stop_time"); - TimeInterval ti = startTime == null - ? null - : new TimeInterval(startTime, stopTime); - - DischargeTable dt = new DischargeTable( - id, gaugeId, ti, description, bfgId); - dts.add(dt); - } - } - finally { - rs.close(); - } - - return dts; - } - - public static List<DischargeTable> loadAftDischargeTables( - SyncContext context, - Long officialNumber - ) - throws SQLException - { - return loadAftDischargeTables(context, officialNumber, 0); - } - - public static List<DischargeTable> loadAftDischargeTables( - SyncContext context, - Long officialNumber, - int flysGaugeId - ) - throws SQLException - { - List<DischargeTable> dts = new ArrayList<DischargeTable>(); - - ResultSet rs = context - .getAftStatements() - .getStatement("select.abflusstafel") - .clearParameters() - .setString("number", "%" + officialNumber) - .executeQuery(); - try { - OUTER: while (rs.next()) { - int dtId = rs.getInt("ABFLUSSTAFEL_NR"); - Date from = rs.getDate("GUELTIG_VON"); - Date to = rs.getDate("GUELTIG_BIS"); - - if (from == null) { - log.warn("AFT: ABFLUSSTAFEL_NR = " - + dtId + ": GUELTIG_VON = NULL -> ignored."); - } - - if (to == null) { - log.warn("AFT: ABFLUSSTAFEL_NR = " - + dtId + ": GUELTIG_BIS = NULL -> ignored."); - } - - if (from == null || to == null) { - continue; - } - - if (from.compareTo(to) > 0) { - log.warn("AFT: ABFLUSSTAFEL_NR = " - + dtId + ": " + from + " > " + to + ". -> swap"); - Date temp = from; - from = to; - to = temp; - } - - String description = rs.getString("ABFLUSSTAFEL_BEZ"); - if (description == null) { - description = String.valueOf(officialNumber); - } - - String bfgId = rs.getString("BFG_ID"); - if (bfgId == null) { - bfgId = ""; - } - - for (DischargeTable dt: dts) { - if (dt.getBfgId().equals(bfgId)) { - log.warn("AFT: Found discharge table '" + - bfgId + "' with same bfg_id. -> ignore."); - continue OUTER; - } - } - - TimeInterval timeInterval = new TimeInterval(from, to); - - DischargeTable dt = new DischargeTable( - dtId, - flysGaugeId, - timeInterval, - description, - bfgId); - dts.add(dt); - } - } - finally { - rs.close(); - } - - return dts; - } - - public void persistFlysTimeInterval( - SyncContext context - ) - throws SQLException - { - if (timeInterval != null) { - timeInterval = context.fetchOrCreateFLYSTimeInterval( - timeInterval); - } - } - - public int persistFlysDischargeTable( - SyncContext context, - int gaugeId - ) - throws SQLException - { - ConnectedStatements flysStatements = - context.getFlysStatements(); - - int flysId; - - ResultSet rs = flysStatements - .getStatement("next.discharge.id") - .executeQuery(); - try { - rs.next(); - flysId = rs.getInt("discharge_table_id"); - } - finally { - rs.close(); - } - - SymbolicStatement.Instance insertDT = flysStatements - .getStatement("insert.dischargetable") - .clearParameters() - .setInt("id", flysId) - .setInt("gauge_id", gaugeId) - .setString("description", description) - .setString("bfg_id", bfgId); - - if (timeInterval != null) { - insertDT.setInt("time_interval_id", timeInterval.getId()); - } - else { - insertDT.setNull("time_interval_id", Types.INTEGER); - } - - insertDT.execute(); - - if (log.isDebugEnabled()) { - log.debug("FLYS: Created discharge table id: " + id); - } - - return flysId; - } -} -// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/aft/IdPair.java Thu Apr 25 11:35:06 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,40 +0,0 @@ -package de.intevation.aft; - -public class IdPair -{ - protected int id1; - protected int id2; - - public IdPair() { - } - - public IdPair(int id1) { - this.id1 = id1; - } - - public IdPair(int id1, int id2) { - this(id1); - this.id2 = id2; - } - - public int getId1() { - return id1; - } - - public void setId1(int id1) { - this.id1 = id1; - } - - public int getId2() { - return id2; - } - - public void setId2(int id2) { - this.id2 = id2; - } - - public String toString() { - return "[IdPair: id1=" + id1 + ", id2=" + id2 + "]"; - } -} -// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/aft/Notification.java Thu Apr 25 11:35:06 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,101 +0,0 @@ -package de.intevation.aft; - -import de.intevation.utils.XML; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -import java.net.HttpURLConnection; -import java.net.URL; -import java.net.URLConnection; - -import org.apache.log4j.Logger; - -import org.w3c.dom.Document; -import org.w3c.dom.Node; -import org.w3c.dom.NodeList; - -public class Notification -{ - private static Logger log = Logger.getLogger(Notification.class); - - protected Document message; - - public Notification() { - } - - public Notification(Document message) { - this.message = message; - } - - public Notification(Node message) { - this(wrap(message)); - } - - public static Document wrap(Node node) { - Document document = XML.newDocument(); - - // Send first element as message. - // Fall back to root node. - Node toImport = node; - - NodeList children = node.getChildNodes(); - for (int i = 0, N = children.getLength(); i < N; ++i) { - Node child = children.item(i); - if (child.getNodeType() == Node.ELEMENT_NODE) { - toImport = child; - break; - } - } - - toImport = document.importNode(toImport, true); - document.appendChild(toImport); - document.normalizeDocument(); - return document; - } - - public Document sendPOST(URL url) { - - OutputStream out = null; - InputStream in = null; - Document result = null; - - try { - URLConnection ucon = url.openConnection(); - - if (!(ucon instanceof HttpURLConnection)) { - log.warn("NOTIFY: '" + url + "' is not an HTTP(S) connection."); - return null; - } - - HttpURLConnection con = (HttpURLConnection)ucon; - - con.setRequestMethod("POST"); - con.setDoInput(true); - con.setDoOutput(true); - con.setUseCaches(false); - con.setRequestProperty("Content-Type", "text/xml"); - - out = con.getOutputStream(); - XML.toStream(message, out); - out.flush(); - in = con.getInputStream(); - result = XML.parseDocument(in); - } - catch (IOException ioe) { - log.error("NOTIFY: Sending message to '" + url + "' failed.", ioe); - } - finally { - if (out != null) { - try { out.close(); } catch (IOException ioe) {} - } - if (in != null) { - try { in.close(); } catch (IOException ioe) {} - } - } - - return result; - } -} -// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/aft/River.java Thu Apr 25 11:35:06 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,525 +0,0 @@ -package de.intevation.aft; - -import de.intevation.db.ConnectedStatements; -import de.intevation.db.SymbolicStatement; - -import java.sql.ResultSet; -import java.sql.SQLException; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.log4j.Logger; - -public class River -extends IdPair -{ - private static Logger log = Logger.getLogger(River.class); - - protected String name; - - protected double from; - protected double to; - - public River() { - } - - public River(int id1, String name, double from, double to) { - super(id1); - this.name = name; - this.from = from; - this.to = to; - } - - public River(int id1, int id2, String name) { - super(id1, id2); - this.name = name; - } - - public String getName() { - return name; - } - - public double getFrom() { - return from; - } - - public void setFrom(double from) { - this.from = from; - } - - public double getTo() { - return to; - } - - public void setTo(double to) { - this.to = to; - } - - public boolean inside(double x) { - return x >= from && x <= to; - } - - public boolean sync(SyncContext context) throws SQLException { - log.info("sync river: " + this); - - // Only take relevant gauges into account. - Map<Long, DIPSGauge> dipsGauges = context.getDIPSGauges(name, from, to); - - ConnectedStatements flysStatements = context.getFlysStatements(); - ConnectedStatements aftStatements = context.getAftStatements(); - - String riverName = getName(); - - Map<Long, DIPSGauge> aftDIPSGauges = new HashMap<Long, DIPSGauge>(); - - ResultSet messstellenRs = aftStatements - .getStatement("select.messstelle") - .clearParameters() - .setInt("GEWAESSER_NR", id2) - .executeQuery(); - - try { - while (messstellenRs.next()) { - String name = messstellenRs.getString("NAME"); - String num = messstellenRs.getString("MESSSTELLE_NR"); - double station = messstellenRs.getDouble("STATIONIERUNG"); - - if (!messstellenRs.wasNull() && !inside(station)) { - log.warn("Station found in AFT but in not range: " + station); - continue; - } - - Long number = SyncContext.numberToLong(num); - if (number == null) { - log.warn("AFT: Invalid MESSSTELLE_NR for MESSSTELLE '"+name+"'"); - continue; - } - DIPSGauge dipsGauge = dipsGauges.get(number); - if (dipsGauge == null) { - log.warn( - "DIPS: MESSSTELLE '" + name + "' not found in DIPS. " + - "Gauge number used for lookup: " + number); - continue; - } - String gaugeRiver = dipsGauge.getRiverName(); - if (!gaugeRiver.equalsIgnoreCase(riverName)) { - log.warn( - "DIPS: MESSSTELLE '" + name + - "' is assigned to river '" + gaugeRiver + - "'. Needs to be on '" + riverName + "'."); - continue; - } - dipsGauge.setAftName(name); - dipsGauge.setOfficialNumber(number); - aftDIPSGauges.put(number, dipsGauge); - } - } - finally { - messstellenRs.close(); - } - - List<DIPSGauge> updateGauges = new ArrayList<DIPSGauge>(); - - ResultSet gaugesRs = flysStatements - .getStatement("select.gauges") - .clearParameters() - .setInt("river_id", id1).executeQuery(); - - try { - while (gaugesRs.next()) { - int gaugeId = gaugesRs.getInt("id"); - String name = gaugesRs.getString("name"); - long number = gaugesRs.getLong("official_number"); - if (gaugesRs.wasNull()) { - log.warn("FLYS: Gauge '" + name + - "' has no official number. Ignored."); - continue; - } - Long key = Long.valueOf(number); - DIPSGauge aftDIPSGauge = aftDIPSGauges.remove(key); - if (aftDIPSGauge == null) { - log.warn("FLYS: Gauge '" + name + "' number " + number + - " is not found in AFT/DIPS."); - continue; - } - aftDIPSGauge.setFlysId(gaugeId); - log.info("Gauge '" + name + - "' found in FLYS, AFT and DIPS. -> Update"); - updateGauges.add(aftDIPSGauge); - } - } - finally { - gaugesRs.close(); - } - - boolean modified = createGauges(context, aftDIPSGauges); - - modified |= updateGauges(context, updateGauges); - - return modified; - } - - protected boolean updateGauges( - SyncContext context, - List<DIPSGauge> gauges - ) - throws SQLException - { - boolean modified = false; - - for (DIPSGauge gauge: gauges) { - // XXX: Do dont modify the master AT. - // modified |= updateBfGIdOnMasterDischargeTable(context, gauge); - modified |= updateGauge(context, gauge); - } - - return modified; - } - - protected boolean updateBfGIdOnMasterDischargeTable( - SyncContext context, - DIPSGauge gauge - ) throws SQLException { - log.info( - "FLYS: Updating master discharge table bfg_id for '" + - gauge.getAftName() + "'"); - ConnectedStatements flysStatements = context.getFlysStatements(); - - ResultSet rs = flysStatements - .getStatement("select.gauge.master.discharge.table") - .clearParameters() - .setInt("gauge_id", gauge.getFlysId()) - .executeQuery(); - - int flysId; - - try { - if (!rs.next()) { - log.error( - "FLYS: No master discharge table found for gauge '" + - gauge.getAftName() + "'"); - return false; - } - String bfgId = rs.getString("bfg_id"); - if (!rs.wasNull()) { // already has BFG_ID - return false; - } - flysId = rs.getInt("id"); - } finally { - rs.close(); - } - - // We need to find out the BFG_ID of the current discharge table - // for this gauge in AFT. - - ConnectedStatements aftStatements = context.getAftStatements(); - - rs = aftStatements - .getStatement("select.bfg.id.current") - .clearParameters() - .setString("number", "%" + gauge.getOfficialNumber()) - .executeQuery(); - - String bfgId = null; - - try { - if (rs.next()) { - bfgId = rs.getString("BFG_ID"); - } - } finally { - rs.close(); - } - - if (bfgId == null) { - log.warn( - "No BFG_ID found for current discharge table of gauge '" + - gauge + "'"); - return false; - } - - // Set the BFG_ID in FLYS. - flysStatements.beginTransaction(); - try { - flysStatements - .getStatement("update.bfg.id.discharge.table") - .clearParameters() - .setInt("id", flysId) - .setString("bfg_id", bfgId) - .executeUpdate(); - flysStatements.commitTransaction(); - } catch (SQLException sqle) { - flysStatements.rollbackTransaction(); - log.error(sqle, sqle); - return false; - } - - return true; - } - - protected boolean updateGauge( - SyncContext context, - DIPSGauge gauge - ) - throws SQLException - { - log.info("FLYS: Updating gauge '" + gauge.getAftName() + "'."); - // We need to load all discharge tables from both databases - // of the gauge and do some pairing based on their bfg_id. - - boolean modified = false; - - ConnectedStatements flysStatements = context.getFlysStatements(); - - flysStatements.beginTransaction(); - try { - List<DischargeTable> flysDTs = - DischargeTable.loadFlysDischargeTables( - context, gauge.getFlysId()); - - List<DischargeTable> aftDTs = - DischargeTable.loadAftDischargeTables( - context, gauge.getOfficialNumber()); - - Map<String, DischargeTable> bfgId2FlysDT = - new HashMap<String, DischargeTable>(); - - for (DischargeTable dt: flysDTs) { - String bfgId = dt.getBfgId(); - if (bfgId == null) { - log.warn("FLYS: discharge table " + dt.getId() - + " has no bfg_id. Ignored."); - continue; - } - bfgId2FlysDT.put(bfgId, dt); - } - - List<DischargeTable> createDTs = new ArrayList<DischargeTable>(); - - for (DischargeTable aftDT: aftDTs) { - String bfgId = aftDT.getBfgId(); - DischargeTable flysDT = bfgId2FlysDT.remove(bfgId); - if (flysDT != null) { - // Found in AFT and FLYS. - log.info("FLYS: Discharge table '" + bfgId - + "' found in AFT and FLYS. -> update"); - // Create the W/Q diff. - modified |= writeWQChanges(context, flysDT, aftDT); - } - else { - log.info("FLYS: Discharge table '" + bfgId - + "' not found in FLYS. -> create"); - createDTs.add(aftDT); - } - } - - for (String bfgId: bfgId2FlysDT.keySet()) { - log.info("FLYS: Discharge table '" + bfgId - + "' found in FLYS but not in AFT. -> ignore"); - } - - log.info("FLYS: Copy " + createDTs.size() + - " discharge tables over from AFT."); - - // Create the new discharge tables. - for (DischargeTable aftDT: createDTs) { - createDischargeTable(context, aftDT, gauge.getFlysId()); - modified = true; - } - - flysStatements.commitTransaction(); - } - catch (SQLException sqle) { - flysStatements.rollbackTransaction(); - log.error(sqle, sqle); - modified = false; - } - - return modified; - } - - protected boolean writeWQChanges( - SyncContext context, - DischargeTable flysDT, - DischargeTable aftDT - ) - throws SQLException - { - flysDT.loadFlysValues(context); - aftDT.loadAftValues(context); - WQDiff diff = new WQDiff(flysDT.getValues(), aftDT.getValues()); - if (diff.hasChanges()) { - diff.writeChanges(context, flysDT.getId()); - return true; - } - return false; - } - - protected boolean createGauges( - SyncContext context, - Map<Long, DIPSGauge> gauges - ) - throws SQLException - { - ConnectedStatements flysStatements = context.getFlysStatements(); - - SymbolicStatement.Instance nextId = - flysStatements.getStatement("next.gauge.id"); - - SymbolicStatement.Instance insertStmnt = - flysStatements.getStatement("insert.gauge"); - - boolean modified = false; - - for (Map.Entry<Long, DIPSGauge> entry: gauges.entrySet()) { - Long officialNumber = entry.getKey(); - DIPSGauge gauge = entry.getValue(); - - log.info("Gauge '" + gauge.getAftName() + - "' not in FLYS but in AFT/DIPS. -> Create"); - - if (!gauge.hasDatums()) { - log.warn("DIPS: Gauge '" + - gauge.getAftName() + "' has no datum. Ignored."); - continue; - } - - ResultSet rs = null; - flysStatements.beginTransaction(); - try { - (rs = nextId.executeQuery()).next(); - int gaugeId = rs.getInt("gauge_id"); - rs.close(); rs = null; - - insertStmnt - .clearParameters() - .setInt("id", gaugeId) - .setString("name", gauge.getAftName()) - .setInt("river_id", id1) - .setDouble("station", gauge.getStation()) - .setDouble("aeo", gauge.getAeo()) - .setLong("official_number", officialNumber) - .setDouble("datum", gauge.getLatestDatum().getValue()); - - insertStmnt.execute(); - - log.info("FLYS: Created gauge '" + gauge.getAftName() + - "' with id " + gaugeId + "."); - - gauge.setFlysId(gaugeId); - createDischargeTables(context, gauge); - flysStatements.commitTransaction(); - modified = true; - } - catch (SQLException sqle) { - flysStatements.rollbackTransaction(); - log.error(sqle, sqle); - } - finally { - if (rs != null) { - rs.close(); - } - } - } - - return modified; - } - - protected void createDischargeTable( - SyncContext context, - DischargeTable aftDT, - int flysGaugeId - ) - throws SQLException - { - aftDT.persistFlysTimeInterval(context); - int flysId = aftDT.persistFlysDischargeTable(context, flysGaugeId); - - aftDT.loadAftValues(context); - aftDT.storeFlysValues(context, flysId); - } - - protected void createDischargeTables( - SyncContext context, - DIPSGauge gauge - ) - throws SQLException - { - log.info("FLYS: Create discharge tables for '" + - gauge.getAftName() + "'."); - - // Load the discharge tables from AFT. - List<DischargeTable> dts = loadAftDischargeTables( - context, gauge); - - // Persist the time intervals. - persistFlysTimeIntervals(context, dts); - - // Persist the discharge tables - int [] flysDTIds = persistFlysDischargeTables( - context, dts, gauge.getFlysId()); - - // Copy over the W/Q values - copyWQsFromAftToFlys(context, dts, flysDTIds); - } - - protected List<DischargeTable> loadAftDischargeTables( - SyncContext context, - DIPSGauge gauge - ) - throws SQLException - { - return DischargeTable.loadAftDischargeTables( - context, gauge.getOfficialNumber(), gauge.getFlysId()); - } - - protected void persistFlysTimeIntervals( - SyncContext context, - List<DischargeTable> dts - ) - throws SQLException - { - for (DischargeTable dt: dts) { - dt.persistFlysTimeInterval(context); - } - } - - protected int [] persistFlysDischargeTables( - SyncContext context, - List<DischargeTable> dts, - int flysGaugeId - ) - throws SQLException - { - int [] flysDTIds = new int[dts.size()]; - - for (int i = 0; i < flysDTIds.length; ++i) { - flysDTIds[i] = dts.get(i) - .persistFlysDischargeTable(context, flysGaugeId); - } - - return flysDTIds; - } - - protected void copyWQsFromAftToFlys( - SyncContext context, - List<DischargeTable> dts, - int [] flysDTIds - ) - throws SQLException - { - for (int i = 0; i < flysDTIds.length; ++i) { - DischargeTable dt = dts.get(i); - dt.loadAftValues(context); - dt.storeFlysValues(context, flysDTIds[i]); - dt.clearValues(); // To save memory. - } - } - - public String toString() { - return "[River: name=" + name + ", " + super.toString() + "]"; - } -} -// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/aft/Rivers.java Thu Apr 25 11:35:06 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,77 +0,0 @@ -package de.intevation.aft; - -import de.intevation.db.ConnectedStatements; - -import java.sql.ResultSet; -import java.sql.SQLException; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.log4j.Logger; - -public class Rivers -{ - private static Logger log = Logger.getLogger(Rivers.class); - - public Rivers() { - } - - public boolean sync(SyncContext context) throws SQLException { - - log.info("sync: rivers"); - - ConnectedStatements flysStatements = context.getFlysStatements(); - ConnectedStatements aftStatements = context.getAftStatements(); - - Map<String, River> flysRivers = new HashMap<String, River>(); - - ResultSet flysRs = flysStatements - .getStatement("select.rivers").executeQuery(); - - try { - while (flysRs.next()) { - int id = flysRs.getInt("id"); - String name = flysRs.getString("name"); - double from = flysRs.getDouble("min_km"); - double to = flysRs.getDouble("max_km"); - flysRivers.put(name.toLowerCase(), new River(id, name, from, to)); - } - } - finally { - flysRs.close(); - } - - List<River> commonRivers = new ArrayList<River>(); - - ResultSet aftRs = aftStatements - .getStatement("select.gewaesser").executeQuery(); - - try { - while (aftRs.next()) { - String name = aftRs.getString("NAME"); - River river = flysRivers.get(name.toLowerCase()); - if (river != null) { - int id2 = aftRs.getInt("GEWAESSER_NR"); - river.setId2(id2); - commonRivers.add(river); - } - } - } - finally { - aftRs.close(); - } - - - boolean modified = false; - - for (River river: commonRivers) { - modified |= river.sync(context); - } - - return modified; - } -} -// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/aft/Sync.java Thu Apr 25 11:35:06 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,170 +0,0 @@ -package de.intevation.aft; - -import de.intevation.db.ConnectionBuilder; - -import de.intevation.utils.XML; - -import java.io.File; - -import java.net.MalformedURLException; -import java.net.URL; - -import java.sql.SQLException; - -import javax.xml.xpath.XPathConstants; - -import org.apache.log4j.Logger; - -import org.w3c.dom.Document; -import org.w3c.dom.Element; -import org.w3c.dom.NodeList; - -public class Sync -{ - private static Logger log = Logger.getLogger(Sync.class); - - public static final String FLYS = "flys"; - public static final String AFT = "aft"; - - public static final String XPATH_DIPS = "/sync/dips/file/text()"; - public static final String XPATH_REPAIR = "/sync/dips/repair/text()"; - - public static final String XPATH_NOTIFICATIONS = - "/sync/notifications/notification"; - - public static final String CONFIG_FILE = - System.getProperty("config.file", "config.xml"); - - public static void sendNotifications(Document config) { - NodeList notifications = (NodeList)XML.xpath( - config, XPATH_NOTIFICATIONS, XPathConstants.NODESET, null, null); - - if (notifications == null) { - return; - } - - for (int i = 0, N = notifications.getLength(); i < N; ++i) { - Element notification = (Element)notifications.item(i); - String urlString = notification.getAttribute("url"); - - URL url; - try { - url = new URL(urlString); - } - catch (MalformedURLException mfue) { - log.warn("NOTIFY: Invalid URL '" + urlString + "'. Ignored.", mfue); - continue; - } - - Notification n = new Notification(notification); - - Document result = n.sendPOST(url); - - if (result != null) { - log.info("Send notifcation to '" + urlString + "'."); - log.info(XML.toString(result)); - } - } - } - - public static void main(String [] args) { - - File configFile = new File(CONFIG_FILE); - - if (!configFile.isFile() || !configFile.canRead()) { - log.error("cannot read config file"); - System.exit(1); - } - - Document config = XML.parseDocument(configFile, Boolean.FALSE); - - if (config == null) { - log.error("Cannot load config file."); - System.exit(1); - } - - String dipsF = (String)XML.xpath( - config, XPATH_DIPS, XPathConstants.STRING, null, null); - - if (dipsF == null || dipsF.length() == 0) { - log.error("Cannot find path to DIPS XML in config."); - System.exit(1); - } - - File dipsFile = new File(dipsF); - - if (!dipsFile.isFile() || !dipsFile.canRead()) { - log.error("DIPS: Cannot find '" + dipsF + "'."); - System.exit(1); - } - - Document dips = XML.parseDocument(dipsFile, Boolean.FALSE); - - if (dips == null) { - log.error("DIPS: Cannot load DIPS document."); - System.exit(1); - } - - String repairF = (String)XML.xpath( - config, XPATH_REPAIR, XPathConstants.STRING, null, null); - - if (repairF != null && repairF.length() > 0) { - File repairFile = new File(repairF); - if (!repairFile.isFile() || !repairFile.canRead()) { - log.warn("REPAIR: Cannot open DIPS repair XSLT file."); - } - else { - Document fixed = XML.transform(dips, repairFile); - if (fixed == null) { - log.warn("REPAIR: Fixing DIPS failed."); - } - else { - dips = fixed; - } - } - } - - int exitCode = 0; - - ConnectionBuilder aftConnectionBuilder = - new ConnectionBuilder(AFT, config); - - ConnectionBuilder flysConnectionBuilder = - new ConnectionBuilder(FLYS, config); - - SyncContext syncContext = null; - - boolean modified = false; - try { - syncContext = new SyncContext( - aftConnectionBuilder.getConnectedStatements(), - flysConnectionBuilder.getConnectedStatements(), - dips); - syncContext.init(); - Rivers rivers = new Rivers(); - modified = rivers.sync(syncContext); - } - catch (SQLException sqle) { - log.error("SYNC: Syncing failed.", sqle); - exitCode = 1; - } - finally { - if (syncContext != null) { - syncContext.close(); - } - } - - if (modified) { - log.info("Modifications found."); - sendNotifications(config); - } - else { - log.info("No modifications found."); - } - - if (exitCode != 0) { - System.exit(1); - } - } -} -// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/aft/SyncContext.java Thu Apr 25 11:35:06 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,220 +0,0 @@ -package de.intevation.aft; - -import de.intevation.db.ConnectedStatements; - -import java.sql.ResultSet; -import java.sql.SQLException; - -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import java.util.TreeMap; - -import org.apache.log4j.Logger; - -import org.w3c.dom.Document; -import org.w3c.dom.Element; -import org.w3c.dom.NodeList; - -public class SyncContext -{ - private static Logger log = Logger.getLogger(SyncContext.class); - - protected ConnectedStatements aftStatements; - protected ConnectedStatements flysStatements; - protected Document dips; - - protected Map<Long, DIPSGauge> numberToGauge; - protected Map<TimeInterval, TimeInterval> flysTimeIntervals; - - public SyncContext() { - } - - public SyncContext( - ConnectedStatements aftStatements, - ConnectedStatements flysStatements, - Document dips - ) { - this.aftStatements = aftStatements; - this.flysStatements = flysStatements; - this.dips = dips; - } - - public void init() throws SQLException { - numberToGauge = indexByNumber(dips); - flysTimeIntervals = loadTimeIntervals(); - } - - public ConnectedStatements getAftStatements() { - return aftStatements; - } - - public void setAftStatements(ConnectedStatements aftStatements) { - this.aftStatements = aftStatements; - } - - public ConnectedStatements getFlysStatements() { - return flysStatements; - } - - public void setFlysStatements(ConnectedStatements flysStatements) { - this.flysStatements = flysStatements; - } - - public Document getDips() { - return dips; - } - - public void setDips(Document dips) { - this.dips = dips; - } - - void close() { - aftStatements.close(); - flysStatements.close(); - } - - public static Long numberToLong(String s) { - try { - return Long.valueOf(s.trim()); - } - catch (NumberFormatException nfe) { - } - return null; - } - - public Map<Long, DIPSGauge> getDIPSGauges() { - return numberToGauge; - } - - public Map<Long, DIPSGauge> getDIPSGauges( - String riverName, - double from, - double to - ) { - if (from > to) { - double t = from; - from = to; - to = t; - } - - riverName = riverName.toLowerCase(); - - Map<Long, DIPSGauge> result = new HashMap<Long, DIPSGauge>(); - - for (Map.Entry<Long, DIPSGauge> entry: numberToGauge.entrySet()) { - DIPSGauge gauge = entry.getValue(); - // XXX: Maybe a bit too sloppy. - if (!riverName.contains(gauge.getRiverName().toLowerCase())) { - continue; - } - double station = gauge.getStation(); - if (station >= from && station <= to) { - result.put(entry.getKey(), gauge); - } - } - - return result; - } - - protected static Map<Long, DIPSGauge> indexByNumber(Document document) { - Map<Long, DIPSGauge> map = new HashMap<Long, DIPSGauge>(); - NodeList nodes = document.getElementsByTagName("PEGELSTATION"); - for (int i = nodes.getLength()-1; i >= 0; --i) { - Element element = (Element)nodes.item(i); - String numberString = element.getAttribute("NUMMER"); - Long number = numberToLong(numberString); - if (number != null) { - DIPSGauge newG = new DIPSGauge(element); - DIPSGauge oldG = map.put(number, newG); - if (oldG != null) { - log.warn("DIPS: '" + newG.getName() + - "' collides with '" + oldG.getName() + - "' on gauge number " + number + "."); - } - } - else { - log.warn("DIPS: Gauge '" + element.getAttribute("NAME") + - "' has invalid gauge number '" + numberString + "'."); - } - } - return map; - } - - protected Map<TimeInterval, TimeInterval> loadTimeIntervals() - throws SQLException { - - boolean debug = log.isDebugEnabled(); - - Map<TimeInterval, TimeInterval> intervals = - new TreeMap<TimeInterval, TimeInterval>(); - - ResultSet rs = flysStatements - .getStatement("select.timeintervals") - .executeQuery(); - - try { - while (rs.next()) { - int id = rs.getInt("id"); - Date start = rs.getDate("start_time"); - Date stop = rs.getDate("stop_time"); - - if (debug) { - log.debug("id: " + id); - log.debug("start: " + start); - log.debug("stop: " + stop); - } - - TimeInterval ti = new TimeInterval(id, start, stop); - intervals.put(ti, ti); - } - } - finally { - rs.close(); - } - - if (debug) { - log.debug("loaded time intervals: " + intervals.size()); - } - - return intervals; - } - - public TimeInterval fetchOrCreateFLYSTimeInterval(TimeInterval key) - throws SQLException - { - TimeInterval old = flysTimeIntervals.get(key); - if (old != null) { - return old; - } - - ResultSet rs = flysStatements - .getStatement("next.timeinterval.id") - .executeQuery(); - - try { - rs.next(); - key.setId(rs.getInt("time_interval_id")); - } - finally { - rs.close(); - } - - if (log.isDebugEnabled()) { - log.debug("FLYS: Created time interval id: " + key.getId()); - log.debug("FLYS: " + key); - } - - flysStatements.getStatement("insert.timeinterval") - .clearParameters() - .setInt("id", key.getId()) - .setObject("start_time", key.getStart()) - .setObject("stop_time", key.getStop()) - .execute(); - - flysTimeIntervals.put(key, key); - - return key; - } -} -// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/aft/TimeInterval.java Thu Apr 25 11:35:06 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,70 +0,0 @@ -package de.intevation.aft; - -import java.util.Date; - -public class TimeInterval -implements Comparable<TimeInterval> -{ - protected int id; - protected Date start; - protected Date stop; - - public TimeInterval() { - } - - public TimeInterval(Date start, Date stop) { - this.start = start; - this.stop = stop; - } - - public TimeInterval(int id, Date start, Date stop) { - this(start, stop); - this.id = id; - } - - protected static int compare(Date d1, Date d2) { - long s1 = d1 != null ? d1.getTime()/1000L : 0L; - long s2 = d2 != null ? d2.getTime()/1000L : 0L; - long diff = s1 - s2; - return diff < 0L - ? -1 - : diff > 0L ? 1 : 0; - } - - @Override - public int compareTo(TimeInterval other) { - int cmp = compare(start, other.start); - return cmp != 0 - ? cmp - : compare(stop, other.stop); - } - - public int getId() { - return id; - } - - public void setId(int id) { - this.id = id; - } - - public Date getStart() { - return start; - } - - public void setStart(Date start) { - this.start = start; - } - - public Date getStop() { - return stop; - } - - public void setStop(Date stop) { - this.stop = stop; - } - - public String toString() { - return "[TimeInterval: start=" + start + ", stop=" + stop + "]"; - } -} -// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/aft/WQ.java Thu Apr 25 11:35:06 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,67 +0,0 @@ -package de.intevation.aft; - -import java.util.Comparator; - -public class WQ -{ - public static final double EPSILON = 1e-4; - - public static final Comparator<WQ> EPS_CMP = new Comparator<WQ>() { - @Override - public int compare(WQ a, WQ b) { - int cmp = compareEpsilon(a.q, b.q); - if (cmp != 0) return cmp; - return compareEpsilon(a.w, b.w); - } - }; - - protected int id; - - protected double w; - protected double q; - - public WQ() { - } - - public WQ(double w, double q) { - this.w = w; - this.q = q; - } - - public WQ(int id, double w, double q) { - this.id = id; - this.w = w; - this.q = q; - } - - public static final int compareEpsilon(double a, double b) { - double diff = a - b; - if (diff < -EPSILON) return -1; - return diff > EPSILON ? +1 : 0; - } - - public int getId() { - return id; - } - - public void setId(int id) { - this.id = id; - } - - public double getW() { - return w; - } - - public void setW(double w) { - this.w = w; - } - - public double getQ() { - return q; - } - - public void setQ(double q) { - this.q = q; - } -} -// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/aft/WQDiff.java Thu Apr 25 11:35:06 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,118 +0,0 @@ -package de.intevation.aft; - -import de.intevation.db.ConnectedStatements; -import de.intevation.db.SymbolicStatement; - -import java.sql.ResultSet; -import java.sql.SQLException; - -import java.util.Collection; -import java.util.Iterator; -import java.util.Set; -import java.util.TreeSet; - -public class WQDiff -{ - protected Set<WQ> toAdd; - protected Set<WQ> toDelete; - - public WQDiff() { - } - - public WQDiff(Collection<WQ> a, Collection<WQ> b) { - toAdd = new TreeSet<WQ>(WQ.EPS_CMP); - toDelete = new TreeSet<WQ>(WQ.EPS_CMP); - build(a, b); - } - - public void build(Collection<WQ> a, Collection<WQ> b) { - toAdd.addAll(b); - toAdd.removeAll(a); - - toDelete.addAll(a); - toDelete.removeAll(b); - } - - public void clear() { - toAdd.clear(); - toDelete.clear(); - } - - public Set<WQ> getToAdd() { - return toAdd; - } - - public void setToAdd(Set<WQ> toAdd) { - this.toAdd = toAdd; - } - - public Set<WQ> getToDelete() { - return toDelete; - } - - public void setToDelete(Set<WQ> toDelete) { - this.toDelete = toDelete; - } - - public boolean hasChanges() { - return !(toAdd.isEmpty() && toDelete.isEmpty()); - } - - public void writeChanges( - SyncContext context, - int tableId - ) - throws SQLException - { - ConnectedStatements flysStatements = context.getFlysStatements(); - - // Delete the old entries - if (!toDelete.isEmpty()) { - SymbolicStatement.Instance deleteDTV = - flysStatements.getStatement("delete.discharge.table.value"); - for (WQ wq: toDelete) { - deleteDTV - .clearParameters() - .setInt("id", wq.getId()) - .execute(); - } - } - - // Add the new entries. - if (!toAdd.isEmpty()) { - SymbolicStatement.Instance nextId = - flysStatements.getStatement("next.discharge.table.values.id"); - - SymbolicStatement.Instance insertDTV = - flysStatements.getStatement("insert.discharge.table.value"); - - // Recycle old ids as much as possible. - Iterator<WQ> oldIds = toDelete.iterator(); - - // Create ids for new entries. - for (WQ wq: toAdd) { - if (oldIds.hasNext()) { - wq.setId(oldIds.next().getId()); - } - else { - ResultSet rs = nextId.executeQuery(); - rs.next(); - wq.setId(rs.getInt("discharge_table_values_id")); - rs.close(); - } - } - - // Write the new entries. - for (WQ wq: toAdd) { - insertDTV - .clearParameters() - .setInt("id", wq.getId()) - .setInt("table_id", tableId) - .setDouble("w", wq.getW()) - .setDouble("q", wq.getQ()) - .execute(); - } - } - } -} -// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/db/ConnectedStatements.java Thu Apr 25 11:35:06 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,110 +0,0 @@ -package de.intevation.db; - -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.SQLException; -import java.sql.Savepoint; - -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.HashMap; -import java.util.Map; - -import org.apache.log4j.Logger; - -public class ConnectedStatements -{ - private static Logger log = Logger.getLogger(ConnectedStatements.class); - - protected Connection connection; - - protected Map<String, SymbolicStatement> statements; - - protected Map<String, SymbolicStatement.Instance> boundStatements; - - protected Deque<Savepoint> savepoints; - - public ConnectedStatements( - Connection connection, - Map<String, SymbolicStatement> statements - ) - throws SQLException - { - this.connection = connection; - this.statements = statements; - checkSavePoints(); - - boundStatements = new HashMap<String, SymbolicStatement.Instance>(); - } - - protected void checkSavePoints() throws SQLException { - DatabaseMetaData metaData = connection.getMetaData(); - if (metaData.supportsSavepoints()) { - log.info("Driver '" + metaData.getDriverName() + - "' does support savepoints."); - savepoints = new ArrayDeque<Savepoint>(); - } - else { - log.info("Driver '" + metaData.getDriverName() + - "' does not support savepoints."); - } - } - - public SymbolicStatement.Instance getStatement(String key) - throws SQLException - { - SymbolicStatement.Instance stmnt = boundStatements.get(key); - if (stmnt != null) { - return stmnt; - } - - SymbolicStatement ss = statements.get(key); - if (ss == null) { - return null; - } - - stmnt = ss.new Instance(connection); - boundStatements.put(key, stmnt); - return stmnt; - } - - public void beginTransaction() throws SQLException { - if (savepoints != null) { - savepoints.push(connection.setSavepoint()); - } - } - - public void commitTransaction() throws SQLException { - if (savepoints != null) { - savepoints.pop(); - } - connection.commit(); - } - - public void rollbackTransaction() throws SQLException { - if (savepoints != null) { - Savepoint savepoint = savepoints.pop(); - connection.rollback(savepoint); - } - else { - connection.rollback(); - } - } - - public void close() { - for (SymbolicStatement.Instance s: boundStatements.values()) { - s.close(); - } - - try { - if (savepoints != null && !savepoints.isEmpty()) { - Savepoint savepoint = savepoints.peekFirst(); - connection.rollback(savepoint); - } - connection.close(); - } - catch (SQLException sqle) { - } - } -} -// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/db/ConnectionBuilder.java Thu Apr 25 11:35:06 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,121 +0,0 @@ -package de.intevation.db; - -import de.intevation.utils.XML; - -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -import javax.xml.xpath.XPathConstants; - -import org.apache.log4j.Logger; - -import org.w3c.dom.Document; -import org.w3c.dom.NodeList; - -public class ConnectionBuilder -{ - private static Logger log = Logger.getLogger(ConnectionBuilder.class); - - public static final String XPATH_DRIVER = "/sync/side[@name=$type]/db/driver/text()"; - public static final String XPATH_USER = "/sync/side[@name=$type]/db/user/text()"; - public static final String XPATH_PASSWORD = "/sync/side[@name=$type]/db/password/text()"; - public static final String XPATH_URL = "/sync/side[@name=$type]/db/url/text()"; - public static final String XPATH_EXEC_LOGIN = "/sync/side[@name=$type]/db/execute-login/statement"; - - protected String type; - protected String driver; - protected String user; - protected String password; - protected String url; - protected List<String> loginStatements; - - public ConnectionBuilder(String type, Document document) { - this.type = type; - extractCredentials(document); - } - - protected static List<String> extractStrings(NodeList nodes) { - int N = nodes.getLength(); - List<String> result = new ArrayList<String>(N); - for (int i = 0; i < N; ++i) { - result.add(nodes.item(i).getTextContent()); - } - return result; - } - - protected void extractCredentials(Document document) { - HashMap<String, String> map = new HashMap<String, String>(); - map.put("type", type); - - driver = (String)XML.xpath( - document, XPATH_DRIVER, XPathConstants.STRING, null, map); - user = (String)XML.xpath( - document, XPATH_USER, XPathConstants.STRING, null, map); - password = (String)XML.xpath( - document, XPATH_PASSWORD, XPathConstants.STRING, null, map); - url = (String)XML.xpath( - document, XPATH_URL, XPathConstants.STRING, null, map); - loginStatements = extractStrings((NodeList)XML.xpath( - document, XPATH_EXEC_LOGIN, XPathConstants.NODESET, null, map)); - - if (log.isDebugEnabled()) { - log.debug("driver: " + driver); - log.debug("user: " + user); - log.debug("password: *******"); - log.debug("url: " + url); - log.debug("number of login statements: " + loginStatements.size()); - } - } - - public Connection getConnection() throws SQLException { - - if (driver != null && driver.length() > 0) { - try { - Class.forName(driver); - } - catch (ClassNotFoundException cnfe) { - throw new SQLException(cnfe); - } - } - - Connection connection = - DriverManager.getConnection(url, user, password); - - connection.setAutoCommit(false); - - DatabaseMetaData metaData = connection.getMetaData(); - - if (metaData.supportsTransactionIsolationLevel( - Connection.TRANSACTION_READ_UNCOMMITTED)) { - connection.setTransactionIsolation( - Connection.TRANSACTION_READ_UNCOMMITTED); - } - - for (String sql: loginStatements) { - Statement stmnt = connection.createStatement(); - try { - stmnt.execute(sql); - } - finally { - stmnt.close(); - } - } - - return connection; - } - - public ConnectedStatements getConnectedStatements() throws SQLException { - return new ConnectedStatements( - getConnection(), - new Statements(type, driver != null ? driver : "") - .getStatements()); - } -} -// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/db/Statements.java Thu Apr 25 11:35:06 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,124 +0,0 @@ -package de.intevation.db; - -import java.io.IOException; -import java.io.InputStream; - -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -import org.apache.log4j.Logger; - -public class Statements -{ - private static Logger log = Logger.getLogger(Statements.class); - - public static final String RESOURCE_PATH = "/sql/"; - public static final String COMMON_PROPERTIES = "-common.properties"; - - protected String type; - protected String driver; - - protected Map<String, SymbolicStatement> statements; - - public Statements(String type, String driver) { - this.type = type; - this.driver = driver; - } - - public SymbolicStatement getStatement(String key) { - return getStatements().get(key); - } - - public Map<String, SymbolicStatement> getStatements() { - if (statements == null) { - statements = loadStatements(); - } - return statements; - } - - protected Map<String, SymbolicStatement> loadStatements() { - Map<String, SymbolicStatement> statements = - new HashMap<String, SymbolicStatement>(); - - Properties properties = loadProperties(); - - for (Enumeration<?> e = properties.propertyNames(); e.hasMoreElements();) { - String key = (String)e.nextElement(); - String value = properties.getProperty(key); - SymbolicStatement symbolic = new SymbolicStatement(value); - statements.put(key, symbolic); - } - - return statements; - } - - protected String driverToProperties() { - return - type + "-" + - driver.replace('.', '-').toLowerCase() + ".properties"; - } - - protected Properties loadCommon() { - Properties common = new Properties(); - - String path = RESOURCE_PATH + type + COMMON_PROPERTIES; - - InputStream in = Statements.class.getResourceAsStream(path); - - if (in != null) { - try { - common.load(in); - } - catch (IOException ioe) { - log.error("cannot load defaults: " + path, ioe); - } - finally { - try { - in.close(); - } - catch (IOException ioe) { - } - } - } - else { - log.warn("cannot find: " + path); - } - - return common; - } - - protected Properties loadProperties() { - - Properties common = loadCommon(); - - Properties properties = new Properties(common); - - String path = RESOURCE_PATH + driverToProperties(); - - InputStream in = Statements.class.getResourceAsStream(path); - - if (in != null) { - try { - properties.load(in); - } - catch (IOException ioe) { - log.error("cannot load statements: " + path, ioe); - } - finally { - try { - in.close(); - } - catch (IOException ioe) { - } - } - } - else { - log.warn("cannot find: " + path); - } - - return properties; - } -} -// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- a/flys-aft/src/main/java/org/dive4elements/etl/db/SymbolicStatement.java Thu Apr 25 11:35:06 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,196 +0,0 @@ -package de.intevation.db; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Timestamp; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.log4j.Logger; - -public class SymbolicStatement { - - private static Logger log = Logger.getLogger(SymbolicStatement.class); - - public static final Pattern VAR = Pattern.compile(":([a-zA-Z0-9_]+)"); - - protected String statement; - protected String compiled; - protected Map<String, List<Integer>> positions; - - public class Instance { - - /** TODO: Support more types. */ - - protected PreparedStatement stmnt; - - public Instance(Connection connection) throws SQLException { - stmnt = connection.prepareStatement(compiled); - } - - public void close() { - try { - stmnt.close(); - } - catch (SQLException sqle) { - log.error("cannot close statement", sqle); - } - } - - public Instance setInt(String key, int value) - throws SQLException - { - List<Integer> pos = positions.get(key.toLowerCase()); - if (pos != null) { - for (Integer p: pos) { - stmnt.setInt(p, value); - } - } - - return this; - } - - public Instance setString(String key, String value) - throws SQLException - { - List<Integer> pos = positions.get(key.toLowerCase()); - if (pos != null) { - for (Integer p: pos) { - stmnt.setString(p, value); - } - } - return this; - } - - public Instance setObject(String key, Object value) - throws SQLException - { - List<Integer> pos = positions.get(key.toLowerCase()); - if (pos != null) { - for (Integer p: pos) { - stmnt.setObject(p, value); - } - } - return this; - } - - public Instance setTimestamp(String key, Timestamp value) - throws SQLException - { - List<Integer> pos = positions.get(key.toLowerCase()); - if (pos != null) { - for (Integer p: pos) { - stmnt.setTimestamp(p, value); - } - } - return this; - } - - public Instance setDouble(String key, double value) - throws SQLException - { - List<Integer> pos = positions.get(key.toLowerCase()); - if (pos != null) { - for (Integer p: pos) { - stmnt.setDouble(p, value); - } - } - return this; - } - - public Instance setLong(String key, long value) - throws SQLException - { - List<Integer> pos = positions.get(key.toLowerCase()); - if (pos != null) { - for (Integer p: pos) { - stmnt.setLong(p, value); - } - } - return this; - } - - public Instance setNull(String key, int sqlType) - throws SQLException - { - List<Integer> pos = positions.get(key.toLowerCase()); - if (pos != null) { - for (Integer p: pos) { - stmnt.setNull(p, sqlType); - } - } - return this; - } - - public Instance set(Map<String, Object> map) throws SQLException { - for (Map.Entry<String, Object> entry: map.entrySet()) { - setObject(entry.getKey(), entry.getValue()); - } - return this; - } - - public Instance clearParameters() throws SQLException { - stmnt.clearParameters(); - return this; - } - - public boolean execute() throws SQLException { - if (log.isDebugEnabled()) { - log.debug("execute: " + compiled); - } - return stmnt.execute(); - } - - public ResultSet executeQuery() throws SQLException { - if (log.isDebugEnabled()) { - log.debug("query: " + compiled); - } - return stmnt.executeQuery(); - } - - public int executeUpdate() throws SQLException { - if (log.isDebugEnabled()) { - log.debug("update: " + compiled); - } - return stmnt.executeUpdate(); - } - - } // class Instance - - public SymbolicStatement(String statement) { - this.statement = statement; - compile(); - } - - public String getStatement() { - return statement; - } - - protected void compile() { - positions = new HashMap<String, List<Integer>>(); - - StringBuffer sb = new StringBuffer(); - Matcher m = VAR.matcher(statement); - int index = 1; - while (m.find()) { - String key = m.group(1).toLowerCase(); - List<Integer> list = positions.get(key); - if (list == null) { - list = new ArrayList<Integer>(); - positions.put(key, list); - } - list.add(index++); - m.appendReplacement(sb, "?"); - } - m.appendTail(sb); - compiled = sb.toString(); - } -} // class SymbolicStatement
--- a/flys-aft/src/main/java/org/dive4elements/etl/utils/XML.java Thu Apr 25 11:35:06 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,332 +0,0 @@ -package de.intevation.utils; - -import java.io.BufferedInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.StringWriter; - -import java.util.HashMap; -import java.util.Map; - -import javax.xml.namespace.NamespaceContext; -import javax.xml.namespace.QName; - -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; - -import javax.xml.transform.Transformer; -import javax.xml.transform.TransformerConfigurationException; -import javax.xml.transform.TransformerException; -import javax.xml.transform.TransformerFactory; -import javax.xml.transform.TransformerFactoryConfigurationError; - -import javax.xml.transform.dom.DOMResult; -import javax.xml.transform.dom.DOMSource; - -import javax.xml.transform.stream.StreamResult; -import javax.xml.transform.stream.StreamSource; - -import javax.xml.xpath.XPath; -import javax.xml.xpath.XPathExpressionException; -import javax.xml.xpath.XPathFactory; -import javax.xml.xpath.XPathVariableResolver; - -import org.apache.log4j.Logger; - -import org.w3c.dom.Document; - -import org.xml.sax.SAXException; - -public final class XML -{ - /** Logger for this class. */ - private static Logger log = Logger.getLogger(XML.class); - - public static class MapXPathVariableResolver - implements XPathVariableResolver - { - protected Map<String, String> variables; - - - public MapXPathVariableResolver() { - this.variables = new HashMap<String, String>(); - } - - - public MapXPathVariableResolver(Map<String, String> variables) { - this.variables = variables; - } - - - public void addVariable(String name, String value) { - variables.put(name, value); - } - - - @Override - public Object resolveVariable(QName variableName) { - String key = variableName.getLocalPart(); - return variables.get(key); - } - } // class MapXPathVariableResolver - - private XML() { - } - - /** - * Creates a new XML document - * @return the new XML document ot null if something went wrong during - * creation. - */ - public static final Document newDocument() { - DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); - factory.setNamespaceAware(true); - - try { - return factory.newDocumentBuilder().newDocument(); - } - catch (ParserConfigurationException pce) { - log.error(pce.getLocalizedMessage(), pce); - } - return null; - } - - /** - * Loads a XML document namespace aware from a file - * @param file The file to load. - * @return the XML document or null if something went wrong - * during loading. - */ - public static final Document parseDocument(File file) { - return parseDocument(file, Boolean.TRUE); - } - - public static final Document parseDocument(File file, Boolean namespaceAware) { - InputStream inputStream = null; - try { - inputStream = new BufferedInputStream(new FileInputStream(file)); - return parseDocument(inputStream, namespaceAware); - } - catch (IOException ioe) { - log.error(ioe.getLocalizedMessage(), ioe); - } - finally { - if (inputStream != null) { - try { inputStream.close(); } - catch (IOException ioe) {} - } - } - return null; - } - - - public static final Document parseDocument(InputStream inputStream) { - return parseDocument(inputStream, Boolean.TRUE); - } - - public static final Document parseDocument( - InputStream inputStream, - Boolean namespaceAware - ) { - DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); - - if (namespaceAware != null) { - factory.setNamespaceAware(namespaceAware.booleanValue()); - } - - try { - return factory.newDocumentBuilder().parse(inputStream); - } - catch (ParserConfigurationException pce) { - log.error(pce.getLocalizedMessage(), pce); - } - catch (SAXException se) { - log.error(se.getLocalizedMessage(), se); - } - catch (IOException ioe) { - log.error(ioe.getLocalizedMessage(), ioe); - } - return null; - } - - - /** - * Creates a new XPath without a namespace context. - * @return the new XPath. - */ - public static final XPath newXPath() { - return newXPath(null, null); - } - - /** - * Creates a new XPath with a given namespace context. - * @param namespaceContext The namespace context to be used or null - * if none should be used. - * @return The new XPath - */ - public static final XPath newXPath( - NamespaceContext namespaceContext, - XPathVariableResolver resolver) - { - XPathFactory factory = XPathFactory.newInstance(); - XPath xpath = factory.newXPath(); - if (namespaceContext != null) { - xpath.setNamespaceContext(namespaceContext); - } - - if (resolver != null) { - xpath.setXPathVariableResolver(resolver); - } - return xpath; - } - - /** - * Evaluates an XPath query on a given object and returns the result - * as a given type. No namespace context is used. - * @param root The object which is used as the root of the tree to - * be searched in. - * @param query The XPath query - * @param returnTyp The type of the result. - * @return The result of type 'returnTyp' or null if something - * went wrong during XPath evaluation. - */ - public static final Object xpath( - Object root, - String query, - QName returnTyp - ) { - return xpath(root, query, returnTyp, null); - } - - /** - * Evaluates an XPath query on a given object and returns the result - * as a given type. Optionally a namespace context is used. - * @param root The object which is used as the root of the tree to - * be searched in. - * @param query The XPath query - * @param returnType The type of the result. - * @param namespaceContext The namespace context to be used or null - * if none should be used. - * @return The result of type 'returnTyp' or null if something - * went wrong during XPath evaluation. - */ - public static final Object xpath( - Object root, - String query, - QName returnType, - NamespaceContext namespaceContext - ) { - return xpath(root, query, returnType, namespaceContext, null); - } - - public static final Object xpath( - Object root, - String query, - QName returnType, - NamespaceContext namespaceContext, - Map<String, String> variables) - { - if (root == null) { - return null; - } - - XPathVariableResolver resolver = variables != null - ? new MapXPathVariableResolver(variables) - : null; - - try { - XPath xpath = newXPath(namespaceContext, resolver); - if (xpath != null) { - return xpath.evaluate(query, root, returnType); - } - } - catch (XPathExpressionException xpee) { - log.error(xpee.getLocalizedMessage(), xpee); - } - - return null; - } - - public static Document transform( - Document document, - File xformFile - ) { - try { - Transformer transformer = - TransformerFactory - .newInstance() - .newTransformer( - new StreamSource(xformFile)); - - DOMResult result = new DOMResult(); - - transformer.transform(new DOMSource(document), result); - - return (Document)result.getNode(); - } - catch (TransformerConfigurationException tce) { - log.error(tce, tce); - } - catch (TransformerException te) { - log.error(te, te); - } - - return null; - } - - /** - * Streams out an XML document to a given output stream. - * @param document The document to be streamed out. - * @param out The output stream to be used. - * @return true if operation succeeded else false. - */ - public static boolean toStream(Document document, OutputStream out) { - try { - Transformer transformer = - TransformerFactory.newInstance().newTransformer(); - DOMSource source = new DOMSource(document); - StreamResult result = new StreamResult(out); - transformer.transform(source, result); - return true; - } - catch (TransformerConfigurationException tce) { - log.error(tce.getLocalizedMessage(), tce); - } - catch (TransformerFactoryConfigurationError tfce) { - log.error(tfce.getLocalizedMessage(), tfce); - } - catch (TransformerException te) { - log.error(te.getLocalizedMessage(), te); - } - - return false; - } - - public static String toString(Document document) { - try { - Transformer transformer = - TransformerFactory.newInstance().newTransformer(); - DOMSource source = new DOMSource(document); - StringWriter out = new StringWriter(); - StreamResult result = new StreamResult(out); - transformer.transform(source, result); - out.flush(); - return out.toString(); - } - catch (TransformerConfigurationException tce) { - log.error(tce.getLocalizedMessage(), tce); - } - catch (TransformerFactoryConfigurationError tfce) { - log.error(tfce.getLocalizedMessage(), tfce); - } - catch (TransformerException te) { - log.error(te.getLocalizedMessage(), te); - } - - return null; - } -} -// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flys-aft/src/main/java/org/dive4elements/river/etl/aft/DIPSGauge.java Thu Apr 25 11:42:54 2013 +0200 @@ -0,0 +1,193 @@ +package de.intevation.aft; + +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.List; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.log4j.Logger; + +import org.w3c.dom.Element; +import org.w3c.dom.NodeList; + +public class DIPSGauge +{ + private static Logger log = Logger.getLogger(DIPSGauge.class); + + public static final Pattern DATE_PATTERN = Pattern.compile( + "(\\d{4})-(\\d{2})-(\\d{2})\\s+(\\d{2}):(\\d{2}):(\\d{2})"); + + public static final Comparator<Datum> DATE_CMP = new Comparator<Datum>() { + public int compare(Datum a, Datum b) { + return a.date.compareTo(b.date); + } + }; + + public static class Datum { + + protected double value; + protected Date date; + + public Datum() { + } + + public Datum(Element element) { + value = Double.parseDouble(element.getAttribute("WERT")); + String dateString = element.getAttribute("GUELTIGAB"); + if (dateString.length() == 0) { + throw + new IllegalArgumentException("missing GUELTIGAB attribute"); + } + Matcher m = DATE_PATTERN.matcher(dateString); + if (!m.matches()) { + throw + new IllegalArgumentException("GUELTIGAB does not match"); + } + + int year = Integer.parseInt(m.group(1)); + int month = Integer.parseInt(m.group(2)); + int day = Integer.parseInt(m.group(3)); + int hours = Integer.parseInt(m.group(4)); + int mins = Integer.parseInt(m.group(5)); + int secs = Integer.parseInt(m.group(6)); + + Calendar cal = Calendar.getInstance(); + cal.set(year, month, day, hours, mins, secs); + + date = cal.getTime(); + } + + public double getValue() { + return value; + } + + public void setValue(double value) { + this.value = value; + } + + public Date getDate() { + return date; + } + + public void setDate(Date date) { + this.date = date; + } + } // class datum + + protected double aeo; + + protected double station; + + protected String name; + + protected String riverName; + + protected List<Datum> datums; + + protected int flysId; + + protected String aftName; + + protected Long officialNumber; + + public DIPSGauge() { + } + + public DIPSGauge(Element element) { + + name = element.getAttribute("NAME"); + riverName = element.getAttribute("GEWAESSER"); + + String aeoString = element.getAttribute("EINZUGSGEBIET_AEO"); + if (aeoString.length() == 0) { + log.warn("DIPS: Setting AEO of gauge '" + name + "' to zero."); + aeoString = "0"; + } + aeo = Double.parseDouble(aeoString); + + String stationString = element.getAttribute("STATIONIERUNG"); + if (stationString.length() == 0) { + log.warn("DIPS: Setting station of gauge '" + name + "' to zero."); + stationString = "-99999"; + } + station = Double.parseDouble(stationString); + if (station == 0d) { + log.warn("DIPS: Station of gauge '" + name + "' is zero."); + } + + datums = new ArrayList<Datum>(); + NodeList nodes = element.getElementsByTagName("PNP"); + for (int i = 0, N = nodes.getLength(); i < N; ++i) { + Element e = (Element)nodes.item(i); + Datum datum = new Datum(e); + datums.add(datum); + } + Collections.sort(datums, DATE_CMP); + } + + public List<Datum> getDatums() { + return datums; + } + + public String getName() { + return name; + } + + public String getRiverName() { + return riverName; + } + + public int getFlysId() { + return flysId; + } + + public void setFlysId(int flysId) { + this.flysId = flysId; + } + + public String getAftName() { + return aftName != null ? aftName : name; + } + + public void setAftName(String aftName) { + this.aftName = aftName; + } + + public double getStation() { + return station; + } + + public double getAeo() { + return aeo; + } + + public void setAeo(double aeo) { + this.aeo = aeo; + } + + public void setStation(double station) { + this.station = station; + } + + public boolean hasDatums() { + return !datums.isEmpty(); + } + + public Datum getLatestDatum() { + return datums.get(datums.size()-1); + } + + public Long getOfficialNumber() { + return officialNumber; + } + + public void setOfficialNumber(Long officialNumber) { + this.officialNumber = officialNumber; + } +} +// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flys-aft/src/main/java/org/dive4elements/river/etl/aft/DischargeTable.java Thu Apr 25 11:42:54 2013 +0200 @@ -0,0 +1,372 @@ +package de.intevation.aft; + +import de.intevation.db.ConnectedStatements; +import de.intevation.db.SymbolicStatement; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.log4j.Logger; + +/** A Discharge Table. */ +public class DischargeTable +{ + private static Logger log = Logger.getLogger(DischargeTable.class); + + protected int id; + protected int gaugeId; + protected TimeInterval timeInterval; + protected String description; + protected String bfgId; + protected Set<WQ> values; + + public DischargeTable() { + } + + public DischargeTable( + int gaugeId, + TimeInterval timeInterval, + String description, + String bfgId + ) { + this.gaugeId = gaugeId; + this.timeInterval = timeInterval; + this.description = description; + this.bfgId = bfgId; + values = new TreeSet<WQ>(WQ.EPS_CMP); + } + + public DischargeTable( + int id, + int gaugeId, + TimeInterval timeInterval, + String description, + String bfgId + ) { + this(gaugeId, timeInterval, description, bfgId); + this.id = id; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public int getGaugeId() { + return gaugeId; + } + + public void setGaugeId(int gaugeId) { + this.gaugeId = gaugeId; + } + + public TimeInterval getTimeInterval() { + return timeInterval; + } + + public void setTimeInterval(TimeInterval timeInterval) { + this.timeInterval = timeInterval; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public String getBfgId() { + return bfgId; + } + + public void setBfgId(String bfgId) { + this.bfgId = bfgId; + } + + + public void clearValues() { + values.clear(); + } + + public Set<WQ> getValues() { + return values; + } + + public void setValues(Set<WQ> values) { + this.values = values; + } + + + protected void loadValues(SymbolicStatement.Instance query) + throws SQLException + { + ResultSet rs = query.executeQuery(); + while (rs.next()) { + int id = rs.getInt("id"); + double w = rs.getDouble("w"); + double q = rs.getDouble("q"); + if (!values.add(new WQ(id, w, q))) { + log.warn("FLYS/AFT: Value duplication w="+w+" q="+q+". -> ignore."); + } + } + rs.close(); + } + + public void loadAftValues(SyncContext context) throws SQLException { + loadValues(context.getAftStatements() + .getStatement("select.tafelwert") + .clearParameters() + .setInt("number", getId())); + } + + public void loadFlysValues(SyncContext context) throws SQLException { + loadValues(context.getFlysStatements() + .getStatement("select.discharge.table.values") + .clearParameters() + .setInt("table_id", getId())); + } + + public void storeFlysValues( + SyncContext context, + int dischargeTableId + ) + throws SQLException + { + ConnectedStatements flysStatements = context.getFlysStatements(); + + // Create the ids. + SymbolicStatement.Instance nextId = flysStatements + .getStatement("next.discharge.table.values.id"); + + // Insert the values. + SymbolicStatement.Instance insertDTV = flysStatements + .getStatement("insert.discharge.table.value"); + + for (WQ wq: values) { + int wqId; + ResultSet rs = nextId.executeQuery(); + try { + rs.next(); + wqId = rs.getInt("discharge_table_values_id"); + } + finally { + rs.close(); + } + + insertDTV + .clearParameters() + .setInt("id", wqId) + .setInt("table_id", dischargeTableId) + .setDouble("w", wq.getW()) + .setDouble("q", wq.getQ()) + .execute(); + } + } + + public static List<DischargeTable> loadFlysDischargeTables( + SyncContext context, + int gaugeId + ) + throws SQLException + { + List<DischargeTable> dts = new ArrayList<DischargeTable>(); + + ResultSet rs = context + .getFlysStatements() + .getStatement("select.gauge.discharge.tables") + .clearParameters() + .setInt("gauge_id", gaugeId) + .executeQuery(); + try { + OUTER: while (rs.next()) { + int id = rs.getInt("id"); + String description = rs.getString("description"); + String bfgId = rs.getString("bfg_id"); + if (description == null) { + description = ""; + } + if (bfgId == null) { + bfgId = ""; + } + for (DischargeTable dt: dts) { + if (dt.getBfgId().equals(bfgId)) { + log.warn("FLYS: Found discharge table '" + + bfgId + "' with same bfg_id. -> ignore"); + continue OUTER; + } + } + Date startTime = rs.getDate("start_time"); + Date stopTime = rs.getDate("stop_time"); + TimeInterval ti = startTime == null + ? null + : new TimeInterval(startTime, stopTime); + + DischargeTable dt = new DischargeTable( + id, gaugeId, ti, description, bfgId); + dts.add(dt); + } + } + finally { + rs.close(); + } + + return dts; + } + + public static List<DischargeTable> loadAftDischargeTables( + SyncContext context, + Long officialNumber + ) + throws SQLException + { + return loadAftDischargeTables(context, officialNumber, 0); + } + + public static List<DischargeTable> loadAftDischargeTables( + SyncContext context, + Long officialNumber, + int flysGaugeId + ) + throws SQLException + { + List<DischargeTable> dts = new ArrayList<DischargeTable>(); + + ResultSet rs = context + .getAftStatements() + .getStatement("select.abflusstafel") + .clearParameters() + .setString("number", "%" + officialNumber) + .executeQuery(); + try { + OUTER: while (rs.next()) { + int dtId = rs.getInt("ABFLUSSTAFEL_NR"); + Date from = rs.getDate("GUELTIG_VON"); + Date to = rs.getDate("GUELTIG_BIS"); + + if (from == null) { + log.warn("AFT: ABFLUSSTAFEL_NR = " + + dtId + ": GUELTIG_VON = NULL -> ignored."); + } + + if (to == null) { + log.warn("AFT: ABFLUSSTAFEL_NR = " + + dtId + ": GUELTIG_BIS = NULL -> ignored."); + } + + if (from == null || to == null) { + continue; + } + + if (from.compareTo(to) > 0) { + log.warn("AFT: ABFLUSSTAFEL_NR = " + + dtId + ": " + from + " > " + to + ". -> swap"); + Date temp = from; + from = to; + to = temp; + } + + String description = rs.getString("ABFLUSSTAFEL_BEZ"); + if (description == null) { + description = String.valueOf(officialNumber); + } + + String bfgId = rs.getString("BFG_ID"); + if (bfgId == null) { + bfgId = ""; + } + + for (DischargeTable dt: dts) { + if (dt.getBfgId().equals(bfgId)) { + log.warn("AFT: Found discharge table '" + + bfgId + "' with same bfg_id. -> ignore."); + continue OUTER; + } + } + + TimeInterval timeInterval = new TimeInterval(from, to); + + DischargeTable dt = new DischargeTable( + dtId, + flysGaugeId, + timeInterval, + description, + bfgId); + dts.add(dt); + } + } + finally { + rs.close(); + } + + return dts; + } + + public void persistFlysTimeInterval( + SyncContext context + ) + throws SQLException + { + if (timeInterval != null) { + timeInterval = context.fetchOrCreateFLYSTimeInterval( + timeInterval); + } + } + + public int persistFlysDischargeTable( + SyncContext context, + int gaugeId + ) + throws SQLException + { + ConnectedStatements flysStatements = + context.getFlysStatements(); + + int flysId; + + ResultSet rs = flysStatements + .getStatement("next.discharge.id") + .executeQuery(); + try { + rs.next(); + flysId = rs.getInt("discharge_table_id"); + } + finally { + rs.close(); + } + + SymbolicStatement.Instance insertDT = flysStatements + .getStatement("insert.dischargetable") + .clearParameters() + .setInt("id", flysId) + .setInt("gauge_id", gaugeId) + .setString("description", description) + .setString("bfg_id", bfgId); + + if (timeInterval != null) { + insertDT.setInt("time_interval_id", timeInterval.getId()); + } + else { + insertDT.setNull("time_interval_id", Types.INTEGER); + } + + insertDT.execute(); + + if (log.isDebugEnabled()) { + log.debug("FLYS: Created discharge table id: " + id); + } + + return flysId; + } +} +// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flys-aft/src/main/java/org/dive4elements/river/etl/aft/IdPair.java Thu Apr 25 11:42:54 2013 +0200 @@ -0,0 +1,40 @@ +package de.intevation.aft; + +public class IdPair +{ + protected int id1; + protected int id2; + + public IdPair() { + } + + public IdPair(int id1) { + this.id1 = id1; + } + + public IdPair(int id1, int id2) { + this(id1); + this.id2 = id2; + } + + public int getId1() { + return id1; + } + + public void setId1(int id1) { + this.id1 = id1; + } + + public int getId2() { + return id2; + } + + public void setId2(int id2) { + this.id2 = id2; + } + + public String toString() { + return "[IdPair: id1=" + id1 + ", id2=" + id2 + "]"; + } +} +// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flys-aft/src/main/java/org/dive4elements/river/etl/aft/Notification.java Thu Apr 25 11:42:54 2013 +0200 @@ -0,0 +1,101 @@ +package de.intevation.aft; + +import de.intevation.utils.XML; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLConnection; + +import org.apache.log4j.Logger; + +import org.w3c.dom.Document; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + +public class Notification +{ + private static Logger log = Logger.getLogger(Notification.class); + + protected Document message; + + public Notification() { + } + + public Notification(Document message) { + this.message = message; + } + + public Notification(Node message) { + this(wrap(message)); + } + + public static Document wrap(Node node) { + Document document = XML.newDocument(); + + // Send first element as message. + // Fall back to root node. + Node toImport = node; + + NodeList children = node.getChildNodes(); + for (int i = 0, N = children.getLength(); i < N; ++i) { + Node child = children.item(i); + if (child.getNodeType() == Node.ELEMENT_NODE) { + toImport = child; + break; + } + } + + toImport = document.importNode(toImport, true); + document.appendChild(toImport); + document.normalizeDocument(); + return document; + } + + public Document sendPOST(URL url) { + + OutputStream out = null; + InputStream in = null; + Document result = null; + + try { + URLConnection ucon = url.openConnection(); + + if (!(ucon instanceof HttpURLConnection)) { + log.warn("NOTIFY: '" + url + "' is not an HTTP(S) connection."); + return null; + } + + HttpURLConnection con = (HttpURLConnection)ucon; + + con.setRequestMethod("POST"); + con.setDoInput(true); + con.setDoOutput(true); + con.setUseCaches(false); + con.setRequestProperty("Content-Type", "text/xml"); + + out = con.getOutputStream(); + XML.toStream(message, out); + out.flush(); + in = con.getInputStream(); + result = XML.parseDocument(in); + } + catch (IOException ioe) { + log.error("NOTIFY: Sending message to '" + url + "' failed.", ioe); + } + finally { + if (out != null) { + try { out.close(); } catch (IOException ioe) {} + } + if (in != null) { + try { in.close(); } catch (IOException ioe) {} + } + } + + return result; + } +} +// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flys-aft/src/main/java/org/dive4elements/river/etl/aft/River.java Thu Apr 25 11:42:54 2013 +0200 @@ -0,0 +1,525 @@ +package de.intevation.aft; + +import de.intevation.db.ConnectedStatements; +import de.intevation.db.SymbolicStatement; + +import java.sql.ResultSet; +import java.sql.SQLException; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.log4j.Logger; + +public class River +extends IdPair +{ + private static Logger log = Logger.getLogger(River.class); + + protected String name; + + protected double from; + protected double to; + + public River() { + } + + public River(int id1, String name, double from, double to) { + super(id1); + this.name = name; + this.from = from; + this.to = to; + } + + public River(int id1, int id2, String name) { + super(id1, id2); + this.name = name; + } + + public String getName() { + return name; + } + + public double getFrom() { + return from; + } + + public void setFrom(double from) { + this.from = from; + } + + public double getTo() { + return to; + } + + public void setTo(double to) { + this.to = to; + } + + public boolean inside(double x) { + return x >= from && x <= to; + } + + public boolean sync(SyncContext context) throws SQLException { + log.info("sync river: " + this); + + // Only take relevant gauges into account. + Map<Long, DIPSGauge> dipsGauges = context.getDIPSGauges(name, from, to); + + ConnectedStatements flysStatements = context.getFlysStatements(); + ConnectedStatements aftStatements = context.getAftStatements(); + + String riverName = getName(); + + Map<Long, DIPSGauge> aftDIPSGauges = new HashMap<Long, DIPSGauge>(); + + ResultSet messstellenRs = aftStatements + .getStatement("select.messstelle") + .clearParameters() + .setInt("GEWAESSER_NR", id2) + .executeQuery(); + + try { + while (messstellenRs.next()) { + String name = messstellenRs.getString("NAME"); + String num = messstellenRs.getString("MESSSTELLE_NR"); + double station = messstellenRs.getDouble("STATIONIERUNG"); + + if (!messstellenRs.wasNull() && !inside(station)) { + log.warn("Station found in AFT but in not range: " + station); + continue; + } + + Long number = SyncContext.numberToLong(num); + if (number == null) { + log.warn("AFT: Invalid MESSSTELLE_NR for MESSSTELLE '"+name+"'"); + continue; + } + DIPSGauge dipsGauge = dipsGauges.get(number); + if (dipsGauge == null) { + log.warn( + "DIPS: MESSSTELLE '" + name + "' not found in DIPS. " + + "Gauge number used for lookup: " + number); + continue; + } + String gaugeRiver = dipsGauge.getRiverName(); + if (!gaugeRiver.equalsIgnoreCase(riverName)) { + log.warn( + "DIPS: MESSSTELLE '" + name + + "' is assigned to river '" + gaugeRiver + + "'. Needs to be on '" + riverName + "'."); + continue; + } + dipsGauge.setAftName(name); + dipsGauge.setOfficialNumber(number); + aftDIPSGauges.put(number, dipsGauge); + } + } + finally { + messstellenRs.close(); + } + + List<DIPSGauge> updateGauges = new ArrayList<DIPSGauge>(); + + ResultSet gaugesRs = flysStatements + .getStatement("select.gauges") + .clearParameters() + .setInt("river_id", id1).executeQuery(); + + try { + while (gaugesRs.next()) { + int gaugeId = gaugesRs.getInt("id"); + String name = gaugesRs.getString("name"); + long number = gaugesRs.getLong("official_number"); + if (gaugesRs.wasNull()) { + log.warn("FLYS: Gauge '" + name + + "' has no official number. Ignored."); + continue; + } + Long key = Long.valueOf(number); + DIPSGauge aftDIPSGauge = aftDIPSGauges.remove(key); + if (aftDIPSGauge == null) { + log.warn("FLYS: Gauge '" + name + "' number " + number + + " is not found in AFT/DIPS."); + continue; + } + aftDIPSGauge.setFlysId(gaugeId); + log.info("Gauge '" + name + + "' found in FLYS, AFT and DIPS. -> Update"); + updateGauges.add(aftDIPSGauge); + } + } + finally { + gaugesRs.close(); + } + + boolean modified = createGauges(context, aftDIPSGauges); + + modified |= updateGauges(context, updateGauges); + + return modified; + } + + protected boolean updateGauges( + SyncContext context, + List<DIPSGauge> gauges + ) + throws SQLException + { + boolean modified = false; + + for (DIPSGauge gauge: gauges) { + // XXX: Do dont modify the master AT. + // modified |= updateBfGIdOnMasterDischargeTable(context, gauge); + modified |= updateGauge(context, gauge); + } + + return modified; + } + + protected boolean updateBfGIdOnMasterDischargeTable( + SyncContext context, + DIPSGauge gauge + ) throws SQLException { + log.info( + "FLYS: Updating master discharge table bfg_id for '" + + gauge.getAftName() + "'"); + ConnectedStatements flysStatements = context.getFlysStatements(); + + ResultSet rs = flysStatements + .getStatement("select.gauge.master.discharge.table") + .clearParameters() + .setInt("gauge_id", gauge.getFlysId()) + .executeQuery(); + + int flysId; + + try { + if (!rs.next()) { + log.error( + "FLYS: No master discharge table found for gauge '" + + gauge.getAftName() + "'"); + return false; + } + String bfgId = rs.getString("bfg_id"); + if (!rs.wasNull()) { // already has BFG_ID + return false; + } + flysId = rs.getInt("id"); + } finally { + rs.close(); + } + + // We need to find out the BFG_ID of the current discharge table + // for this gauge in AFT. + + ConnectedStatements aftStatements = context.getAftStatements(); + + rs = aftStatements + .getStatement("select.bfg.id.current") + .clearParameters() + .setString("number", "%" + gauge.getOfficialNumber()) + .executeQuery(); + + String bfgId = null; + + try { + if (rs.next()) { + bfgId = rs.getString("BFG_ID"); + } + } finally { + rs.close(); + } + + if (bfgId == null) { + log.warn( + "No BFG_ID found for current discharge table of gauge '" + + gauge + "'"); + return false; + } + + // Set the BFG_ID in FLYS. + flysStatements.beginTransaction(); + try { + flysStatements + .getStatement("update.bfg.id.discharge.table") + .clearParameters() + .setInt("id", flysId) + .setString("bfg_id", bfgId) + .executeUpdate(); + flysStatements.commitTransaction(); + } catch (SQLException sqle) { + flysStatements.rollbackTransaction(); + log.error(sqle, sqle); + return false; + } + + return true; + } + + protected boolean updateGauge( + SyncContext context, + DIPSGauge gauge + ) + throws SQLException + { + log.info("FLYS: Updating gauge '" + gauge.getAftName() + "'."); + // We need to load all discharge tables from both databases + // of the gauge and do some pairing based on their bfg_id. + + boolean modified = false; + + ConnectedStatements flysStatements = context.getFlysStatements(); + + flysStatements.beginTransaction(); + try { + List<DischargeTable> flysDTs = + DischargeTable.loadFlysDischargeTables( + context, gauge.getFlysId()); + + List<DischargeTable> aftDTs = + DischargeTable.loadAftDischargeTables( + context, gauge.getOfficialNumber()); + + Map<String, DischargeTable> bfgId2FlysDT = + new HashMap<String, DischargeTable>(); + + for (DischargeTable dt: flysDTs) { + String bfgId = dt.getBfgId(); + if (bfgId == null) { + log.warn("FLYS: discharge table " + dt.getId() + + " has no bfg_id. Ignored."); + continue; + } + bfgId2FlysDT.put(bfgId, dt); + } + + List<DischargeTable> createDTs = new ArrayList<DischargeTable>(); + + for (DischargeTable aftDT: aftDTs) { + String bfgId = aftDT.getBfgId(); + DischargeTable flysDT = bfgId2FlysDT.remove(bfgId); + if (flysDT != null) { + // Found in AFT and FLYS. + log.info("FLYS: Discharge table '" + bfgId + + "' found in AFT and FLYS. -> update"); + // Create the W/Q diff. + modified |= writeWQChanges(context, flysDT, aftDT); + } + else { + log.info("FLYS: Discharge table '" + bfgId + + "' not found in FLYS. -> create"); + createDTs.add(aftDT); + } + } + + for (String bfgId: bfgId2FlysDT.keySet()) { + log.info("FLYS: Discharge table '" + bfgId + + "' found in FLYS but not in AFT. -> ignore"); + } + + log.info("FLYS: Copy " + createDTs.size() + + " discharge tables over from AFT."); + + // Create the new discharge tables. + for (DischargeTable aftDT: createDTs) { + createDischargeTable(context, aftDT, gauge.getFlysId()); + modified = true; + } + + flysStatements.commitTransaction(); + } + catch (SQLException sqle) { + flysStatements.rollbackTransaction(); + log.error(sqle, sqle); + modified = false; + } + + return modified; + } + + protected boolean writeWQChanges( + SyncContext context, + DischargeTable flysDT, + DischargeTable aftDT + ) + throws SQLException + { + flysDT.loadFlysValues(context); + aftDT.loadAftValues(context); + WQDiff diff = new WQDiff(flysDT.getValues(), aftDT.getValues()); + if (diff.hasChanges()) { + diff.writeChanges(context, flysDT.getId()); + return true; + } + return false; + } + + protected boolean createGauges( + SyncContext context, + Map<Long, DIPSGauge> gauges + ) + throws SQLException + { + ConnectedStatements flysStatements = context.getFlysStatements(); + + SymbolicStatement.Instance nextId = + flysStatements.getStatement("next.gauge.id"); + + SymbolicStatement.Instance insertStmnt = + flysStatements.getStatement("insert.gauge"); + + boolean modified = false; + + for (Map.Entry<Long, DIPSGauge> entry: gauges.entrySet()) { + Long officialNumber = entry.getKey(); + DIPSGauge gauge = entry.getValue(); + + log.info("Gauge '" + gauge.getAftName() + + "' not in FLYS but in AFT/DIPS. -> Create"); + + if (!gauge.hasDatums()) { + log.warn("DIPS: Gauge '" + + gauge.getAftName() + "' has no datum. Ignored."); + continue; + } + + ResultSet rs = null; + flysStatements.beginTransaction(); + try { + (rs = nextId.executeQuery()).next(); + int gaugeId = rs.getInt("gauge_id"); + rs.close(); rs = null; + + insertStmnt + .clearParameters() + .setInt("id", gaugeId) + .setString("name", gauge.getAftName()) + .setInt("river_id", id1) + .setDouble("station", gauge.getStation()) + .setDouble("aeo", gauge.getAeo()) + .setLong("official_number", officialNumber) + .setDouble("datum", gauge.getLatestDatum().getValue()); + + insertStmnt.execute(); + + log.info("FLYS: Created gauge '" + gauge.getAftName() + + "' with id " + gaugeId + "."); + + gauge.setFlysId(gaugeId); + createDischargeTables(context, gauge); + flysStatements.commitTransaction(); + modified = true; + } + catch (SQLException sqle) { + flysStatements.rollbackTransaction(); + log.error(sqle, sqle); + } + finally { + if (rs != null) { + rs.close(); + } + } + } + + return modified; + } + + protected void createDischargeTable( + SyncContext context, + DischargeTable aftDT, + int flysGaugeId + ) + throws SQLException + { + aftDT.persistFlysTimeInterval(context); + int flysId = aftDT.persistFlysDischargeTable(context, flysGaugeId); + + aftDT.loadAftValues(context); + aftDT.storeFlysValues(context, flysId); + } + + protected void createDischargeTables( + SyncContext context, + DIPSGauge gauge + ) + throws SQLException + { + log.info("FLYS: Create discharge tables for '" + + gauge.getAftName() + "'."); + + // Load the discharge tables from AFT. + List<DischargeTable> dts = loadAftDischargeTables( + context, gauge); + + // Persist the time intervals. + persistFlysTimeIntervals(context, dts); + + // Persist the discharge tables + int [] flysDTIds = persistFlysDischargeTables( + context, dts, gauge.getFlysId()); + + // Copy over the W/Q values + copyWQsFromAftToFlys(context, dts, flysDTIds); + } + + protected List<DischargeTable> loadAftDischargeTables( + SyncContext context, + DIPSGauge gauge + ) + throws SQLException + { + return DischargeTable.loadAftDischargeTables( + context, gauge.getOfficialNumber(), gauge.getFlysId()); + } + + protected void persistFlysTimeIntervals( + SyncContext context, + List<DischargeTable> dts + ) + throws SQLException + { + for (DischargeTable dt: dts) { + dt.persistFlysTimeInterval(context); + } + } + + protected int [] persistFlysDischargeTables( + SyncContext context, + List<DischargeTable> dts, + int flysGaugeId + ) + throws SQLException + { + int [] flysDTIds = new int[dts.size()]; + + for (int i = 0; i < flysDTIds.length; ++i) { + flysDTIds[i] = dts.get(i) + .persistFlysDischargeTable(context, flysGaugeId); + } + + return flysDTIds; + } + + protected void copyWQsFromAftToFlys( + SyncContext context, + List<DischargeTable> dts, + int [] flysDTIds + ) + throws SQLException + { + for (int i = 0; i < flysDTIds.length; ++i) { + DischargeTable dt = dts.get(i); + dt.loadAftValues(context); + dt.storeFlysValues(context, flysDTIds[i]); + dt.clearValues(); // To save memory. + } + } + + public String toString() { + return "[River: name=" + name + ", " + super.toString() + "]"; + } +} +// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flys-aft/src/main/java/org/dive4elements/river/etl/aft/Rivers.java Thu Apr 25 11:42:54 2013 +0200 @@ -0,0 +1,77 @@ +package de.intevation.aft; + +import de.intevation.db.ConnectedStatements; + +import java.sql.ResultSet; +import java.sql.SQLException; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.log4j.Logger; + +public class Rivers +{ + private static Logger log = Logger.getLogger(Rivers.class); + + public Rivers() { + } + + public boolean sync(SyncContext context) throws SQLException { + + log.info("sync: rivers"); + + ConnectedStatements flysStatements = context.getFlysStatements(); + ConnectedStatements aftStatements = context.getAftStatements(); + + Map<String, River> flysRivers = new HashMap<String, River>(); + + ResultSet flysRs = flysStatements + .getStatement("select.rivers").executeQuery(); + + try { + while (flysRs.next()) { + int id = flysRs.getInt("id"); + String name = flysRs.getString("name"); + double from = flysRs.getDouble("min_km"); + double to = flysRs.getDouble("max_km"); + flysRivers.put(name.toLowerCase(), new River(id, name, from, to)); + } + } + finally { + flysRs.close(); + } + + List<River> commonRivers = new ArrayList<River>(); + + ResultSet aftRs = aftStatements + .getStatement("select.gewaesser").executeQuery(); + + try { + while (aftRs.next()) { + String name = aftRs.getString("NAME"); + River river = flysRivers.get(name.toLowerCase()); + if (river != null) { + int id2 = aftRs.getInt("GEWAESSER_NR"); + river.setId2(id2); + commonRivers.add(river); + } + } + } + finally { + aftRs.close(); + } + + + boolean modified = false; + + for (River river: commonRivers) { + modified |= river.sync(context); + } + + return modified; + } +} +// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flys-aft/src/main/java/org/dive4elements/river/etl/aft/Sync.java Thu Apr 25 11:42:54 2013 +0200 @@ -0,0 +1,170 @@ +package de.intevation.aft; + +import de.intevation.db.ConnectionBuilder; + +import de.intevation.utils.XML; + +import java.io.File; + +import java.net.MalformedURLException; +import java.net.URL; + +import java.sql.SQLException; + +import javax.xml.xpath.XPathConstants; + +import org.apache.log4j.Logger; + +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.NodeList; + +public class Sync +{ + private static Logger log = Logger.getLogger(Sync.class); + + public static final String FLYS = "flys"; + public static final String AFT = "aft"; + + public static final String XPATH_DIPS = "/sync/dips/file/text()"; + public static final String XPATH_REPAIR = "/sync/dips/repair/text()"; + + public static final String XPATH_NOTIFICATIONS = + "/sync/notifications/notification"; + + public static final String CONFIG_FILE = + System.getProperty("config.file", "config.xml"); + + public static void sendNotifications(Document config) { + NodeList notifications = (NodeList)XML.xpath( + config, XPATH_NOTIFICATIONS, XPathConstants.NODESET, null, null); + + if (notifications == null) { + return; + } + + for (int i = 0, N = notifications.getLength(); i < N; ++i) { + Element notification = (Element)notifications.item(i); + String urlString = notification.getAttribute("url"); + + URL url; + try { + url = new URL(urlString); + } + catch (MalformedURLException mfue) { + log.warn("NOTIFY: Invalid URL '" + urlString + "'. Ignored.", mfue); + continue; + } + + Notification n = new Notification(notification); + + Document result = n.sendPOST(url); + + if (result != null) { + log.info("Send notifcation to '" + urlString + "'."); + log.info(XML.toString(result)); + } + } + } + + public static void main(String [] args) { + + File configFile = new File(CONFIG_FILE); + + if (!configFile.isFile() || !configFile.canRead()) { + log.error("cannot read config file"); + System.exit(1); + } + + Document config = XML.parseDocument(configFile, Boolean.FALSE); + + if (config == null) { + log.error("Cannot load config file."); + System.exit(1); + } + + String dipsF = (String)XML.xpath( + config, XPATH_DIPS, XPathConstants.STRING, null, null); + + if (dipsF == null || dipsF.length() == 0) { + log.error("Cannot find path to DIPS XML in config."); + System.exit(1); + } + + File dipsFile = new File(dipsF); + + if (!dipsFile.isFile() || !dipsFile.canRead()) { + log.error("DIPS: Cannot find '" + dipsF + "'."); + System.exit(1); + } + + Document dips = XML.parseDocument(dipsFile, Boolean.FALSE); + + if (dips == null) { + log.error("DIPS: Cannot load DIPS document."); + System.exit(1); + } + + String repairF = (String)XML.xpath( + config, XPATH_REPAIR, XPathConstants.STRING, null, null); + + if (repairF != null && repairF.length() > 0) { + File repairFile = new File(repairF); + if (!repairFile.isFile() || !repairFile.canRead()) { + log.warn("REPAIR: Cannot open DIPS repair XSLT file."); + } + else { + Document fixed = XML.transform(dips, repairFile); + if (fixed == null) { + log.warn("REPAIR: Fixing DIPS failed."); + } + else { + dips = fixed; + } + } + } + + int exitCode = 0; + + ConnectionBuilder aftConnectionBuilder = + new ConnectionBuilder(AFT, config); + + ConnectionBuilder flysConnectionBuilder = + new ConnectionBuilder(FLYS, config); + + SyncContext syncContext = null; + + boolean modified = false; + try { + syncContext = new SyncContext( + aftConnectionBuilder.getConnectedStatements(), + flysConnectionBuilder.getConnectedStatements(), + dips); + syncContext.init(); + Rivers rivers = new Rivers(); + modified = rivers.sync(syncContext); + } + catch (SQLException sqle) { + log.error("SYNC: Syncing failed.", sqle); + exitCode = 1; + } + finally { + if (syncContext != null) { + syncContext.close(); + } + } + + if (modified) { + log.info("Modifications found."); + sendNotifications(config); + } + else { + log.info("No modifications found."); + } + + if (exitCode != 0) { + System.exit(1); + } + } +} +// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flys-aft/src/main/java/org/dive4elements/river/etl/aft/SyncContext.java Thu Apr 25 11:42:54 2013 +0200 @@ -0,0 +1,220 @@ +package de.intevation.aft; + +import de.intevation.db.ConnectedStatements; + +import java.sql.ResultSet; +import java.sql.SQLException; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.log4j.Logger; + +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.NodeList; + +public class SyncContext +{ + private static Logger log = Logger.getLogger(SyncContext.class); + + protected ConnectedStatements aftStatements; + protected ConnectedStatements flysStatements; + protected Document dips; + + protected Map<Long, DIPSGauge> numberToGauge; + protected Map<TimeInterval, TimeInterval> flysTimeIntervals; + + public SyncContext() { + } + + public SyncContext( + ConnectedStatements aftStatements, + ConnectedStatements flysStatements, + Document dips + ) { + this.aftStatements = aftStatements; + this.flysStatements = flysStatements; + this.dips = dips; + } + + public void init() throws SQLException { + numberToGauge = indexByNumber(dips); + flysTimeIntervals = loadTimeIntervals(); + } + + public ConnectedStatements getAftStatements() { + return aftStatements; + } + + public void setAftStatements(ConnectedStatements aftStatements) { + this.aftStatements = aftStatements; + } + + public ConnectedStatements getFlysStatements() { + return flysStatements; + } + + public void setFlysStatements(ConnectedStatements flysStatements) { + this.flysStatements = flysStatements; + } + + public Document getDips() { + return dips; + } + + public void setDips(Document dips) { + this.dips = dips; + } + + void close() { + aftStatements.close(); + flysStatements.close(); + } + + public static Long numberToLong(String s) { + try { + return Long.valueOf(s.trim()); + } + catch (NumberFormatException nfe) { + } + return null; + } + + public Map<Long, DIPSGauge> getDIPSGauges() { + return numberToGauge; + } + + public Map<Long, DIPSGauge> getDIPSGauges( + String riverName, + double from, + double to + ) { + if (from > to) { + double t = from; + from = to; + to = t; + } + + riverName = riverName.toLowerCase(); + + Map<Long, DIPSGauge> result = new HashMap<Long, DIPSGauge>(); + + for (Map.Entry<Long, DIPSGauge> entry: numberToGauge.entrySet()) { + DIPSGauge gauge = entry.getValue(); + // XXX: Maybe a bit too sloppy. + if (!riverName.contains(gauge.getRiverName().toLowerCase())) { + continue; + } + double station = gauge.getStation(); + if (station >= from && station <= to) { + result.put(entry.getKey(), gauge); + } + } + + return result; + } + + protected static Map<Long, DIPSGauge> indexByNumber(Document document) { + Map<Long, DIPSGauge> map = new HashMap<Long, DIPSGauge>(); + NodeList nodes = document.getElementsByTagName("PEGELSTATION"); + for (int i = nodes.getLength()-1; i >= 0; --i) { + Element element = (Element)nodes.item(i); + String numberString = element.getAttribute("NUMMER"); + Long number = numberToLong(numberString); + if (number != null) { + DIPSGauge newG = new DIPSGauge(element); + DIPSGauge oldG = map.put(number, newG); + if (oldG != null) { + log.warn("DIPS: '" + newG.getName() + + "' collides with '" + oldG.getName() + + "' on gauge number " + number + "."); + } + } + else { + log.warn("DIPS: Gauge '" + element.getAttribute("NAME") + + "' has invalid gauge number '" + numberString + "'."); + } + } + return map; + } + + protected Map<TimeInterval, TimeInterval> loadTimeIntervals() + throws SQLException { + + boolean debug = log.isDebugEnabled(); + + Map<TimeInterval, TimeInterval> intervals = + new TreeMap<TimeInterval, TimeInterval>(); + + ResultSet rs = flysStatements + .getStatement("select.timeintervals") + .executeQuery(); + + try { + while (rs.next()) { + int id = rs.getInt("id"); + Date start = rs.getDate("start_time"); + Date stop = rs.getDate("stop_time"); + + if (debug) { + log.debug("id: " + id); + log.debug("start: " + start); + log.debug("stop: " + stop); + } + + TimeInterval ti = new TimeInterval(id, start, stop); + intervals.put(ti, ti); + } + } + finally { + rs.close(); + } + + if (debug) { + log.debug("loaded time intervals: " + intervals.size()); + } + + return intervals; + } + + public TimeInterval fetchOrCreateFLYSTimeInterval(TimeInterval key) + throws SQLException + { + TimeInterval old = flysTimeIntervals.get(key); + if (old != null) { + return old; + } + + ResultSet rs = flysStatements + .getStatement("next.timeinterval.id") + .executeQuery(); + + try { + rs.next(); + key.setId(rs.getInt("time_interval_id")); + } + finally { + rs.close(); + } + + if (log.isDebugEnabled()) { + log.debug("FLYS: Created time interval id: " + key.getId()); + log.debug("FLYS: " + key); + } + + flysStatements.getStatement("insert.timeinterval") + .clearParameters() + .setInt("id", key.getId()) + .setObject("start_time", key.getStart()) + .setObject("stop_time", key.getStop()) + .execute(); + + flysTimeIntervals.put(key, key); + + return key; + } +} +// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flys-aft/src/main/java/org/dive4elements/river/etl/aft/TimeInterval.java Thu Apr 25 11:42:54 2013 +0200 @@ -0,0 +1,70 @@ +package de.intevation.aft; + +import java.util.Date; + +public class TimeInterval +implements Comparable<TimeInterval> +{ + protected int id; + protected Date start; + protected Date stop; + + public TimeInterval() { + } + + public TimeInterval(Date start, Date stop) { + this.start = start; + this.stop = stop; + } + + public TimeInterval(int id, Date start, Date stop) { + this(start, stop); + this.id = id; + } + + protected static int compare(Date d1, Date d2) { + long s1 = d1 != null ? d1.getTime()/1000L : 0L; + long s2 = d2 != null ? d2.getTime()/1000L : 0L; + long diff = s1 - s2; + return diff < 0L + ? -1 + : diff > 0L ? 1 : 0; + } + + @Override + public int compareTo(TimeInterval other) { + int cmp = compare(start, other.start); + return cmp != 0 + ? cmp + : compare(stop, other.stop); + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public Date getStart() { + return start; + } + + public void setStart(Date start) { + this.start = start; + } + + public Date getStop() { + return stop; + } + + public void setStop(Date stop) { + this.stop = stop; + } + + public String toString() { + return "[TimeInterval: start=" + start + ", stop=" + stop + "]"; + } +} +// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flys-aft/src/main/java/org/dive4elements/river/etl/aft/WQ.java Thu Apr 25 11:42:54 2013 +0200 @@ -0,0 +1,67 @@ +package de.intevation.aft; + +import java.util.Comparator; + +public class WQ +{ + public static final double EPSILON = 1e-4; + + public static final Comparator<WQ> EPS_CMP = new Comparator<WQ>() { + @Override + public int compare(WQ a, WQ b) { + int cmp = compareEpsilon(a.q, b.q); + if (cmp != 0) return cmp; + return compareEpsilon(a.w, b.w); + } + }; + + protected int id; + + protected double w; + protected double q; + + public WQ() { + } + + public WQ(double w, double q) { + this.w = w; + this.q = q; + } + + public WQ(int id, double w, double q) { + this.id = id; + this.w = w; + this.q = q; + } + + public static final int compareEpsilon(double a, double b) { + double diff = a - b; + if (diff < -EPSILON) return -1; + return diff > EPSILON ? +1 : 0; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public double getW() { + return w; + } + + public void setW(double w) { + this.w = w; + } + + public double getQ() { + return q; + } + + public void setQ(double q) { + this.q = q; + } +} +// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flys-aft/src/main/java/org/dive4elements/river/etl/aft/WQDiff.java Thu Apr 25 11:42:54 2013 +0200 @@ -0,0 +1,118 @@ +package de.intevation.aft; + +import de.intevation.db.ConnectedStatements; +import de.intevation.db.SymbolicStatement; + +import java.sql.ResultSet; +import java.sql.SQLException; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Set; +import java.util.TreeSet; + +public class WQDiff +{ + protected Set<WQ> toAdd; + protected Set<WQ> toDelete; + + public WQDiff() { + } + + public WQDiff(Collection<WQ> a, Collection<WQ> b) { + toAdd = new TreeSet<WQ>(WQ.EPS_CMP); + toDelete = new TreeSet<WQ>(WQ.EPS_CMP); + build(a, b); + } + + public void build(Collection<WQ> a, Collection<WQ> b) { + toAdd.addAll(b); + toAdd.removeAll(a); + + toDelete.addAll(a); + toDelete.removeAll(b); + } + + public void clear() { + toAdd.clear(); + toDelete.clear(); + } + + public Set<WQ> getToAdd() { + return toAdd; + } + + public void setToAdd(Set<WQ> toAdd) { + this.toAdd = toAdd; + } + + public Set<WQ> getToDelete() { + return toDelete; + } + + public void setToDelete(Set<WQ> toDelete) { + this.toDelete = toDelete; + } + + public boolean hasChanges() { + return !(toAdd.isEmpty() && toDelete.isEmpty()); + } + + public void writeChanges( + SyncContext context, + int tableId + ) + throws SQLException + { + ConnectedStatements flysStatements = context.getFlysStatements(); + + // Delete the old entries + if (!toDelete.isEmpty()) { + SymbolicStatement.Instance deleteDTV = + flysStatements.getStatement("delete.discharge.table.value"); + for (WQ wq: toDelete) { + deleteDTV + .clearParameters() + .setInt("id", wq.getId()) + .execute(); + } + } + + // Add the new entries. + if (!toAdd.isEmpty()) { + SymbolicStatement.Instance nextId = + flysStatements.getStatement("next.discharge.table.values.id"); + + SymbolicStatement.Instance insertDTV = + flysStatements.getStatement("insert.discharge.table.value"); + + // Recycle old ids as much as possible. + Iterator<WQ> oldIds = toDelete.iterator(); + + // Create ids for new entries. + for (WQ wq: toAdd) { + if (oldIds.hasNext()) { + wq.setId(oldIds.next().getId()); + } + else { + ResultSet rs = nextId.executeQuery(); + rs.next(); + wq.setId(rs.getInt("discharge_table_values_id")); + rs.close(); + } + } + + // Write the new entries. + for (WQ wq: toAdd) { + insertDTV + .clearParameters() + .setInt("id", wq.getId()) + .setInt("table_id", tableId) + .setDouble("w", wq.getW()) + .setDouble("q", wq.getQ()) + .execute(); + } + } + } +} +// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flys-aft/src/main/java/org/dive4elements/river/etl/db/ConnectedStatements.java Thu Apr 25 11:42:54 2013 +0200 @@ -0,0 +1,110 @@ +package de.intevation.db; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.SQLException; +import java.sql.Savepoint; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; + +import org.apache.log4j.Logger; + +public class ConnectedStatements +{ + private static Logger log = Logger.getLogger(ConnectedStatements.class); + + protected Connection connection; + + protected Map<String, SymbolicStatement> statements; + + protected Map<String, SymbolicStatement.Instance> boundStatements; + + protected Deque<Savepoint> savepoints; + + public ConnectedStatements( + Connection connection, + Map<String, SymbolicStatement> statements + ) + throws SQLException + { + this.connection = connection; + this.statements = statements; + checkSavePoints(); + + boundStatements = new HashMap<String, SymbolicStatement.Instance>(); + } + + protected void checkSavePoints() throws SQLException { + DatabaseMetaData metaData = connection.getMetaData(); + if (metaData.supportsSavepoints()) { + log.info("Driver '" + metaData.getDriverName() + + "' does support savepoints."); + savepoints = new ArrayDeque<Savepoint>(); + } + else { + log.info("Driver '" + metaData.getDriverName() + + "' does not support savepoints."); + } + } + + public SymbolicStatement.Instance getStatement(String key) + throws SQLException + { + SymbolicStatement.Instance stmnt = boundStatements.get(key); + if (stmnt != null) { + return stmnt; + } + + SymbolicStatement ss = statements.get(key); + if (ss == null) { + return null; + } + + stmnt = ss.new Instance(connection); + boundStatements.put(key, stmnt); + return stmnt; + } + + public void beginTransaction() throws SQLException { + if (savepoints != null) { + savepoints.push(connection.setSavepoint()); + } + } + + public void commitTransaction() throws SQLException { + if (savepoints != null) { + savepoints.pop(); + } + connection.commit(); + } + + public void rollbackTransaction() throws SQLException { + if (savepoints != null) { + Savepoint savepoint = savepoints.pop(); + connection.rollback(savepoint); + } + else { + connection.rollback(); + } + } + + public void close() { + for (SymbolicStatement.Instance s: boundStatements.values()) { + s.close(); + } + + try { + if (savepoints != null && !savepoints.isEmpty()) { + Savepoint savepoint = savepoints.peekFirst(); + connection.rollback(savepoint); + } + connection.close(); + } + catch (SQLException sqle) { + } + } +} +// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flys-aft/src/main/java/org/dive4elements/river/etl/db/ConnectionBuilder.java Thu Apr 25 11:42:54 2013 +0200 @@ -0,0 +1,121 @@ +package de.intevation.db; + +import de.intevation.utils.XML; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import javax.xml.xpath.XPathConstants; + +import org.apache.log4j.Logger; + +import org.w3c.dom.Document; +import org.w3c.dom.NodeList; + +public class ConnectionBuilder +{ + private static Logger log = Logger.getLogger(ConnectionBuilder.class); + + public static final String XPATH_DRIVER = "/sync/side[@name=$type]/db/driver/text()"; + public static final String XPATH_USER = "/sync/side[@name=$type]/db/user/text()"; + public static final String XPATH_PASSWORD = "/sync/side[@name=$type]/db/password/text()"; + public static final String XPATH_URL = "/sync/side[@name=$type]/db/url/text()"; + public static final String XPATH_EXEC_LOGIN = "/sync/side[@name=$type]/db/execute-login/statement"; + + protected String type; + protected String driver; + protected String user; + protected String password; + protected String url; + protected List<String> loginStatements; + + public ConnectionBuilder(String type, Document document) { + this.type = type; + extractCredentials(document); + } + + protected static List<String> extractStrings(NodeList nodes) { + int N = nodes.getLength(); + List<String> result = new ArrayList<String>(N); + for (int i = 0; i < N; ++i) { + result.add(nodes.item(i).getTextContent()); + } + return result; + } + + protected void extractCredentials(Document document) { + HashMap<String, String> map = new HashMap<String, String>(); + map.put("type", type); + + driver = (String)XML.xpath( + document, XPATH_DRIVER, XPathConstants.STRING, null, map); + user = (String)XML.xpath( + document, XPATH_USER, XPathConstants.STRING, null, map); + password = (String)XML.xpath( + document, XPATH_PASSWORD, XPathConstants.STRING, null, map); + url = (String)XML.xpath( + document, XPATH_URL, XPathConstants.STRING, null, map); + loginStatements = extractStrings((NodeList)XML.xpath( + document, XPATH_EXEC_LOGIN, XPathConstants.NODESET, null, map)); + + if (log.isDebugEnabled()) { + log.debug("driver: " + driver); + log.debug("user: " + user); + log.debug("password: *******"); + log.debug("url: " + url); + log.debug("number of login statements: " + loginStatements.size()); + } + } + + public Connection getConnection() throws SQLException { + + if (driver != null && driver.length() > 0) { + try { + Class.forName(driver); + } + catch (ClassNotFoundException cnfe) { + throw new SQLException(cnfe); + } + } + + Connection connection = + DriverManager.getConnection(url, user, password); + + connection.setAutoCommit(false); + + DatabaseMetaData metaData = connection.getMetaData(); + + if (metaData.supportsTransactionIsolationLevel( + Connection.TRANSACTION_READ_UNCOMMITTED)) { + connection.setTransactionIsolation( + Connection.TRANSACTION_READ_UNCOMMITTED); + } + + for (String sql: loginStatements) { + Statement stmnt = connection.createStatement(); + try { + stmnt.execute(sql); + } + finally { + stmnt.close(); + } + } + + return connection; + } + + public ConnectedStatements getConnectedStatements() throws SQLException { + return new ConnectedStatements( + getConnection(), + new Statements(type, driver != null ? driver : "") + .getStatements()); + } +} +// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flys-aft/src/main/java/org/dive4elements/river/etl/db/Statements.java Thu Apr 25 11:42:54 2013 +0200 @@ -0,0 +1,124 @@ +package de.intevation.db; + +import java.io.IOException; +import java.io.InputStream; + +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.log4j.Logger; + +public class Statements +{ + private static Logger log = Logger.getLogger(Statements.class); + + public static final String RESOURCE_PATH = "/sql/"; + public static final String COMMON_PROPERTIES = "-common.properties"; + + protected String type; + protected String driver; + + protected Map<String, SymbolicStatement> statements; + + public Statements(String type, String driver) { + this.type = type; + this.driver = driver; + } + + public SymbolicStatement getStatement(String key) { + return getStatements().get(key); + } + + public Map<String, SymbolicStatement> getStatements() { + if (statements == null) { + statements = loadStatements(); + } + return statements; + } + + protected Map<String, SymbolicStatement> loadStatements() { + Map<String, SymbolicStatement> statements = + new HashMap<String, SymbolicStatement>(); + + Properties properties = loadProperties(); + + for (Enumeration<?> e = properties.propertyNames(); e.hasMoreElements();) { + String key = (String)e.nextElement(); + String value = properties.getProperty(key); + SymbolicStatement symbolic = new SymbolicStatement(value); + statements.put(key, symbolic); + } + + return statements; + } + + protected String driverToProperties() { + return + type + "-" + + driver.replace('.', '-').toLowerCase() + ".properties"; + } + + protected Properties loadCommon() { + Properties common = new Properties(); + + String path = RESOURCE_PATH + type + COMMON_PROPERTIES; + + InputStream in = Statements.class.getResourceAsStream(path); + + if (in != null) { + try { + common.load(in); + } + catch (IOException ioe) { + log.error("cannot load defaults: " + path, ioe); + } + finally { + try { + in.close(); + } + catch (IOException ioe) { + } + } + } + else { + log.warn("cannot find: " + path); + } + + return common; + } + + protected Properties loadProperties() { + + Properties common = loadCommon(); + + Properties properties = new Properties(common); + + String path = RESOURCE_PATH + driverToProperties(); + + InputStream in = Statements.class.getResourceAsStream(path); + + if (in != null) { + try { + properties.load(in); + } + catch (IOException ioe) { + log.error("cannot load statements: " + path, ioe); + } + finally { + try { + in.close(); + } + catch (IOException ioe) { + } + } + } + else { + log.warn("cannot find: " + path); + } + + return properties; + } +} +// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flys-aft/src/main/java/org/dive4elements/river/etl/db/SymbolicStatement.java Thu Apr 25 11:42:54 2013 +0200 @@ -0,0 +1,196 @@ +package de.intevation.db; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.log4j.Logger; + +public class SymbolicStatement { + + private static Logger log = Logger.getLogger(SymbolicStatement.class); + + public static final Pattern VAR = Pattern.compile(":([a-zA-Z0-9_]+)"); + + protected String statement; + protected String compiled; + protected Map<String, List<Integer>> positions; + + public class Instance { + + /** TODO: Support more types. */ + + protected PreparedStatement stmnt; + + public Instance(Connection connection) throws SQLException { + stmnt = connection.prepareStatement(compiled); + } + + public void close() { + try { + stmnt.close(); + } + catch (SQLException sqle) { + log.error("cannot close statement", sqle); + } + } + + public Instance setInt(String key, int value) + throws SQLException + { + List<Integer> pos = positions.get(key.toLowerCase()); + if (pos != null) { + for (Integer p: pos) { + stmnt.setInt(p, value); + } + } + + return this; + } + + public Instance setString(String key, String value) + throws SQLException + { + List<Integer> pos = positions.get(key.toLowerCase()); + if (pos != null) { + for (Integer p: pos) { + stmnt.setString(p, value); + } + } + return this; + } + + public Instance setObject(String key, Object value) + throws SQLException + { + List<Integer> pos = positions.get(key.toLowerCase()); + if (pos != null) { + for (Integer p: pos) { + stmnt.setObject(p, value); + } + } + return this; + } + + public Instance setTimestamp(String key, Timestamp value) + throws SQLException + { + List<Integer> pos = positions.get(key.toLowerCase()); + if (pos != null) { + for (Integer p: pos) { + stmnt.setTimestamp(p, value); + } + } + return this; + } + + public Instance setDouble(String key, double value) + throws SQLException + { + List<Integer> pos = positions.get(key.toLowerCase()); + if (pos != null) { + for (Integer p: pos) { + stmnt.setDouble(p, value); + } + } + return this; + } + + public Instance setLong(String key, long value) + throws SQLException + { + List<Integer> pos = positions.get(key.toLowerCase()); + if (pos != null) { + for (Integer p: pos) { + stmnt.setLong(p, value); + } + } + return this; + } + + public Instance setNull(String key, int sqlType) + throws SQLException + { + List<Integer> pos = positions.get(key.toLowerCase()); + if (pos != null) { + for (Integer p: pos) { + stmnt.setNull(p, sqlType); + } + } + return this; + } + + public Instance set(Map<String, Object> map) throws SQLException { + for (Map.Entry<String, Object> entry: map.entrySet()) { + setObject(entry.getKey(), entry.getValue()); + } + return this; + } + + public Instance clearParameters() throws SQLException { + stmnt.clearParameters(); + return this; + } + + public boolean execute() throws SQLException { + if (log.isDebugEnabled()) { + log.debug("execute: " + compiled); + } + return stmnt.execute(); + } + + public ResultSet executeQuery() throws SQLException { + if (log.isDebugEnabled()) { + log.debug("query: " + compiled); + } + return stmnt.executeQuery(); + } + + public int executeUpdate() throws SQLException { + if (log.isDebugEnabled()) { + log.debug("update: " + compiled); + } + return stmnt.executeUpdate(); + } + + } // class Instance + + public SymbolicStatement(String statement) { + this.statement = statement; + compile(); + } + + public String getStatement() { + return statement; + } + + protected void compile() { + positions = new HashMap<String, List<Integer>>(); + + StringBuffer sb = new StringBuffer(); + Matcher m = VAR.matcher(statement); + int index = 1; + while (m.find()) { + String key = m.group(1).toLowerCase(); + List<Integer> list = positions.get(key); + if (list == null) { + list = new ArrayList<Integer>(); + positions.put(key, list); + } + list.add(index++); + m.appendReplacement(sb, "?"); + } + m.appendTail(sb); + compiled = sb.toString(); + } +} // class SymbolicStatement
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flys-aft/src/main/java/org/dive4elements/river/etl/utils/XML.java Thu Apr 25 11:42:54 2013 +0200 @@ -0,0 +1,332 @@ +package de.intevation.utils; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.StringWriter; + +import java.util.HashMap; +import java.util.Map; + +import javax.xml.namespace.NamespaceContext; +import javax.xml.namespace.QName; + +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerConfigurationException; +import javax.xml.transform.TransformerException; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.TransformerFactoryConfigurationError; + +import javax.xml.transform.dom.DOMResult; +import javax.xml.transform.dom.DOMSource; + +import javax.xml.transform.stream.StreamResult; +import javax.xml.transform.stream.StreamSource; + +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; +import javax.xml.xpath.XPathVariableResolver; + +import org.apache.log4j.Logger; + +import org.w3c.dom.Document; + +import org.xml.sax.SAXException; + +public final class XML +{ + /** Logger for this class. */ + private static Logger log = Logger.getLogger(XML.class); + + public static class MapXPathVariableResolver + implements XPathVariableResolver + { + protected Map<String, String> variables; + + + public MapXPathVariableResolver() { + this.variables = new HashMap<String, String>(); + } + + + public MapXPathVariableResolver(Map<String, String> variables) { + this.variables = variables; + } + + + public void addVariable(String name, String value) { + variables.put(name, value); + } + + + @Override + public Object resolveVariable(QName variableName) { + String key = variableName.getLocalPart(); + return variables.get(key); + } + } // class MapXPathVariableResolver + + private XML() { + } + + /** + * Creates a new XML document + * @return the new XML document ot null if something went wrong during + * creation. + */ + public static final Document newDocument() { + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + factory.setNamespaceAware(true); + + try { + return factory.newDocumentBuilder().newDocument(); + } + catch (ParserConfigurationException pce) { + log.error(pce.getLocalizedMessage(), pce); + } + return null; + } + + /** + * Loads a XML document namespace aware from a file + * @param file The file to load. + * @return the XML document or null if something went wrong + * during loading. + */ + public static final Document parseDocument(File file) { + return parseDocument(file, Boolean.TRUE); + } + + public static final Document parseDocument(File file, Boolean namespaceAware) { + InputStream inputStream = null; + try { + inputStream = new BufferedInputStream(new FileInputStream(file)); + return parseDocument(inputStream, namespaceAware); + } + catch (IOException ioe) { + log.error(ioe.getLocalizedMessage(), ioe); + } + finally { + if (inputStream != null) { + try { inputStream.close(); } + catch (IOException ioe) {} + } + } + return null; + } + + + public static final Document parseDocument(InputStream inputStream) { + return parseDocument(inputStream, Boolean.TRUE); + } + + public static final Document parseDocument( + InputStream inputStream, + Boolean namespaceAware + ) { + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + + if (namespaceAware != null) { + factory.setNamespaceAware(namespaceAware.booleanValue()); + } + + try { + return factory.newDocumentBuilder().parse(inputStream); + } + catch (ParserConfigurationException pce) { + log.error(pce.getLocalizedMessage(), pce); + } + catch (SAXException se) { + log.error(se.getLocalizedMessage(), se); + } + catch (IOException ioe) { + log.error(ioe.getLocalizedMessage(), ioe); + } + return null; + } + + + /** + * Creates a new XPath without a namespace context. + * @return the new XPath. + */ + public static final XPath newXPath() { + return newXPath(null, null); + } + + /** + * Creates a new XPath with a given namespace context. + * @param namespaceContext The namespace context to be used or null + * if none should be used. + * @return The new XPath + */ + public static final XPath newXPath( + NamespaceContext namespaceContext, + XPathVariableResolver resolver) + { + XPathFactory factory = XPathFactory.newInstance(); + XPath xpath = factory.newXPath(); + if (namespaceContext != null) { + xpath.setNamespaceContext(namespaceContext); + } + + if (resolver != null) { + xpath.setXPathVariableResolver(resolver); + } + return xpath; + } + + /** + * Evaluates an XPath query on a given object and returns the result + * as a given type. No namespace context is used. + * @param root The object which is used as the root of the tree to + * be searched in. + * @param query The XPath query + * @param returnTyp The type of the result. + * @return The result of type 'returnTyp' or null if something + * went wrong during XPath evaluation. + */ + public static final Object xpath( + Object root, + String query, + QName returnTyp + ) { + return xpath(root, query, returnTyp, null); + } + + /** + * Evaluates an XPath query on a given object and returns the result + * as a given type. Optionally a namespace context is used. + * @param root The object which is used as the root of the tree to + * be searched in. + * @param query The XPath query + * @param returnType The type of the result. + * @param namespaceContext The namespace context to be used or null + * if none should be used. + * @return The result of type 'returnTyp' or null if something + * went wrong during XPath evaluation. + */ + public static final Object xpath( + Object root, + String query, + QName returnType, + NamespaceContext namespaceContext + ) { + return xpath(root, query, returnType, namespaceContext, null); + } + + public static final Object xpath( + Object root, + String query, + QName returnType, + NamespaceContext namespaceContext, + Map<String, String> variables) + { + if (root == null) { + return null; + } + + XPathVariableResolver resolver = variables != null + ? new MapXPathVariableResolver(variables) + : null; + + try { + XPath xpath = newXPath(namespaceContext, resolver); + if (xpath != null) { + return xpath.evaluate(query, root, returnType); + } + } + catch (XPathExpressionException xpee) { + log.error(xpee.getLocalizedMessage(), xpee); + } + + return null; + } + + public static Document transform( + Document document, + File xformFile + ) { + try { + Transformer transformer = + TransformerFactory + .newInstance() + .newTransformer( + new StreamSource(xformFile)); + + DOMResult result = new DOMResult(); + + transformer.transform(new DOMSource(document), result); + + return (Document)result.getNode(); + } + catch (TransformerConfigurationException tce) { + log.error(tce, tce); + } + catch (TransformerException te) { + log.error(te, te); + } + + return null; + } + + /** + * Streams out an XML document to a given output stream. + * @param document The document to be streamed out. + * @param out The output stream to be used. + * @return true if operation succeeded else false. + */ + public static boolean toStream(Document document, OutputStream out) { + try { + Transformer transformer = + TransformerFactory.newInstance().newTransformer(); + DOMSource source = new DOMSource(document); + StreamResult result = new StreamResult(out); + transformer.transform(source, result); + return true; + } + catch (TransformerConfigurationException tce) { + log.error(tce.getLocalizedMessage(), tce); + } + catch (TransformerFactoryConfigurationError tfce) { + log.error(tfce.getLocalizedMessage(), tfce); + } + catch (TransformerException te) { + log.error(te.getLocalizedMessage(), te); + } + + return false; + } + + public static String toString(Document document) { + try { + Transformer transformer = + TransformerFactory.newInstance().newTransformer(); + DOMSource source = new DOMSource(document); + StringWriter out = new StringWriter(); + StreamResult result = new StreamResult(out); + transformer.transform(source, result); + out.flush(); + return out.toString(); + } + catch (TransformerConfigurationException tce) { + log.error(tce.getLocalizedMessage(), tce); + } + catch (TransformerFactoryConfigurationError tfce) { + log.error(tfce.getLocalizedMessage(), tfce); + } + catch (TransformerException te) { + log.error(te.getLocalizedMessage(), te); + } + + return null; + } +} +// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :