changeset 1970:368040e5c400

Improved the Scheduler to be able to cancel running WSPLGEN jobs. flys-artifacts/trunk@3384 c6561f87-3c4e-4783-a992-168aeb5c3f6f
author Ingo Weinzierl <ingo.weinzierl@intevation.de>
date Mon, 12 Dec 2011 08:15:12 +0000
parents baefcfba97aa
children 741d2067cfe1
files flys-artifacts/ChangeLog flys-artifacts/doc/conf/conf.xml flys-artifacts/src/main/java/de/intevation/flys/artifacts/context/FLYSContext.java flys-artifacts/src/main/java/de/intevation/flys/artifacts/states/FloodMapState.java flys-artifacts/src/main/java/de/intevation/flys/wsplgen/JobExecutor.java flys-artifacts/src/main/java/de/intevation/flys/wsplgen/Scheduler.java flys-artifacts/src/main/java/de/intevation/flys/wsplgen/SchedulerSetup.java flys-artifacts/src/main/java/de/intevation/flys/wsplgen/WSPLGENCallable.java flys-artifacts/src/main/java/de/intevation/flys/wsplgen/WSPLGENFuture.java
diffstat 9 files changed, 323 insertions(+), 172 deletions(-) [+]
line wrap: on
line diff
--- a/flys-artifacts/ChangeLog	Fri Dec 09 16:39:08 2011 +0000
+++ b/flys-artifacts/ChangeLog	Mon Dec 12 08:15:12 2011 +0000
@@ -1,3 +1,35 @@
+2011-12-12  Ingo Weinzierl <ingo@intevation.de>
+
+	* src/main/java/de/intevation/flys/wsplgen/JobExecutor.java,
+	  src/main/java/de/intevation/flys/wsplgen/WSPLGENCallable.java: Renamed
+	  JobExecutor to WSPLGENCallable (because it is a Callable now). In addition
+	  to the call() method which starts the WSPLGEN process, this Callable
+	  offers a cancelWSPLGEN() method to destroy a running WSPLGEN process.
+
+	* src/main/java/de/intevation/flys/wsplgen/WSPLGENFuture.java: A FutureTask
+	  that overrides cancel(boolean). Before this instance call
+	  super.cancel(boolean), it executes WSPLGENCallable.cancelWSPLGEN() to kill
+	  a running WSPLGEN process.
+
+	* src/main/java/de/intevation/flys/wsplgen/Scheduler.java: The Scheduler is
+	  no longer a Runnable. It makes now use of a ScheduledThreadPoolExecutor to
+	  schedule the incoming WSPLGENJobs. The ScheduledThreadPoolExecutor has a
+	  fixed number of worker threads that process the jobs. The number is 1 per
+	  default; it can be modified using a System property "wsplgen.max.threads".
+
+	* src/main/java/de/intevation/flys/artifacts/context/FLYSContext.java: Added
+	  a string constant SCHEDULER.
+
+	* src/main/java/de/intevation/flys/wsplgen/SchedulerSetup.java: A
+	  LifetimeListener that currently implements the systemUp() method to create
+	  an instance of Scheduler. After its creation, the Scheduler is put into
+	  the GlobalContext using FLYSContext.SCHEDULER as key.
+
+	* src/main/java/de/intevation/flys/artifacts/states/FloodMapState.java:
+	  Fetch the Scheduler from GlobalContext.
+
+	* doc/conf/conf.xml: Registered SchedulerSetup as LifetimeListener.
+
 2011-12-09	Felix Wolfsteller	<felix.wolfsteller@intevation.de>
 
 	* src/main/java/de/intevation/flys/artifacts/StaticFLYSArtifact.java:
--- a/flys-artifacts/doc/conf/conf.xml	Fri Dec 09 16:39:08 2011 +0000
+++ b/flys-artifacts/doc/conf/conf.xml	Mon Dec 12 08:15:12 2011 +0000
@@ -97,6 +97,7 @@
 
     <lifetime-listeners>
         <listener>de.intevation.flys.artifacts.datacage.Datacage</listener>
+        <listener>de.intevation.flys.wsplgen.SchedulerSetup</listener>
     </lifetime-listeners>
 
     <backend-listeners>
