view nspr/pr/src/misc/prtpool.c @ 3:150b72113545

Add DBM and legacydb support
author Andre Heinecke <andre.heinecke@intevation.de>
date Tue, 05 Aug 2014 18:32:02 +0200
parents 1e5118fa0cb1
children
line wrap: on
line source
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

#include "nspr.h"

/*
 * Thread pools
 *	Thread pools create and manage threads to provide support for
 *	scheduling jobs onto one or more threads.
 *
 */
#ifdef OPT_WINNT
#include <windows.h>
#endif

/*
 * worker thread
 */
typedef struct wthread {
	PRCList		links;
	PRThread	*thread;
} wthread;

/*
 * queue of timer jobs
 */
typedef struct timer_jobq {
	PRCList		list;
	PRLock		*lock;
	PRCondVar	*cv;
	PRInt32		cnt;
	PRCList 	wthreads;
} timer_jobq;

/*
 * queue of jobs
 */
typedef struct tp_jobq {
	PRCList		list;
	PRInt32		cnt;
	PRLock		*lock;
	PRCondVar	*cv;
	PRCList 	wthreads;
#ifdef OPT_WINNT
	HANDLE		nt_completion_port;
#endif
} tp_jobq;

/*
 * queue of IO jobs
 */
typedef struct io_jobq {
	PRCList		list;
	PRPollDesc  *pollfds;
	PRInt32  	npollfds;
	PRJob		**polljobs;
	PRLock		*lock;
	PRInt32		cnt;
	PRFileDesc	*notify_fd;
	PRCList 	wthreads;
} io_jobq;

/*
 * Threadpool
 */
struct PRThreadPool {
	PRInt32		init_threads;
	PRInt32		max_threads;
	PRInt32		current_threads;
	PRInt32		idle_threads;
	PRUint32	stacksize;
	tp_jobq		jobq;
	io_jobq		ioq;
	timer_jobq	timerq;
	PRLock		*join_lock;		/* used with jobp->join_cv */
	PRCondVar	*shutdown_cv;
	PRBool		shutdown;
};

typedef enum io_op_type
	{ JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type;

#ifdef OPT_WINNT
typedef struct NT_notifier {
	OVERLAPPED overlapped;		/* must be first */
	PRJob	*jobp;
} NT_notifier;
#endif

struct PRJob {
	PRCList			links;		/* 	for linking jobs */
	PRBool			on_ioq;		/* job on ioq */
	PRBool			on_timerq;	/* job on timerq */
	PRJobFn			job_func;
	void 			*job_arg;
	PRCondVar		*join_cv;
	PRBool			join_wait;	/* == PR_TRUE, when waiting to join */
	PRCondVar		*cancel_cv;	/* for cancelling IO jobs */
	PRBool			cancel_io;	/* for cancelling IO jobs */
	PRThreadPool	*tpool;		/* back pointer to thread pool */
	PRJobIoDesc		*iod;
	io_op_type		io_op;
	PRInt16			io_poll_flags;
	PRNetAddr		*netaddr;
	PRIntervalTime	timeout;	/* relative value */
	PRIntervalTime	absolute;
#ifdef OPT_WINNT
	NT_notifier		nt_notifier;	
#endif
};

#define JOB_LINKS_PTR(_qp) \
    ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links)))

#define WTHREAD_LINKS_PTR(_qp) \
    ((wthread *) ((char *) (_qp) - offsetof(wthread, links)))

#define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv)

#define JOIN_NOTIFY(_jobp)								\
				PR_BEGIN_MACRO							\
				PR_Lock(_jobp->tpool->join_lock);		\
				_jobp->join_wait = PR_FALSE;			\
				PR_NotifyCondVar(_jobp->join_cv);		\
				PR_Unlock(_jobp->tpool->join_lock);		\
				PR_END_MACRO

#define CANCEL_IO_JOB(jobp)								\
				PR_BEGIN_MACRO							\
				jobp->cancel_io = PR_FALSE;				\
				jobp->on_ioq = PR_FALSE;				\
				PR_REMOVE_AND_INIT_LINK(&jobp->links);	\
				tp->ioq.cnt--;							\
				PR_NotifyCondVar(jobp->cancel_cv);		\
				PR_END_MACRO

static void delete_job(PRJob *jobp);
static PRThreadPool * alloc_threadpool(void);
static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp);
static void notify_ioq(PRThreadPool *tp);
static void notify_timerq(PRThreadPool *tp);

