Mercurial > dive4elements > river
comparison flys-artifacts/src/main/java/de/intevation/flys/wsplgen/Scheduler.java @ 1970:368040e5c400
Improved the Scheduler to be able to cancel running WSPLGEN jobs.
flys-artifacts/trunk@3384 c6561f87-3c4e-4783-a992-168aeb5c3f6f
author | Ingo Weinzierl <ingo.weinzierl@intevation.de> |
---|---|
date | Mon, 12 Dec 2011 08:15:12 +0000 |
parents | 6b9877a9f6c1 |
children | 453d2d0c4258 |
comparison
equal
deleted
inserted
replaced
1969:baefcfba97aa | 1970:368040e5c400 |
---|---|
1 package de.intevation.flys.wsplgen; | 1 package de.intevation.flys.wsplgen; |
2 | 2 |
3 import java.util.Collections; | 3 import java.util.HashMap; |
4 import java.util.LinkedList; | 4 import java.util.Map; |
5 import java.util.List; | 5 import java.util.concurrent.Future; |
6 import java.util.concurrent.ScheduledThreadPoolExecutor; | |
6 | 7 |
7 import org.apache.log4j.Logger; | 8 import org.apache.log4j.Logger; |
9 | |
10 import de.intevation.artifacts.CallContext; | |
8 | 11 |
9 import de.intevation.flys.artifacts.model.WSPLGENJob; | 12 import de.intevation.flys.artifacts.model.WSPLGENJob; |
10 | 13 |
11 | 14 |
12 public class Scheduler implements Runnable { | 15 /** |
16 * The Scheduler is used to retrieve new WSPLGENJob. The incoming jobs are added | |
17 * to a ScheduledThreadPoolExecutor. This thread pool has a number of worker | |
18 * threads that processes the WSPLGENJobs. The number of worker threads can be | |
19 * set using a System property <i>wsplgen.max.threads</i> ; its default value is | |
20 * 1. | |
21 * | |
22 * @author <a href="mailto:ingo.weinzierl@intevation.de">Ingo Weinzierl</a> | |
23 */ | |
24 public class Scheduler { | |
13 | 25 |
14 public static final int MAX_WSPLGEN_PROCESSES = 1; | 26 private class FutureJob { |
27 public Future future; | |
28 public WSPLGENJob job; | |
29 | |
30 public FutureJob(Future future, WSPLGENJob job) { | |
31 this.future = future; | |
32 this.job = job; | |
33 } | |
34 } | |
35 | |
36 public static final int MAX_WSPLGEN_PROCESSES = | |
37 Integer.getInteger("wsplgen.max.threads", 1); | |
15 | 38 |
16 | 39 |
17 protected List<WSPLGENJob> jobs; | 40 protected ScheduledThreadPoolExecutor pool; |
41 protected Map<String, FutureJob> jobs; | |
18 | 42 |
19 | 43 |
20 private static Scheduler INSTANCE; | 44 private static Scheduler INSTANCE; |
21 | 45 |
22 private static final Logger logger = Logger.getLogger(Scheduler.class); | 46 private static final Logger logger = Logger.getLogger(Scheduler.class); |
23 | 47 |
24 | 48 |
25 | 49 |
26 private Scheduler() { | 50 private Scheduler() { |
27 jobs = Collections.synchronizedList(new LinkedList<WSPLGENJob>()); | 51 jobs = new HashMap<String, FutureJob>(); |
52 pool = new ScheduledThreadPoolExecutor(MAX_WSPLGEN_PROCESSES); | |
28 } | 53 } |
29 | 54 |
30 | 55 |
31 public static Scheduler getInstance() { | 56 public static Scheduler getInstance() { |
32 if (INSTANCE == null) { | 57 if (INSTANCE == null) { |
33 logger.info("Create new WSPLGEN Scheduler..."); | 58 logger.info("Create new WSPLGEN Scheduler..."); |
34 | 59 |
35 INSTANCE = new Scheduler(); | 60 INSTANCE = new Scheduler(); |
36 new Thread(INSTANCE).start(); | |
37 } | 61 } |
38 | 62 |
39 return INSTANCE; | 63 return INSTANCE; |
40 } | 64 } |
41 | 65 |
42 | 66 |
43 public void addJob(WSPLGENJob job) { | 67 public void addJob(final WSPLGENJob job) { |
44 synchronized(jobs) { | 68 synchronized (jobs) { |
45 jobs.add(job); | 69 WSPLGENFuture f = new WSPLGENFuture(new WSPLGENCallable(this, job)); |
70 pool.execute(f); | |
46 | 71 |
47 logger.info("New WSPLGEN job added."); | 72 jobs.put(job.getArtifact().identifier(), new FutureJob(f, job)); |
48 | 73 |
49 jobs.notifyAll(); | 74 logger.info("New WSPLGEN job successfully added."); |
50 } | 75 } |
51 } | 76 } |
52 | 77 |
53 | 78 |
54 public WSPLGENJob getJob() { | 79 /** |
55 synchronized(jobs) { | 80 * Cancels a running (or queued) job. |
56 if (!jobs.isEmpty()) { | 81 * |
57 return jobs.remove(0); | 82 * @param jobId The id of the job (which is the identifier of an Artifact). |
58 } | 83 */ |
84 public void cancelJob(String jobId) { | |
85 logger.debug("Search job in queue: " + jobId); | |
59 | 86 |
60 return null; | 87 synchronized (jobs) { |
61 } | 88 FutureJob fj = jobs.get(jobId); |
62 } | |
63 | 89 |
90 if (fj != null) { | |
91 logger.info("Try to cancel job: " + jobId); | |
64 | 92 |
65 public void run() { | 93 fj.future.cancel(true); |
66 logger.info("WSPLGEN Scheduler started."); | |
67 | 94 |
68 for (;;) { | 95 removeJob(jobId); |
69 try { | 96 |
70 doRun(); | 97 fj.job.getCallContext().afterBackground( |
71 } | 98 CallContext.STORE); |
72 catch (InterruptedException ie) { | 99 |
73 logger.warn("Interrupt in WSPLGEN Scheduler -> restart it!"); | 100 logger.info("Canceled job: " + jobId); |
74 } | 101 } |
75 } | 102 } |
76 } | 103 } |
77 | 104 |
78 | 105 |
79 public void doRun() | 106 protected void removeJob(String id) { |
80 throws InterruptedException | 107 synchronized (jobs) { |
81 { | 108 jobs.remove(id); |
82 for (;;) { | |
83 final WSPLGENJob job = getJob(); | |
84 | |
85 if (job != null) { | |
86 logger.debug("Got new job to execute..."); | |
87 | |
88 Thread t = new Thread() { | |
89 public void run() { | |
90 JobExecutor executor = new JobExecutor(job); | |
91 executor.execute(); | |
92 } | |
93 }; | |
94 | |
95 t.start(); | |
96 t.join(); | |
97 } | |
98 else { | |
99 logger.info("No more jobs in Scheduler -> go sleep!"); | |
100 synchronized (jobs) { | |
101 jobs.wait(); | |
102 } | |
103 | |
104 logger.info("New jobs in Scheduler -> wake up!"); | |
105 } | |
106 } | 109 } |
107 } | 110 } |
108 } | 111 } |
109 // vim:set ts=4 sw=4 si et sta sts=4 fenc=utf-8 : | 112 // vim:set ts=4 sw=4 si et sta sts=4 fenc=utf-8 : |