ingo@1127: package de.intevation.flys.wsplgen; ingo@1127: ingo@1970: import java.util.HashMap; ingo@1970: import java.util.Map; ingo@1970: import java.util.concurrent.Future; ingo@1970: import java.util.concurrent.ScheduledThreadPoolExecutor; ingo@1127: ingo@1127: import org.apache.log4j.Logger; ingo@1127: ingo@1970: import de.intevation.artifacts.CallContext; ingo@3302: import de.intevation.flys.artifacts.model.map.WSPLGENJob; ingo@1127: ingo@1127: ingo@1970: /** ingo@1970: * The Scheduler is used to retrieve new WSPLGENJob. The incoming jobs are added ingo@1970: * to a ScheduledThreadPoolExecutor. This thread pool has a number of worker ingo@1970: * threads that processes the WSPLGENJobs. The number of worker threads can be ingo@1970: * set using a System property wsplgen.max.threads ; its default value is ingo@1970: * 1. ingo@1970: * ingo@1970: * @author Ingo Weinzierl ingo@1970: */ ingo@1970: public class Scheduler { ingo@1127: ingo@1970: private class FutureJob { ingo@1970: public Future future; ingo@1970: public WSPLGENJob job; ingo@1970: ingo@1970: public FutureJob(Future future, WSPLGENJob job) { ingo@1970: this.future = future; ingo@1970: this.job = job; ingo@1970: } ingo@1970: } ingo@1970: ingo@1970: public static final int MAX_WSPLGEN_PROCESSES = ingo@1970: Integer.getInteger("wsplgen.max.threads", 1); ingo@1127: ingo@1127: ingo@1970: protected ScheduledThreadPoolExecutor pool; ingo@1970: protected Map jobs; ingo@1127: ingo@1127: ingo@1127: private static Scheduler INSTANCE; ingo@1127: ingo@1127: private static final Logger logger = Logger.getLogger(Scheduler.class); ingo@1127: ingo@1127: ingo@1127: ingo@1127: private Scheduler() { ingo@1970: jobs = new HashMap(); ingo@1970: pool = new ScheduledThreadPoolExecutor(MAX_WSPLGEN_PROCESSES); ingo@1127: } ingo@1127: ingo@1127: ingo@1127: public static Scheduler getInstance() { ingo@1127: if (INSTANCE == null) { ingo@1127: logger.info("Create new WSPLGEN Scheduler..."); ingo@1127: ingo@1127: INSTANCE = new Scheduler(); ingo@1127: } ingo@1127: ingo@1127: return INSTANCE; ingo@1127: } ingo@1127: ingo@1127: ingo@1970: public void addJob(final WSPLGENJob job) { ingo@1970: synchronized (jobs) { ingo@1970: WSPLGENFuture f = new WSPLGENFuture(new WSPLGENCallable(this, job)); ingo@1970: pool.execute(f); ingo@1127: ingo@1970: jobs.put(job.getArtifact().identifier(), new FutureJob(f, job)); ingo@1127: ingo@1970: logger.info("New WSPLGEN job successfully added."); ingo@1127: } ingo@1127: } ingo@1127: ingo@1127: ingo@1970: /** ingo@1970: * Cancels a running (or queued) job. ingo@1970: * ingo@1970: * @param jobId The id of the job (which is the identifier of an Artifact). ingo@1970: */ ingo@1970: public void cancelJob(String jobId) { ingo@1970: logger.debug("Search job in queue: " + jobId); ingo@1127: ingo@1970: synchronized (jobs) { ingo@1970: FutureJob fj = jobs.get(jobId); ingo@1127: ingo@1970: if (fj != null) { ingo@1970: logger.info("Try to cancel job: " + jobId); ingo@1970: ingo@1970: fj.future.cancel(true); ingo@1970: ingo@1970: removeJob(jobId); ingo@1970: ingo@1970: fj.job.getCallContext().afterBackground( ingo@1970: CallContext.STORE); ingo@1970: ingo@1970: logger.info("Canceled job: " + jobId); ingo@1127: } ingo@1127: } ingo@1127: } ingo@1127: ingo@1127: ingo@1970: protected void removeJob(String id) { ingo@1970: synchronized (jobs) { ingo@1970: jobs.remove(id); ingo@1127: } ingo@1127: } ingo@1127: } ingo@1127: // vim:set ts=4 sw=4 si et sta sts=4 fenc=utf-8 :