ingo@1127: package de.intevation.flys.wsplgen;
ingo@1127: 
ingo@1970: import java.util.HashMap;
ingo@1970: import java.util.Map;
ingo@1970: import java.util.concurrent.Future;
ingo@1970: import java.util.concurrent.ScheduledThreadPoolExecutor;
ingo@1127: 
ingo@1127: import org.apache.log4j.Logger;
ingo@1127: 
ingo@1970: import de.intevation.artifacts.CallContext;
ingo@3302: import de.intevation.flys.artifacts.model.map.WSPLGENJob;
ingo@1127: 
ingo@1127: 
ingo@1970: /**
ingo@1970:  * The Scheduler is used to retrieve new WSPLGENJob. The incoming jobs are added
ingo@1970:  * to a ScheduledThreadPoolExecutor. This thread pool has a number of worker
ingo@1970:  * threads that processes the WSPLGENJobs. The number of worker threads can be
ingo@1970:  * set using a System property <i>wsplgen.max.threads</i> ; its default value is
ingo@1970:  * 1.
ingo@1970:  *
ingo@1970:  * @author <a href="mailto:ingo.weinzierl@intevation.de">Ingo Weinzierl</a>
ingo@1970:  */
ingo@1970: public class Scheduler {
ingo@1127: 
ingo@1970:     private class FutureJob {
ingo@1970:         public Future     future;
ingo@1970:         public WSPLGENJob job;
ingo@1970: 
ingo@1970:         public FutureJob(Future future, WSPLGENJob job) {
ingo@1970:             this.future = future;
ingo@1970:             this.job    = job;
ingo@1970:         }
ingo@1970:     }
ingo@1970: 
ingo@1970:     public static final int MAX_WSPLGEN_PROCESSES =
ingo@1970:         Integer.getInteger("wsplgen.max.threads", 1);
ingo@1127: 
ingo@1127: 
ingo@1970:     protected ScheduledThreadPoolExecutor pool;
ingo@1970:     protected Map<String, FutureJob> jobs;
ingo@1127: 
ingo@1127: 
ingo@1127:     private static Scheduler INSTANCE;
ingo@1127: 
ingo@1127:     private static final Logger logger = Logger.getLogger(Scheduler.class);
ingo@1127: 
ingo@1127: 
ingo@1127: 
ingo@1127:     private Scheduler() {
ingo@1970:         jobs = new HashMap<String, FutureJob>();
ingo@1970:         pool = new ScheduledThreadPoolExecutor(MAX_WSPLGEN_PROCESSES);
ingo@1127:     }
ingo@1127: 
ingo@1127: 
ingo@1127:     public static Scheduler getInstance() {
ingo@1127:         if (INSTANCE == null) {
ingo@1127:             logger.info("Create new WSPLGEN Scheduler...");
ingo@1127: 
ingo@1127:             INSTANCE = new Scheduler();
ingo@1127:         }
ingo@1127: 
ingo@1127:         return INSTANCE;
ingo@1127:     }
ingo@1127: 
ingo@1127: 
ingo@1970:     public void addJob(final WSPLGENJob job) {
ingo@1970:         synchronized (jobs) {
ingo@1970:             WSPLGENFuture f = new WSPLGENFuture(new WSPLGENCallable(this, job));
ingo@1970:             pool.execute(f);
ingo@1127: 
ingo@1970:             jobs.put(job.getArtifact().identifier(), new FutureJob(f, job));
ingo@1127: 
ingo@1970:             logger.info("New WSPLGEN job successfully added.");
ingo@1127:         }
ingo@1127:     }
ingo@1127: 
ingo@1127: 
ingo@1970:     /**
ingo@1970:      * Cancels a running (or queued) job.
ingo@1970:      *
ingo@1970:      * @param jobId The id of the job (which is the identifier of an Artifact).
ingo@1970:      */
ingo@1970:     public void cancelJob(String jobId) {
ingo@1970:         logger.debug("Search job in queue: " + jobId);
ingo@1127: 
ingo@1970:         synchronized (jobs) {
ingo@1970:             FutureJob fj = jobs.get(jobId);
ingo@1127: 
ingo@1970:             if (fj != null) {
ingo@1970:                 logger.info("Try to cancel job: " + jobId);
ingo@1970: 
ingo@1970:                 fj.future.cancel(true);
ingo@1970: 
ingo@1970:                 removeJob(jobId);
ingo@1970: 
ingo@1970:                 fj.job.getCallContext().afterBackground(
ingo@1970:                     CallContext.STORE);
ingo@1970: 
ingo@1970:                 logger.info("Canceled job: " + jobId);
ingo@1127:             }
ingo@1127:         }
ingo@1127:     }
ingo@1127: 
ingo@1127: 
ingo@1970:     protected void removeJob(String id) {
ingo@1970:         synchronized (jobs) {
ingo@1970:             jobs.remove(id);
ingo@1127:         }
ingo@1127:     }
ingo@1127: }
ingo@1127: // vim:set ts=4 sw=4 si et sta sts=4 fenc=utf-8 :