diff nspr/pr/src/misc/prtpool.c @ 0:1e5118fa0cb1

This is NSS with a Cmake Buildsyste To compile a static NSS library for Windows we've used the Chromium-NSS fork and added a Cmake buildsystem to compile it statically for Windows. See README.chromium for chromium changes and README.trustbridge for our modifications.
author Andre Heinecke <andre.heinecke@intevation.de>
date Mon, 28 Jul 2014 10:47:06 +0200
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/nspr/pr/src/misc/prtpool.c	Mon Jul 28 10:47:06 2014 +0200
@@ -0,0 +1,1187 @@
+/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+#include "nspr.h"
+
+/*
+ * Thread pools
+ *	Thread pools create and manage threads to provide support for
+ *	scheduling jobs onto one or more threads.
+ *
+ */
+#ifdef OPT_WINNT
+#include <windows.h>
+#endif
+
+/*
+ * worker thread
+ */
+typedef struct wthread {
+	PRCList		links;
+	PRThread	*thread;
+} wthread;
+
+/*
+ * queue of timer jobs
+ */
+typedef struct timer_jobq {
+	PRCList		list;
+	PRLock		*lock;
+	PRCondVar	*cv;
+	PRInt32		cnt;
+	PRCList 	wthreads;
+} timer_jobq;
+
+/*
+ * queue of jobs
+ */
+typedef struct tp_jobq {
+	PRCList		list;
+	PRInt32		cnt;
+	PRLock		*lock;
+	PRCondVar	*cv;
+	PRCList 	wthreads;
+#ifdef OPT_WINNT
+	HANDLE		nt_completion_port;
+#endif
+} tp_jobq;
+
+/*
+ * queue of IO jobs
+ */
+typedef struct io_jobq {
+	PRCList		list;
+	PRPollDesc  *pollfds;
+	PRInt32  	npollfds;
+	PRJob		**polljobs;
+	PRLock		*lock;
+	PRInt32		cnt;
+	PRFileDesc	*notify_fd;
+	PRCList 	wthreads;
+} io_jobq;
+
+/*
+ * Threadpool
+ */
+struct PRThreadPool {
+	PRInt32		init_threads;
+	PRInt32		max_threads;
+	PRInt32		current_threads;
+	PRInt32		idle_threads;
+	PRUint32	stacksize;
+	tp_jobq		jobq;
+	io_jobq		ioq;
+	timer_jobq	timerq;
+	PRLock		*join_lock;		/* used with jobp->join_cv */
+	PRCondVar	*shutdown_cv;
+	PRBool		shutdown;
+};
+
+typedef enum io_op_type
+	{ JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type;
+
+#ifdef OPT_WINNT
+typedef struct NT_notifier {
+	OVERLAPPED overlapped;		/* must be first */
+	PRJob	*jobp;
+} NT_notifier;
+#endif
+
+struct PRJob {
+	PRCList			links;		/* 	for linking jobs */
+	PRBool			on_ioq;		/* job on ioq */
+	PRBool			on_timerq;	/* job on timerq */
+	PRJobFn			job_func;
+	void 			*job_arg;
+	PRCondVar		*join_cv;
+	PRBool			join_wait;	/* == PR_TRUE, when waiting to join */
+	PRCondVar		*cancel_cv;	/* for cancelling IO jobs */
+	PRBool			cancel_io;	/* for cancelling IO jobs */
+	PRThreadPool	*tpool;		/* back pointer to thread pool */
+	PRJobIoDesc		*iod;
+	io_op_type		io_op;
+	PRInt16			io_poll_flags;
+	PRNetAddr		*netaddr;
+	PRIntervalTime	timeout;	/* relative value */
+	PRIntervalTime	absolute;
+#ifdef OPT_WINNT
+	NT_notifier		nt_notifier;	
+#endif
+};
+
+#define JOB_LINKS_PTR(_qp) \
+    ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links)))
+
+#define WTHREAD_LINKS_PTR(_qp) \
+    ((wthread *) ((char *) (_qp) - offsetof(wthread, links)))
+
+#define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv)
+
+#define JOIN_NOTIFY(_jobp)								\
+				PR_BEGIN_MACRO							\
+				PR_Lock(_jobp->tpool->join_lock);		\
+				_jobp->join_wait = PR_FALSE;			\
+				PR_NotifyCondVar(_jobp->join_cv);		\
+				PR_Unlock(_jobp->tpool->join_lock);		\
+				PR_END_MACRO
+
+#define CANCEL_IO_JOB(jobp)								\
+				PR_BEGIN_MACRO							\
+				jobp->cancel_io = PR_FALSE;				\
+				jobp->on_ioq = PR_FALSE;				\
+				PR_REMOVE_AND_INIT_LINK(&jobp->links);	\
+				tp->ioq.cnt--;							\
+				PR_NotifyCondVar(jobp->cancel_cv);		\
+				PR_END_MACRO
+
+static void delete_job(PRJob *jobp);
+static PRThreadPool * alloc_threadpool(void);
+static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp);
+static void notify_ioq(PRThreadPool *tp);
+static void notify_timerq(PRThreadPool *tp);
+
+/*
+ * locks are acquired in the following order
+ *
+ *	tp->ioq.lock,tp->timerq.lock
+ *			|
+ *			V
+ *		tp->jobq->lock		
+ */
+
+/*
+ * worker thread function
+ */
+static void wstart(void *arg)
+{
+PRThreadPool *tp = (PRThreadPool *) arg;
+PRCList *head;
+
+	/*
+	 * execute jobs until shutdown
+	 */
+	while (!tp->shutdown) {
+		PRJob *jobp;
+#ifdef OPT_WINNT
+		BOOL rv;
+		DWORD unused, shutdown;
+		LPOVERLAPPED olp;
+
+		PR_Lock(tp->jobq.lock);
+		tp->idle_threads++;
+		PR_Unlock(tp->jobq.lock);
+		rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port,
+					&unused, &shutdown, &olp, INFINITE);
+		
+		PR_ASSERT(rv);
+		if (shutdown)
+			break;
+		jobp = ((NT_notifier *) olp)->jobp;
+		PR_Lock(tp->jobq.lock);
+		tp->idle_threads--;
+		tp->jobq.cnt--;
+		PR_Unlock(tp->jobq.lock);
+#else
+
+		PR_Lock(tp->jobq.lock);
+		while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) {
+			tp->idle_threads++;
+			PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT);
+			tp->idle_threads--;
+		}	
+		if (tp->shutdown) {
+			PR_Unlock(tp->jobq.lock);
+			break;
+		}
+		head = PR_LIST_HEAD(&tp->jobq.list);
+		/*
+		 * remove job from queue
+		 */
+		PR_REMOVE_AND_INIT_LINK(head);
+		tp->jobq.cnt--;
+		jobp = JOB_LINKS_PTR(head);
+		PR_Unlock(tp->jobq.lock);
+#endif
+
+		jobp->job_func(jobp->job_arg);
+		if (!JOINABLE_JOB(jobp)) {
+			delete_job(jobp);
+		} else {
+			JOIN_NOTIFY(jobp);
+		}
+	}
+	PR_Lock(tp->jobq.lock);
+	tp->current_threads--;
+	PR_Unlock(tp->jobq.lock);
+}
+
+/*
+ * add a job to the work queue
+ */
+static void
+add_to_jobq(PRThreadPool *tp, PRJob *jobp)
+{
+	/*
+	 * add to jobq
+	 */
+#ifdef OPT_WINNT
+	PR_Lock(tp->jobq.lock);
+	tp->jobq.cnt++;
+	PR_Unlock(tp->jobq.lock);
+	/*
+	 * notify worker thread(s)
+	 */
+	PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0,
+            FALSE, &jobp->nt_notifier.overlapped);
+#else
+	PR_Lock(tp->jobq.lock);
+	PR_APPEND_LINK(&jobp->links,&tp->jobq.list);
+	tp->jobq.cnt++;
+	if ((tp->idle_threads < tp->jobq.cnt) &&
+					(tp->current_threads < tp->max_threads)) {
+		wthread *wthrp;
+		/*
+		 * increment thread count and unlock the jobq lock
+		 */
+		tp->current_threads++;
+		PR_Unlock(tp->jobq.lock);
+		/* create new worker thread */
+		wthrp = PR_NEWZAP(wthread);
+		if (wthrp) {
+			wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart,
+						tp, PR_PRIORITY_NORMAL,
+						PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize);
+			if (NULL == wthrp->thread) {
+				PR_DELETE(wthrp);  /* this sets wthrp to NULL */
+			}
+		}
+		PR_Lock(tp->jobq.lock);
+		if (NULL == wthrp) {
+			tp->current_threads--;
+		} else {
+			PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
+		}
+	}
+	/*
+	 * wakeup a worker thread
+	 */
+	PR_NotifyCondVar(tp->jobq.cv);
+	PR_Unlock(tp->jobq.lock);
+#endif
+}
+
+/*
+ * io worker thread function
+ */
+static void io_wstart(void *arg)
+{
+PRThreadPool *tp = (PRThreadPool *) arg;
+int pollfd_cnt, pollfds_used;
+int rv;
+PRCList *qp, *nextqp;
+PRPollDesc *pollfds;
+PRJob **polljobs;
+int poll_timeout;
+PRIntervalTime now;
+
+	/*
+	 * scan io_jobq
+	 * construct poll list
+	 * call PR_Poll
+	 * for all fds, for which poll returns true, move the job to
+	 * jobq and wakeup worker thread.
+	 */
+	while (!tp->shutdown) {
+		PRJob *jobp;
+
+		pollfd_cnt = tp->ioq.cnt + 10;
+		if (pollfd_cnt > tp->ioq.npollfds) {
+
+			/*
+			 * re-allocate pollfd array if the current one is not large
+			 * enough
+			 */
+			if (NULL != tp->ioq.pollfds)
+				PR_Free(tp->ioq.pollfds);
+			tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt *
+						(sizeof(PRPollDesc) + sizeof(PRJob *)));
+			PR_ASSERT(NULL != tp->ioq.pollfds);
+			/*
+			 * array of pollfds
+			 */
+			pollfds = tp->ioq.pollfds;
+			tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]);
+			/*
+			 * parallel array of jobs
+			 */
+			polljobs = tp->ioq.polljobs;
+			tp->ioq.npollfds = pollfd_cnt;
+		}
+
+		pollfds_used = 0;
+		/*
+		 * add the notify fd; used for unblocking io thread(s)
+		 */
+		pollfds[pollfds_used].fd = tp->ioq.notify_fd;
+		pollfds[pollfds_used].in_flags = PR_POLL_READ;
+		pollfds[pollfds_used].out_flags = 0;
+		polljobs[pollfds_used] = NULL;
+		pollfds_used++;
+		/*
+		 * fill in the pollfd array
+		 */
+		PR_Lock(tp->ioq.lock);
+		for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
+			nextqp = qp->next;
+			jobp = JOB_LINKS_PTR(qp);
+			if (jobp->cancel_io) {
+				CANCEL_IO_JOB(jobp);
+				continue;
+			}
+			if (pollfds_used == (pollfd_cnt))
+				break;
+			pollfds[pollfds_used].fd = jobp->iod->socket;
+			pollfds[pollfds_used].in_flags = jobp->io_poll_flags;
+			pollfds[pollfds_used].out_flags = 0;
+			polljobs[pollfds_used] = jobp;
+
+			pollfds_used++;
+		}
+		if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) {
+			qp = tp->ioq.list.next;
+			jobp = JOB_LINKS_PTR(qp);
+			if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
+				poll_timeout = PR_INTERVAL_NO_TIMEOUT;
+			else if (PR_INTERVAL_NO_WAIT == jobp->timeout)
+				poll_timeout = PR_INTERVAL_NO_WAIT;
+			else {
+				poll_timeout = jobp->absolute - PR_IntervalNow();
+				if (poll_timeout <= 0) /* already timed out */
+					poll_timeout = PR_INTERVAL_NO_WAIT;
+			}
+		} else {
+			poll_timeout = PR_INTERVAL_NO_TIMEOUT;
+		}
+		PR_Unlock(tp->ioq.lock);
+
+		/*
+		 * XXXX
+		 * should retry if more jobs have been added to the queue?
+		 *
+		 */
+		PR_ASSERT(pollfds_used <= pollfd_cnt);
+		rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout);
+
+		if (tp->shutdown) {
+			break;
+		}
+
+		if (rv > 0) {
+			/*
+			 * at least one io event is set
+			 */
+			PRStatus rval_status;
+			PRInt32 index;
+
+			PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd);
+			/*
+			 * reset the pollable event, if notified
+			 */
+			if (pollfds[0].out_flags & PR_POLL_READ) {
+				rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd);
+				PR_ASSERT(PR_SUCCESS == rval_status);
+			}
+
+			for(index = 1; index < (pollfds_used); index++) {
+                PRInt16 events = pollfds[index].in_flags;
+                PRInt16 revents = pollfds[index].out_flags;	
+				jobp = polljobs[index];	
+
+                if ((revents & PR_POLL_NVAL) ||  /* busted in all cases */
+                	(revents & PR_POLL_ERR) ||
+                			((events & PR_POLL_WRITE) &&
+							(revents & PR_POLL_HUP))) { /* write op & hup */
+					PR_Lock(tp->ioq.lock);
+					if (jobp->cancel_io) {
+						CANCEL_IO_JOB(jobp);
+						PR_Unlock(tp->ioq.lock);
+						continue;
+					}
+					PR_REMOVE_AND_INIT_LINK(&jobp->links);
+					tp->ioq.cnt--;
+					jobp->on_ioq = PR_FALSE;
+					PR_Unlock(tp->ioq.lock);
+
+					/* set error */
+                    if (PR_POLL_NVAL & revents)
+						jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR;
+                    else if (PR_POLL_HUP & revents)
+						jobp->iod->error = PR_CONNECT_RESET_ERROR;
+                    else 
+						jobp->iod->error = PR_IO_ERROR;
+
+					/*
+					 * add to jobq
+					 */
+					add_to_jobq(tp, jobp);
+				} else if (revents) {
+					/*
+					 * add to jobq
+					 */
+					PR_Lock(tp->ioq.lock);
+					if (jobp->cancel_io) {
+						CANCEL_IO_JOB(jobp);
+						PR_Unlock(tp->ioq.lock);
+						continue;
+					}
+					PR_REMOVE_AND_INIT_LINK(&jobp->links);
+					tp->ioq.cnt--;
+					jobp->on_ioq = PR_FALSE;
+					PR_Unlock(tp->ioq.lock);
+
+					if (jobp->io_op == JOB_IO_CONNECT) {
+						if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS)
+							jobp->iod->error = 0;
+						else
+							jobp->iod->error = PR_GetError();
+					} else
+						jobp->iod->error = 0;
+
+					add_to_jobq(tp, jobp);
+				}
+			}
+		}
+		/*
+		 * timeout processing
+		 */
+		now = PR_IntervalNow();
+		PR_Lock(tp->ioq.lock);
+		for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
+			nextqp = qp->next;
+			jobp = JOB_LINKS_PTR(qp);
+			if (jobp->cancel_io) {
+				CANCEL_IO_JOB(jobp);
+				continue;
+			}
+			if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
+				break;
+			if ((PR_INTERVAL_NO_WAIT != jobp->timeout) &&
+								((PRInt32)(jobp->absolute - now) > 0))
+				break;
+			PR_REMOVE_AND_INIT_LINK(&jobp->links);
+			tp->ioq.cnt--;
+			jobp->on_ioq = PR_FALSE;
+			jobp->iod->error = PR_IO_TIMEOUT_ERROR;
+			add_to_jobq(tp, jobp);
+		}
+		PR_Unlock(tp->ioq.lock);
+	}
+}
+
+/*
+ * timer worker thread function
+ */
+static void timer_wstart(void *arg)
+{
+PRThreadPool *tp = (PRThreadPool *) arg;
+PRCList *qp;
+PRIntervalTime timeout;
+PRIntervalTime now;
+
+	/*
+	 * call PR_WaitCondVar with minimum value of all timeouts
+	 */
+	while (!tp->shutdown) {
+		PRJob *jobp;
+
+		PR_Lock(tp->timerq.lock);
+		if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
+			timeout = PR_INTERVAL_NO_TIMEOUT;
+		} else {
+			PRCList *qp;
+
+			qp = tp->timerq.list.next;
+			jobp = JOB_LINKS_PTR(qp);
+
+			timeout = jobp->absolute - PR_IntervalNow();
+            if (timeout <= 0)
+				timeout = PR_INTERVAL_NO_WAIT;  /* already timed out */
+		}
+		if (PR_INTERVAL_NO_WAIT != timeout)
+			PR_WaitCondVar(tp->timerq.cv, timeout);
+		if (tp->shutdown) {
+			PR_Unlock(tp->timerq.lock);
+			break;
+		}
+		/*
+		 * move expired-timer jobs to jobq
+		 */
+		now = PR_IntervalNow();	
+		while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
+			qp = tp->timerq.list.next;
+			jobp = JOB_LINKS_PTR(qp);
+
+			if ((PRInt32)(jobp->absolute - now) > 0) {
+				break;
+			}
+			/*
+			 * job timed out
+			 */
+			PR_REMOVE_AND_INIT_LINK(&jobp->links);
+			tp->timerq.cnt--;
+			jobp->on_timerq = PR_FALSE;
+			add_to_jobq(tp, jobp);
+		}
+		PR_Unlock(tp->timerq.lock);
+	}
+}
+
+static void
+delete_threadpool(PRThreadPool *tp)
+{
+	if (NULL != tp) {
+		if (NULL != tp->shutdown_cv)
+			PR_DestroyCondVar(tp->shutdown_cv);
+		if (NULL != tp->jobq.cv)
+			PR_DestroyCondVar(tp->jobq.cv);
+		if (NULL != tp->jobq.lock)
+			PR_DestroyLock(tp->jobq.lock);
+		if (NULL != tp->join_lock)
+			PR_DestroyLock(tp->join_lock);
+#ifdef OPT_WINNT
+		if (NULL != tp->jobq.nt_completion_port)
+			CloseHandle(tp->jobq.nt_completion_port);
+#endif
+		/* Timer queue */
+		if (NULL != tp->timerq.cv)
+			PR_DestroyCondVar(tp->timerq.cv);
+		if (NULL != tp->timerq.lock)
+			PR_DestroyLock(tp->timerq.lock);
+
+		if (NULL != tp->ioq.lock)
+			PR_DestroyLock(tp->ioq.lock);
+		if (NULL != tp->ioq.pollfds)
+			PR_Free(tp->ioq.pollfds);
+		if (NULL != tp->ioq.notify_fd)
+			PR_DestroyPollableEvent(tp->ioq.notify_fd);
+		PR_Free(tp);
+	}
+	return;
+}
+
+static PRThreadPool *
+alloc_threadpool(void)
+{
+PRThreadPool *tp;
+
+	tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp));
+	if (NULL == tp)
+		goto failed;
+	tp->jobq.lock = PR_NewLock();
+	if (NULL == tp->jobq.lock)
+		goto failed;
+	tp->jobq.cv = PR_NewCondVar(tp->jobq.lock);
+	if (NULL == tp->jobq.cv)
+		goto failed;
+	tp->join_lock = PR_NewLock();
+	if (NULL == tp->join_lock)
+		goto failed;
+#ifdef OPT_WINNT
+	tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
+									NULL, 0, 0);
+	if (NULL == tp->jobq.nt_completion_port)
+		goto failed;
+#endif
+
+	tp->ioq.lock = PR_NewLock();
+	if (NULL == tp->ioq.lock)
+		goto failed;
+
+	/* Timer queue */
+
+	tp->timerq.lock = PR_NewLock();
+	if (NULL == tp->timerq.lock)
+		goto failed;
+	tp->timerq.cv = PR_NewCondVar(tp->timerq.lock);
+	if (NULL == tp->timerq.cv)
+		goto failed;
+
+	tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock);
+	if (NULL == tp->shutdown_cv)
+		goto failed;
+	tp->ioq.notify_fd = PR_NewPollableEvent();
+	if (NULL == tp->ioq.notify_fd)
+		goto failed;
+	return tp;
+failed:
+	delete_threadpool(tp);
+	PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
+	return NULL;
+}
+
+/* Create thread pool */
+PR_IMPLEMENT(PRThreadPool *)
+PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads,
+                                PRUint32 stacksize)
+{
+PRThreadPool *tp;
+PRThread *thr;
+int i;
+wthread *wthrp;
+
+	tp = alloc_threadpool();
+	if (NULL == tp)
+		return NULL;
+
+	tp->init_threads = initial_threads;
+	tp->max_threads = max_threads;
+	tp->stacksize = stacksize;
+	PR_INIT_CLIST(&tp->jobq.list);
+	PR_INIT_CLIST(&tp->ioq.list);
+	PR_INIT_CLIST(&tp->timerq.list);
+	PR_INIT_CLIST(&tp->jobq.wthreads);
+	PR_INIT_CLIST(&tp->ioq.wthreads);
+	PR_INIT_CLIST(&tp->timerq.wthreads);
+	tp->shutdown = PR_FALSE;
+
+	PR_Lock(tp->jobq.lock);
+	for(i=0; i < initial_threads; ++i) {
+
+		thr = PR_CreateThread(PR_USER_THREAD, wstart,
+						tp, PR_PRIORITY_NORMAL,
+						PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize);
+		PR_ASSERT(thr);
+		wthrp = PR_NEWZAP(wthread);
+		PR_ASSERT(wthrp);
+		wthrp->thread = thr;
+		PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
+	}
+	tp->current_threads = initial_threads;
+
+	thr = PR_CreateThread(PR_USER_THREAD, io_wstart,
+					tp, PR_PRIORITY_NORMAL,
+					PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
+	PR_ASSERT(thr);
+	wthrp = PR_NEWZAP(wthread);
+	PR_ASSERT(wthrp);
+	wthrp->thread = thr;
+	PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads);
+
+	thr = PR_CreateThread(PR_USER_THREAD, timer_wstart,
+					tp, PR_PRIORITY_NORMAL,
+					PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
+	PR_ASSERT(thr);
+	wthrp = PR_NEWZAP(wthread);
+	PR_ASSERT(wthrp);
+	wthrp->thread = thr;
+	PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads);
+
+	PR_Unlock(tp->jobq.lock);
+	return tp;
+}
+
+static void
+delete_job(PRJob *jobp)
+{
+	if (NULL != jobp) {
+		if (NULL != jobp->join_cv) {
+			PR_DestroyCondVar(jobp->join_cv);
+			jobp->join_cv = NULL;
+		}
+		if (NULL != jobp->cancel_cv) {
+			PR_DestroyCondVar(jobp->cancel_cv);
+			jobp->cancel_cv = NULL;
+		}
+		PR_DELETE(jobp);
+	}
+}
+
+static PRJob *
+alloc_job(PRBool joinable, PRThreadPool *tp)
+{
+	PRJob *jobp;
+
+	jobp = PR_NEWZAP(PRJob);
+	if (NULL == jobp) 
+		goto failed;
+	if (joinable) {
+		jobp->join_cv = PR_NewCondVar(tp->join_lock);
+		jobp->join_wait = PR_TRUE;
+		if (NULL == jobp->join_cv)
+			goto failed;
+	} else {
+		jobp->join_cv = NULL;
+	}
+#ifdef OPT_WINNT
+	jobp->nt_notifier.jobp = jobp;
+#endif
+	return jobp;
+failed:
+	delete_job(jobp);
+	PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
+	return NULL;
+}
+
+/* queue a job */
+PR_IMPLEMENT(PRJob *)
+PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable)
+{
+	PRJob *jobp;
+
+	jobp = alloc_job(joinable, tpool);
+	if (NULL == jobp)
+		return NULL;
+
+	jobp->job_func = fn;
+	jobp->job_arg = arg;
+	jobp->tpool = tpool;
+
+	add_to_jobq(tpool, jobp);
+	return jobp;
+}
+
+/* queue a job, when a socket is readable or writeable */
+static PRJob *
+queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
+				PRBool joinable, io_op_type op)
+{
+	PRJob *jobp;
+	PRIntervalTime now;
+
+	jobp = alloc_job(joinable, tpool);
+	if (NULL == jobp) {
+		return NULL;
+	}
+
+	/*
+	 * Add a new job to io_jobq
+	 * wakeup io worker thread
+	 */
+
+	jobp->job_func = fn;
+	jobp->job_arg = arg;
+	jobp->tpool = tpool;
+	jobp->iod = iod;
+	if (JOB_IO_READ == op) {
+		jobp->io_op = JOB_IO_READ;
+		jobp->io_poll_flags = PR_POLL_READ;
+	} else if (JOB_IO_WRITE == op) {
+		jobp->io_op = JOB_IO_WRITE;
+		jobp->io_poll_flags = PR_POLL_WRITE;
+	} else if (JOB_IO_ACCEPT == op) {
+		jobp->io_op = JOB_IO_ACCEPT;
+		jobp->io_poll_flags = PR_POLL_READ;
+	} else if (JOB_IO_CONNECT == op) {
+		jobp->io_op = JOB_IO_CONNECT;
+		jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT;
+	} else {
+		delete_job(jobp);
+		PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
+		return NULL;
+	}
+
+	jobp->timeout = iod->timeout;
+	if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) ||
+			(PR_INTERVAL_NO_WAIT == iod->timeout)) {
+		jobp->absolute = iod->timeout;
+	} else {
+		now = PR_IntervalNow();
+		jobp->absolute = now + iod->timeout;
+	}
+
+
+	PR_Lock(tpool->ioq.lock);
+
+	if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) ||
+			(PR_INTERVAL_NO_TIMEOUT == iod->timeout)) {
+		PR_APPEND_LINK(&jobp->links,&tpool->ioq.list);
+	} else if (PR_INTERVAL_NO_WAIT == iod->timeout) {
+		PR_INSERT_LINK(&jobp->links,&tpool->ioq.list);
+	} else {
+		PRCList *qp;
+		PRJob *tmp_jobp;
+		/*
+		 * insert into the timeout-sorted ioq
+		 */
+		for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list;
+							qp = qp->prev) {
+			tmp_jobp = JOB_LINKS_PTR(qp);
+			if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
+				break;
+			}
+		}
+		PR_INSERT_AFTER(&jobp->links,qp);
+	}
+
+	jobp->on_ioq = PR_TRUE;
+	tpool->ioq.cnt++;
+	/*
+	 * notify io worker thread(s)
+	 */
+	PR_Unlock(tpool->ioq.lock);
+	notify_ioq(tpool);
+	return jobp;
+}
+
+/* queue a job, when a socket is readable */
+PR_IMPLEMENT(PRJob *)
+PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
+											PRBool joinable)
+{
+	return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ));
+}
+
+/* queue a job, when a socket is writeable */
+PR_IMPLEMENT(PRJob *)
+PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg,
+										PRBool joinable)
+{
+	return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE));
+}
+
+
+/* queue a job, when a socket has a pending connection */
+PR_IMPLEMENT(PRJob *)
+PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,
+								void * arg, PRBool joinable)
+{
+	return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT));
+}
+
+/* queue a job, when a socket can be connected */
+PR_IMPLEMENT(PRJob *)
+PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod,
+			const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable)
+{
+	PRStatus rv;
+	PRErrorCode err;
+
+	rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT);
+	if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){
+		/* connection pending */
+		return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT));
+	} else {
+		/*
+		 * connection succeeded or failed; add to jobq right away
+		 */
+		if (rv == PR_FAILURE)
+			iod->error = err;
+		else
+			iod->error = 0;
+		return(PR_QueueJob(tpool, fn, arg, joinable));
+	}
+}
+
+/* queue a job, when a timer expires */
+PR_IMPLEMENT(PRJob *)
+PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout,
+							PRJobFn fn, void * arg, PRBool joinable)
+{
+	PRIntervalTime now;
+	PRJob *jobp;
+
+	if (PR_INTERVAL_NO_TIMEOUT == timeout) {
+		PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
+		return NULL;
+	}
+	if (PR_INTERVAL_NO_WAIT == timeout) {
+		/*
+		 * no waiting; add to jobq right away
+		 */
+		return(PR_QueueJob(tpool, fn, arg, joinable));
+	}
+	jobp = alloc_job(joinable, tpool);
+	if (NULL == jobp) {
+		return NULL;
+	}
+
+	/*
+	 * Add a new job to timer_jobq
+	 * wakeup timer worker thread
+	 */
+
+	jobp->job_func = fn;
+	jobp->job_arg = arg;
+	jobp->tpool = tpool;
+	jobp->timeout = timeout;
+
+	now = PR_IntervalNow();
+	jobp->absolute = now + timeout;
+
+
+	PR_Lock(tpool->timerq.lock);
+	jobp->on_timerq = PR_TRUE;
+	if (PR_CLIST_IS_EMPTY(&tpool->timerq.list))
+		PR_APPEND_LINK(&jobp->links,&tpool->timerq.list);
+	else {
+		PRCList *qp;
+		PRJob *tmp_jobp;
+		/*
+		 * insert into the sorted timer jobq
+		 */
+		for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list;
+							qp = qp->prev) {
+			tmp_jobp = JOB_LINKS_PTR(qp);
+			if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
+				break;
+			}
+		}
+		PR_INSERT_AFTER(&jobp->links,qp);
+	}
+	tpool->timerq.cnt++;
+	/*
+	 * notify timer worker thread(s)
+	 */
+	notify_timerq(tpool);
+	PR_Unlock(tpool->timerq.lock);
+	return jobp;
+}
+
+static void
+notify_timerq(PRThreadPool *tp)
+{
+	/*
+	 * wakeup the timer thread(s)
+	 */
+	PR_NotifyCondVar(tp->timerq.cv);
+}
+
+static void
+notify_ioq(PRThreadPool *tp)
+{
+PRStatus rval_status;
+
+	/*
+	 * wakeup the io thread(s)
+	 */
+	rval_status = PR_SetPollableEvent(tp->ioq.notify_fd);
+	PR_ASSERT(PR_SUCCESS == rval_status);
+}
+
+/*
+ * cancel a job
+ *
+ *	XXXX: is this needed? likely to be removed
+ */
+PR_IMPLEMENT(PRStatus)
+PR_CancelJob(PRJob *jobp) {
+
+	PRStatus rval = PR_FAILURE;
+	PRThreadPool *tp;
+
+	if (jobp->on_timerq) {
+		/*
+		 * now, check again while holding the timerq lock
+		 */
+		tp = jobp->tpool;
+		PR_Lock(tp->timerq.lock);
+		if (jobp->on_timerq) {
+			jobp->on_timerq = PR_FALSE;
+			PR_REMOVE_AND_INIT_LINK(&jobp->links);
+			tp->timerq.cnt--;
+			PR_Unlock(tp->timerq.lock);
+			if (!JOINABLE_JOB(jobp)) {
+				delete_job(jobp);
+			} else {
+				JOIN_NOTIFY(jobp);
+			}
+			rval = PR_SUCCESS;
+		} else
+			PR_Unlock(tp->timerq.lock);
+	} else if (jobp->on_ioq) {
+		/*
+		 * now, check again while holding the ioq lock
+		 */
+		tp = jobp->tpool;
+		PR_Lock(tp->ioq.lock);
+		if (jobp->on_ioq) {
+			jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock);
+			if (NULL == jobp->cancel_cv) {
+				PR_Unlock(tp->ioq.lock);
+				PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0);
+				return PR_FAILURE;
+			}
+			/*
+			 * mark job 'cancelled' and notify io thread(s)
+			 * XXXX:
+			 *		this assumes there is only one io thread; when there
+			 * 		are multiple threads, the io thread processing this job
+			 * 		must be notified.
+			 */
+			jobp->cancel_io = PR_TRUE;
+			PR_Unlock(tp->ioq.lock);	/* release, reacquire ioq lock */
+			notify_ioq(tp);
+			PR_Lock(tp->ioq.lock);
+			while (jobp->cancel_io)
+				PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT);
+			PR_Unlock(tp->ioq.lock);
+			PR_ASSERT(!jobp->on_ioq);
+			if (!JOINABLE_JOB(jobp)) {
+				delete_job(jobp);
+			} else {
+				JOIN_NOTIFY(jobp);
+			}
+			rval = PR_SUCCESS;
+		} else
+			PR_Unlock(tp->ioq.lock);
+	}
+	if (PR_FAILURE == rval)
+		PR_SetError(PR_INVALID_STATE_ERROR, 0);
+	return rval;
+}
+
+/* join a job, wait until completion */
+PR_IMPLEMENT(PRStatus)
+PR_JoinJob(PRJob *jobp)
+{
+	if (!JOINABLE_JOB(jobp)) {
+		PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
+		return PR_FAILURE;
+	}
+	PR_Lock(jobp->tpool->join_lock);
+	while(jobp->join_wait)
+		PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT);
+	PR_Unlock(jobp->tpool->join_lock);
+	delete_job(jobp);
+	return PR_SUCCESS;
+}
+
+/* shutdown threadpool */
+PR_IMPLEMENT(PRStatus)
+PR_ShutdownThreadPool(PRThreadPool *tpool)
+{
+PRStatus rval = PR_SUCCESS;
+
+	PR_Lock(tpool->jobq.lock);
+	tpool->shutdown = PR_TRUE;
+	PR_NotifyAllCondVar(tpool->shutdown_cv);
+	PR_Unlock(tpool->jobq.lock);
+
+	return rval;
+}
+
+/*
+ * join thread pool
+ * 	wait for termination of worker threads
+ *	reclaim threadpool resources
+ */
+PR_IMPLEMENT(PRStatus)
+PR_JoinThreadPool(PRThreadPool *tpool)
+{
+PRStatus rval = PR_SUCCESS;
+PRCList *head;
+PRStatus rval_status;
+
+	PR_Lock(tpool->jobq.lock);
+	while (!tpool->shutdown)
+		PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT);
+
+	/*
+	 * wakeup worker threads
+	 */
+#ifdef OPT_WINNT
+	/*
+	 * post shutdown notification for all threads
+	 */
+	{
+		int i;
+		for(i=0; i < tpool->current_threads; i++) {
+			PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0,
+												TRUE, NULL);
+		}
+	}
+#else
+	PR_NotifyAllCondVar(tpool->jobq.cv);
+#endif
+
+	/*
+	 * wakeup io thread(s)
+	 */
+	notify_ioq(tpool);
+
+	/*
+	 * wakeup timer thread(s)
+	 */
+	PR_Lock(tpool->timerq.lock);
+	notify_timerq(tpool);
+	PR_Unlock(tpool->timerq.lock);
+
+	while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) {
+		wthread *wthrp;
+
+		head = PR_LIST_HEAD(&tpool->jobq.wthreads);
+		PR_REMOVE_AND_INIT_LINK(head);
+		PR_Unlock(tpool->jobq.lock);
+		wthrp = WTHREAD_LINKS_PTR(head);
+		rval_status = PR_JoinThread(wthrp->thread);
+		PR_ASSERT(PR_SUCCESS == rval_status);
+		PR_DELETE(wthrp);
+		PR_Lock(tpool->jobq.lock);
+	}
+	PR_Unlock(tpool->jobq.lock);
+	while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) {
+		wthread *wthrp;
+
+		head = PR_LIST_HEAD(&tpool->ioq.wthreads);
+		PR_REMOVE_AND_INIT_LINK(head);
+		wthrp = WTHREAD_LINKS_PTR(head);
+		rval_status = PR_JoinThread(wthrp->thread);
+		PR_ASSERT(PR_SUCCESS == rval_status);
+		PR_DELETE(wthrp);
+	}
+
+	while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) {
+		wthread *wthrp;
+
+		head = PR_LIST_HEAD(&tpool->timerq.wthreads);
+		PR_REMOVE_AND_INIT_LINK(head);
+		wthrp = WTHREAD_LINKS_PTR(head);
+		rval_status = PR_JoinThread(wthrp->thread);
+		PR_ASSERT(PR_SUCCESS == rval_status);
+		PR_DELETE(wthrp);
+	}
+
+	/*
+	 * Delete queued jobs
+	 */
+	while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) {
+		PRJob *jobp;
+
+		head = PR_LIST_HEAD(&tpool->jobq.list);
+		PR_REMOVE_AND_INIT_LINK(head);
+		jobp = JOB_LINKS_PTR(head);
+		tpool->jobq.cnt--;
+		delete_job(jobp);
+	}
+
+	/* delete io jobs */
+	while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) {
+		PRJob *jobp;
+
+		head = PR_LIST_HEAD(&tpool->ioq.list);
+		PR_REMOVE_AND_INIT_LINK(head);
+		tpool->ioq.cnt--;
+		jobp = JOB_LINKS_PTR(head);
+		delete_job(jobp);
+	}
+
+	/* delete timer jobs */
+	while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
+		PRJob *jobp;
+
+		head = PR_LIST_HEAD(&tpool->timerq.list);
+		PR_REMOVE_AND_INIT_LINK(head);
+		tpool->timerq.cnt--;
+		jobp = JOB_LINKS_PTR(head);
+		delete_job(jobp);
+	}
+
+	PR_ASSERT(0 == tpool->jobq.cnt);
+	PR_ASSERT(0 == tpool->ioq.cnt);
+	PR_ASSERT(0 == tpool->timerq.cnt);
+
+	delete_threadpool(tpool);
+	return rval;
+}
This site is hosted by Intevation GmbH (Datenschutzerklärung und Impressum | Privacy Policy and Imprint)