--- a/flys-artifacts/src/main/java/de/intevation/flys/artifacts/context/FLYSContext.java	Fri Dec 09 16:39:08 2011 +0000
+++ b/flys-artifacts/src/main/java/de/intevation/flys/artifacts/context/FLYSContext.java	Mon Dec 12 08:15:12 2011 +0000
@@ -46,6 +46,10 @@
     public static final String RIVER_WMS =
         "flys.floodmap.river.wms";
 
+    /** The key that is used to store an instance of Scheduler in the context.*/
+    public static final String SCHEDULER =
+        "flys.wsplgen.scheduler";
+
 
     /**
      * The default constructor.
--- a/flys-artifacts/src/main/java/de/intevation/flys/artifacts/states/FloodMapState.java	Fri Dec 09 16:39:08 2011 +0000
+++ b/flys-artifacts/src/main/java/de/intevation/flys/artifacts/states/FloodMapState.java	Mon Dec 12 08:15:12 2011 +0000
@@ -23,6 +23,7 @@
 
 import de.intevation.artifacts.Artifact;
 import de.intevation.artifacts.CallContext;
+import de.intevation.artifacts.GlobalContext;
 
 import de.intevation.artifacts.common.utils.FileTools;
 
@@ -34,6 +35,7 @@
 import de.intevation.flys.model.RiverAxis;
 
 import de.intevation.flys.artifacts.FLYSArtifact;
+import de.intevation.flys.artifacts.context.FLYSContext;
 import de.intevation.flys.artifacts.model.CalculationMessage;
 import de.intevation.flys.artifacts.model.CalculationResult;
 import de.intevation.flys.artifacts.model.FacetTypes;
@@ -141,7 +143,8 @@
                 "wsplgen.job.queued")
         ));
 
-        Scheduler scheduler = Scheduler.getInstance();
+        GlobalContext gc    = (GlobalContext) context.globalContext();
+        Scheduler scheduler = (Scheduler) gc.get(FLYSContext.SCHEDULER);
         scheduler.addJob(job);
 
         return null;
@@ -194,6 +197,9 @@
         FLYSArtifact flys = (FLYSArtifact) artifact;
         removeDirectory(flys);
 
+        Scheduler scheduler = Scheduler.getInstance();
+        scheduler.cancelJob(flys.identifier());
+
         MapfileGenerator.getInstance().update();
     }
 
--- a/flys-artifacts/src/main/java/de/intevation/flys/wsplgen/JobExecutor.java	Fri Dec 09 16:39:08 2011 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,113 +0,0 @@
-package de.intevation.flys.wsplgen;
-
-import java.io.IOException;
-import java.io.File;
-
-import org.apache.log4j.Logger;
-
-import de.intevation.artifacts.CallContext;
-
-import de.intevation.flys.artifacts.model.WSPLGENJob;
-
-
-public class JobExecutor {
-
-    public static final String WSPLGEN_PARAMETER_FILE =
-        "wsplgen.par";
-
-    public static final String WSPLGEN_BIN_PATH =
-        System.getProperty("wsplgen.bin.path");
-
-
-    private Logger logger = Logger.getLogger(JobExecutor.class);
-
-    private Process process;
-
-    protected WSPLGENJob job;
-
-    protected JobObserver     logObserver;
-    protected ProblemObserver errorObserver;
-
-
-    public JobExecutor(WSPLGENJob job) {
-        this.job           = job;
-        this.logObserver   = new JobObserver(job);
-        this.errorObserver = new ProblemObserver(job);
-    }
-
-
-    public void execute() {
-        File dir       = job.getWorkingDir();
-        File parameter = new File(dir, WSPLGEN_PARAMETER_FILE);
-
-        String[] args = new String[] {
-            WSPLGEN_BIN_PATH,
-            "-PAR=\"" + parameter.getAbsolutePath() + "\""
-        };
-
-        execute(args, dir);
-    }
-
-
-    protected void execute(String[] args, File dir) {
-        logger.info("Start JobExecutor for artifact: " + dir.getName());
-
-        String errorMsg = null;
-
-        try {
-            synchronized (this) {
-                process = Runtime.getRuntime().exec(args, null, dir);
-
-                logObserver.setInputStream(process.getInputStream());
-                errorObserver.setInputStream(process.getErrorStream());
-
-                logObserver.start();
-                errorObserver.start();
-
-                try {
-                    process.waitFor();
-                }
-                catch (InterruptedException ie) {
-                    logger.error("WSPLGEN job interrupted: " + ie.getMessage());
-                }
-
-                try {
-                    logObserver.join();
-                    errorObserver.join();
-                }
-                catch (InterruptedException iee) { /* do nothing */ }
-
-                logger.info("WSPLGEN exit value: " + process.exitValue());
-                logger.info(
-                    "WSPLGEN throw " +
-                    errorObserver.numErrors() + " errors.");
-                logger.info(
-                    "WSPLGEN throw " +
-                    errorObserver.numWarnings() + " warnings.");
-
-                if (process.exitValue() < 2 && errorObserver.numErrors() == 0) {
-                    FacetCreator fc = job.getFacetCreator();
-                    fc.createWSPLGENFacet();
-                    fc.finish();
-                }
-
-                job.getCallContext().afterBackground(CallContext.STORE);
-
-                return;
-            }
-        }
-        catch (SecurityException se) {
-            logger.error(se);
-        }
-        catch (IOException ioe) {
-            logger.error(ioe);
-        }
-        catch (NullPointerException npe) {
-            logger.error(npe, npe);
-        }
-        catch (IndexOutOfBoundsException ioobe) {
-            logger.error(ioobe, ioobe);
-        }
-    }
-}
-// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf-8 :
--- a/flys-artifacts/src/main/java/de/intevation/flys/wsplgen/Scheduler.java	Fri Dec 09 16:39:08 2011 +0000
+++ b/flys-artifacts/src/main/java/de/intevation/flys/wsplgen/Scheduler.java	Mon Dec 12 08:15:12 2011 +0000
@@ -1,20 +1,44 @@
 package de.intevation.flys.wsplgen;
 
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import org.apache.log4j.Logger;
 
