comparison etl/src/main/java/org/dive4elements/river/etl/aft/River.java @ 5838:5aa05a7a34b7

Rename modules to more fitting names.
author Sascha L. Teichmann <teichmann@intevation.de>
date Thu, 25 Apr 2013 15:23:37 +0200
parents flys-aft/src/main/java/org/dive4elements/river/etl/aft/River.java@9438e9259213
children 8bd9b551456c
comparison
equal deleted inserted replaced
5837:d9901a08d0a6 5838:5aa05a7a34b7
1 package org.dive4elements.river.etl.aft;
2
3 import org.dive4elements.river.etl.db.ConnectedStatements;
4 import org.dive4elements.river.etl.db.SymbolicStatement;
5
6 import java.sql.ResultSet;
7 import java.sql.SQLException;
8
9 import java.util.ArrayList;
10 import java.util.HashMap;
11 import java.util.List;
12 import java.util.Map;
13
14 import org.apache.log4j.Logger;
15
16 public class River
17 extends IdPair
18 {
19 private static Logger log = Logger.getLogger(River.class);
20
21 protected String name;
22
23 protected double from;
24 protected double to;
25
26 public River() {
27 }
28
29 public River(int id1, String name, double from, double to) {
30 super(id1);
31 this.name = name;
32 this.from = from;
33 this.to = to;
34 }
35
36 public River(int id1, int id2, String name) {
37 super(id1, id2);
38 this.name = name;
39 }
40
41 public String getName() {
42 return name;
43 }
44
45 public double getFrom() {
46 return from;
47 }
48
49 public void setFrom(double from) {
50 this.from = from;
51 }
52
53 public double getTo() {
54 return to;
55 }
56
57 public void setTo(double to) {
58 this.to = to;
59 }
60
61 public boolean inside(double x) {
62 return x >= from && x <= to;
63 }
64
65 public boolean sync(SyncContext context) throws SQLException {
66 log.info("sync river: " + this);
67
68 // Only take relevant gauges into account.
69 Map<Long, DIPSGauge> dipsGauges = context.getDIPSGauges(name, from, to);
70
71 ConnectedStatements flysStatements = context.getFlysStatements();
72 ConnectedStatements aftStatements = context.getAftStatements();
73
74 String riverName = getName();
75
76 Map<Long, DIPSGauge> aftDIPSGauges = new HashMap<Long, DIPSGauge>();
77
78 ResultSet messstellenRs = aftStatements
79 .getStatement("select.messstelle")
80 .clearParameters()
81 .setInt("GEWAESSER_NR", id2)
82 .executeQuery();
83
84 try {
85 while (messstellenRs.next()) {
86 String name = messstellenRs.getString("NAME");
87 String num = messstellenRs.getString("MESSSTELLE_NR");
88 double station = messstellenRs.getDouble("STATIONIERUNG");
89
90 if (!messstellenRs.wasNull() && !inside(station)) {
91 log.warn("Station found in AFT but in not range: " + station);
92 continue;
93 }
94
95 Long number = SyncContext.numberToLong(num);
96 if (number == null) {
97 log.warn("AFT: Invalid MESSSTELLE_NR for MESSSTELLE '"+name+"'");
98 continue;
99 }
100 DIPSGauge dipsGauge = dipsGauges.get(number);
101 if (dipsGauge == null) {
102 log.warn(
103 "DIPS: MESSSTELLE '" + name + "' not found in DIPS. " +
104 "Gauge number used for lookup: " + number);
105 continue;
106 }
107 String gaugeRiver = dipsGauge.getRiverName();
108 if (!gaugeRiver.equalsIgnoreCase(riverName)) {
109 log.warn(
110 "DIPS: MESSSTELLE '" + name +
111 "' is assigned to river '" + gaugeRiver +
112 "'. Needs to be on '" + riverName + "'.");
113 continue;
114 }
115 dipsGauge.setAftName(name);
116 dipsGauge.setOfficialNumber(number);
117 aftDIPSGauges.put(number, dipsGauge);
118 }
119 }
120 finally {
121 messstellenRs.close();
122 }
123
124 List<DIPSGauge> updateGauges = new ArrayList<DIPSGauge>();
125
126 ResultSet gaugesRs = flysStatements
127 .getStatement("select.gauges")
128 .clearParameters()
129 .setInt("river_id", id1).executeQuery();
130
131 try {
132 while (gaugesRs.next()) {
133 int gaugeId = gaugesRs.getInt("id");
134 String name = gaugesRs.getString("name");
135 long number = gaugesRs.getLong("official_number");
136 if (gaugesRs.wasNull()) {
137 log.warn("FLYS: Gauge '" + name +
138 "' has no official number. Ignored.");
139 continue;
140 }
141 Long key = Long.valueOf(number);
142 DIPSGauge aftDIPSGauge = aftDIPSGauges.remove(key);
143 if (aftDIPSGauge == null) {
144 log.warn("FLYS: Gauge '" + name + "' number " + number +
145 " is not found in AFT/DIPS.");
146 continue;
147 }
148 aftDIPSGauge.setFlysId(gaugeId);
149 log.info("Gauge '" + name +
150 "' found in FLYS, AFT and DIPS. -> Update");
151 updateGauges.add(aftDIPSGauge);
152 }
153 }
154 finally {
155 gaugesRs.close();
156 }
157
158 boolean modified = createGauges(context, aftDIPSGauges);
159
160 modified |= updateGauges(context, updateGauges);
161
162 return modified;
163 }
164
165 protected boolean updateGauges(
166 SyncContext context,
167 List<DIPSGauge> gauges
168 )
169 throws SQLException
170 {
171 boolean modified = false;
172
173 for (DIPSGauge gauge: gauges) {
174 // XXX: Do dont modify the master AT.
175 // modified |= updateBfGIdOnMasterDischargeTable(context, gauge);
176 modified |= updateGauge(context, gauge);
177 }
178
179 return modified;
180 }
181
182 protected boolean updateBfGIdOnMasterDischargeTable(
183 SyncContext context,
184 DIPSGauge gauge
185 ) throws SQLException {
186 log.info(
187 "FLYS: Updating master discharge table bfg_id for '" +
188 gauge.getAftName() + "'");
189 ConnectedStatements flysStatements = context.getFlysStatements();
190
191 ResultSet rs = flysStatements
192 .getStatement("select.gauge.master.discharge.table")
193 .clearParameters()
194 .setInt("gauge_id", gauge.getFlysId())
195 .executeQuery();
196
197 int flysId;
198
199 try {
200 if (!rs.next()) {
201 log.error(
202 "FLYS: No master discharge table found for gauge '" +
203 gauge.getAftName() + "'");
204 return false;
205 }
206 String bfgId = rs.getString("bfg_id");
207 if (!rs.wasNull()) { // already has BFG_ID
208 return false;
209 }
210 flysId = rs.getInt("id");
211 } finally {
212 rs.close();
213 }
214
215 // We need to find out the BFG_ID of the current discharge table
216 // for this gauge in AFT.
217
218 ConnectedStatements aftStatements = context.getAftStatements();
219
220 rs = aftStatements
221 .getStatement("select.bfg.id.current")
222 .clearParameters()
223 .setString("number", "%" + gauge.getOfficialNumber())
224 .executeQuery();
225
226 String bfgId = null;
227
228 try {
229 if (rs.next()) {
230 bfgId = rs.getString("BFG_ID");
231 }
232 } finally {
233 rs.close();
234 }
235
236 if (bfgId == null) {
237 log.warn(
238 "No BFG_ID found for current discharge table of gauge '" +
239 gauge + "'");
240 return false;
241 }
242
243 // Set the BFG_ID in FLYS.
244 flysStatements.beginTransaction();
245 try {
246 flysStatements
247 .getStatement("update.bfg.id.discharge.table")
248 .clearParameters()
249 .setInt("id", flysId)
250 .setString("bfg_id", bfgId)
251 .executeUpdate();
252 flysStatements.commitTransaction();
253 } catch (SQLException sqle) {
254 flysStatements.rollbackTransaction();
255 log.error(sqle, sqle);
256 return false;
257 }
258
259 return true;
260 }
261
262 protected boolean updateGauge(
263 SyncContext context,
264 DIPSGauge gauge
265 )
266 throws SQLException
267 {
268 log.info("FLYS: Updating gauge '" + gauge.getAftName() + "'.");
269 // We need to load all discharge tables from both databases
270 // of the gauge and do some pairing based on their bfg_id.
271
272 boolean modified = false;
273
274 ConnectedStatements flysStatements = context.getFlysStatements();
275
276 flysStatements.beginTransaction();
277 try {
278 List<DischargeTable> flysDTs =
279 DischargeTable.loadFlysDischargeTables(
280 context, gauge.getFlysId());
281
282 List<DischargeTable> aftDTs =
283 DischargeTable.loadAftDischargeTables(
284 context, gauge.getOfficialNumber());
285
286 Map<String, DischargeTable> bfgId2FlysDT =
287 new HashMap<String, DischargeTable>();
288
289 for (DischargeTable dt: flysDTs) {
290 String bfgId = dt.getBfgId();
291 if (bfgId == null) {
292 log.warn("FLYS: discharge table " + dt.getId()
293 + " has no bfg_id. Ignored.");
294 continue;
295 }
296 bfgId2FlysDT.put(bfgId, dt);
297 }
298
299 List<DischargeTable> createDTs = new ArrayList<DischargeTable>();
300
301 for (DischargeTable aftDT: aftDTs) {
302 String bfgId = aftDT.getBfgId();
303 DischargeTable flysDT = bfgId2FlysDT.remove(bfgId);
304 if (flysDT != null) {
305 // Found in AFT and FLYS.
306 log.info("FLYS: Discharge table '" + bfgId
307 + "' found in AFT and FLYS. -> update");
308 // Create the W/Q diff.
309 modified |= writeWQChanges(context, flysDT, aftDT);
310 }
311 else {
312 log.info("FLYS: Discharge table '" + bfgId
313 + "' not found in FLYS. -> create");
314 createDTs.add(aftDT);
315 }
316 }
317
318 for (String bfgId: bfgId2FlysDT.keySet()) {
319 log.info("FLYS: Discharge table '" + bfgId
320 + "' found in FLYS but not in AFT. -> ignore");
321 }
322
323 log.info("FLYS: Copy " + createDTs.size() +
324 " discharge tables over from AFT.");
325
326 // Create the new discharge tables.
327 for (DischargeTable aftDT: createDTs) {
328 createDischargeTable(context, aftDT, gauge.getFlysId());
329 modified = true;
330 }
331
332 flysStatements.commitTransaction();
333 }
334 catch (SQLException sqle) {
335 flysStatements.rollbackTransaction();
336 log.error(sqle, sqle);
337 modified = false;
338 }
339
340 return modified;
341 }
342
343 protected boolean writeWQChanges(
344 SyncContext context,
345 DischargeTable flysDT,
346 DischargeTable aftDT
347 )
348 throws SQLException
349 {
350 flysDT.loadFlysValues(context);
351 aftDT.loadAftValues(context);
352 WQDiff diff = new WQDiff(flysDT.getValues(), aftDT.getValues());
353 if (diff.hasChanges()) {
354 diff.writeChanges(context, flysDT.getId());
355 return true;
356 }
357 return false;
358 }
359
360 protected boolean createGauges(
361 SyncContext context,
362 Map<Long, DIPSGauge> gauges
363 )
364 throws SQLException
365 {
366 ConnectedStatements flysStatements = context.getFlysStatements();
367
368 SymbolicStatement.Instance nextId =
369 flysStatements.getStatement("next.gauge.id");
370
371 SymbolicStatement.Instance insertStmnt =
372 flysStatements.getStatement("insert.gauge");
373
374 boolean modified = false;
375
376 for (Map.Entry<Long, DIPSGauge> entry: gauges.entrySet()) {
377 Long officialNumber = entry.getKey();
378 DIPSGauge gauge = entry.getValue();
379
380 log.info("Gauge '" + gauge.getAftName() +
381 "' not in FLYS but in AFT/DIPS. -> Create");
382
383 if (!gauge.hasDatums()) {
384 log.warn("DIPS: Gauge '" +
385 gauge.getAftName() + "' has no datum. Ignored.");
386 continue;
387 }
388
389 ResultSet rs = null;
390 flysStatements.beginTransaction();
391 try {
392 (rs = nextId.executeQuery()).next();
393 int gaugeId = rs.getInt("gauge_id");
394 rs.close(); rs = null;
395
396 insertStmnt
397 .clearParameters()
398 .setInt("id", gaugeId)
399 .setString("name", gauge.getAftName())
400 .setInt("river_id", id1)
401 .setDouble("station", gauge.getStation())
402 .setDouble("aeo", gauge.getAeo())
403 .setLong("official_number", officialNumber)
404 .setDouble("datum", gauge.getLatestDatum().getValue());
405
406 insertStmnt.execute();
407
408 log.info("FLYS: Created gauge '" + gauge.getAftName() +
409 "' with id " + gaugeId + ".");
410
411 gauge.setFlysId(gaugeId);
412 createDischargeTables(context, gauge);
413 flysStatements.commitTransaction();
414 modified = true;
415 }
416 catch (SQLException sqle) {
417 flysStatements.rollbackTransaction();
418 log.error(sqle, sqle);
419 }
420 finally {
421 if (rs != null) {
422 rs.close();
423 }
424 }
425 }
426
427 return modified;
428 }
429
430 protected void createDischargeTable(
431 SyncContext context,
432 DischargeTable aftDT,
433 int flysGaugeId
434 )
435 throws SQLException
436 {
437 aftDT.persistFlysTimeInterval(context);
438 int flysId = aftDT.persistFlysDischargeTable(context, flysGaugeId);
439
440 aftDT.loadAftValues(context);
441 aftDT.storeFlysValues(context, flysId);
442 }
443
444 protected void createDischargeTables(
445 SyncContext context,
446 DIPSGauge gauge
447 )
448 throws SQLException
449 {
450 log.info("FLYS: Create discharge tables for '" +
451 gauge.getAftName() + "'.");
452
453 // Load the discharge tables from AFT.
454 List<DischargeTable> dts = loadAftDischargeTables(
455 context, gauge);
456
457 // Persist the time intervals.
458 persistFlysTimeIntervals(context, dts);
459
460 // Persist the discharge tables
461 int [] flysDTIds = persistFlysDischargeTables(
462 context, dts, gauge.getFlysId());
463
464 // Copy over the W/Q values
465 copyWQsFromAftToFlys(context, dts, flysDTIds);
466 }
467
468 protected List<DischargeTable> loadAftDischargeTables(
469 SyncContext context,
470 DIPSGauge gauge
471 )
472 throws SQLException
473 {
474 return DischargeTable.loadAftDischargeTables(
475 context, gauge.getOfficialNumber(), gauge.getFlysId());
476 }
477
478 protected void persistFlysTimeIntervals(
479 SyncContext context,
480 List<DischargeTable> dts
481 )
482 throws SQLException
483 {
484 for (DischargeTable dt: dts) {
485 dt.persistFlysTimeInterval(context);
486 }
487 }
488
489 protected int [] persistFlysDischargeTables(
490 SyncContext context,
491 List<DischargeTable> dts,
492 int flysGaugeId
493 )
494 throws SQLException
495 {
496 int [] flysDTIds = new int[dts.size()];
497
498 for (int i = 0; i < flysDTIds.length; ++i) {
499 flysDTIds[i] = dts.get(i)
500 .persistFlysDischargeTable(context, flysGaugeId);
501 }
502
503 return flysDTIds;
504 }
505
506 protected void copyWQsFromAftToFlys(
507 SyncContext context,
508 List<DischargeTable> dts,
509 int [] flysDTIds
510 )
511 throws SQLException
512 {
513 for (int i = 0; i < flysDTIds.length; ++i) {
514 DischargeTable dt = dts.get(i);
515 dt.loadAftValues(context);
516 dt.storeFlysValues(context, flysDTIds[i]);
517 dt.clearValues(); // To save memory.
518 }
519 }
520
521 public String toString() {
522 return "[River: name=" + name + ", " + super.toString() + "]";
523 }
524 }
525 // vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :

http://dive4elements.wald.intevation.org