ingo@1127: package de.intevation.flys.wsplgen;
ingo@1127:
ingo@1970: import java.util.HashMap;
ingo@1970: import java.util.Map;
ingo@1970: import java.util.concurrent.Future;
ingo@1970: import java.util.concurrent.ScheduledThreadPoolExecutor;
ingo@1127:
ingo@1127: import org.apache.log4j.Logger;
ingo@1127:
ingo@1970: import de.intevation.artifacts.CallContext;
ingo@3302: import de.intevation.flys.artifacts.model.map.WSPLGENJob;
ingo@1127:
ingo@1127:
ingo@1970: /**
ingo@1970: * The Scheduler is used to retrieve new WSPLGENJob. The incoming jobs are added
ingo@1970: * to a ScheduledThreadPoolExecutor. This thread pool has a number of worker
ingo@1970: * threads that processes the WSPLGENJobs. The number of worker threads can be
ingo@1970: * set using a System property wsplgen.max.threads ; its default value is
ingo@1970: * 1.
ingo@1970: *
ingo@1970: * @author Ingo Weinzierl
ingo@1970: */
ingo@1970: public class Scheduler {
ingo@1127:
ingo@1970: private class FutureJob {
ingo@1970: public Future future;
ingo@1970: public WSPLGENJob job;
ingo@1970:
ingo@1970: public FutureJob(Future future, WSPLGENJob job) {
ingo@1970: this.future = future;
ingo@1970: this.job = job;
ingo@1970: }
ingo@1970: }
ingo@1970:
ingo@1970: public static final int MAX_WSPLGEN_PROCESSES =
ingo@1970: Integer.getInteger("wsplgen.max.threads", 1);
ingo@1127:
ingo@1127:
ingo@1970: protected ScheduledThreadPoolExecutor pool;
ingo@1970: protected Map jobs;
ingo@1127:
ingo@1127:
ingo@1127: private static Scheduler INSTANCE;
ingo@1127:
ingo@1127: private static final Logger logger = Logger.getLogger(Scheduler.class);
ingo@1127:
ingo@1127:
ingo@1127:
ingo@1127: private Scheduler() {
ingo@1970: jobs = new HashMap();
ingo@1970: pool = new ScheduledThreadPoolExecutor(MAX_WSPLGEN_PROCESSES);
ingo@1127: }
ingo@1127:
ingo@1127:
ingo@1127: public static Scheduler getInstance() {
ingo@1127: if (INSTANCE == null) {
ingo@1127: logger.info("Create new WSPLGEN Scheduler...");
ingo@1127:
ingo@1127: INSTANCE = new Scheduler();
ingo@1127: }
ingo@1127:
ingo@1127: return INSTANCE;
ingo@1127: }
ingo@1127:
ingo@1127:
ingo@1970: public void addJob(final WSPLGENJob job) {
ingo@1970: synchronized (jobs) {
ingo@1970: WSPLGENFuture f = new WSPLGENFuture(new WSPLGENCallable(this, job));
ingo@1970: pool.execute(f);
ingo@1127:
ingo@1970: jobs.put(job.getArtifact().identifier(), new FutureJob(f, job));
ingo@1127:
ingo@1970: logger.info("New WSPLGEN job successfully added.");
ingo@1127: }
ingo@1127: }
ingo@1127:
ingo@1127:
ingo@1970: /**
ingo@1970: * Cancels a running (or queued) job.
ingo@1970: *
ingo@1970: * @param jobId The id of the job (which is the identifier of an Artifact).
ingo@1970: */
ingo@1970: public void cancelJob(String jobId) {
ingo@1970: logger.debug("Search job in queue: " + jobId);
ingo@1127:
ingo@1970: synchronized (jobs) {
ingo@1970: FutureJob fj = jobs.get(jobId);
ingo@1127:
ingo@1970: if (fj != null) {
ingo@1970: logger.info("Try to cancel job: " + jobId);
ingo@1970:
ingo@1970: fj.future.cancel(true);
ingo@1970:
ingo@1970: removeJob(jobId);
ingo@1970:
ingo@1970: fj.job.getCallContext().afterBackground(
ingo@1970: CallContext.STORE);
ingo@1970:
ingo@1970: logger.info("Canceled job: " + jobId);
ingo@1127: }
ingo@1127: }
ingo@1127: }
ingo@1127:
ingo@1127:
ingo@1970: protected void removeJob(String id) {
ingo@1970: synchronized (jobs) {
ingo@1970: jobs.remove(id);
ingo@1127: }
ingo@1127: }
ingo@1127: }
ingo@1127: // vim:set ts=4 sw=4 si et sta sts=4 fenc=utf-8 :