/*
 * locks are acquired in the following order
 *
 *	tp->ioq.lock,tp->timerq.lock
 *			|
 *			V
 *		tp->jobq->lock		
 */

/*
 * worker thread function
 */
static void wstart(void *arg)
{
PRThreadPool *tp = (PRThreadPool *) arg;
PRCList *head;

	/*
	 * execute jobs until shutdown
	 */
	while (!tp->shutdown) {
		PRJob *jobp;
#ifdef OPT_WINNT
		BOOL rv;
		DWORD unused, shutdown;
		LPOVERLAPPED olp;

		PR_Lock(tp->jobq.lock);
		tp->idle_threads++;
		PR_Unlock(tp->jobq.lock);
		rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port,
					&unused, &shutdown, &olp, INFINITE);
		
		PR_ASSERT(rv);
		if (shutdown)
			break;
		jobp = ((NT_notifier *) olp)->jobp;
		PR_Lock(tp->jobq.lock);
		tp->idle_threads--;
		tp->jobq.cnt--;
		PR_Unlock(tp->jobq.lock);
#else

		PR_Lock(tp->jobq.lock);
		while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) {
			tp->idle_threads++;
			PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT);
			tp->idle_threads--;
		}	
		if (tp->shutdown) {
			PR_Unlock(tp->jobq.lock);
			break;
		}
		head = PR_LIST_HEAD(&tp->jobq.list);
		/*
		 * remove job from queue
		 */
		PR_REMOVE_AND_INIT_LINK(head);
		tp->jobq.cnt--;
		jobp = JOB_LINKS_PTR(head);
		PR_Unlock(tp->jobq.lock);
#endif

		jobp->job_func(jobp->job_arg);
		if (!JOINABLE_JOB(jobp)) {
			delete_job(jobp);
		} else {
			JOIN_NOTIFY(jobp);
		}
	}
	PR_Lock(tp->jobq.lock);
	tp->current_threads--;
	PR_Unlock(tp->jobq.lock);
}

/*
 * add a job to the work queue
 */
static void
add_to_jobq(PRThreadPool *tp, PRJob *jobp)
{
	/*
	 * add to jobq
	 */
#ifdef OPT_WINNT
	PR_Lock(tp->jobq.lock);
	tp->jobq.cnt++;
	PR_Unlock(tp->jobq.lock);
	/*
	 * notify worker thread(s)
	 */
	PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0,
            FALSE, &jobp->nt_notifier.overlapped);
#else
	PR_Lock(tp->jobq.lock);
	PR_APPEND_LINK(&jobp->links,&tp->jobq.list);
	tp->jobq.cnt++;
	if ((tp->idle_threads < tp->jobq.cnt) &&
					(tp->current_threads < tp->max_threads)) {
		wthread *wthrp;
		/*
		 * increment thread count and unlock the jobq lock
		 */
		tp->current_threads++;
		PR_Unlock(tp->jobq.lock);
		/* create new worker thread */
		wthrp = PR_NEWZAP(wthread);
		if (wthrp) {
			wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart,
						tp, PR_PRIORITY_NORMAL,
						PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize);
			if (NULL == wthrp->thread) {
				PR_DELETE(wthrp);  /* this sets wthrp to NULL */
			}
		}
		PR_Lock(tp->jobq.lock);
		if (NULL == wthrp) {
			tp->current_threads--;
		} else {
			PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
		}
	}
	/*
	 * wakeup a worker thread
	 */
	PR_NotifyCondVar(tp->jobq.cv);
	PR_Unlock(tp->jobq.lock);
#endif
}

/*
 * io worker thread function
 */
