comparison flys-artifacts/src/main/java/de/intevation/flys/wsplgen/Scheduler.java @ 1970:368040e5c400

Improved the Scheduler to be able to cancel running WSPLGEN jobs. flys-artifacts/trunk@3384 c6561f87-3c4e-4783-a992-168aeb5c3f6f
author Ingo Weinzierl <ingo.weinzierl@intevation.de>
date Mon, 12 Dec 2011 08:15:12 +0000
parents 6b9877a9f6c1
children 453d2d0c4258
comparison
equal deleted inserted replaced
1969:baefcfba97aa 1970:368040e5c400
1 package de.intevation.flys.wsplgen; 1 package de.intevation.flys.wsplgen;
2 2
3 import java.util.Collections; 3 import java.util.HashMap;
4 import java.util.LinkedList; 4 import java.util.Map;
5 import java.util.List; 5 import java.util.concurrent.Future;
6 import java.util.concurrent.ScheduledThreadPoolExecutor;
6 7
7 import org.apache.log4j.Logger; 8 import org.apache.log4j.Logger;
9
10 import de.intevation.artifacts.CallContext;
8 11
9 import de.intevation.flys.artifacts.model.WSPLGENJob; 12 import de.intevation.flys.artifacts.model.WSPLGENJob;
10 13
11 14
12 public class Scheduler implements Runnable { 15 /**
16 * The Scheduler is used to retrieve new WSPLGENJob. The incoming jobs are added
17 * to a ScheduledThreadPoolExecutor. This thread pool has a number of worker
18 * threads that processes the WSPLGENJobs. The number of worker threads can be
19 * set using a System property <i>wsplgen.max.threads</i> ; its default value is
20 * 1.
21 *
22 * @author <a href="mailto:ingo.weinzierl@intevation.de">Ingo Weinzierl</a>
23 */
24 public class Scheduler {
13 25
14 public static final int MAX_WSPLGEN_PROCESSES = 1; 26 private class FutureJob {
27 public Future future;
28 public WSPLGENJob job;
29
30 public FutureJob(Future future, WSPLGENJob job) {
31 this.future = future;
32 this.job = job;
33 }
34 }
35
36 public static final int MAX_WSPLGEN_PROCESSES =
37 Integer.getInteger("wsplgen.max.threads", 1);
15 38
16 39
17 protected List<WSPLGENJob> jobs; 40 protected ScheduledThreadPoolExecutor pool;
41 protected Map<String, FutureJob> jobs;
18 42
19 43
20 private static Scheduler INSTANCE; 44 private static Scheduler INSTANCE;
21 45
22 private static final Logger logger = Logger.getLogger(Scheduler.class); 46 private static final Logger logger = Logger.getLogger(Scheduler.class);
23 47
24 48
25 49
26 private Scheduler() { 50 private Scheduler() {
27 jobs = Collections.synchronizedList(new LinkedList<WSPLGENJob>()); 51 jobs = new HashMap<String, FutureJob>();
52 pool = new ScheduledThreadPoolExecutor(MAX_WSPLGEN_PROCESSES);
28 } 53 }
29 54
30 55
31 public static Scheduler getInstance() { 56 public static Scheduler getInstance() {
32 if (INSTANCE == null) { 57 if (INSTANCE == null) {
33 logger.info("Create new WSPLGEN Scheduler..."); 58 logger.info("Create new WSPLGEN Scheduler...");
34 59
35 INSTANCE = new Scheduler(); 60 INSTANCE = new Scheduler();
36 new Thread(INSTANCE).start();
37 } 61 }
38 62
39 return INSTANCE; 63 return INSTANCE;
40 } 64 }
41 65
42 66
43 public void addJob(WSPLGENJob job) { 67 public void addJob(final WSPLGENJob job) {
44 synchronized(jobs) { 68 synchronized (jobs) {
45 jobs.add(job); 69 WSPLGENFuture f = new WSPLGENFuture(new WSPLGENCallable(this, job));
70 pool.execute(f);
46 71
47 logger.info("New WSPLGEN job added."); 72 jobs.put(job.getArtifact().identifier(), new FutureJob(f, job));
48 73
49 jobs.notifyAll(); 74 logger.info("New WSPLGEN job successfully added.");
50 } 75 }
51 } 76 }
52 77
53 78
54 public WSPLGENJob getJob() { 79 /**
55 synchronized(jobs) { 80 * Cancels a running (or queued) job.
56 if (!jobs.isEmpty()) { 81 *
57 return jobs.remove(0); 82 * @param jobId The id of the job (which is the identifier of an Artifact).
58 } 83 */
84 public void cancelJob(String jobId) {
85 logger.debug("Search job in queue: " + jobId);
59 86
60 return null; 87 synchronized (jobs) {
61 } 88 FutureJob fj = jobs.get(jobId);
62 }
63 89
90 if (fj != null) {
91 logger.info("Try to cancel job: " + jobId);
64 92
65 public void run() { 93 fj.future.cancel(true);
66 logger.info("WSPLGEN Scheduler started.");
67 94
68 for (;;) { 95 removeJob(jobId);
69 try { 96
70 doRun(); 97 fj.job.getCallContext().afterBackground(
71 } 98 CallContext.STORE);
72 catch (InterruptedException ie) { 99
73 logger.warn("Interrupt in WSPLGEN Scheduler -> restart it!"); 100 logger.info("Canceled job: " + jobId);
74 } 101 }
75 } 102 }
76 } 103 }
77 104
78 105
79 public void doRun() 106 protected void removeJob(String id) {
80 throws InterruptedException 107 synchronized (jobs) {
81 { 108 jobs.remove(id);
82 for (;;) {
83 final WSPLGENJob job = getJob();
84
85 if (job != null) {
86 logger.debug("Got new job to execute...");
87
88 Thread t = new Thread() {
89 public void run() {
90 JobExecutor executor = new JobExecutor(job);
91 executor.execute();
92 }
93 };
94
95 t.start();
96 t.join();
97 }
98 else {
99 logger.info("No more jobs in Scheduler -> go sleep!");
100 synchronized (jobs) {
101 jobs.wait();
102 }
103
104 logger.info("New jobs in Scheduler -> wake up!");
105 }
106 } 109 }
107 } 110 }
108 } 111 }
109 // vim:set ts=4 sw=4 si et sta sts=4 fenc=utf-8 : 112 // vim:set ts=4 sw=4 si et sta sts=4 fenc=utf-8 :

http://dive4elements.wald.intevation.org