Mercurial > dive4elements > river
diff flys-artifacts/src/main/java/de/intevation/flys/wsplgen/Scheduler.java @ 1970:368040e5c400
Improved the Scheduler to be able to cancel running WSPLGEN jobs.
flys-artifacts/trunk@3384 c6561f87-3c4e-4783-a992-168aeb5c3f6f
author | Ingo Weinzierl <ingo.weinzierl@intevation.de> |
---|---|
date | Mon, 12 Dec 2011 08:15:12 +0000 |
parents | 6b9877a9f6c1 |
children | 453d2d0c4258 |
line wrap: on
line diff
--- a/flys-artifacts/src/main/java/de/intevation/flys/wsplgen/Scheduler.java Fri Dec 09 16:39:08 2011 +0000 +++ b/flys-artifacts/src/main/java/de/intevation/flys/wsplgen/Scheduler.java Mon Dec 12 08:15:12 2011 +0000 @@ -1,20 +1,44 @@ package de.intevation.flys.wsplgen; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; import org.apache.log4j.Logger; +import de.intevation.artifacts.CallContext; + import de.intevation.flys.artifacts.model.WSPLGENJob; -public class Scheduler implements Runnable { +/** + * The Scheduler is used to retrieve new WSPLGENJob. The incoming jobs are added + * to a ScheduledThreadPoolExecutor. This thread pool has a number of worker + * threads that processes the WSPLGENJobs. The number of worker threads can be + * set using a System property <i>wsplgen.max.threads</i> ; its default value is + * 1. + * + * @author <a href="mailto:ingo.weinzierl@intevation.de">Ingo Weinzierl</a> + */ +public class Scheduler { - public static final int MAX_WSPLGEN_PROCESSES = 1; + private class FutureJob { + public Future future; + public WSPLGENJob job; + + public FutureJob(Future future, WSPLGENJob job) { + this.future = future; + this.job = job; + } + } + + public static final int MAX_WSPLGEN_PROCESSES = + Integer.getInteger("wsplgen.max.threads", 1); - protected List<WSPLGENJob> jobs; + protected ScheduledThreadPoolExecutor pool; + protected Map<String, FutureJob> jobs; private static Scheduler INSTANCE; @@ -24,7 +48,8 @@ private Scheduler() { - jobs = Collections.synchronizedList(new LinkedList<WSPLGENJob>()); + jobs = new HashMap<String, FutureJob>(); + pool = new ScheduledThreadPoolExecutor(MAX_WSPLGEN_PROCESSES); } @@ -33,76 +58,54 @@ logger.info("Create new WSPLGEN Scheduler..."); INSTANCE = new Scheduler(); - new Thread(INSTANCE).start(); } return INSTANCE; } - public void addJob(WSPLGENJob job) { - synchronized(jobs) { - jobs.add(job); + public void addJob(final WSPLGENJob job) { + synchronized (jobs) { + WSPLGENFuture f = new WSPLGENFuture(new WSPLGENCallable(this, job)); + pool.execute(f); - logger.info("New WSPLGEN job added."); + jobs.put(job.getArtifact().identifier(), new FutureJob(f, job)); - jobs.notifyAll(); + logger.info("New WSPLGEN job successfully added."); } } - public WSPLGENJob getJob() { - synchronized(jobs) { - if (!jobs.isEmpty()) { - return jobs.remove(0); - } - - return null; - } - } + /** + * Cancels a running (or queued) job. + * + * @param jobId The id of the job (which is the identifier of an Artifact). + */ + public void cancelJob(String jobId) { + logger.debug("Search job in queue: " + jobId); - - public void run() { - logger.info("WSPLGEN Scheduler started."); + synchronized (jobs) { + FutureJob fj = jobs.get(jobId); - for (;;) { - try { - doRun(); - } - catch (InterruptedException ie) { - logger.warn("Interrupt in WSPLGEN Scheduler -> restart it!"); + if (fj != null) { + logger.info("Try to cancel job: " + jobId); + + fj.future.cancel(true); + + removeJob(jobId); + + fj.job.getCallContext().afterBackground( + CallContext.STORE); + + logger.info("Canceled job: " + jobId); } } } - public void doRun() - throws InterruptedException - { - for (;;) { - final WSPLGENJob job = getJob(); - - if (job != null) { - logger.debug("Got new job to execute..."); - - Thread t = new Thread() { - public void run() { - JobExecutor executor = new JobExecutor(job); - executor.execute(); - } - }; - - t.start(); - t.join(); - } - else { - logger.info("No more jobs in Scheduler -> go sleep!"); - synchronized (jobs) { - jobs.wait(); - } - - logger.info("New jobs in Scheduler -> wake up!"); - } + protected void removeJob(String id) { + synchronized (jobs) { + jobs.remove(id); } } }