static void io_wstart(void *arg)
{
PRThreadPool *tp = (PRThreadPool *) arg;
int pollfd_cnt, pollfds_used;
int rv;
PRCList *qp, *nextqp;
PRPollDesc *pollfds;
PRJob **polljobs;
int poll_timeout;
PRIntervalTime now;

	/*
	 * scan io_jobq
	 * construct poll list
	 * call PR_Poll
	 * for all fds, for which poll returns true, move the job to
	 * jobq and wakeup worker thread.
	 */
	while (!tp->shutdown) {
		PRJob *jobp;

		pollfd_cnt = tp->ioq.cnt + 10;
		if (pollfd_cnt > tp->ioq.npollfds) {

			/*
			 * re-allocate pollfd array if the current one is not large
			 * enough
			 */
			if (NULL != tp->ioq.pollfds)
				PR_Free(tp->ioq.pollfds);
			tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt *
						(sizeof(PRPollDesc) + sizeof(PRJob *)));
			PR_ASSERT(NULL != tp->ioq.pollfds);
			/*
			 * array of pollfds
			 */
			pollfds = tp->ioq.pollfds;
			tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]);
			/*
			 * parallel array of jobs
			 */
			polljobs = tp->ioq.polljobs;
			tp->ioq.npollfds = pollfd_cnt;
		}

		pollfds_used = 0;
		/*
		 * add the notify fd; used for unblocking io thread(s)
		 */
		pollfds[pollfds_used].fd = tp->ioq.notify_fd;
		pollfds[pollfds_used].in_flags = PR_POLL_READ;
		pollfds[pollfds_used].out_flags = 0;
		polljobs[pollfds_used] = NULL;
		pollfds_used++;
		/*
		 * fill in the pollfd array
		 */
		PR_Lock(tp->ioq.lock);
		for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
			nextqp = qp->next;
			jobp = JOB_LINKS_PTR(qp);
			if (jobp->cancel_io) {
				CANCEL_IO_JOB(jobp);
				continue;
			}
			if (pollfds_used == (pollfd_cnt))
				break;
			pollfds[pollfds_used].fd = jobp->iod->socket;
			pollfds[pollfds_used].in_flags = jobp->io_poll_flags;
			pollfds[pollfds_used].out_flags = 0;
			polljobs[pollfds_used] = jobp;

			pollfds_used++;
		}
		if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) {
			qp = tp->ioq.list.next;
			jobp = JOB_LINKS_PTR(qp);
			if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
				poll_timeout = PR_INTERVAL_NO_TIMEOUT;
			else if (PR_INTERVAL_NO_WAIT == jobp->timeout)
				poll_timeout = PR_INTERVAL_NO_WAIT;
			else {
				poll_timeout = jobp->absolute - PR_IntervalNow();
				if (poll_timeout <= 0) /* already timed out */
					poll_timeout = PR_INTERVAL_NO_WAIT;
			}
		} else {
			poll_timeout = PR_INTERVAL_NO_TIMEOUT;
		}
		PR_Unlock(tp->ioq.lock);

		/*
		 * XXXX
		 * should retry if more jobs have been added to the queue?
		 *
		 */
		PR_ASSERT(pollfds_used <= pollfd_cnt);
		rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout);

		if (tp->shutdown) {
			break;
		}

		if (rv > 0) {
			/*
			 * at least one io event is set
			 */
			PRStatus rval_status;
			PRInt32 index;

			PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd);
			/*
			 * reset the pollable event, if notified
			 */
			if (pollfds[0].out_flags & PR_POLL_READ) {
				rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd);
				PR_ASSERT(PR_SUCCESS == rval_status);
			}

			for(index = 1; index < (pollfds_used); index++) {
                PRInt16 events = pollfds[index].in_flags;
                PRInt16 revents = pollfds[index].out_flags;	
				jobp = polljobs[index];	

                if ((revents & PR_POLL_NVAL) ||  /* busted in all cases */
                	(revents & PR_POLL_ERR) ||
                			((events & PR_POLL_WRITE) &&
							(revents & PR_POLL_HUP))) { /* write op & hup */
					PR_Lock(tp->ioq.lock);
					if (jobp->cancel_io) {
						CANCEL_IO_JOB(jobp);
						PR_Unlock(tp->ioq.lock);
						continue;
					}
					PR_REMOVE_AND_INIT_LINK(&jobp->links);
					tp->ioq.cnt--;
					jobp->on_ioq = PR_FALSE;
					PR_Unlock(tp->ioq.lock);

					/* set error */
                    if (PR_POLL_NVAL & revents)
						jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR;
                    else if (PR_POLL_HUP & revents)
						jobp->iod->error = PR_CONNECT_RESET_ERROR;
                    else 
						jobp->iod->error = PR_IO_ERROR;

					/*
					 * add to jobq
					 */
					add_to_jobq(tp, jobp);
				} else if (revents) {
					/*
					 * add to jobq
					 */
					PR_Lock(tp->ioq.lock);
					if (jobp->cancel_io) {
						CANCEL_IO_JOB(jobp);
						PR_Unlock(tp->ioq.lock);
						continue;
					}
					PR_REMOVE_AND_INIT_LINK(&jobp->links);
					tp->ioq.cnt--;
					jobp->on_ioq = PR_FALSE;
					PR_Unlock(tp->ioq.lock);

					if (jobp->io_op == JOB_IO_CONNECT) {
						if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS)
							jobp->iod->error = 0;
						else
							jobp->iod->error = PR_GetError();
					} else
						jobp->iod->error = 0;

					add_to_jobq(tp, jobp);
				}
			}
		}
		/*
		 * timeout processing
		 */
		now = PR_IntervalNow();
		PR_Lock(tp->ioq.lock);
		for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
			nextqp = qp->next;
			jobp = JOB_LINKS_PTR(qp);
			if (jobp->cancel_io) {
				CANCEL_IO_JOB(jobp);
				continue;
			}
			if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
				break;
			if ((PR_INTERVAL_NO_WAIT != jobp->timeout) &&
								((PRInt32)(jobp->absolute - now) > 0))
				break;
			PR_REMOVE_AND_INIT_LINK(&jobp->links);
			tp->ioq.cnt--;
			jobp->on_ioq = PR_FALSE;
			jobp->iod->error = PR_IO_TIMEOUT_ERROR;
			add_to_jobq(tp, jobp);
		}
		PR_Unlock(tp->ioq.lock);
	}
}