+import de.intevation.artifacts.CallContext;
+
 import de.intevation.flys.artifacts.model.WSPLGENJob;
 
 
-public class Scheduler implements Runnable {
+/**
+ * The Scheduler is used to retrieve new WSPLGENJob. The incoming jobs are added
+ * to a ScheduledThreadPoolExecutor. This thread pool has a number of worker
+ * threads that processes the WSPLGENJobs. The number of worker threads can be
+ * set using a System property <i>wsplgen.max.threads</i> ; its default value is
+ * 1.
+ *
+ * @author <a href="mailto:ingo.weinzierl@intevation.de">Ingo Weinzierl</a>
+ */
+public class Scheduler {
 
-    public static final int MAX_WSPLGEN_PROCESSES = 1;
+    private class FutureJob {
+        public Future     future;
+        public WSPLGENJob job;
+
+        public FutureJob(Future future, WSPLGENJob job) {
+            this.future = future;
+            this.job    = job;
+        }
+    }
+
+    public static final int MAX_WSPLGEN_PROCESSES =
+        Integer.getInteger("wsplgen.max.threads", 1);
 
 
-    protected List<WSPLGENJob> jobs;
+    protected ScheduledThreadPoolExecutor pool;
+    protected Map<String, FutureJob> jobs;
 
 
     private static Scheduler INSTANCE;
@@ -24,7 +48,8 @@
 
 
     private Scheduler() {
-        jobs = Collections.synchronizedList(new LinkedList<WSPLGENJob>());
+        jobs = new HashMap<String, FutureJob>();
+        pool = new ScheduledThreadPoolExecutor(MAX_WSPLGEN_PROCESSES);
     }
 
 
@@ -33,76 +58,54 @@
             logger.info("Create new WSPLGEN Scheduler...");
 
             INSTANCE = new Scheduler();
-            new Thread(INSTANCE).start();
         }
 
         return INSTANCE;
     }
 
 
