andre@0: /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ andre@0: /* This Source Code Form is subject to the terms of the Mozilla Public andre@0: * License, v. 2.0. If a copy of the MPL was not distributed with this andre@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ andre@0: andre@0: #include "nspr.h" andre@0: andre@0: /* andre@0: * Thread pools andre@0: * Thread pools create and manage threads to provide support for andre@0: * scheduling jobs onto one or more threads. andre@0: * andre@0: */ andre@0: #ifdef OPT_WINNT andre@0: #include andre@0: #endif andre@0: andre@0: /* andre@0: * worker thread andre@0: */ andre@0: typedef struct wthread { andre@0: PRCList links; andre@0: PRThread *thread; andre@0: } wthread; andre@0: andre@0: /* andre@0: * queue of timer jobs andre@0: */ andre@0: typedef struct timer_jobq { andre@0: PRCList list; andre@0: PRLock *lock; andre@0: PRCondVar *cv; andre@0: PRInt32 cnt; andre@0: PRCList wthreads; andre@0: } timer_jobq; andre@0: andre@0: /* andre@0: * queue of jobs andre@0: */ andre@0: typedef struct tp_jobq { andre@0: PRCList list; andre@0: PRInt32 cnt; andre@0: PRLock *lock; andre@0: PRCondVar *cv; andre@0: PRCList wthreads; andre@0: #ifdef OPT_WINNT andre@0: HANDLE nt_completion_port; andre@0: #endif andre@0: } tp_jobq; andre@0: andre@0: /* andre@0: * queue of IO jobs andre@0: */ andre@0: typedef struct io_jobq { andre@0: PRCList list; andre@0: PRPollDesc *pollfds; andre@0: PRInt32 npollfds; andre@0: PRJob **polljobs; andre@0: PRLock *lock; andre@0: PRInt32 cnt; andre@0: PRFileDesc *notify_fd; andre@0: PRCList wthreads; andre@0: } io_jobq; andre@0: andre@0: /* andre@0: * Threadpool andre@0: */ andre@0: struct PRThreadPool { andre@0: PRInt32 init_threads; andre@0: PRInt32 max_threads; andre@0: PRInt32 current_threads; andre@0: PRInt32 idle_threads; andre@0: PRUint32 stacksize; andre@0: tp_jobq jobq; andre@0: io_jobq ioq; andre@0: timer_jobq timerq; andre@0: PRLock *join_lock; /* used with jobp->join_cv */ andre@0: PRCondVar *shutdown_cv; andre@0: PRBool shutdown; andre@0: }; andre@0: andre@0: typedef enum io_op_type andre@0: { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type; andre@0: andre@0: #ifdef OPT_WINNT andre@0: typedef struct NT_notifier { andre@0: OVERLAPPED overlapped; /* must be first */ andre@0: PRJob *jobp; andre@0: } NT_notifier; andre@0: #endif andre@0: andre@0: struct PRJob { andre@0: PRCList links; /* for linking jobs */ andre@0: PRBool on_ioq; /* job on ioq */ andre@0: PRBool on_timerq; /* job on timerq */ andre@0: PRJobFn job_func; andre@0: void *job_arg; andre@0: PRCondVar *join_cv; andre@0: PRBool join_wait; /* == PR_TRUE, when waiting to join */ andre@0: PRCondVar *cancel_cv; /* for cancelling IO jobs */ andre@0: PRBool cancel_io; /* for cancelling IO jobs */ andre@0: PRThreadPool *tpool; /* back pointer to thread pool */ andre@0: PRJobIoDesc *iod; andre@0: io_op_type io_op; andre@0: PRInt16 io_poll_flags; andre@0: PRNetAddr *netaddr; andre@0: PRIntervalTime timeout; /* relative value */ andre@0: PRIntervalTime absolute; andre@0: #ifdef OPT_WINNT andre@0: NT_notifier nt_notifier; andre@0: #endif andre@0: }; andre@0: andre@0: #define JOB_LINKS_PTR(_qp) \ andre@0: ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links))) andre@0: andre@0: #define WTHREAD_LINKS_PTR(_qp) \ andre@0: ((wthread *) ((char *) (_qp) - offsetof(wthread, links))) andre@0: andre@0: #define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv) andre@0: andre@0: #define JOIN_NOTIFY(_jobp) \ andre@0: PR_BEGIN_MACRO \ andre@0: PR_Lock(_jobp->tpool->join_lock); \ andre@0: _jobp->join_wait = PR_FALSE; \ andre@0: PR_NotifyCondVar(_jobp->join_cv); \ andre@0: PR_Unlock(_jobp->tpool->join_lock); \ andre@0: PR_END_MACRO andre@0: andre@0: #define CANCEL_IO_JOB(jobp) \ andre@0: PR_BEGIN_MACRO \ andre@0: jobp->cancel_io = PR_FALSE; \ andre@0: jobp->on_ioq = PR_FALSE; \ andre@0: PR_REMOVE_AND_INIT_LINK(&jobp->links); \ andre@0: tp->ioq.cnt--; \ andre@0: PR_NotifyCondVar(jobp->cancel_cv); \ andre@0: PR_END_MACRO andre@0: andre@0: static void delete_job(PRJob *jobp); andre@0: static PRThreadPool * alloc_threadpool(void); andre@0: static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp); andre@0: static void notify_ioq(PRThreadPool *tp); andre@0: static void notify_timerq(PRThreadPool *tp); andre@0: andre@0: /* andre@0: * locks are acquired in the following order andre@0: * andre@0: * tp->ioq.lock,tp->timerq.lock andre@0: * | andre@0: * V andre@0: * tp->jobq->lock andre@0: */ andre@0: andre@0: /* andre@0: * worker thread function andre@0: */ andre@0: static void wstart(void *arg) andre@0: { andre@0: PRThreadPool *tp = (PRThreadPool *) arg; andre@0: PRCList *head; andre@0: andre@0: /* andre@0: * execute jobs until shutdown andre@0: */ andre@0: while (!tp->shutdown) { andre@0: PRJob *jobp; andre@0: #ifdef OPT_WINNT andre@0: BOOL rv; andre@0: DWORD unused, shutdown; andre@0: LPOVERLAPPED olp; andre@0: andre@0: PR_Lock(tp->jobq.lock); andre@0: tp->idle_threads++; andre@0: PR_Unlock(tp->jobq.lock); andre@0: rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port, andre@0: &unused, &shutdown, &olp, INFINITE); andre@0: andre@0: PR_ASSERT(rv); andre@0: if (shutdown) andre@0: break; andre@0: jobp = ((NT_notifier *) olp)->jobp; andre@0: PR_Lock(tp->jobq.lock); andre@0: tp->idle_threads--; andre@0: tp->jobq.cnt--; andre@0: PR_Unlock(tp->jobq.lock); andre@0: #else andre@0: andre@0: PR_Lock(tp->jobq.lock); andre@0: while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) { andre@0: tp->idle_threads++; andre@0: PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT); andre@0: tp->idle_threads--; andre@0: } andre@0: if (tp->shutdown) { andre@0: PR_Unlock(tp->jobq.lock); andre@0: break; andre@0: } andre@0: head = PR_LIST_HEAD(&tp->jobq.list); andre@0: /* andre@0: * remove job from queue andre@0: */ andre@0: PR_REMOVE_AND_INIT_LINK(head); andre@0: tp->jobq.cnt--; andre@0: jobp = JOB_LINKS_PTR(head); andre@0: PR_Unlock(tp->jobq.lock); andre@0: #endif andre@0: andre@0: jobp->job_func(jobp->job_arg); andre@0: if (!JOINABLE_JOB(jobp)) { andre@0: delete_job(jobp); andre@0: } else { andre@0: JOIN_NOTIFY(jobp); andre@0: } andre@0: } andre@0: PR_Lock(tp->jobq.lock); andre@0: tp->current_threads--; andre@0: PR_Unlock(tp->jobq.lock); andre@0: } andre@0: andre@0: /* andre@0: * add a job to the work queue andre@0: */ andre@0: static void andre@0: add_to_jobq(PRThreadPool *tp, PRJob *jobp) andre@0: { andre@0: /* andre@0: * add to jobq andre@0: */ andre@0: #ifdef OPT_WINNT andre@0: PR_Lock(tp->jobq.lock); andre@0: tp->jobq.cnt++; andre@0: PR_Unlock(tp->jobq.lock); andre@0: /* andre@0: * notify worker thread(s) andre@0: */ andre@0: PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0, andre@0: FALSE, &jobp->nt_notifier.overlapped); andre@0: #else andre@0: PR_Lock(tp->jobq.lock); andre@0: PR_APPEND_LINK(&jobp->links,&tp->jobq.list); andre@0: tp->jobq.cnt++; andre@0: if ((tp->idle_threads < tp->jobq.cnt) && andre@0: (tp->current_threads < tp->max_threads)) { andre@0: wthread *wthrp; andre@0: /* andre@0: * increment thread count and unlock the jobq lock andre@0: */ andre@0: tp->current_threads++; andre@0: PR_Unlock(tp->jobq.lock); andre@0: /* create new worker thread */ andre@0: wthrp = PR_NEWZAP(wthread); andre@0: if (wthrp) { andre@0: wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart, andre@0: tp, PR_PRIORITY_NORMAL, andre@0: PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize); andre@0: if (NULL == wthrp->thread) { andre@0: PR_DELETE(wthrp); /* this sets wthrp to NULL */ andre@0: } andre@0: } andre@0: PR_Lock(tp->jobq.lock); andre@0: if (NULL == wthrp) { andre@0: tp->current_threads--; andre@0: } else { andre@0: PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); andre@0: } andre@0: } andre@0: /* andre@0: * wakeup a worker thread andre@0: */ andre@0: PR_NotifyCondVar(tp->jobq.cv); andre@0: PR_Unlock(tp->jobq.lock); andre@0: #endif andre@0: } andre@0: andre@0: /* andre@0: * io worker thread function andre@0: */ andre@0: static void io_wstart(void *arg) andre@0: { andre@0: PRThreadPool *tp = (PRThreadPool *) arg; andre@0: int pollfd_cnt, pollfds_used; andre@0: int rv; andre@0: PRCList *qp, *nextqp; andre@0: PRPollDesc *pollfds; andre@0: PRJob **polljobs; andre@0: int poll_timeout; andre@0: PRIntervalTime now; andre@0: andre@0: /* andre@0: * scan io_jobq andre@0: * construct poll list andre@0: * call PR_Poll andre@0: * for all fds, for which poll returns true, move the job to andre@0: * jobq and wakeup worker thread. andre@0: */ andre@0: while (!tp->shutdown) { andre@0: PRJob *jobp; andre@0: andre@0: pollfd_cnt = tp->ioq.cnt + 10; andre@0: if (pollfd_cnt > tp->ioq.npollfds) { andre@0: andre@0: /* andre@0: * re-allocate pollfd array if the current one is not large andre@0: * enough andre@0: */ andre@0: if (NULL != tp->ioq.pollfds) andre@0: PR_Free(tp->ioq.pollfds); andre@0: tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt * andre@0: (sizeof(PRPollDesc) + sizeof(PRJob *))); andre@0: PR_ASSERT(NULL != tp->ioq.pollfds); andre@0: /* andre@0: * array of pollfds andre@0: */ andre@0: pollfds = tp->ioq.pollfds; andre@0: tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]); andre@0: /* andre@0: * parallel array of jobs andre@0: */ andre@0: polljobs = tp->ioq.polljobs; andre@0: tp->ioq.npollfds = pollfd_cnt; andre@0: } andre@0: andre@0: pollfds_used = 0; andre@0: /* andre@0: * add the notify fd; used for unblocking io thread(s) andre@0: */ andre@0: pollfds[pollfds_used].fd = tp->ioq.notify_fd; andre@0: pollfds[pollfds_used].in_flags = PR_POLL_READ; andre@0: pollfds[pollfds_used].out_flags = 0; andre@0: polljobs[pollfds_used] = NULL; andre@0: pollfds_used++; andre@0: /* andre@0: * fill in the pollfd array andre@0: */ andre@0: PR_Lock(tp->ioq.lock); andre@0: for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { andre@0: nextqp = qp->next; andre@0: jobp = JOB_LINKS_PTR(qp); andre@0: if (jobp->cancel_io) { andre@0: CANCEL_IO_JOB(jobp); andre@0: continue; andre@0: } andre@0: if (pollfds_used == (pollfd_cnt)) andre@0: break; andre@0: pollfds[pollfds_used].fd = jobp->iod->socket; andre@0: pollfds[pollfds_used].in_flags = jobp->io_poll_flags; andre@0: pollfds[pollfds_used].out_flags = 0; andre@0: polljobs[pollfds_used] = jobp; andre@0: andre@0: pollfds_used++; andre@0: } andre@0: if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) { andre@0: qp = tp->ioq.list.next; andre@0: jobp = JOB_LINKS_PTR(qp); andre@0: if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) andre@0: poll_timeout = PR_INTERVAL_NO_TIMEOUT; andre@0: else if (PR_INTERVAL_NO_WAIT == jobp->timeout) andre@0: poll_timeout = PR_INTERVAL_NO_WAIT; andre@0: else { andre@0: poll_timeout = jobp->absolute - PR_IntervalNow(); andre@0: if (poll_timeout <= 0) /* already timed out */ andre@0: poll_timeout = PR_INTERVAL_NO_WAIT; andre@0: } andre@0: } else { andre@0: poll_timeout = PR_INTERVAL_NO_TIMEOUT; andre@0: } andre@0: PR_Unlock(tp->ioq.lock); andre@0: andre@0: /* andre@0: * XXXX andre@0: * should retry if more jobs have been added to the queue? andre@0: * andre@0: */ andre@0: PR_ASSERT(pollfds_used <= pollfd_cnt); andre@0: rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout); andre@0: andre@0: if (tp->shutdown) { andre@0: break; andre@0: } andre@0: andre@0: if (rv > 0) { andre@0: /* andre@0: * at least one io event is set andre@0: */ andre@0: PRStatus rval_status; andre@0: PRInt32 index; andre@0: andre@0: PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd); andre@0: /* andre@0: * reset the pollable event, if notified andre@0: */ andre@0: if (pollfds[0].out_flags & PR_POLL_READ) { andre@0: rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd); andre@0: PR_ASSERT(PR_SUCCESS == rval_status); andre@0: } andre@0: andre@0: for(index = 1; index < (pollfds_used); index++) { andre@0: PRInt16 events = pollfds[index].in_flags; andre@0: PRInt16 revents = pollfds[index].out_flags; andre@0: jobp = polljobs[index]; andre@0: andre@0: if ((revents & PR_POLL_NVAL) || /* busted in all cases */ andre@0: (revents & PR_POLL_ERR) || andre@0: ((events & PR_POLL_WRITE) && andre@0: (revents & PR_POLL_HUP))) { /* write op & hup */ andre@0: PR_Lock(tp->ioq.lock); andre@0: if (jobp->cancel_io) { andre@0: CANCEL_IO_JOB(jobp); andre@0: PR_Unlock(tp->ioq.lock); andre@0: continue; andre@0: } andre@0: PR_REMOVE_AND_INIT_LINK(&jobp->links); andre@0: tp->ioq.cnt--; andre@0: jobp->on_ioq = PR_FALSE; andre@0: PR_Unlock(tp->ioq.lock); andre@0: andre@0: /* set error */ andre@0: if (PR_POLL_NVAL & revents) andre@0: jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR; andre@0: else if (PR_POLL_HUP & revents) andre@0: jobp->iod->error = PR_CONNECT_RESET_ERROR; andre@0: else andre@0: jobp->iod->error = PR_IO_ERROR; andre@0: andre@0: /* andre@0: * add to jobq andre@0: */ andre@0: add_to_jobq(tp, jobp); andre@0: } else if (revents) { andre@0: /* andre@0: * add to jobq andre@0: */ andre@0: PR_Lock(tp->ioq.lock); andre@0: if (jobp->cancel_io) { andre@0: CANCEL_IO_JOB(jobp); andre@0: PR_Unlock(tp->ioq.lock); andre@0: continue; andre@0: } andre@0: PR_REMOVE_AND_INIT_LINK(&jobp->links); andre@0: tp->ioq.cnt--; andre@0: jobp->on_ioq = PR_FALSE; andre@0: PR_Unlock(tp->ioq.lock); andre@0: andre@0: if (jobp->io_op == JOB_IO_CONNECT) { andre@0: if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS) andre@0: jobp->iod->error = 0; andre@0: else andre@0: jobp->iod->error = PR_GetError(); andre@0: } else andre@0: jobp->iod->error = 0; andre@0: andre@0: add_to_jobq(tp, jobp); andre@0: } andre@0: } andre@0: } andre@0: /* andre@0: * timeout processing andre@0: */ andre@0: now = PR_IntervalNow(); andre@0: PR_Lock(tp->ioq.lock); andre@0: for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { andre@0: nextqp = qp->next; andre@0: jobp = JOB_LINKS_PTR(qp); andre@0: if (jobp->cancel_io) { andre@0: CANCEL_IO_JOB(jobp); andre@0: continue; andre@0: } andre@0: if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) andre@0: break; andre@0: if ((PR_INTERVAL_NO_WAIT != jobp->timeout) && andre@0: ((PRInt32)(jobp->absolute - now) > 0)) andre@0: break; andre@0: PR_REMOVE_AND_INIT_LINK(&jobp->links); andre@0: tp->ioq.cnt--; andre@0: jobp->on_ioq = PR_FALSE; andre@0: jobp->iod->error = PR_IO_TIMEOUT_ERROR; andre@0: add_to_jobq(tp, jobp); andre@0: } andre@0: PR_Unlock(tp->ioq.lock); andre@0: } andre@0: } andre@0: andre@0: /* andre@0: * timer worker thread function andre@0: */ andre@0: static void timer_wstart(void *arg) andre@0: { andre@0: PRThreadPool *tp = (PRThreadPool *) arg; andre@0: PRCList *qp; andre@0: PRIntervalTime timeout; andre@0: PRIntervalTime now; andre@0: andre@0: /* andre@0: * call PR_WaitCondVar with minimum value of all timeouts andre@0: */ andre@0: while (!tp->shutdown) { andre@0: PRJob *jobp; andre@0: andre@0: PR_Lock(tp->timerq.lock); andre@0: if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) { andre@0: timeout = PR_INTERVAL_NO_TIMEOUT; andre@0: } else { andre@0: PRCList *qp; andre@0: andre@0: qp = tp->timerq.list.next; andre@0: jobp = JOB_LINKS_PTR(qp); andre@0: andre@0: timeout = jobp->absolute - PR_IntervalNow(); andre@0: if (timeout <= 0) andre@0: timeout = PR_INTERVAL_NO_WAIT; /* already timed out */ andre@0: } andre@0: if (PR_INTERVAL_NO_WAIT != timeout) andre@0: PR_WaitCondVar(tp->timerq.cv, timeout); andre@0: if (tp->shutdown) { andre@0: PR_Unlock(tp->timerq.lock); andre@0: break; andre@0: } andre@0: /* andre@0: * move expired-timer jobs to jobq andre@0: */ andre@0: now = PR_IntervalNow(); andre@0: while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) { andre@0: qp = tp->timerq.list.next; andre@0: jobp = JOB_LINKS_PTR(qp); andre@0: andre@0: if ((PRInt32)(jobp->absolute - now) > 0) { andre@0: break; andre@0: } andre@0: /* andre@0: * job timed out andre@0: */ andre@0: PR_REMOVE_AND_INIT_LINK(&jobp->links); andre@0: tp->timerq.cnt--; andre@0: jobp->on_timerq = PR_FALSE; andre@0: add_to_jobq(tp, jobp); andre@0: } andre@0: PR_Unlock(tp->timerq.lock); andre@0: } andre@0: } andre@0: andre@0: static void andre@0: delete_threadpool(PRThreadPool *tp) andre@0: { andre@0: if (NULL != tp) { andre@0: if (NULL != tp->shutdown_cv) andre@0: PR_DestroyCondVar(tp->shutdown_cv); andre@0: if (NULL != tp->jobq.cv) andre@0: PR_DestroyCondVar(tp->jobq.cv); andre@0: if (NULL != tp->jobq.lock) andre@0: PR_DestroyLock(tp->jobq.lock); andre@0: if (NULL != tp->join_lock) andre@0: PR_DestroyLock(tp->join_lock); andre@0: #ifdef OPT_WINNT andre@0: if (NULL != tp->jobq.nt_completion_port) andre@0: CloseHandle(tp->jobq.nt_completion_port); andre@0: #endif andre@0: /* Timer queue */ andre@0: if (NULL != tp->timerq.cv) andre@0: PR_DestroyCondVar(tp->timerq.cv); andre@0: if (NULL != tp->timerq.lock) andre@0: PR_DestroyLock(tp->timerq.lock); andre@0: andre@0: if (NULL != tp->ioq.lock) andre@0: PR_DestroyLock(tp->ioq.lock); andre@0: if (NULL != tp->ioq.pollfds) andre@0: PR_Free(tp->ioq.pollfds); andre@0: if (NULL != tp->ioq.notify_fd) andre@0: PR_DestroyPollableEvent(tp->ioq.notify_fd); andre@0: PR_Free(tp); andre@0: } andre@0: return; andre@0: } andre@0: andre@0: static PRThreadPool * andre@0: alloc_threadpool(void) andre@0: { andre@0: PRThreadPool *tp; andre@0: andre@0: tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp)); andre@0: if (NULL == tp) andre@0: goto failed; andre@0: tp->jobq.lock = PR_NewLock(); andre@0: if (NULL == tp->jobq.lock) andre@0: goto failed; andre@0: tp->jobq.cv = PR_NewCondVar(tp->jobq.lock); andre@0: if (NULL == tp->jobq.cv) andre@0: goto failed; andre@0: tp->join_lock = PR_NewLock(); andre@0: if (NULL == tp->join_lock) andre@0: goto failed; andre@0: #ifdef OPT_WINNT andre@0: tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, andre@0: NULL, 0, 0); andre@0: if (NULL == tp->jobq.nt_completion_port) andre@0: goto failed; andre@0: #endif andre@0: andre@0: tp->ioq.lock = PR_NewLock(); andre@0: if (NULL == tp->ioq.lock) andre@0: goto failed; andre@0: andre@0: /* Timer queue */ andre@0: andre@0: tp->timerq.lock = PR_NewLock(); andre@0: if (NULL == tp->timerq.lock) andre@0: goto failed; andre@0: tp->timerq.cv = PR_NewCondVar(tp->timerq.lock); andre@0: if (NULL == tp->timerq.cv) andre@0: goto failed; andre@0: andre@0: tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock); andre@0: if (NULL == tp->shutdown_cv) andre@0: goto failed; andre@0: tp->ioq.notify_fd = PR_NewPollableEvent(); andre@0: if (NULL == tp->ioq.notify_fd) andre@0: goto failed; andre@0: return tp; andre@0: failed: andre@0: delete_threadpool(tp); andre@0: PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); andre@0: return NULL; andre@0: } andre@0: andre@0: /* Create thread pool */ andre@0: PR_IMPLEMENT(PRThreadPool *) andre@0: PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads, andre@0: PRUint32 stacksize) andre@0: { andre@0: PRThreadPool *tp; andre@0: PRThread *thr; andre@0: int i; andre@0: wthread *wthrp; andre@0: andre@0: tp = alloc_threadpool(); andre@0: if (NULL == tp) andre@0: return NULL; andre@0: andre@0: tp->init_threads = initial_threads; andre@0: tp->max_threads = max_threads; andre@0: tp->stacksize = stacksize; andre@0: PR_INIT_CLIST(&tp->jobq.list); andre@0: PR_INIT_CLIST(&tp->ioq.list); andre@0: PR_INIT_CLIST(&tp->timerq.list); andre@0: PR_INIT_CLIST(&tp->jobq.wthreads); andre@0: PR_INIT_CLIST(&tp->ioq.wthreads); andre@0: PR_INIT_CLIST(&tp->timerq.wthreads); andre@0: tp->shutdown = PR_FALSE; andre@0: andre@0: PR_Lock(tp->jobq.lock); andre@0: for(i=0; i < initial_threads; ++i) { andre@0: andre@0: thr = PR_CreateThread(PR_USER_THREAD, wstart, andre@0: tp, PR_PRIORITY_NORMAL, andre@0: PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize); andre@0: PR_ASSERT(thr); andre@0: wthrp = PR_NEWZAP(wthread); andre@0: PR_ASSERT(wthrp); andre@0: wthrp->thread = thr; andre@0: PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); andre@0: } andre@0: tp->current_threads = initial_threads; andre@0: andre@0: thr = PR_CreateThread(PR_USER_THREAD, io_wstart, andre@0: tp, PR_PRIORITY_NORMAL, andre@0: PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); andre@0: PR_ASSERT(thr); andre@0: wthrp = PR_NEWZAP(wthread); andre@0: PR_ASSERT(wthrp); andre@0: wthrp->thread = thr; andre@0: PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads); andre@0: andre@0: thr = PR_CreateThread(PR_USER_THREAD, timer_wstart, andre@0: tp, PR_PRIORITY_NORMAL, andre@0: PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); andre@0: PR_ASSERT(thr); andre@0: wthrp = PR_NEWZAP(wthread); andre@0: PR_ASSERT(wthrp); andre@0: wthrp->thread = thr; andre@0: PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads); andre@0: andre@0: PR_Unlock(tp->jobq.lock); andre@0: return tp; andre@0: } andre@0: andre@0: static void andre@0: delete_job(PRJob *jobp) andre@0: { andre@0: if (NULL != jobp) { andre@0: if (NULL != jobp->join_cv) { andre@0: PR_DestroyCondVar(jobp->join_cv); andre@0: jobp->join_cv = NULL; andre@0: } andre@0: if (NULL != jobp->cancel_cv) { andre@0: PR_DestroyCondVar(jobp->cancel_cv); andre@0: jobp->cancel_cv = NULL; andre@0: } andre@0: PR_DELETE(jobp); andre@0: } andre@0: } andre@0: andre@0: static PRJob * andre@0: alloc_job(PRBool joinable, PRThreadPool *tp) andre@0: { andre@0: PRJob *jobp; andre@0: andre@0: jobp = PR_NEWZAP(PRJob); andre@0: if (NULL == jobp) andre@0: goto failed; andre@0: if (joinable) { andre@0: jobp->join_cv = PR_NewCondVar(tp->join_lock); andre@0: jobp->join_wait = PR_TRUE; andre@0: if (NULL == jobp->join_cv) andre@0: goto failed; andre@0: } else { andre@0: jobp->join_cv = NULL; andre@0: } andre@0: #ifdef OPT_WINNT andre@0: jobp->nt_notifier.jobp = jobp; andre@0: #endif andre@0: return jobp; andre@0: failed: andre@0: delete_job(jobp); andre@0: PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); andre@0: return NULL; andre@0: } andre@0: andre@0: /* queue a job */ andre@0: PR_IMPLEMENT(PRJob *) andre@0: PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable) andre@0: { andre@0: PRJob *jobp; andre@0: andre@0: jobp = alloc_job(joinable, tpool); andre@0: if (NULL == jobp) andre@0: return NULL; andre@0: andre@0: jobp->job_func = fn; andre@0: jobp->job_arg = arg; andre@0: jobp->tpool = tpool; andre@0: andre@0: add_to_jobq(tpool, jobp); andre@0: return jobp; andre@0: } andre@0: andre@0: /* queue a job, when a socket is readable or writeable */ andre@0: static PRJob * andre@0: queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, andre@0: PRBool joinable, io_op_type op) andre@0: { andre@0: PRJob *jobp; andre@0: PRIntervalTime now; andre@0: andre@0: jobp = alloc_job(joinable, tpool); andre@0: if (NULL == jobp) { andre@0: return NULL; andre@0: } andre@0: andre@0: /* andre@0: * Add a new job to io_jobq andre@0: * wakeup io worker thread andre@0: */ andre@0: andre@0: jobp->job_func = fn; andre@0: jobp->job_arg = arg; andre@0: jobp->tpool = tpool; andre@0: jobp->iod = iod; andre@0: if (JOB_IO_READ == op) { andre@0: jobp->io_op = JOB_IO_READ; andre@0: jobp->io_poll_flags = PR_POLL_READ; andre@0: } else if (JOB_IO_WRITE == op) { andre@0: jobp->io_op = JOB_IO_WRITE; andre@0: jobp->io_poll_flags = PR_POLL_WRITE; andre@0: } else if (JOB_IO_ACCEPT == op) { andre@0: jobp->io_op = JOB_IO_ACCEPT; andre@0: jobp->io_poll_flags = PR_POLL_READ; andre@0: } else if (JOB_IO_CONNECT == op) { andre@0: jobp->io_op = JOB_IO_CONNECT; andre@0: jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT; andre@0: } else { andre@0: delete_job(jobp); andre@0: PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); andre@0: return NULL; andre@0: } andre@0: andre@0: jobp->timeout = iod->timeout; andre@0: if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) || andre@0: (PR_INTERVAL_NO_WAIT == iod->timeout)) { andre@0: jobp->absolute = iod->timeout; andre@0: } else { andre@0: now = PR_IntervalNow(); andre@0: jobp->absolute = now + iod->timeout; andre@0: } andre@0: andre@0: andre@0: PR_Lock(tpool->ioq.lock); andre@0: andre@0: if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) || andre@0: (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) { andre@0: PR_APPEND_LINK(&jobp->links,&tpool->ioq.list); andre@0: } else if (PR_INTERVAL_NO_WAIT == iod->timeout) { andre@0: PR_INSERT_LINK(&jobp->links,&tpool->ioq.list); andre@0: } else { andre@0: PRCList *qp; andre@0: PRJob *tmp_jobp; andre@0: /* andre@0: * insert into the timeout-sorted ioq andre@0: */ andre@0: for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list; andre@0: qp = qp->prev) { andre@0: tmp_jobp = JOB_LINKS_PTR(qp); andre@0: if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { andre@0: break; andre@0: } andre@0: } andre@0: PR_INSERT_AFTER(&jobp->links,qp); andre@0: } andre@0: andre@0: jobp->on_ioq = PR_TRUE; andre@0: tpool->ioq.cnt++; andre@0: /* andre@0: * notify io worker thread(s) andre@0: */ andre@0: PR_Unlock(tpool->ioq.lock); andre@0: notify_ioq(tpool); andre@0: return jobp; andre@0: } andre@0: andre@0: /* queue a job, when a socket is readable */ andre@0: PR_IMPLEMENT(PRJob *) andre@0: PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, andre@0: PRBool joinable) andre@0: { andre@0: return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ)); andre@0: } andre@0: andre@0: /* queue a job, when a socket is writeable */ andre@0: PR_IMPLEMENT(PRJob *) andre@0: PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg, andre@0: PRBool joinable) andre@0: { andre@0: return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE)); andre@0: } andre@0: andre@0: andre@0: /* queue a job, when a socket has a pending connection */ andre@0: PR_IMPLEMENT(PRJob *) andre@0: PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, andre@0: void * arg, PRBool joinable) andre@0: { andre@0: return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT)); andre@0: } andre@0: andre@0: /* queue a job, when a socket can be connected */ andre@0: PR_IMPLEMENT(PRJob *) andre@0: PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod, andre@0: const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable) andre@0: { andre@0: PRStatus rv; andre@0: PRErrorCode err; andre@0: andre@0: rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT); andre@0: if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){ andre@0: /* connection pending */ andre@0: return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT)); andre@0: } else { andre@0: /* andre@0: * connection succeeded or failed; add to jobq right away andre@0: */ andre@0: if (rv == PR_FAILURE) andre@0: iod->error = err; andre@0: else andre@0: iod->error = 0; andre@0: return(PR_QueueJob(tpool, fn, arg, joinable)); andre@0: } andre@0: } andre@0: andre@0: /* queue a job, when a timer expires */ andre@0: PR_IMPLEMENT(PRJob *) andre@0: PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout, andre@0: PRJobFn fn, void * arg, PRBool joinable) andre@0: { andre@0: PRIntervalTime now; andre@0: PRJob *jobp; andre@0: andre@0: if (PR_INTERVAL_NO_TIMEOUT == timeout) { andre@0: PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); andre@0: return NULL; andre@0: } andre@0: if (PR_INTERVAL_NO_WAIT == timeout) { andre@0: /* andre@0: * no waiting; add to jobq right away andre@0: */ andre@0: return(PR_QueueJob(tpool, fn, arg, joinable)); andre@0: } andre@0: jobp = alloc_job(joinable, tpool); andre@0: if (NULL == jobp) { andre@0: return NULL; andre@0: } andre@0: andre@0: /* andre@0: * Add a new job to timer_jobq andre@0: * wakeup timer worker thread andre@0: */ andre@0: andre@0: jobp->job_func = fn; andre@0: jobp->job_arg = arg; andre@0: jobp->tpool = tpool; andre@0: jobp->timeout = timeout; andre@0: andre@0: now = PR_IntervalNow(); andre@0: jobp->absolute = now + timeout; andre@0: andre@0: andre@0: PR_Lock(tpool->timerq.lock); andre@0: jobp->on_timerq = PR_TRUE; andre@0: if (PR_CLIST_IS_EMPTY(&tpool->timerq.list)) andre@0: PR_APPEND_LINK(&jobp->links,&tpool->timerq.list); andre@0: else { andre@0: PRCList *qp; andre@0: PRJob *tmp_jobp; andre@0: /* andre@0: * insert into the sorted timer jobq andre@0: */ andre@0: for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list; andre@0: qp = qp->prev) { andre@0: tmp_jobp = JOB_LINKS_PTR(qp); andre@0: if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { andre@0: break; andre@0: } andre@0: } andre@0: PR_INSERT_AFTER(&jobp->links,qp); andre@0: } andre@0: tpool->timerq.cnt++; andre@0: /* andre@0: * notify timer worker thread(s) andre@0: */ andre@0: notify_timerq(tpool); andre@0: PR_Unlock(tpool->timerq.lock); andre@0: return jobp; andre@0: } andre@0: andre@0: static void andre@0: notify_timerq(PRThreadPool *tp) andre@0: { andre@0: /* andre@0: * wakeup the timer thread(s) andre@0: */ andre@0: PR_NotifyCondVar(tp->timerq.cv); andre@0: } andre@0: andre@0: static void andre@0: notify_ioq(PRThreadPool *tp) andre@0: { andre@0: PRStatus rval_status; andre@0: andre@0: /* andre@0: * wakeup the io thread(s) andre@0: */ andre@0: rval_status = PR_SetPollableEvent(tp->ioq.notify_fd); andre@0: PR_ASSERT(PR_SUCCESS == rval_status); andre@0: } andre@0: andre@0: /* andre@0: * cancel a job andre@0: * andre@0: * XXXX: is this needed? likely to be removed andre@0: */ andre@0: PR_IMPLEMENT(PRStatus) andre@0: PR_CancelJob(PRJob *jobp) { andre@0: andre@0: PRStatus rval = PR_FAILURE; andre@0: PRThreadPool *tp; andre@0: andre@0: if (jobp->on_timerq) { andre@0: /* andre@0: * now, check again while holding the timerq lock andre@0: */ andre@0: tp = jobp->tpool; andre@0: PR_Lock(tp->timerq.lock); andre@0: if (jobp->on_timerq) { andre@0: jobp->on_timerq = PR_FALSE; andre@0: PR_REMOVE_AND_INIT_LINK(&jobp->links); andre@0: tp->timerq.cnt--; andre@0: PR_Unlock(tp->timerq.lock); andre@0: if (!JOINABLE_JOB(jobp)) { andre@0: delete_job(jobp); andre@0: } else { andre@0: JOIN_NOTIFY(jobp); andre@0: } andre@0: rval = PR_SUCCESS; andre@0: } else andre@0: PR_Unlock(tp->timerq.lock); andre@0: } else if (jobp->on_ioq) { andre@0: /* andre@0: * now, check again while holding the ioq lock andre@0: */ andre@0: tp = jobp->tpool; andre@0: PR_Lock(tp->ioq.lock); andre@0: if (jobp->on_ioq) { andre@0: jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock); andre@0: if (NULL == jobp->cancel_cv) { andre@0: PR_Unlock(tp->ioq.lock); andre@0: PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0); andre@0: return PR_FAILURE; andre@0: } andre@0: /* andre@0: * mark job 'cancelled' and notify io thread(s) andre@0: * XXXX: andre@0: * this assumes there is only one io thread; when there andre@0: * are multiple threads, the io thread processing this job andre@0: * must be notified. andre@0: */ andre@0: jobp->cancel_io = PR_TRUE; andre@0: PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */ andre@0: notify_ioq(tp); andre@0: PR_Lock(tp->ioq.lock); andre@0: while (jobp->cancel_io) andre@0: PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT); andre@0: PR_Unlock(tp->ioq.lock); andre@0: PR_ASSERT(!jobp->on_ioq); andre@0: if (!JOINABLE_JOB(jobp)) { andre@0: delete_job(jobp); andre@0: } else { andre@0: JOIN_NOTIFY(jobp); andre@0: } andre@0: rval = PR_SUCCESS; andre@0: } else andre@0: PR_Unlock(tp->ioq.lock); andre@0: } andre@0: if (PR_FAILURE == rval) andre@0: PR_SetError(PR_INVALID_STATE_ERROR, 0); andre@0: return rval; andre@0: } andre@0: andre@0: /* join a job, wait until completion */ andre@0: PR_IMPLEMENT(PRStatus) andre@0: PR_JoinJob(PRJob *jobp) andre@0: { andre@0: if (!JOINABLE_JOB(jobp)) { andre@0: PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); andre@0: return PR_FAILURE; andre@0: } andre@0: PR_Lock(jobp->tpool->join_lock); andre@0: while(jobp->join_wait) andre@0: PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT); andre@0: PR_Unlock(jobp->tpool->join_lock); andre@0: delete_job(jobp); andre@0: return PR_SUCCESS; andre@0: } andre@0: andre@0: /* shutdown threadpool */ andre@0: PR_IMPLEMENT(PRStatus) andre@0: PR_ShutdownThreadPool(PRThreadPool *tpool) andre@0: { andre@0: PRStatus rval = PR_SUCCESS; andre@0: andre@0: PR_Lock(tpool->jobq.lock); andre@0: tpool->shutdown = PR_TRUE; andre@0: PR_NotifyAllCondVar(tpool->shutdown_cv); andre@0: PR_Unlock(tpool->jobq.lock); andre@0: andre@0: return rval; andre@0: } andre@0: andre@0: /* andre@0: * join thread pool andre@0: * wait for termination of worker threads andre@0: * reclaim threadpool resources andre@0: */ andre@0: PR_IMPLEMENT(PRStatus) andre@0: PR_JoinThreadPool(PRThreadPool *tpool) andre@0: { andre@0: PRStatus rval = PR_SUCCESS; andre@0: PRCList *head; andre@0: PRStatus rval_status; andre@0: andre@0: PR_Lock(tpool->jobq.lock); andre@0: while (!tpool->shutdown) andre@0: PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT); andre@0: andre@0: /* andre@0: * wakeup worker threads andre@0: */ andre@0: #ifdef OPT_WINNT andre@0: /* andre@0: * post shutdown notification for all threads andre@0: */ andre@0: { andre@0: int i; andre@0: for(i=0; i < tpool->current_threads; i++) { andre@0: PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0, andre@0: TRUE, NULL); andre@0: } andre@0: } andre@0: #else andre@0: PR_NotifyAllCondVar(tpool->jobq.cv); andre@0: #endif andre@0: andre@0: /* andre@0: * wakeup io thread(s) andre@0: */ andre@0: notify_ioq(tpool); andre@0: andre@0: /* andre@0: * wakeup timer thread(s) andre@0: */ andre@0: PR_Lock(tpool->timerq.lock); andre@0: notify_timerq(tpool); andre@0: PR_Unlock(tpool->timerq.lock); andre@0: andre@0: while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) { andre@0: wthread *wthrp; andre@0: andre@0: head = PR_LIST_HEAD(&tpool->jobq.wthreads); andre@0: PR_REMOVE_AND_INIT_LINK(head); andre@0: PR_Unlock(tpool->jobq.lock); andre@0: wthrp = WTHREAD_LINKS_PTR(head); andre@0: rval_status = PR_JoinThread(wthrp->thread); andre@0: PR_ASSERT(PR_SUCCESS == rval_status); andre@0: PR_DELETE(wthrp); andre@0: PR_Lock(tpool->jobq.lock); andre@0: } andre@0: PR_Unlock(tpool->jobq.lock); andre@0: while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) { andre@0: wthread *wthrp; andre@0: andre@0: head = PR_LIST_HEAD(&tpool->ioq.wthreads); andre@0: PR_REMOVE_AND_INIT_LINK(head); andre@0: wthrp = WTHREAD_LINKS_PTR(head); andre@0: rval_status = PR_JoinThread(wthrp->thread); andre@0: PR_ASSERT(PR_SUCCESS == rval_status); andre@0: PR_DELETE(wthrp); andre@0: } andre@0: andre@0: while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) { andre@0: wthread *wthrp; andre@0: andre@0: head = PR_LIST_HEAD(&tpool->timerq.wthreads); andre@0: PR_REMOVE_AND_INIT_LINK(head); andre@0: wthrp = WTHREAD_LINKS_PTR(head); andre@0: rval_status = PR_JoinThread(wthrp->thread); andre@0: PR_ASSERT(PR_SUCCESS == rval_status); andre@0: PR_DELETE(wthrp); andre@0: } andre@0: andre@0: /* andre@0: * Delete queued jobs andre@0: */ andre@0: while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) { andre@0: PRJob *jobp; andre@0: andre@0: head = PR_LIST_HEAD(&tpool->jobq.list); andre@0: PR_REMOVE_AND_INIT_LINK(head); andre@0: jobp = JOB_LINKS_PTR(head); andre@0: tpool->jobq.cnt--; andre@0: delete_job(jobp); andre@0: } andre@0: andre@0: /* delete io jobs */ andre@0: while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) { andre@0: PRJob *jobp; andre@0: andre@0: head = PR_LIST_HEAD(&tpool->ioq.list); andre@0: PR_REMOVE_AND_INIT_LINK(head); andre@0: tpool->ioq.cnt--; andre@0: jobp = JOB_LINKS_PTR(head); andre@0: delete_job(jobp); andre@0: } andre@0: andre@0: /* delete timer jobs */ andre@0: while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) { andre@0: PRJob *jobp; andre@0: andre@0: head = PR_LIST_HEAD(&tpool->timerq.list); andre@0: PR_REMOVE_AND_INIT_LINK(head); andre@0: tpool->timerq.cnt--; andre@0: jobp = JOB_LINKS_PTR(head); andre@0: delete_job(jobp); andre@0: } andre@0: andre@0: PR_ASSERT(0 == tpool->jobq.cnt); andre@0: PR_ASSERT(0 == tpool->ioq.cnt); andre@0: PR_ASSERT(0 == tpool->timerq.cnt); andre@0: andre@0: delete_threadpool(tpool); andre@0: return rval; andre@0: }