/*
 * timer worker thread function
 */
static void timer_wstart(void *arg)
{
PRThreadPool *tp = (PRThreadPool *) arg;
PRCList *qp;
PRIntervalTime timeout;
PRIntervalTime now;

	/*
	 * call PR_WaitCondVar with minimum value of all timeouts
	 */
	while (!tp->shutdown) {
		PRJob *jobp;

		PR_Lock(tp->timerq.lock);
		if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
			timeout = PR_INTERVAL_NO_TIMEOUT;
		} else {
			PRCList *qp;

			qp = tp->timerq.list.next;
			jobp = JOB_LINKS_PTR(qp);

			timeout = jobp->absolute - PR_IntervalNow();
            if (timeout <= 0)
				timeout = PR_INTERVAL_NO_WAIT;  /* already timed out */
		}
		if (PR_INTERVAL_NO_WAIT != timeout)
			PR_WaitCondVar(tp->timerq.cv, timeout);
		if (tp->shutdown) {
			PR_Unlock(tp->timerq.lock);
			break;
		}
		/*
		 * move expired-timer jobs to jobq
		 */
		now = PR_IntervalNow();	
		while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
			qp = tp->timerq.list.next;
			jobp = JOB_LINKS_PTR(qp);

			if ((PRInt32)(jobp->absolute - now) > 0) {
				break;
			}
			/*
			 * job timed out
			 */
			PR_REMOVE_AND_INIT_LINK(&jobp->links);
			tp->timerq.cnt--;
			jobp->on_timerq = PR_FALSE;
			add_to_jobq(tp, jobp);
		}
		PR_Unlock(tp->timerq.lock);
	}
}

static void
delete_threadpool(PRThreadPool *tp)
{
	if (NULL != tp) {
		if (NULL != tp->shutdown_cv)
			PR_DestroyCondVar(tp->shutdown_cv);
		if (NULL != tp->jobq.cv)
			PR_DestroyCondVar(tp->jobq.cv);
		if (NULL != tp->jobq.lock)
			PR_DestroyLock(tp->jobq.lock);
		if (NULL != tp->join_lock)
			PR_DestroyLock(tp->join_lock);
#ifdef OPT_WINNT
		if (NULL != tp->jobq.nt_completion_port)
			CloseHandle(tp->jobq.nt_completion_port);
#endif
		/* Timer queue */
		if (NULL != tp->timerq.cv)
			PR_DestroyCondVar(tp->timerq.cv);
		if (NULL != tp->timerq.lock)
			PR_DestroyLock(tp->timerq.lock);

		if (NULL != tp->ioq.lock)
			PR_DestroyLock(tp->ioq.lock);
		if (NULL != tp->ioq.pollfds)
			PR_Free(tp->ioq.pollfds);
		if (NULL != tp->ioq.notify_fd)
			PR_DestroyPollableEvent(tp->ioq.notify_fd);
		PR_Free(tp);
	}
	return;
}