-    public void addJob(WSPLGENJob job) {
-        synchronized(jobs) {
-            jobs.add(job);
+    public void addJob(final WSPLGENJob job) {
+        synchronized (jobs) {
+            WSPLGENFuture f = new WSPLGENFuture(new WSPLGENCallable(this, job));
+            pool.execute(f);
 
-            logger.info("New WSPLGEN job added.");
+            jobs.put(job.getArtifact().identifier(), new FutureJob(f, job));
 
-            jobs.notifyAll();
+            logger.info("New WSPLGEN job successfully added.");
         }
     }
 
 
-    public WSPLGENJob getJob() {
-        synchronized(jobs) {
-            if (!jobs.isEmpty()) {
-                return jobs.remove(0);
-            }
-
-            return null;
-        }
-    }
+    /**
+     * Cancels a running (or queued) job.
+     *
+     * @param jobId The id of the job (which is the identifier of an Artifact).
+     */
+    public void cancelJob(String jobId) {
+        logger.debug("Search job in queue: " + jobId);
 
-
-    public void run() {
-        logger.info("WSPLGEN Scheduler started.");
+        synchronized (jobs) {
+            FutureJob fj = jobs.get(jobId);
 
-        for (;;) {
-            try {
-                doRun();
-            }
-            catch (InterruptedException ie) {
-                logger.warn("Interrupt in WSPLGEN Scheduler -> restart it!");
+            if (fj != null) {
+                logger.info("Try to cancel job: " + jobId);
+
+                fj.future.cancel(true);
+
+                removeJob(jobId);
+
+                fj.job.getCallContext().afterBackground(
+                    CallContext.STORE);
+
+                logger.info("Canceled job: " + jobId);
             }
         }
     }
 
 
-    public void doRun()
-    throws InterruptedException
-    {
-        for (;;) {
-            final WSPLGENJob job = getJob();
-
-            if (job != null) {
-                logger.debug("Got new job to execute...");
-
-                Thread t = new Thread() {
-                    public void run() {
-                        JobExecutor executor = new JobExecutor(job);
-                        executor.execute();
-                    }
-                };
-
-                t.start();
-                t.join();
-            }
-            else {
-                logger.info("No more jobs in Scheduler -> go sleep!");
-                synchronized (jobs) {
-                    jobs.wait();
-                }
-
-                logger.info("New jobs in Scheduler -> wake up!");
-            }
+    protected void removeJob(String id) {
+        synchronized (jobs) {
+            jobs.remove(id);
         }
     }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flys-artifacts/src/main/java/de/intevation/flys/wsplgen/SchedulerSetup.java	Mon Dec 12 08:15:12 2011 +0000
@@ -0,0 +1,37 @@
+package de.intevation.flys.wsplgen;
+
+import org.w3c.dom.Document;
+
+import de.intevation.artifacts.GlobalContext;
+
+import de.intevation.artifactdatabase.LifetimeListener;
+
+import de.intevation.flys.artifacts.context.FLYSContext;
+
+
+/**
+ * A LifetimeListener that is used to create an instance of Scheduler. This
+ * instance is put into the GlobalContext using FLYSContext.SCHEDULER.
+ *
+ * @author <a href="mailto:ingo.weinzierl@intevation.de">Ingo Weinzierl</a>
+ */
+public class SchedulerSetup implements LifetimeListener {
+
+    @Override
+    public void setup(Document document) {
+    }
+
+
+    @Override
+    public void systemUp(GlobalContext globalContext) {
+        Scheduler scheduler = Scheduler.getInstance();
+        globalContext.put(FLYSContext.SCHEDULER, scheduler);
+    }
+
+
+    @Override
+    public void systemDown(GlobalContext globalContext) {
+        // TODO IMPLEMENT ME!
+    }
+}
+// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 :
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flys-artifacts/src/main/java/de/intevation/flys/wsplgen/WSPLGENCallable.java	Mon Dec 12 08:15:12 2011 +0000
@@ -0,0 +1,140 @@
+package de.intevation.flys.wsplgen;
+
+import java.io.IOException;
+import java.io.File;
+import java.util.concurrent.Callable;
+
+import org.apache.log4j.Logger;
+
+import de.intevation.artifacts.CallContext;
+
+import de.intevation.flys.artifacts.model.WSPLGENJob;
+
+
+/**
+ * A Callable that is used to start and observe an external Process for WSPLGEN.
+ *
+ * @author <a href="mailto:ingo.weinzierl@intevation.de">Ingo Weinzierl</a>
+ */
+public class WSPLGENCallable implements Callable {
+
+    public static final String WSPLGEN_PARAMETER_FILE =
+        "wsplgen.par";
+
+    public static final String WSPLGEN_BIN_PATH =
+        System.getProperty("wsplgen.bin.path");
+
+
+    private Logger logger = Logger.getLogger(WSPLGENCallable.class);
+
+    private Process process;
+
+    protected Scheduler scheduler;
+
+    protected WSPLGENJob job;
+
+    protected JobObserver     logObserver;
+    protected ProblemObserver errorObserver;
+
+
+    public WSPLGENCallable(Scheduler scheduler, WSPLGENJob job) {
+        this.scheduler     = scheduler;
+        this.job           = job;
+        this.logObserver   = new JobObserver(job);
+        this.errorObserver = new ProblemObserver(job);
+    }
+
+
+    @Override
+    public WSPLGENJob call() {
+        File dir       = job.getWorkingDir();
+        File parameter = new File(dir, WSPLGEN_PARAMETER_FILE);
+
+        String[] args = new String[] {
+            WSPLGEN_BIN_PATH,
+            "-PAR=\"" + parameter.getAbsolutePath() + "\""
+        };
+
+        execute(args, dir);
+
+        return job;
+    }
+
+
+    protected void execute(String[] args, File dir) {
+        logger.info("Start JobExecutor for artifact: " + dir.getName());
+
+        String errorMsg = null;
+
+        try {
+            synchronized (this) {
+                process = Runtime.getRuntime().exec(args, null, dir);
+
+                logObserver.setInputStream(process.getInputStream());
+                errorObserver.setInputStream(process.getErrorStream());
+
+                logObserver.start();
+                errorObserver.start();
+
+                try {
+                    process.waitFor();
+                }
+                catch (InterruptedException ie) {
+                    logger.warn("WSPLGEN job interrupted: " + ie.getMessage());
+                }
+
+                try {
+                    logObserver.join();
+                    errorObserver.join();
+                }
+                catch (InterruptedException iee) { /* do nothing */ }
+
+                logger.info("WSPLGEN exit value: " + process.exitValue());
+                logger.info(
+                    "WSPLGEN throw " +
+                    errorObserver.numErrors() + " errors.");
+                logger.info(
+                    "WSPLGEN throw " +
+                    errorObserver.numWarnings() + " warnings.");
+
+                if (process.exitValue() < 2 && errorObserver.numErrors() == 0) {
+                    FacetCreator fc = job.getFacetCreator();
+                    fc.createWSPLGENFacet();
+                    fc.finish();
+                }
+
+                job.getCallContext().afterBackground(CallContext.STORE);
+
+                scheduler.removeJob(getJob().getArtifact().identifier());
+
+                return;
+            }
+        }
+        catch (SecurityException se) {
+            logger.error(se);
+        }
+        catch (IOException ioe) {
+            logger.error(ioe);
+        }
+        catch (NullPointerException npe) {
+            logger.error(npe, npe);
+        }
+        catch (IndexOutOfBoundsException ioobe) {
+            logger.error(ioobe, ioobe);
+        }
+    }
+
+
+    public void cancelWSPLGEN() {
+        if (process != null) {
+            logger.debug("Cancel running WSPLGEN process.");
+            process.destroy();
+        }
+    }
+
+
+    public WSPLGENJob getJob() {
+        return job;
+    }
+}
+// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf-8 :
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/flys-artifacts/src/main/java/de/intevation/flys/wsplgen/WSPLGENFuture.java	Mon Dec 12 08:15:12 2011 +0000
@@ -0,0 +1,41 @@
+package de.intevation.flys.wsplgen;
+
+import java.util.concurrent.FutureTask;
+
+import org.apache.log4j.Logger;
+
+
+/**
+ * This FutureTask overrides the <i>cancel()</i> method. Before super.cancel()
+ * is called, WSPLGENCallable.cancelWSPLGEN() is executed to kill a running
+ * WSPLGEN process.
+ *
+ * @author <a href="mailto:ingo.weinzierl@intevation.de">Ingo Weinzierl</a>
+ */
+public class WSPLGENFuture extends FutureTask {
+
+    private static final Logger logger = Logger.getLogger(WSPLGENFuture.class);
+
+    protected WSPLGENCallable wsplgenCallable;
+
+
+    public WSPLGENFuture(WSPLGENCallable callable) {
+        super(callable);
+        this.wsplgenCallable = callable;
+    }
+
+
+    public WSPLGENCallable getWSPLGENCallable() {
+        return wsplgenCallable;
+    }
+
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        logger.debug("WSPLGENFuture.cancel");
+
+        wsplgenCallable.cancelWSPLGEN();
+        return super.cancel(mayInterruptIfRunning);
+    }
+}
+// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf-8 :

http://dive4elements.wald.intevation.org