# HG changeset patch # User Ingo Weinzierl # Date 1323677712 0 # Node ID 368040e5c400f0ed192ac606fa03b7cd5b8d06da # Parent baefcfba97aa7bf45759c7c6045f1a060f8ebdea Improved the Scheduler to be able to cancel running WSPLGEN jobs. flys-artifacts/trunk@3384 c6561f87-3c4e-4783-a992-168aeb5c3f6f diff -r baefcfba97aa -r 368040e5c400 flys-artifacts/ChangeLog --- a/flys-artifacts/ChangeLog Fri Dec 09 16:39:08 2011 +0000 +++ b/flys-artifacts/ChangeLog Mon Dec 12 08:15:12 2011 +0000 @@ -1,3 +1,35 @@ +2011-12-12 Ingo Weinzierl + + * src/main/java/de/intevation/flys/wsplgen/JobExecutor.java, + src/main/java/de/intevation/flys/wsplgen/WSPLGENCallable.java: Renamed + JobExecutor to WSPLGENCallable (because it is a Callable now). In addition + to the call() method which starts the WSPLGEN process, this Callable + offers a cancelWSPLGEN() method to destroy a running WSPLGEN process. + + * src/main/java/de/intevation/flys/wsplgen/WSPLGENFuture.java: A FutureTask + that overrides cancel(boolean). Before this instance call + super.cancel(boolean), it executes WSPLGENCallable.cancelWSPLGEN() to kill + a running WSPLGEN process. + + * src/main/java/de/intevation/flys/wsplgen/Scheduler.java: The Scheduler is + no longer a Runnable. It makes now use of a ScheduledThreadPoolExecutor to + schedule the incoming WSPLGENJobs. The ScheduledThreadPoolExecutor has a + fixed number of worker threads that process the jobs. The number is 1 per + default; it can be modified using a System property "wsplgen.max.threads". + + * src/main/java/de/intevation/flys/artifacts/context/FLYSContext.java: Added + a string constant SCHEDULER. + + * src/main/java/de/intevation/flys/wsplgen/SchedulerSetup.java: A + LifetimeListener that currently implements the systemUp() method to create + an instance of Scheduler. After its creation, the Scheduler is put into + the GlobalContext using FLYSContext.SCHEDULER as key. + + * src/main/java/de/intevation/flys/artifacts/states/FloodMapState.java: + Fetch the Scheduler from GlobalContext. + + * doc/conf/conf.xml: Registered SchedulerSetup as LifetimeListener. + 2011-12-09 Felix Wolfsteller * src/main/java/de/intevation/flys/artifacts/StaticFLYSArtifact.java: diff -r baefcfba97aa -r 368040e5c400 flys-artifacts/doc/conf/conf.xml --- a/flys-artifacts/doc/conf/conf.xml Fri Dec 09 16:39:08 2011 +0000 +++ b/flys-artifacts/doc/conf/conf.xml Mon Dec 12 08:15:12 2011 +0000 @@ -97,6 +97,7 @@ de.intevation.flys.artifacts.datacage.Datacage + de.intevation.flys.wsplgen.SchedulerSetup diff -r baefcfba97aa -r 368040e5c400 flys-artifacts/src/main/java/de/intevation/flys/artifacts/context/FLYSContext.java --- a/flys-artifacts/src/main/java/de/intevation/flys/artifacts/context/FLYSContext.java Fri Dec 09 16:39:08 2011 +0000 +++ b/flys-artifacts/src/main/java/de/intevation/flys/artifacts/context/FLYSContext.java Mon Dec 12 08:15:12 2011 +0000 @@ -46,6 +46,10 @@ public static final String RIVER_WMS = "flys.floodmap.river.wms"; + /** The key that is used to store an instance of Scheduler in the context.*/ + public static final String SCHEDULER = + "flys.wsplgen.scheduler"; + /** * The default constructor. diff -r baefcfba97aa -r 368040e5c400 flys-artifacts/src/main/java/de/intevation/flys/artifacts/states/FloodMapState.java --- a/flys-artifacts/src/main/java/de/intevation/flys/artifacts/states/FloodMapState.java Fri Dec 09 16:39:08 2011 +0000 +++ b/flys-artifacts/src/main/java/de/intevation/flys/artifacts/states/FloodMapState.java Mon Dec 12 08:15:12 2011 +0000 @@ -23,6 +23,7 @@ import de.intevation.artifacts.Artifact; import de.intevation.artifacts.CallContext; +import de.intevation.artifacts.GlobalContext; import de.intevation.artifacts.common.utils.FileTools; @@ -34,6 +35,7 @@ import de.intevation.flys.model.RiverAxis; import de.intevation.flys.artifacts.FLYSArtifact; +import de.intevation.flys.artifacts.context.FLYSContext; import de.intevation.flys.artifacts.model.CalculationMessage; import de.intevation.flys.artifacts.model.CalculationResult; import de.intevation.flys.artifacts.model.FacetTypes; @@ -141,7 +143,8 @@ "wsplgen.job.queued") )); - Scheduler scheduler = Scheduler.getInstance(); + GlobalContext gc = (GlobalContext) context.globalContext(); + Scheduler scheduler = (Scheduler) gc.get(FLYSContext.SCHEDULER); scheduler.addJob(job); return null; @@ -194,6 +197,9 @@ FLYSArtifact flys = (FLYSArtifact) artifact; removeDirectory(flys); + Scheduler scheduler = Scheduler.getInstance(); + scheduler.cancelJob(flys.identifier()); + MapfileGenerator.getInstance().update(); } diff -r baefcfba97aa -r 368040e5c400 flys-artifacts/src/main/java/de/intevation/flys/wsplgen/JobExecutor.java --- a/flys-artifacts/src/main/java/de/intevation/flys/wsplgen/JobExecutor.java Fri Dec 09 16:39:08 2011 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,113 +0,0 @@ -package de.intevation.flys.wsplgen; - -import java.io.IOException; -import java.io.File; - -import org.apache.log4j.Logger; - -import de.intevation.artifacts.CallContext; - -import de.intevation.flys.artifacts.model.WSPLGENJob; - - -public class JobExecutor { - - public static final String WSPLGEN_PARAMETER_FILE = - "wsplgen.par"; - - public static final String WSPLGEN_BIN_PATH = - System.getProperty("wsplgen.bin.path"); - - - private Logger logger = Logger.getLogger(JobExecutor.class); - - private Process process; - - protected WSPLGENJob job; - - protected JobObserver logObserver; - protected ProblemObserver errorObserver; - - - public JobExecutor(WSPLGENJob job) { - this.job = job; - this.logObserver = new JobObserver(job); - this.errorObserver = new ProblemObserver(job); - } - - - public void execute() { - File dir = job.getWorkingDir(); - File parameter = new File(dir, WSPLGEN_PARAMETER_FILE); - - String[] args = new String[] { - WSPLGEN_BIN_PATH, - "-PAR=\"" + parameter.getAbsolutePath() + "\"" - }; - - execute(args, dir); - } - - - protected void execute(String[] args, File dir) { - logger.info("Start JobExecutor for artifact: " + dir.getName()); - - String errorMsg = null; - - try { - synchronized (this) { - process = Runtime.getRuntime().exec(args, null, dir); - - logObserver.setInputStream(process.getInputStream()); - errorObserver.setInputStream(process.getErrorStream()); - - logObserver.start(); - errorObserver.start(); - - try { - process.waitFor(); - } - catch (InterruptedException ie) { - logger.error("WSPLGEN job interrupted: " + ie.getMessage()); - } - - try { - logObserver.join(); - errorObserver.join(); - } - catch (InterruptedException iee) { /* do nothing */ } - - logger.info("WSPLGEN exit value: " + process.exitValue()); - logger.info( - "WSPLGEN throw " + - errorObserver.numErrors() + " errors."); - logger.info( - "WSPLGEN throw " + - errorObserver.numWarnings() + " warnings."); - - if (process.exitValue() < 2 && errorObserver.numErrors() == 0) { - FacetCreator fc = job.getFacetCreator(); - fc.createWSPLGENFacet(); - fc.finish(); - } - - job.getCallContext().afterBackground(CallContext.STORE); - - return; - } - } - catch (SecurityException se) { - logger.error(se); - } - catch (IOException ioe) { - logger.error(ioe); - } - catch (NullPointerException npe) { - logger.error(npe, npe); - } - catch (IndexOutOfBoundsException ioobe) { - logger.error(ioobe, ioobe); - } - } -} -// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf-8 : diff -r baefcfba97aa -r 368040e5c400 flys-artifacts/src/main/java/de/intevation/flys/wsplgen/Scheduler.java --- a/flys-artifacts/src/main/java/de/intevation/flys/wsplgen/Scheduler.java Fri Dec 09 16:39:08 2011 +0000 +++ b/flys-artifacts/src/main/java/de/intevation/flys/wsplgen/Scheduler.java Mon Dec 12 08:15:12 2011 +0000 @@ -1,20 +1,44 @@ package de.intevation.flys.wsplgen; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; import org.apache.log4j.Logger; +import de.intevation.artifacts.CallContext; + import de.intevation.flys.artifacts.model.WSPLGENJob; -public class Scheduler implements Runnable { +/** + * The Scheduler is used to retrieve new WSPLGENJob. The incoming jobs are added + * to a ScheduledThreadPoolExecutor. This thread pool has a number of worker + * threads that processes the WSPLGENJobs. The number of worker threads can be + * set using a System property wsplgen.max.threads ; its default value is + * 1. + * + * @author Ingo Weinzierl + */ +public class Scheduler { - public static final int MAX_WSPLGEN_PROCESSES = 1; + private class FutureJob { + public Future future; + public WSPLGENJob job; + + public FutureJob(Future future, WSPLGENJob job) { + this.future = future; + this.job = job; + } + } + + public static final int MAX_WSPLGEN_PROCESSES = + Integer.getInteger("wsplgen.max.threads", 1); - protected List jobs; + protected ScheduledThreadPoolExecutor pool; + protected Map jobs; private static Scheduler INSTANCE; @@ -24,7 +48,8 @@ private Scheduler() { - jobs = Collections.synchronizedList(new LinkedList()); + jobs = new HashMap(); + pool = new ScheduledThreadPoolExecutor(MAX_WSPLGEN_PROCESSES); } @@ -33,76 +58,54 @@ logger.info("Create new WSPLGEN Scheduler..."); INSTANCE = new Scheduler(); - new Thread(INSTANCE).start(); } return INSTANCE; } - public void addJob(WSPLGENJob job) { - synchronized(jobs) { - jobs.add(job); + public void addJob(final WSPLGENJob job) { + synchronized (jobs) { + WSPLGENFuture f = new WSPLGENFuture(new WSPLGENCallable(this, job)); + pool.execute(f); - logger.info("New WSPLGEN job added."); + jobs.put(job.getArtifact().identifier(), new FutureJob(f, job)); - jobs.notifyAll(); + logger.info("New WSPLGEN job successfully added."); } } - public WSPLGENJob getJob() { - synchronized(jobs) { - if (!jobs.isEmpty()) { - return jobs.remove(0); - } - - return null; - } - } + /** + * Cancels a running (or queued) job. + * + * @param jobId The id of the job (which is the identifier of an Artifact). + */ + public void cancelJob(String jobId) { + logger.debug("Search job in queue: " + jobId); - - public void run() { - logger.info("WSPLGEN Scheduler started."); + synchronized (jobs) { + FutureJob fj = jobs.get(jobId); - for (;;) { - try { - doRun(); - } - catch (InterruptedException ie) { - logger.warn("Interrupt in WSPLGEN Scheduler -> restart it!"); + if (fj != null) { + logger.info("Try to cancel job: " + jobId); + + fj.future.cancel(true); + + removeJob(jobId); + + fj.job.getCallContext().afterBackground( + CallContext.STORE); + + logger.info("Canceled job: " + jobId); } } } - public void doRun() - throws InterruptedException - { - for (;;) { - final WSPLGENJob job = getJob(); - - if (job != null) { - logger.debug("Got new job to execute..."); - - Thread t = new Thread() { - public void run() { - JobExecutor executor = new JobExecutor(job); - executor.execute(); - } - }; - - t.start(); - t.join(); - } - else { - logger.info("No more jobs in Scheduler -> go sleep!"); - synchronized (jobs) { - jobs.wait(); - } - - logger.info("New jobs in Scheduler -> wake up!"); - } + protected void removeJob(String id) { + synchronized (jobs) { + jobs.remove(id); } } } diff -r baefcfba97aa -r 368040e5c400 flys-artifacts/src/main/java/de/intevation/flys/wsplgen/SchedulerSetup.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flys-artifacts/src/main/java/de/intevation/flys/wsplgen/SchedulerSetup.java Mon Dec 12 08:15:12 2011 +0000 @@ -0,0 +1,37 @@ +package de.intevation.flys.wsplgen; + +import org.w3c.dom.Document; + +import de.intevation.artifacts.GlobalContext; + +import de.intevation.artifactdatabase.LifetimeListener; + +import de.intevation.flys.artifacts.context.FLYSContext; + + +/** + * A LifetimeListener that is used to create an instance of Scheduler. This + * instance is put into the GlobalContext using FLYSContext.SCHEDULER. + * + * @author Ingo Weinzierl + */ +public class SchedulerSetup implements LifetimeListener { + + @Override + public void setup(Document document) { + } + + + @Override + public void systemUp(GlobalContext globalContext) { + Scheduler scheduler = Scheduler.getInstance(); + globalContext.put(FLYSContext.SCHEDULER, scheduler); + } + + + @Override + public void systemDown(GlobalContext globalContext) { + // TODO IMPLEMENT ME! + } +} +// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf8 : diff -r baefcfba97aa -r 368040e5c400 flys-artifacts/src/main/java/de/intevation/flys/wsplgen/WSPLGENCallable.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flys-artifacts/src/main/java/de/intevation/flys/wsplgen/WSPLGENCallable.java Mon Dec 12 08:15:12 2011 +0000 @@ -0,0 +1,140 @@ +package de.intevation.flys.wsplgen; + +import java.io.IOException; +import java.io.File; +import java.util.concurrent.Callable; + +import org.apache.log4j.Logger; + +import de.intevation.artifacts.CallContext; + +import de.intevation.flys.artifacts.model.WSPLGENJob; + + +/** + * A Callable that is used to start and observe an external Process for WSPLGEN. + * + * @author Ingo Weinzierl + */ +public class WSPLGENCallable implements Callable { + + public static final String WSPLGEN_PARAMETER_FILE = + "wsplgen.par"; + + public static final String WSPLGEN_BIN_PATH = + System.getProperty("wsplgen.bin.path"); + + + private Logger logger = Logger.getLogger(WSPLGENCallable.class); + + private Process process; + + protected Scheduler scheduler; + + protected WSPLGENJob job; + + protected JobObserver logObserver; + protected ProblemObserver errorObserver; + + + public WSPLGENCallable(Scheduler scheduler, WSPLGENJob job) { + this.scheduler = scheduler; + this.job = job; + this.logObserver = new JobObserver(job); + this.errorObserver = new ProblemObserver(job); + } + + + @Override + public WSPLGENJob call() { + File dir = job.getWorkingDir(); + File parameter = new File(dir, WSPLGEN_PARAMETER_FILE); + + String[] args = new String[] { + WSPLGEN_BIN_PATH, + "-PAR=\"" + parameter.getAbsolutePath() + "\"" + }; + + execute(args, dir); + + return job; + } + + + protected void execute(String[] args, File dir) { + logger.info("Start JobExecutor for artifact: " + dir.getName()); + + String errorMsg = null; + + try { + synchronized (this) { + process = Runtime.getRuntime().exec(args, null, dir); + + logObserver.setInputStream(process.getInputStream()); + errorObserver.setInputStream(process.getErrorStream()); + + logObserver.start(); + errorObserver.start(); + + try { + process.waitFor(); + } + catch (InterruptedException ie) { + logger.warn("WSPLGEN job interrupted: " + ie.getMessage()); + } + + try { + logObserver.join(); + errorObserver.join(); + } + catch (InterruptedException iee) { /* do nothing */ } + + logger.info("WSPLGEN exit value: " + process.exitValue()); + logger.info( + "WSPLGEN throw " + + errorObserver.numErrors() + " errors."); + logger.info( + "WSPLGEN throw " + + errorObserver.numWarnings() + " warnings."); + + if (process.exitValue() < 2 && errorObserver.numErrors() == 0) { + FacetCreator fc = job.getFacetCreator(); + fc.createWSPLGENFacet(); + fc.finish(); + } + + job.getCallContext().afterBackground(CallContext.STORE); + + scheduler.removeJob(getJob().getArtifact().identifier()); + + return; + } + } + catch (SecurityException se) { + logger.error(se); + } + catch (IOException ioe) { + logger.error(ioe); + } + catch (NullPointerException npe) { + logger.error(npe, npe); + } + catch (IndexOutOfBoundsException ioobe) { + logger.error(ioobe, ioobe); + } + } + + + public void cancelWSPLGEN() { + if (process != null) { + logger.debug("Cancel running WSPLGEN process."); + process.destroy(); + } + } + + + public WSPLGENJob getJob() { + return job; + } +} +// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf-8 : diff -r baefcfba97aa -r 368040e5c400 flys-artifacts/src/main/java/de/intevation/flys/wsplgen/WSPLGENFuture.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/flys-artifacts/src/main/java/de/intevation/flys/wsplgen/WSPLGENFuture.java Mon Dec 12 08:15:12 2011 +0000 @@ -0,0 +1,41 @@ +package de.intevation.flys.wsplgen; + +import java.util.concurrent.FutureTask; + +import org.apache.log4j.Logger; + + +/** + * This FutureTask overrides the cancel() method. Before super.cancel() + * is called, WSPLGENCallable.cancelWSPLGEN() is executed to kill a running + * WSPLGEN process. + * + * @author Ingo Weinzierl + */ +public class WSPLGENFuture extends FutureTask { + + private static final Logger logger = Logger.getLogger(WSPLGENFuture.class); + + protected WSPLGENCallable wsplgenCallable; + + + public WSPLGENFuture(WSPLGENCallable callable) { + super(callable); + this.wsplgenCallable = callable; + } + + + public WSPLGENCallable getWSPLGENCallable() { + return wsplgenCallable; + } + + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + logger.debug("WSPLGENFuture.cancel"); + + wsplgenCallable.cancelWSPLGEN(); + return super.cancel(mayInterruptIfRunning); + } +} +// vim:set ts=4 sw=4 si et sta sts=4 fenc=utf-8 :