static PRThreadPool *
alloc_threadpool(void)
{
PRThreadPool *tp;

	tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp));
	if (NULL == tp)
		goto failed;
	tp->jobq.lock = PR_NewLock();
	if (NULL == tp->jobq.lock)
		goto failed;
	tp->jobq.cv = PR_NewCondVar(tp->jobq.lock);
	if (NULL == tp->jobq.cv)
		goto failed;
	tp->join_lock = PR_NewLock();
	if (NULL == tp->join_lock)
		goto failed;
#ifdef OPT_WINNT
	tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
									NULL, 0, 0);
	if (NULL == tp->jobq.nt_completion_port)
		goto failed;
#endif

	tp->ioq.lock = PR_NewLock();
	if (NULL == tp->ioq.lock)
		goto failed;

	/* Timer queue */

	tp->timerq.lock = PR_NewLock();
	if (NULL == tp->timerq.lock)
		goto failed;
	tp->timerq.cv = PR_NewCondVar(tp->timerq.lock);
	if (NULL == tp->timerq.cv)
		goto failed;

	tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock);
	if (NULL == tp->shutdown_cv)
		goto failed;
	tp->ioq.notify_fd = PR_NewPollableEvent();
	if (NULL == tp->ioq.notify_fd)
		goto failed;
	return tp;
failed:
	delete_threadpool(tp);
	PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
	return NULL;
}

/* Create thread pool */
PR_IMPLEMENT(PRThreadPool *)
PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads,
                                PRUint32 stacksize)
{
PRThreadPool *tp;
PRThread *thr;
int i;
wthread *wthrp;

	tp = alloc_threadpool();
	if (NULL == tp)
		return NULL;

	tp->init_threads = initial_threads;
	tp->max_threads = max_threads;
	tp->stacksize = stacksize;
	PR_INIT_CLIST(&tp->jobq.list);
	PR_INIT_CLIST(&tp->ioq.list);
	PR_INIT_CLIST(&tp->timerq.list);
	PR_INIT_CLIST(&tp->jobq.wthreads);
	PR_INIT_CLIST(&tp->ioq.wthreads);
	PR_INIT_CLIST(&tp->timerq.wthreads);
	tp->shutdown = PR_FALSE;

	PR_Lock(tp->jobq.lock);
	for(i=0; i < initial_threads; ++i) {

		thr = PR_CreateThread(PR_USER_THREAD, wstart,
						tp, PR_PRIORITY_NORMAL,
						PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize);
		PR_ASSERT(thr);
		wthrp = PR_NEWZAP(wthread);
		PR_ASSERT(wthrp);
		wthrp->thread = thr;
		PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
	}
	tp->current_threads = initial_threads;

	thr = PR_CreateThread(PR_USER_THREAD, io_wstart,
					tp, PR_PRIORITY_NORMAL,
					PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
	PR_ASSERT(thr);
	wthrp = PR_NEWZAP(wthread);
	PR_ASSERT(wthrp);
	wthrp->thread = thr;
	PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads);

	thr = PR_CreateThread(PR_USER_THREAD, timer_wstart,
					tp, PR_PRIORITY_NORMAL,
					PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
	PR_ASSERT(thr);
	wthrp = PR_NEWZAP(wthread);
	PR_ASSERT(wthrp);
	wthrp->thread = thr;
	PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads);

	PR_Unlock(tp->jobq.lock);
	return tp;
}

static void
delete_job(PRJob *jobp)
{
	if (NULL != jobp) {
		if (NULL != jobp->join_cv) {
			PR_DestroyCondVar(jobp->join_cv);
			jobp->join_cv = NULL;
		}
		if (NULL != jobp->cancel_cv) {
			PR_DestroyCondVar(jobp->cancel_cv);
			jobp->cancel_cv = NULL;
		}
		PR_DELETE(jobp);
	}
}

static PRJob *
alloc_job(PRBool joinable, PRThreadPool *tp)
{
	PRJob *jobp;

	jobp = PR_NEWZAP(PRJob);
	if (NULL == jobp) 
		goto failed;
	if (joinable) {
		jobp->join_cv = PR_NewCondVar(tp->join_lock);
		jobp->join_wait = PR_TRUE;
		if (NULL == jobp->join_cv)
			goto failed;
	} else {
		jobp->join_cv = NULL;
	}
#ifdef OPT_WINNT
	jobp->nt_notifier.jobp = jobp;
#endif
	return jobp;
failed:
	delete_job(jobp);
	PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
	return NULL;
}

