Mercurial > dive4elements > river
comparison etl/src/main/java/org/dive4elements/river/etl/aft/River.java @ 5838:5aa05a7a34b7
Rename modules to more fitting names.
author | Sascha L. Teichmann <teichmann@intevation.de> |
---|---|
date | Thu, 25 Apr 2013 15:23:37 +0200 |
parents | flys-aft/src/main/java/org/dive4elements/river/etl/aft/River.java@9438e9259213 |
children | 8bd9b551456c |
comparison
equal
deleted
inserted
replaced
5837:d9901a08d0a6 | 5838:5aa05a7a34b7 |
---|---|
1 package org.dive4elements.river.etl.aft; | |
2 | |
3 import org.dive4elements.river.etl.db.ConnectedStatements; | |
4 import org.dive4elements.river.etl.db.SymbolicStatement; | |
5 | |
6 import java.sql.ResultSet; | |
7 import java.sql.SQLException; | |
8 | |
9 import java.util.ArrayList; | |
10 import java.util.HashMap; | |
11 import java.util.List; | |
12 import java.util.Map; | |
13 | |
14 import org.apache.log4j.Logger; | |
15 | |
16 public class River | |
17 extends IdPair | |
18 { | |
19 private static Logger log = Logger.getLogger(River.class); | |
20 | |
21 protected String name; | |
22 | |
23 protected double from; | |
24 protected double to; | |
25 | |
26 public River() { | |
27 } | |
28 | |
29 public River(int id1, String name, double from, double to) { | |
30 super(id1); | |
31 this.name = name; | |
32 this.from = from; | |
33 this.to = to; | |
34 } | |
35 | |
36 public River(int id1, int id2, String name) { | |
37 super(id1, id2); | |
38 this.name = name; | |
39 } | |
40 | |
41 public String getName() { | |
42 return name; | |
43 } | |
44 | |
45 public double getFrom() { | |
46 return from; | |
47 } | |
48 | |
49 public void setFrom(double from) { | |
50 this.from = from; | |
51 } | |
52 | |
53 public double getTo() { | |
54 return to; | |
55 } | |
56 | |
57 public void setTo(double to) { | |
58 this.to = to; | |
59 } | |
60 | |
61 public boolean inside(double x) { | |
62 return x >= from && x <= to; | |
63 } | |
64 | |
65 public boolean sync(SyncContext context) throws SQLException { | |
66 log.info("sync river: " + this); | |
67 | |
68 // Only take relevant gauges into account. | |
69 Map<Long, DIPSGauge> dipsGauges = context.getDIPSGauges(name, from, to); | |
70 | |
71 ConnectedStatements flysStatements = context.getFlysStatements(); | |
72 ConnectedStatements aftStatements = context.getAftStatements(); | |
73 | |
74 String riverName = getName(); | |
75 | |
76 Map<Long, DIPSGauge> aftDIPSGauges = new HashMap<Long, DIPSGauge>(); | |
77 | |
78 ResultSet messstellenRs = aftStatements | |
79 .getStatement("select.messstelle") | |
80 .clearParameters() | |
81 .setInt("GEWAESSER_NR", id2) | |
82 .executeQuery(); | |
83 | |
84 try { | |
85 while (messstellenRs.next()) { | |
86 String name = messstellenRs.getString("NAME"); | |
87 String num = messstellenRs.getString("MESSSTELLE_NR"); | |
88 double station = messstellenRs.getDouble("STATIONIERUNG"); | |
89 | |
90 if (!messstellenRs.wasNull() && !inside(station)) { | |
91 log.warn("Station found in AFT but in not range: " + station); | |
92 continue; | |
93 } | |
94 | |
95 Long number = SyncContext.numberToLong(num); | |
96 if (number == null) { | |
97 log.warn("AFT: Invalid MESSSTELLE_NR for MESSSTELLE '"+name+"'"); | |
98 continue; | |
99 } | |
100 DIPSGauge dipsGauge = dipsGauges.get(number); | |
101 if (dipsGauge == null) { | |
102 log.warn( | |
103 "DIPS: MESSSTELLE '" + name + "' not found in DIPS. " + | |
104 "Gauge number used for lookup: " + number); | |
105 continue; | |
106 } | |
107 String gaugeRiver = dipsGauge.getRiverName(); | |
108 if (!gaugeRiver.equalsIgnoreCase(riverName)) { | |
109 log.warn( | |
110 "DIPS: MESSSTELLE '" + name + | |
111 "' is assigned to river '" + gaugeRiver + | |
112 "'. Needs to be on '" + riverName + "'."); | |
113 continue; | |
114 } | |
115 dipsGauge.setAftName(name); | |
116 dipsGauge.setOfficialNumber(number); | |
117 aftDIPSGauges.put(number, dipsGauge); | |
118 } | |
119 } | |
120 finally { | |
121 messstellenRs.close(); | |
122 } | |
123 | |
124 List<DIPSGauge> updateGauges = new ArrayList<DIPSGauge>(); | |
125 | |
126 ResultSet gaugesRs = flysStatements | |
127 .getStatement("select.gauges") | |
128 .clearParameters() | |
129 .setInt("river_id", id1).executeQuery(); | |
130 | |
131 try { | |
132 while (gaugesRs.next()) { | |
133 int gaugeId = gaugesRs.getInt("id"); | |
134 String name = gaugesRs.getString("name"); | |
135 long number = gaugesRs.getLong("official_number"); | |
136 if (gaugesRs.wasNull()) { | |
137 log.warn("FLYS: Gauge '" + name + | |
138 "' has no official number. Ignored."); | |
139 continue; | |
140 } | |
141 Long key = Long.valueOf(number); | |
142 DIPSGauge aftDIPSGauge = aftDIPSGauges.remove(key); | |
143 if (aftDIPSGauge == null) { | |
144 log.warn("FLYS: Gauge '" + name + "' number " + number + | |
145 " is not found in AFT/DIPS."); | |
146 continue; | |
147 } | |
148 aftDIPSGauge.setFlysId(gaugeId); | |
149 log.info("Gauge '" + name + | |
150 "' found in FLYS, AFT and DIPS. -> Update"); | |
151 updateGauges.add(aftDIPSGauge); | |
152 } | |
153 } | |
154 finally { | |
155 gaugesRs.close(); | |
156 } | |
157 | |
158 boolean modified = createGauges(context, aftDIPSGauges); | |
159 | |
160 modified |= updateGauges(context, updateGauges); | |
161 | |
162 return modified; | |
163 } | |
164 | |
165 protected boolean updateGauges( | |
166 SyncContext context, | |
167 List<DIPSGauge> gauges | |
168 ) | |
169 throws SQLException | |
170 { | |
171 boolean modified = false; | |
172 | |
173 for (DIPSGauge gauge: gauges) { | |
174 // XXX: Do dont modify the master AT. | |
175 // modified |= updateBfGIdOnMasterDischargeTable(context, gauge); | |
176 modified |= updateGauge(context, gauge); | |
177 } | |
178 | |
179 return modified; | |
180 } | |
181 | |
182 protected boolean updateBfGIdOnMasterDischargeTable( | |
183 SyncContext context, | |
184 DIPSGauge gauge | |
185 ) throws SQLException { | |
186 log.info( | |
187 "FLYS: Updating master discharge table bfg_id for '" + | |
188 gauge.getAftName() + "'"); | |
189 ConnectedStatements flysStatements = context.getFlysStatements(); | |
190 | |
191 ResultSet rs = flysStatements | |
192 .getStatement("select.gauge.master.discharge.table") | |
193 .clearParameters() | |
194 .setInt("gauge_id", gauge.getFlysId()) | |
195 .executeQuery(); | |
196 | |
197 int flysId; | |
198 | |
199 try { | |
200 if (!rs.next()) { | |
201 log.error( | |
202 "FLYS: No master discharge table found for gauge '" + | |
203 gauge.getAftName() + "'"); | |
204 return false; | |
205 } | |
206 String bfgId = rs.getString("bfg_id"); | |
207 if (!rs.wasNull()) { // already has BFG_ID | |
208 return false; | |
209 } | |
210 flysId = rs.getInt("id"); | |
211 } finally { | |
212 rs.close(); | |
213 } | |
214 | |
215 // We need to find out the BFG_ID of the current discharge table | |
216 // for this gauge in AFT. | |
217 | |
218 ConnectedStatements aftStatements = context.getAftStatements(); | |
219 | |
220 rs = aftStatements | |
221 .getStatement("select.bfg.id.current") | |
222 .clearParameters() | |
223 .setString("number", "%" + gauge.getOfficialNumber()) | |
224 .executeQuery(); | |
225 | |
226 String bfgId = null; | |
227 | |
228 try { | |
229 if (rs.next()) { | |
230 bfgId = rs.getString("BFG_ID"); | |
231 } | |
232 } finally { | |
233 rs.close(); | |
234 } | |
235 | |
236 if (bfgId == null) { | |
237 log.warn( | |
238 "No BFG_ID found for current discharge table of gauge '" + | |
239 gauge + "'"); | |
240 return false; | |
241 } | |
242 | |
243 // Set the BFG_ID in FLYS. | |
244 flysStatements.beginTransaction(); | |
245 try { | |
246 flysStatements | |
247 .getStatement("update.bfg.id.discharge.table") | |
248 .clearParameters() | |
249 .setInt("id", flysId) | |
250 .setString("bfg_id", bfgId) | |
251 .executeUpdate(); | |
252 flysStatements.commitTransaction(); | |
253 } catch (SQLException sqle) { | |
254 flysStatements.rollbackTransaction(); | |
255 log.error(sqle, sqle); | |
256 return false; | |
257 } | |
258 | |
259 return true; | |
260 } | |
261 | |
262 protected boolean updateGauge( | |
263 SyncContext context, | |
264 DIPSGauge gauge | |
265 ) | |
266 throws SQLException | |
267 { | |
268 log.info("FLYS: Updating gauge '" + gauge.getAftName() + "'."); | |
269 // We need to load all discharge tables from both databases | |
270 // of the gauge and do some pairing based on their bfg_id. | |
271 | |
272 boolean modified = false; | |
273 | |
274 ConnectedStatements flysStatements = context.getFlysStatements(); | |
275 | |
276 flysStatements.beginTransaction(); | |
277 try { | |
278 List<DischargeTable> flysDTs = | |
279 DischargeTable.loadFlysDischargeTables( | |
280 context, gauge.getFlysId()); | |
281 | |
282 List<DischargeTable> aftDTs = | |
283 DischargeTable.loadAftDischargeTables( | |
284 context, gauge.getOfficialNumber()); | |
285 | |
286 Map<String, DischargeTable> bfgId2FlysDT = | |
287 new HashMap<String, DischargeTable>(); | |
288 | |
289 for (DischargeTable dt: flysDTs) { | |
290 String bfgId = dt.getBfgId(); | |
291 if (bfgId == null) { | |
292 log.warn("FLYS: discharge table " + dt.getId() | |
293 + " has no bfg_id. Ignored."); | |
294 continue; | |
295 } | |
296 bfgId2FlysDT.put(bfgId, dt); | |
297 } | |
298 | |
299 List<DischargeTable> createDTs = new ArrayList<DischargeTable>(); | |
300 | |
301 for (DischargeTable aftDT: aftDTs) { | |
302 String bfgId = aftDT.getBfgId(); | |
303 DischargeTable flysDT = bfgId2FlysDT.remove(bfgId); | |
304 if (flysDT != null) { | |
305 // Found in AFT and FLYS. | |
306 log.info("FLYS: Discharge table '" + bfgId | |
307 + "' found in AFT and FLYS. -> update"); | |
308 // Create the W/Q diff. | |
309 modified |= writeWQChanges(context, flysDT, aftDT); | |
310 } | |
311 else { | |
312 log.info("FLYS: Discharge table '" + bfgId | |
313 + "' not found in FLYS. -> create"); | |
314 createDTs.add(aftDT); | |
315 } | |
316 } | |
317 | |
318 for (String bfgId: bfgId2FlysDT.keySet()) { | |
319 log.info("FLYS: Discharge table '" + bfgId | |
320 + "' found in FLYS but not in AFT. -> ignore"); | |
321 } | |
322 | |
323 log.info("FLYS: Copy " + createDTs.size() + | |
324 " discharge tables over from AFT."); | |
325 | |
326 // Create the new discharge tables. | |
327 for (DischargeTable aftDT: createDTs) { | |
328 createDischargeTable(context, aftDT, gauge.getFlysId()); | |
329 modified = true; | |
330 } | |
331 | |
332 flysStatements.commitTransaction(); | |
333 } | |
334 catch (SQLException sqle) { | |
335 flysStatements.rollbackTransaction(); | |
336 log.error(sqle, sqle); | |
337 modified = false; | |
338 } | |
339 | |
340 return modified; | |
341 } | |
342 | |
343 protected boolean writeWQChanges( | |
344 SyncContext context, | |
345 DischargeTable flysDT, | |
346 DischargeTable aftDT | |
347 ) | |
348 throws SQLException | |
349 { | |
350 flysDT.loadFlysValues(context); | |
351 aftDT.loadAftValues(context); | |
352 WQDiff diff = new WQDiff(flysDT.getValues(), aftDT.getValues()); | |
353 if (diff.hasChanges()) { | |
354 diff.writeChanges(context, flysDT.getId()); | |
355 return true; | |
356 } | |
357 return false; | |
358 } | |
359 | |
360 protected boolean createGauges( | |
361 SyncContext context, | |
362 Map<Long, DIPSGauge> gauges | |
363 ) | |
364 throws SQLException | |
365 { | |
366 ConnectedStatements flysStatements = context.getFlysStatements(); | |
367 | |
368 SymbolicStatement.Instance nextId = | |
369 flysStatements.getStatement("next.gauge.id"); | |
370 | |
371 SymbolicStatement.Instance insertStmnt = | |
372 flysStatements.getStatement("insert.gauge"); | |
373 | |
374 boolean modified = false; | |
375 | |
376 for (Map.Entry<Long, DIPSGauge> entry: gauges.entrySet()) { | |
377 Long officialNumber = entry.getKey(); | |
378 DIPSGauge gauge = entry.getValue(); | |
379 | |
380 log.info("Gauge '" + gauge.getAftName() + | |
381 "' not in FLYS but in AFT/DIPS. -> Create"); | |
382 | |
383 if (!gauge.hasDatums()) { | |
384 log.warn("DIPS: Gauge '" + | |
385 gauge.getAftName() + "' has no datum. Ignored."); | |
386 continue; | |
387 } | |
388 | |
389 ResultSet rs = null; | |
390 flysStatements.beginTransaction(); | |
391 try { | |
392 (rs = nextId.executeQuery()).next(); | |
393 int gaugeId = rs.getInt("gauge_id"); | |
394 rs.close(); rs = null; | |
395 | |
396 insertStmnt | |
397 .clearParameters() | |
398 .setInt("id", gaugeId) | |
399 .setString("name", gauge.getAftName()) | |
400 .setInt("river_id", id1) | |
401 .setDouble("station", gauge.getStation()) | |
402 .setDouble("aeo", gauge.getAeo()) | |
403 .setLong("official_number", officialNumber) | |
404 .setDouble("datum", gauge.getLatestDatum().getValue()); | |
405 | |
406 insertStmnt.execute(); | |
407 | |
408 log.info("FLYS: Created gauge '" + gauge.getAftName() + | |
409 "' with id " + gaugeId + "."); | |
410 | |
411 gauge.setFlysId(gaugeId); | |
412 createDischargeTables(context, gauge); | |
413 flysStatements.commitTransaction(); | |
414 modified = true; | |
415 } | |
416 catch (SQLException sqle) { | |
417 flysStatements.rollbackTransaction(); | |
418 log.error(sqle, sqle); | |
419 } | |
420 finally { | |
421 if (rs != null) { | |
422 rs.close(); | |
423 } | |
424 } | |
425 } | |
426 | |
427 return modified; | |
428 } | |
429 | |
430 protected void createDischargeTable( | |
431 SyncContext context, | |
432 DischargeTable aftDT, | |
433 int flysGaugeId | |
434 ) | |
435 throws SQLException | |
436 { | |
437 aftDT.persistFlysTimeInterval(context); | |
438 int flysId = aftDT.persistFlysDischargeTable(context, flysGaugeId); | |
439 | |
440 aftDT.loadAftValues(context); | |
441 aftDT.storeFlysValues(context, flysId); | |
442 } | |
443 | |
444 protected void createDischargeTables( | |
445 SyncContext context, | |
446 DIPSGauge gauge | |
447 ) | |
448 throws SQLException | |
449 { | |
450 log.info("FLYS: Create discharge tables for '" + | |
451 gauge.getAftName() + "'."); | |
452 | |
453 // Load the discharge tables from AFT. | |
454 List<DischargeTable> dts = loadAftDischargeTables( | |
455 context, gauge); | |
456 | |
457 // Persist the time intervals. | |
458 persistFlysTimeIntervals(context, dts); | |
459 | |
460 // Persist the discharge tables | |
461 int [] flysDTIds = persistFlysDischargeTables( | |
462 context, dts, gauge.getFlysId()); | |
463 | |
464 // Copy over the W/Q values | |
465 copyWQsFromAftToFlys(context, dts, flysDTIds); | |
466 } | |
467 | |
468 protected List<DischargeTable> loadAftDischargeTables( | |
469 SyncContext context, | |
470 DIPSGauge gauge | |
471 ) | |
472 throws SQLException | |
473 { | |
474 return DischargeTable.loadAftDischargeTables( | |
475 context, gauge.getOfficialNumber(), gauge.getFlysId()); | |
476 } | |
477 | |
478 protected void persistFlysTimeIntervals( | |
479 SyncContext context, | |
480 List<DischargeTable> dts | |
481 ) | |
482 throws SQLException | |
483 { | |
484 for (DischargeTable dt: dts) { | |
485 dt.persistFlysTimeInterval(context); | |
486 } | |
487 } | |
488 | |
489 protected int [] persistFlysDischargeTables( | |
490 SyncContext context, | |
491 List<DischargeTable> dts, | |
492 int flysGaugeId | |
493 ) | |
494 throws SQLException | |
495 { | |
496 int [] flysDTIds = new int[dts.size()]; | |
497 | |
498 for (int i = 0; i < flysDTIds.length; ++i) { | |
499 flysDTIds[i] = dts.get(i) | |
500 .persistFlysDischargeTable(context, flysGaugeId); | |
501 } | |
502 | |
503 return flysDTIds; | |
504 } | |
505 | |
506 protected void copyWQsFromAftToFlys( | |
507 SyncContext context, | |
508 List<DischargeTable> dts, | |
509 int [] flysDTIds | |
510 ) | |
511 throws SQLException | |
512 { | |
513 for (int i = 0; i < flysDTIds.length; ++i) { | |
514 DischargeTable dt = dts.get(i); | |
515 dt.loadAftValues(context); | |
516 dt.storeFlysValues(context, flysDTIds[i]); | |
517 dt.clearValues(); // To save memory. | |
518 } | |
519 } | |
520 | |
521 public String toString() { | |
522 return "[River: name=" + name + ", " + super.toString() + "]"; | |
523 } | |
524 } | |
525 // vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 : |