diff flys-artifacts/src/main/java/de/intevation/flys/wsplgen/Scheduler.java @ 1970:368040e5c400

Improved the Scheduler to be able to cancel running WSPLGEN jobs. flys-artifacts/trunk@3384 c6561f87-3c4e-4783-a992-168aeb5c3f6f
author Ingo Weinzierl <ingo.weinzierl@intevation.de>
date Mon, 12 Dec 2011 08:15:12 +0000
parents 6b9877a9f6c1
children 453d2d0c4258
line wrap: on
line diff
--- a/flys-artifacts/src/main/java/de/intevation/flys/wsplgen/Scheduler.java	Fri Dec 09 16:39:08 2011 +0000
+++ b/flys-artifacts/src/main/java/de/intevation/flys/wsplgen/Scheduler.java	Mon Dec 12 08:15:12 2011 +0000
@@ -1,20 +1,44 @@
 package de.intevation.flys.wsplgen;
 
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import org.apache.log4j.Logger;
 
+import de.intevation.artifacts.CallContext;
+
 import de.intevation.flys.artifacts.model.WSPLGENJob;
 
 
-public class Scheduler implements Runnable {
+/**
+ * The Scheduler is used to retrieve new WSPLGENJob. The incoming jobs are added
+ * to a ScheduledThreadPoolExecutor. This thread pool has a number of worker
+ * threads that processes the WSPLGENJobs. The number of worker threads can be
+ * set using a System property <i>wsplgen.max.threads</i> ; its default value is
+ * 1.
+ *
+ * @author <a href="mailto:ingo.weinzierl@intevation.de">Ingo Weinzierl</a>
+ */
+public class Scheduler {
 
-    public static final int MAX_WSPLGEN_PROCESSES = 1;
+    private class FutureJob {
+        public Future     future;
+        public WSPLGENJob job;
+
+        public FutureJob(Future future, WSPLGENJob job) {
+            this.future = future;
+            this.job    = job;
+        }
+    }
+
+    public static final int MAX_WSPLGEN_PROCESSES =
+        Integer.getInteger("wsplgen.max.threads", 1);
 
 
-    protected List<WSPLGENJob> jobs;
+    protected ScheduledThreadPoolExecutor pool;
+    protected Map<String, FutureJob> jobs;
 
 
     private static Scheduler INSTANCE;
@@ -24,7 +48,8 @@
 
 
     private Scheduler() {
-        jobs = Collections.synchronizedList(new LinkedList<WSPLGENJob>());
+        jobs = new HashMap<String, FutureJob>();
+        pool = new ScheduledThreadPoolExecutor(MAX_WSPLGEN_PROCESSES);
     }
 
 
@@ -33,76 +58,54 @@
             logger.info("Create new WSPLGEN Scheduler...");
 
             INSTANCE = new Scheduler();
-            new Thread(INSTANCE).start();
         }
 
         return INSTANCE;
     }
 
 
-    public void addJob(WSPLGENJob job) {
-        synchronized(jobs) {
-            jobs.add(job);
+    public void addJob(final WSPLGENJob job) {
+        synchronized (jobs) {
+            WSPLGENFuture f = new WSPLGENFuture(new WSPLGENCallable(this, job));
+            pool.execute(f);
 
-            logger.info("New WSPLGEN job added.");
+            jobs.put(job.getArtifact().identifier(), new FutureJob(f, job));
 
-            jobs.notifyAll();
+            logger.info("New WSPLGEN job successfully added.");
         }
     }
 
 
-    public WSPLGENJob getJob() {
-        synchronized(jobs) {
-            if (!jobs.isEmpty()) {
-                return jobs.remove(0);
-            }
-
-            return null;
-        }
-    }
+    /**
+     * Cancels a running (or queued) job.
+     *
+     * @param jobId The id of the job (which is the identifier of an Artifact).
+     */
+    public void cancelJob(String jobId) {
+        logger.debug("Search job in queue: " + jobId);
 
-
-    public void run() {
-        logger.info("WSPLGEN Scheduler started.");
+        synchronized (jobs) {
+            FutureJob fj = jobs.get(jobId);
 
-        for (;;) {
-            try {
-                doRun();
-            }
-            catch (InterruptedException ie) {
-                logger.warn("Interrupt in WSPLGEN Scheduler -> restart it!");
+            if (fj != null) {
+                logger.info("Try to cancel job: " + jobId);
+
+                fj.future.cancel(true);
+
+                removeJob(jobId);
+
+                fj.job.getCallContext().afterBackground(
+                    CallContext.STORE);
+
+                logger.info("Canceled job: " + jobId);
             }
         }
     }
 
 
-    public void doRun()
-    throws InterruptedException
-    {
-        for (;;) {
-            final WSPLGENJob job = getJob();
-
-            if (job != null) {
-                logger.debug("Got new job to execute...");
-
-                Thread t = new Thread() {
-                    public void run() {
-                        JobExecutor executor = new JobExecutor(job);
-                        executor.execute();
-                    }
-                };
-
-                t.start();
-                t.join();
-            }
-            else {
-                logger.info("No more jobs in Scheduler -> go sleep!");
-                synchronized (jobs) {
-                    jobs.wait();
-                }
-
-                logger.info("New jobs in Scheduler -> wake up!");
-            }
+    protected void removeJob(String id) {
+        synchronized (jobs) {
+            jobs.remove(id);
         }
     }
 }

http://dive4elements.wald.intevation.org