/* queue a job */
PR_IMPLEMENT(PRJob *)
PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable)
{
	PRJob *jobp;

	jobp = alloc_job(joinable, tpool);
	if (NULL == jobp)
		return NULL;

	jobp->job_func = fn;
	jobp->job_arg = arg;
	jobp->tpool = tpool;

	add_to_jobq(tpool, jobp);
	return jobp;
}

/* queue a job, when a socket is readable or writeable */
static PRJob *
queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
				PRBool joinable, io_op_type op)
{
	PRJob *jobp;
	PRIntervalTime now;

	jobp = alloc_job(joinable, tpool);
	if (NULL == jobp) {
		return NULL;
	}

	/*
	 * Add a new job to io_jobq
	 * wakeup io worker thread
	 */

	jobp->job_func = fn;
	jobp->job_arg = arg;
	jobp->tpool = tpool;
	jobp->iod = iod;
	if (JOB_IO_READ == op) {
		jobp->io_op = JOB_IO_READ;
		jobp->io_poll_flags = PR_POLL_READ;
	} else if (JOB_IO_WRITE == op) {
		jobp->io_op = JOB_IO_WRITE;
		jobp->io_poll_flags = PR_POLL_WRITE;
	} else if (JOB_IO_ACCEPT == op) {
		jobp->io_op = JOB_IO_ACCEPT;
		jobp->io_poll_flags = PR_POLL_READ;
	} else if (JOB_IO_CONNECT == op) {
		jobp->io_op = JOB_IO_CONNECT;
		jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT;
	} else {
		delete_job(jobp);
		PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
		return NULL;
	}

	jobp->timeout = iod->timeout;
	if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) ||
			(PR_INTERVAL_NO_WAIT == iod->timeout)) {
		jobp->absolute = iod->timeout;
	} else {
		now = PR_IntervalNow();
		jobp->absolute = now + iod->timeout;
	}


	PR_Lock(tpool->ioq.lock);

	if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) ||
			(PR_INTERVAL_NO_TIMEOUT == iod->timeout)) {
		PR_APPEND_LINK(&jobp->links,&tpool->ioq.list);
	} else if (PR_INTERVAL_NO_WAIT == iod->timeout) {
		PR_INSERT_LINK(&jobp->links,&tpool->ioq.list);
	} else {
		PRCList *qp;
		PRJob *tmp_jobp;
		/*
		 * insert into the timeout-sorted ioq
		 */
		for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list;
							qp = qp->prev) {
			tmp_jobp = JOB_LINKS_PTR(qp);
			if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
				break;
			}
		}
		PR_INSERT_AFTER(&jobp->links,qp);
	}

	jobp->on_ioq = PR_TRUE;
	tpool->ioq.cnt++;
	/*
	 * notify io worker thread(s)
	 */
	PR_Unlock(tpool->ioq.lock);
	notify_ioq(tpool);
	return jobp;
}

/* queue a job, when a socket is readable */
PR_IMPLEMENT(PRJob *)
PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
											PRBool joinable)
{
	return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ));
}

/* queue a job, when a socket is writeable */
PR_IMPLEMENT(PRJob *)
PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg,
										PRBool joinable)
{
	return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE));
}


/* queue a job, when a socket has a pending connection */
PR_IMPLEMENT(PRJob *)
PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,
								void * arg, PRBool joinable)
{
	return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT));
}

/* queue a job, when a socket can be connected */
PR_IMPLEMENT(PRJob *)
PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod,
			const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable)
{
	PRStatus rv;
	PRErrorCode err;

	rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT);
	if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){
		/* connection pending */
		return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT));
	} else {
		/*
		 * connection succeeded or failed; add to jobq right away
		 */
		if (rv == PR_FAILURE)
			iod->error = err;
		else
			iod->error = 0;
		return(PR_QueueJob(tpool, fn, arg, joinable));
	}
}

/* queue a job, when a timer expires */
PR_IMPLEMENT(PRJob *)
PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout,
							PRJobFn fn, void * arg, PRBool joinable)
{
	PRIntervalTime now;
	PRJob *jobp;

	if (PR_INTERVAL_NO_TIMEOUT == timeout) {
		PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
		return NULL;
	}
	if (PR_INTERVAL_NO_WAIT == timeout) {
		/*
		 * no waiting; add to jobq right away
		 */
		return(PR_QueueJob(tpool, fn, arg, joinable));
	}
	jobp = alloc_job(joinable, tpool);
	if (NULL == jobp) {
		return NULL;
	}

	/*
	 * Add a new job to timer_jobq
	 * wakeup timer worker thread
	 */

	jobp->job_func = fn;
	jobp->job_arg = arg;
	jobp->tpool = tpool;
	jobp->timeout = timeout;

	now = PR_IntervalNow();
	jobp->absolute = now + timeout;


	PR_Lock(tpool->timerq.lock);
	jobp->on_timerq = PR_TRUE;
	if (PR_CLIST_IS_EMPTY(&tpool->timerq.list))
		PR_APPEND_LINK(&jobp->links,&tpool->timerq.list);
	else {
		PRCList *qp;
		PRJob *tmp_jobp;
		/*
		 * insert into the sorted timer jobq
		 */
		for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list;
							qp = qp->prev) {
			tmp_jobp = JOB_LINKS_PTR(qp);
			if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
				break;
			}
		}
		PR_INSERT_AFTER(&jobp->links,qp);
	}
	tpool->timerq.cnt++;
	/*
	 * notify timer worker thread(s)
	 */
	notify_timerq(tpool);
	PR_Unlock(tpool->timerq.lock);
	return jobp;
}

static void
notify_timerq(PRThreadPool *tp)
{
	/*
	 * wakeup the timer thread(s)
	 */
	PR_NotifyCondVar(tp->timerq.cv);
}

static void
notify_ioq(PRThreadPool *tp)
{
PRStatus rval_status;

	/*
	 * wakeup the io thread(s)
	 */
	rval_status = PR_SetPollableEvent(tp->ioq.notify_fd);
	PR_ASSERT(PR_SUCCESS == rval_status);
}

/*
 * cancel a job
 *
 *	XXXX: is this needed? likely to be removed
 */
PR_IMPLEMENT(PRStatus)
PR_CancelJob(PRJob *jobp) {

	PRStatus rval = PR_FAILURE;
	PRThreadPool *tp;

	if (jobp->on_timerq) {
		/*
		 * now, check again while holding the timerq lock
		 */
		tp = jobp->tpool;
		PR_Lock(tp->timerq.lock);
		if (jobp->on_timerq) {
			jobp->on_timerq = PR_FALSE;
			PR_REMOVE_AND_INIT_LINK(&jobp->links);
			tp->timerq.cnt--;
			PR_Unlock(tp->timerq.lock);
			if (!JOINABLE_JOB(jobp)) {
				delete_job(jobp);
			} else {
				JOIN_NOTIFY(jobp);
			}
			rval = PR_SUCCESS;
		} else
			PR_Unlock(tp->timerq.lock);
	} else if (jobp->on_ioq) {
		/*
		 * now, check again while holding the ioq lock
		 */
		tp = jobp->tpool;
		PR_Lock(tp->ioq.lock);
		if (jobp->on_ioq) {
			jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock);
			if (NULL == jobp->cancel_cv) {
				PR_Unlock(tp->ioq.lock);
				PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0);
				return PR_FAILURE;
			}
			/*
			 * mark job 'cancelled' and notify io thread(s)
			 * XXXX:
			 *		this assumes there is only one io thread; when there
			 * 		are multiple threads, the io thread processing this job
			 * 		must be notified.
			 */
			jobp->cancel_io = PR_TRUE;
			PR_Unlock(tp->ioq.lock);	/* release, reacquire ioq lock */
			notify_ioq(tp);
			PR_Lock(tp->ioq.lock);
			while (jobp->cancel_io)
				PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT);
			PR_Unlock(tp->ioq.lock);
			PR_ASSERT(!jobp->on_ioq);
			if (!JOINABLE_JOB(jobp)) {
				delete_job(jobp);
			} else {
				JOIN_NOTIFY(jobp);
			}
			rval = PR_SUCCESS;
		} else
			PR_Unlock(tp->ioq.lock);
	}
	if (PR_FAILURE == rval)
		PR_SetError(PR_INVALID_STATE_ERROR, 0);
	return rval;
}

/* join a job, wait until completion */
PR_IMPLEMENT(PRStatus)
PR_JoinJob(PRJob *jobp)
{
	if (!JOINABLE_JOB(jobp)) {
		PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
		return PR_FAILURE;
	}
	PR_Lock(jobp->tpool->join_lock);
	while(jobp->join_wait)
		PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT);
	PR_Unlock(jobp->tpool->join_lock);
	delete_job(jobp);
	return PR_SUCCESS;
}

/* shutdown threadpool */
PR_IMPLEMENT(PRStatus)
PR_ShutdownThreadPool(PRThreadPool *tpool)
{
PRStatus rval = PR_SUCCESS;

	PR_Lock(tpool->jobq.lock);
	tpool->shutdown = PR_TRUE;
	PR_NotifyAllCondVar(tpool->shutdown_cv);
	PR_Unlock(tpool->jobq.lock);

	return rval;
}

/*
 * join thread pool
 * 	wait for termination of worker threads
 *	reclaim threadpool resources
 */
PR_IMPLEMENT(PRStatus)
PR_JoinThreadPool(PRThreadPool *tpool)
{
PRStatus rval = PR_SUCCESS;
PRCList *head;
PRStatus rval_status;

	PR_Lock(tpool->jobq.lock);
	while (!tpool->shutdown)
		PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT);

	/*
	 * wakeup worker threads
	 */
#ifdef OPT_WINNT
	/*
	 * post shutdown notification for all threads
	 */
	{
		int i;
		for(i=0; i < tpool->current_threads; i++) {
			PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0,
												TRUE, NULL);
		}
	}
#else
	PR_NotifyAllCondVar(tpool->jobq.cv);
#endif

	/*
	 * wakeup io thread(s)
	 */
	notify_ioq(tpool);

	/*
	 * wakeup timer thread(s)
	 */
	PR_Lock(tpool->timerq.lock);
	notify_timerq(tpool);
	PR_Unlock(tpool->timerq.lock);

	while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) {
		wthread *wthrp;

		head = PR_LIST_HEAD(&tpool->jobq.wthreads);
		PR_REMOVE_AND_INIT_LINK(head);
		PR_Unlock(tpool->jobq.lock);
		wthrp = WTHREAD_LINKS_PTR(head);
		rval_status = PR_JoinThread(wthrp->thread);
		PR_ASSERT(PR_SUCCESS == rval_status);
		PR_DELETE(wthrp);
		PR_Lock(tpool->jobq.lock);
	}
	PR_Unlock(tpool->jobq.lock);
	while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) {
		wthread *wthrp;

		head = PR_LIST_HEAD(&tpool->ioq.wthreads);
		PR_REMOVE_AND_INIT_LINK(head);
		wthrp = WTHREAD_LINKS_PTR(head);
		rval_status = PR_JoinThread(wthrp->thread);
		PR_ASSERT(PR_SUCCESS == rval_status);
		PR_DELETE(wthrp);
	}

	while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) {
		wthread *wthrp;

		head = PR_LIST_HEAD(&tpool->timerq.wthreads);
		PR_REMOVE_AND_INIT_LINK(head);
		wthrp = WTHREAD_LINKS_PTR(head);
		rval_status = PR_JoinThread(wthrp->thread);
		PR_ASSERT(PR_SUCCESS == rval_status);
		PR_DELETE(wthrp);
	}

	/*
	 * Delete queued jobs
	 */
	while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) {
		PRJob *jobp;

		head = PR_LIST_HEAD(&tpool->jobq.list);
		PR_REMOVE_AND_INIT_LINK(head);
		jobp = JOB_LINKS_PTR(head);
		tpool->jobq.cnt--;
		delete_job(jobp);
	}

	/* delete io jobs */
	while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) {
		PRJob *jobp;

		head = PR_LIST_HEAD(&tpool->ioq.list);
		PR_REMOVE_AND_INIT_LINK(head);
		tpool->ioq.cnt--;
		jobp = JOB_LINKS_PTR(head);
		delete_job(jobp);
	}

	/* delete timer jobs */
	while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
		PRJob *jobp;

		head = PR_LIST_HEAD(&tpool->timerq.list);
		PR_REMOVE_AND_INIT_LINK(head);
		tpool->timerq.cnt--;
		jobp = JOB_LINKS_PTR(head);
		delete_job(jobp);
	}

	PR_ASSERT(0 == tpool->jobq.cnt);
	PR_ASSERT(0 == tpool->ioq.cnt);
	PR_ASSERT(0 == tpool->timerq.cnt);

	delete_threadpool(tpool);
	return rval;
}
This site is hosted by Intevation GmbH (Datenschutzerklärung und Impressum | Privacy Policy and Imprint)