comparison nspr/pr/src/misc/prtpool.c @ 0:1e5118fa0cb1

This is NSS with a Cmake Buildsyste To compile a static NSS library for Windows we've used the Chromium-NSS fork and added a Cmake buildsystem to compile it statically for Windows. See README.chromium for chromium changes and README.trustbridge for our modifications.
author Andre Heinecke <andre.heinecke@intevation.de>
date Mon, 28 Jul 2014 10:47:06 +0200
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:1e5118fa0cb1
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5
6 #include "nspr.h"
7
8 /*
9 * Thread pools
10 * Thread pools create and manage threads to provide support for
11 * scheduling jobs onto one or more threads.
12 *
13 */
14 #ifdef OPT_WINNT
15 #include <windows.h>
16 #endif
17
18 /*
19 * worker thread
20 */
21 typedef struct wthread {
22 PRCList links;
23 PRThread *thread;
24 } wthread;
25
26 /*
27 * queue of timer jobs
28 */
29 typedef struct timer_jobq {
30 PRCList list;
31 PRLock *lock;
32 PRCondVar *cv;
33 PRInt32 cnt;
34 PRCList wthreads;
35 } timer_jobq;
36
37 /*
38 * queue of jobs
39 */
40 typedef struct tp_jobq {
41 PRCList list;
42 PRInt32 cnt;
43 PRLock *lock;
44 PRCondVar *cv;
45 PRCList wthreads;
46 #ifdef OPT_WINNT
47 HANDLE nt_completion_port;
48 #endif
49 } tp_jobq;
50
51 /*
52 * queue of IO jobs
53 */
54 typedef struct io_jobq {
55 PRCList list;
56 PRPollDesc *pollfds;
57 PRInt32 npollfds;
58 PRJob **polljobs;
59 PRLock *lock;
60 PRInt32 cnt;
61 PRFileDesc *notify_fd;
62 PRCList wthreads;
63 } io_jobq;
64
65 /*
66 * Threadpool
67 */
68 struct PRThreadPool {
69 PRInt32 init_threads;
70 PRInt32 max_threads;
71 PRInt32 current_threads;
72 PRInt32 idle_threads;
73 PRUint32 stacksize;
74 tp_jobq jobq;
75 io_jobq ioq;
76 timer_jobq timerq;
77 PRLock *join_lock; /* used with jobp->join_cv */
78 PRCondVar *shutdown_cv;
79 PRBool shutdown;
80 };
81
82 typedef enum io_op_type
83 { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type;
84
85 #ifdef OPT_WINNT
86 typedef struct NT_notifier {
87 OVERLAPPED overlapped; /* must be first */
88 PRJob *jobp;
89 } NT_notifier;
90 #endif
91
92 struct PRJob {
93 PRCList links; /* for linking jobs */
94 PRBool on_ioq; /* job on ioq */
95 PRBool on_timerq; /* job on timerq */
96 PRJobFn job_func;
97 void *job_arg;
98 PRCondVar *join_cv;
99 PRBool join_wait; /* == PR_TRUE, when waiting to join */
100 PRCondVar *cancel_cv; /* for cancelling IO jobs */
101 PRBool cancel_io; /* for cancelling IO jobs */
102 PRThreadPool *tpool; /* back pointer to thread pool */
103 PRJobIoDesc *iod;
104 io_op_type io_op;
105 PRInt16 io_poll_flags;
106 PRNetAddr *netaddr;
107 PRIntervalTime timeout; /* relative value */
108 PRIntervalTime absolute;
109 #ifdef OPT_WINNT
110 NT_notifier nt_notifier;
111 #endif
112 };
113
114 #define JOB_LINKS_PTR(_qp) \
115 ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links)))
116
117 #define WTHREAD_LINKS_PTR(_qp) \
118 ((wthread *) ((char *) (_qp) - offsetof(wthread, links)))
119
120 #define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv)
121
122 #define JOIN_NOTIFY(_jobp) \
123 PR_BEGIN_MACRO \
124 PR_Lock(_jobp->tpool->join_lock); \
125 _jobp->join_wait = PR_FALSE; \
126 PR_NotifyCondVar(_jobp->join_cv); \
127 PR_Unlock(_jobp->tpool->join_lock); \
128 PR_END_MACRO
129
130 #define CANCEL_IO_JOB(jobp) \
131 PR_BEGIN_MACRO \
132 jobp->cancel_io = PR_FALSE; \
133 jobp->on_ioq = PR_FALSE; \
134 PR_REMOVE_AND_INIT_LINK(&jobp->links); \
135 tp->ioq.cnt--; \
136 PR_NotifyCondVar(jobp->cancel_cv); \
137 PR_END_MACRO
138
139 static void delete_job(PRJob *jobp);
140 static PRThreadPool * alloc_threadpool(void);
141 static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp);
142 static void notify_ioq(PRThreadPool *tp);
143 static void notify_timerq(PRThreadPool *tp);
144
145 /*
146 * locks are acquired in the following order
147 *
148 * tp->ioq.lock,tp->timerq.lock
149 * |
150 * V
151 * tp->jobq->lock
152 */
153
154 /*
155 * worker thread function
156 */
157 static void wstart(void *arg)
158 {
159 PRThreadPool *tp = (PRThreadPool *) arg;
160 PRCList *head;
161
162 /*
163 * execute jobs until shutdown
164 */
165 while (!tp->shutdown) {
166 PRJob *jobp;
167 #ifdef OPT_WINNT
168 BOOL rv;
169 DWORD unused, shutdown;
170 LPOVERLAPPED olp;
171
172 PR_Lock(tp->jobq.lock);
173 tp->idle_threads++;
174 PR_Unlock(tp->jobq.lock);
175 rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port,
176 &unused, &shutdown, &olp, INFINITE);
177
178 PR_ASSERT(rv);
179 if (shutdown)
180 break;
181 jobp = ((NT_notifier *) olp)->jobp;
182 PR_Lock(tp->jobq.lock);
183 tp->idle_threads--;
184 tp->jobq.cnt--;
185 PR_Unlock(tp->jobq.lock);
186 #else
187
188 PR_Lock(tp->jobq.lock);
189 while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) {
190 tp->idle_threads++;
191 PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT);
192 tp->idle_threads--;
193 }
194 if (tp->shutdown) {
195 PR_Unlock(tp->jobq.lock);
196 break;
197 }
198 head = PR_LIST_HEAD(&tp->jobq.list);
199 /*
200 * remove job from queue
201 */
202 PR_REMOVE_AND_INIT_LINK(head);
203 tp->jobq.cnt--;
204 jobp = JOB_LINKS_PTR(head);
205 PR_Unlock(tp->jobq.lock);
206 #endif
207
208 jobp->job_func(jobp->job_arg);
209 if (!JOINABLE_JOB(jobp)) {
210 delete_job(jobp);
211 } else {
212 JOIN_NOTIFY(jobp);
213 }
214 }
215 PR_Lock(tp->jobq.lock);
216 tp->current_threads--;
217 PR_Unlock(tp->jobq.lock);
218 }
219
220 /*
221 * add a job to the work queue
222 */
223 static void
224 add_to_jobq(PRThreadPool *tp, PRJob *jobp)
225 {
226 /*
227 * add to jobq
228 */
229 #ifdef OPT_WINNT
230 PR_Lock(tp->jobq.lock);
231 tp->jobq.cnt++;
232 PR_Unlock(tp->jobq.lock);
233 /*
234 * notify worker thread(s)
235 */
236 PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0,
237 FALSE, &jobp->nt_notifier.overlapped);
238 #else
239 PR_Lock(tp->jobq.lock);
240 PR_APPEND_LINK(&jobp->links,&tp->jobq.list);
241 tp->jobq.cnt++;
242 if ((tp->idle_threads < tp->jobq.cnt) &&
243 (tp->current_threads < tp->max_threads)) {
244 wthread *wthrp;
245 /*
246 * increment thread count and unlock the jobq lock
247 */
248 tp->current_threads++;
249 PR_Unlock(tp->jobq.lock);
250 /* create new worker thread */
251 wthrp = PR_NEWZAP(wthread);
252 if (wthrp) {
253 wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart,
254 tp, PR_PRIORITY_NORMAL,
255 PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize);
256 if (NULL == wthrp->thread) {
257 PR_DELETE(wthrp); /* this sets wthrp to NULL */
258 }
259 }
260 PR_Lock(tp->jobq.lock);
261 if (NULL == wthrp) {
262 tp->current_threads--;
263 } else {
264 PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
265 }
266 }
267 /*
268 * wakeup a worker thread
269 */
270 PR_NotifyCondVar(tp->jobq.cv);
271 PR_Unlock(tp->jobq.lock);
272 #endif
273 }
274
275 /*
276 * io worker thread function
277 */
278 static void io_wstart(void *arg)
279 {
280 PRThreadPool *tp = (PRThreadPool *) arg;
281 int pollfd_cnt, pollfds_used;
282 int rv;
283 PRCList *qp, *nextqp;
284 PRPollDesc *pollfds;
285 PRJob **polljobs;
286 int poll_timeout;
287 PRIntervalTime now;
288
289 /*
290 * scan io_jobq
291 * construct poll list
292 * call PR_Poll
293 * for all fds, for which poll returns true, move the job to
294 * jobq and wakeup worker thread.
295 */
296 while (!tp->shutdown) {
297 PRJob *jobp;
298
299 pollfd_cnt = tp->ioq.cnt + 10;
300 if (pollfd_cnt > tp->ioq.npollfds) {
301
302 /*
303 * re-allocate pollfd array if the current one is not large
304 * enough
305 */
306 if (NULL != tp->ioq.pollfds)
307 PR_Free(tp->ioq.pollfds);
308 tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt *
309 (sizeof(PRPollDesc) + sizeof(PRJob *)));
310 PR_ASSERT(NULL != tp->ioq.pollfds);
311 /*
312 * array of pollfds
313 */
314 pollfds = tp->ioq.pollfds;
315 tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]);
316 /*
317 * parallel array of jobs
318 */
319 polljobs = tp->ioq.polljobs;
320 tp->ioq.npollfds = pollfd_cnt;
321 }
322
323 pollfds_used = 0;
324 /*
325 * add the notify fd; used for unblocking io thread(s)
326 */
327 pollfds[pollfds_used].fd = tp->ioq.notify_fd;
328 pollfds[pollfds_used].in_flags = PR_POLL_READ;
329 pollfds[pollfds_used].out_flags = 0;
330 polljobs[pollfds_used] = NULL;
331 pollfds_used++;
332 /*
333 * fill in the pollfd array
334 */
335 PR_Lock(tp->ioq.lock);
336 for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
337 nextqp = qp->next;
338 jobp = JOB_LINKS_PTR(qp);
339 if (jobp->cancel_io) {
340 CANCEL_IO_JOB(jobp);
341 continue;
342 }
343 if (pollfds_used == (pollfd_cnt))
344 break;
345 pollfds[pollfds_used].fd = jobp->iod->socket;
346 pollfds[pollfds_used].in_flags = jobp->io_poll_flags;
347 pollfds[pollfds_used].out_flags = 0;
348 polljobs[pollfds_used] = jobp;
349
350 pollfds_used++;
351 }
352 if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) {
353 qp = tp->ioq.list.next;
354 jobp = JOB_LINKS_PTR(qp);
355 if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
356 poll_timeout = PR_INTERVAL_NO_TIMEOUT;
357 else if (PR_INTERVAL_NO_WAIT == jobp->timeout)
358 poll_timeout = PR_INTERVAL_NO_WAIT;
359 else {
360 poll_timeout = jobp->absolute - PR_IntervalNow();
361 if (poll_timeout <= 0) /* already timed out */
362 poll_timeout = PR_INTERVAL_NO_WAIT;
363 }
364 } else {
365 poll_timeout = PR_INTERVAL_NO_TIMEOUT;
366 }
367 PR_Unlock(tp->ioq.lock);
368
369 /*
370 * XXXX
371 * should retry if more jobs have been added to the queue?
372 *
373 */
374 PR_ASSERT(pollfds_used <= pollfd_cnt);
375 rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout);
376
377 if (tp->shutdown) {
378 break;
379 }
380
381 if (rv > 0) {
382 /*
383 * at least one io event is set
384 */
385 PRStatus rval_status;
386 PRInt32 index;
387
388 PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd);
389 /*
390 * reset the pollable event, if notified
391 */
392 if (pollfds[0].out_flags & PR_POLL_READ) {
393 rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd);
394 PR_ASSERT(PR_SUCCESS == rval_status);
395 }
396
397 for(index = 1; index < (pollfds_used); index++) {
398 PRInt16 events = pollfds[index].in_flags;
399 PRInt16 revents = pollfds[index].out_flags;
400 jobp = polljobs[index];
401
402 if ((revents & PR_POLL_NVAL) || /* busted in all cases */
403 (revents & PR_POLL_ERR) ||
404 ((events & PR_POLL_WRITE) &&
405 (revents & PR_POLL_HUP))) { /* write op & hup */
406 PR_Lock(tp->ioq.lock);
407 if (jobp->cancel_io) {
408 CANCEL_IO_JOB(jobp);
409 PR_Unlock(tp->ioq.lock);
410 continue;
411 }
412 PR_REMOVE_AND_INIT_LINK(&jobp->links);
413 tp->ioq.cnt--;
414 jobp->on_ioq = PR_FALSE;
415 PR_Unlock(tp->ioq.lock);
416
417 /* set error */
418 if (PR_POLL_NVAL & revents)
419 jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR;
420 else if (PR_POLL_HUP & revents)
421 jobp->iod->error = PR_CONNECT_RESET_ERROR;
422 else
423 jobp->iod->error = PR_IO_ERROR;
424
425 /*
426 * add to jobq
427 */
428 add_to_jobq(tp, jobp);
429 } else if (revents) {
430 /*
431 * add to jobq
432 */
433 PR_Lock(tp->ioq.lock);
434 if (jobp->cancel_io) {
435 CANCEL_IO_JOB(jobp);
436 PR_Unlock(tp->ioq.lock);
437 continue;
438 }
439 PR_REMOVE_AND_INIT_LINK(&jobp->links);
440 tp->ioq.cnt--;
441 jobp->on_ioq = PR_FALSE;
442 PR_Unlock(tp->ioq.lock);
443
444 if (jobp->io_op == JOB_IO_CONNECT) {
445 if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS)
446 jobp->iod->error = 0;
447 else
448 jobp->iod->error = PR_GetError();
449 } else
450 jobp->iod->error = 0;
451
452 add_to_jobq(tp, jobp);
453 }
454 }
455 }
456 /*
457 * timeout processing
458 */
459 now = PR_IntervalNow();
460 PR_Lock(tp->ioq.lock);
461 for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
462 nextqp = qp->next;
463 jobp = JOB_LINKS_PTR(qp);
464 if (jobp->cancel_io) {
465 CANCEL_IO_JOB(jobp);
466 continue;
467 }
468 if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
469 break;
470 if ((PR_INTERVAL_NO_WAIT != jobp->timeout) &&
471 ((PRInt32)(jobp->absolute - now) > 0))
472 break;
473 PR_REMOVE_AND_INIT_LINK(&jobp->links);
474 tp->ioq.cnt--;
475 jobp->on_ioq = PR_FALSE;
476 jobp->iod->error = PR_IO_TIMEOUT_ERROR;
477 add_to_jobq(tp, jobp);
478 }
479 PR_Unlock(tp->ioq.lock);
480 }
481 }
482
483 /*
484 * timer worker thread function
485 */
486 static void timer_wstart(void *arg)
487 {
488 PRThreadPool *tp = (PRThreadPool *) arg;
489 PRCList *qp;
490 PRIntervalTime timeout;
491 PRIntervalTime now;
492
493 /*
494 * call PR_WaitCondVar with minimum value of all timeouts
495 */
496 while (!tp->shutdown) {
497 PRJob *jobp;
498
499 PR_Lock(tp->timerq.lock);
500 if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
501 timeout = PR_INTERVAL_NO_TIMEOUT;
502 } else {
503 PRCList *qp;
504
505 qp = tp->timerq.list.next;
506 jobp = JOB_LINKS_PTR(qp);
507
508 timeout = jobp->absolute - PR_IntervalNow();
509 if (timeout <= 0)
510 timeout = PR_INTERVAL_NO_WAIT; /* already timed out */
511 }
512 if (PR_INTERVAL_NO_WAIT != timeout)
513 PR_WaitCondVar(tp->timerq.cv, timeout);
514 if (tp->shutdown) {
515 PR_Unlock(tp->timerq.lock);
516 break;
517 }
518 /*
519 * move expired-timer jobs to jobq
520 */
521 now = PR_IntervalNow();
522 while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
523 qp = tp->timerq.list.next;
524 jobp = JOB_LINKS_PTR(qp);
525
526 if ((PRInt32)(jobp->absolute - now) > 0) {
527 break;
528 }
529 /*
530 * job timed out
531 */
532 PR_REMOVE_AND_INIT_LINK(&jobp->links);
533 tp->timerq.cnt--;
534 jobp->on_timerq = PR_FALSE;
535 add_to_jobq(tp, jobp);
536 }
537 PR_Unlock(tp->timerq.lock);
538 }
539 }
540
541 static void
542 delete_threadpool(PRThreadPool *tp)
543 {
544 if (NULL != tp) {
545 if (NULL != tp->shutdown_cv)
546 PR_DestroyCondVar(tp->shutdown_cv);
547 if (NULL != tp->jobq.cv)
548 PR_DestroyCondVar(tp->jobq.cv);
549 if (NULL != tp->jobq.lock)
550 PR_DestroyLock(tp->jobq.lock);
551 if (NULL != tp->join_lock)
552 PR_DestroyLock(tp->join_lock);
553 #ifdef OPT_WINNT
554 if (NULL != tp->jobq.nt_completion_port)
555 CloseHandle(tp->jobq.nt_completion_port);
556 #endif
557 /* Timer queue */
558 if (NULL != tp->timerq.cv)
559 PR_DestroyCondVar(tp->timerq.cv);
560 if (NULL != tp->timerq.lock)
561 PR_DestroyLock(tp->timerq.lock);
562
563 if (NULL != tp->ioq.lock)
564 PR_DestroyLock(tp->ioq.lock);
565 if (NULL != tp->ioq.pollfds)
566 PR_Free(tp->ioq.pollfds);
567 if (NULL != tp->ioq.notify_fd)
568 PR_DestroyPollableEvent(tp->ioq.notify_fd);
569 PR_Free(tp);
570 }
571 return;
572 }
573
574 static PRThreadPool *
575 alloc_threadpool(void)
576 {
577 PRThreadPool *tp;
578
579 tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp));
580 if (NULL == tp)
581 goto failed;
582 tp->jobq.lock = PR_NewLock();
583 if (NULL == tp->jobq.lock)
584 goto failed;
585 tp->jobq.cv = PR_NewCondVar(tp->jobq.lock);
586 if (NULL == tp->jobq.cv)
587 goto failed;
588 tp->join_lock = PR_NewLock();
589 if (NULL == tp->join_lock)
590 goto failed;
591 #ifdef OPT_WINNT
592 tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
593 NULL, 0, 0);
594 if (NULL == tp->jobq.nt_completion_port)
595 goto failed;
596 #endif
597
598 tp->ioq.lock = PR_NewLock();
599 if (NULL == tp->ioq.lock)
600 goto failed;
601
602 /* Timer queue */
603
604 tp->timerq.lock = PR_NewLock();
605 if (NULL == tp->timerq.lock)
606 goto failed;
607 tp->timerq.cv = PR_NewCondVar(tp->timerq.lock);
608 if (NULL == tp->timerq.cv)
609 goto failed;
610
611 tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock);
612 if (NULL == tp->shutdown_cv)
613 goto failed;
614 tp->ioq.notify_fd = PR_NewPollableEvent();
615 if (NULL == tp->ioq.notify_fd)
616 goto failed;
617 return tp;
618 failed:
619 delete_threadpool(tp);
620 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
621 return NULL;
622 }
623
624 /* Create thread pool */
625 PR_IMPLEMENT(PRThreadPool *)
626 PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads,
627 PRUint32 stacksize)
628 {
629 PRThreadPool *tp;
630 PRThread *thr;
631 int i;
632 wthread *wthrp;
633
634 tp = alloc_threadpool();
635 if (NULL == tp)
636 return NULL;
637
638 tp->init_threads = initial_threads;
639 tp->max_threads = max_threads;
640 tp->stacksize = stacksize;
641 PR_INIT_CLIST(&tp->jobq.list);
642 PR_INIT_CLIST(&tp->ioq.list);
643 PR_INIT_CLIST(&tp->timerq.list);
644 PR_INIT_CLIST(&tp->jobq.wthreads);
645 PR_INIT_CLIST(&tp->ioq.wthreads);
646 PR_INIT_CLIST(&tp->timerq.wthreads);
647 tp->shutdown = PR_FALSE;
648
649 PR_Lock(tp->jobq.lock);
650 for(i=0; i < initial_threads; ++i) {
651
652 thr = PR_CreateThread(PR_USER_THREAD, wstart,
653 tp, PR_PRIORITY_NORMAL,
654 PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize);
655 PR_ASSERT(thr);
656 wthrp = PR_NEWZAP(wthread);
657 PR_ASSERT(wthrp);
658 wthrp->thread = thr;
659 PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
660 }
661 tp->current_threads = initial_threads;
662
663 thr = PR_CreateThread(PR_USER_THREAD, io_wstart,
664 tp, PR_PRIORITY_NORMAL,
665 PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
666 PR_ASSERT(thr);
667 wthrp = PR_NEWZAP(wthread);
668 PR_ASSERT(wthrp);
669 wthrp->thread = thr;
670 PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads);
671
672 thr = PR_CreateThread(PR_USER_THREAD, timer_wstart,
673 tp, PR_PRIORITY_NORMAL,
674 PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
675 PR_ASSERT(thr);
676 wthrp = PR_NEWZAP(wthread);
677 PR_ASSERT(wthrp);
678 wthrp->thread = thr;
679 PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads);
680
681 PR_Unlock(tp->jobq.lock);
682 return tp;
683 }
684
685 static void
686 delete_job(PRJob *jobp)
687 {
688 if (NULL != jobp) {
689 if (NULL != jobp->join_cv) {
690 PR_DestroyCondVar(jobp->join_cv);
691 jobp->join_cv = NULL;
692 }
693 if (NULL != jobp->cancel_cv) {
694 PR_DestroyCondVar(jobp->cancel_cv);
695 jobp->cancel_cv = NULL;
696 }
697 PR_DELETE(jobp);
698 }
699 }
700
701 static PRJob *
702 alloc_job(PRBool joinable, PRThreadPool *tp)
703 {
704 PRJob *jobp;
705
706 jobp = PR_NEWZAP(PRJob);
707 if (NULL == jobp)
708 goto failed;
709 if (joinable) {
710 jobp->join_cv = PR_NewCondVar(tp->join_lock);
711 jobp->join_wait = PR_TRUE;
712 if (NULL == jobp->join_cv)
713 goto failed;
714 } else {
715 jobp->join_cv = NULL;
716 }
717 #ifdef OPT_WINNT
718 jobp->nt_notifier.jobp = jobp;
719 #endif
720 return jobp;
721 failed:
722 delete_job(jobp);
723 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
724 return NULL;
725 }
726
727 /* queue a job */
728 PR_IMPLEMENT(PRJob *)
729 PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable)
730 {
731 PRJob *jobp;
732
733 jobp = alloc_job(joinable, tpool);
734 if (NULL == jobp)
735 return NULL;
736
737 jobp->job_func = fn;
738 jobp->job_arg = arg;
739 jobp->tpool = tpool;
740
741 add_to_jobq(tpool, jobp);
742 return jobp;
743 }
744
745 /* queue a job, when a socket is readable or writeable */
746 static PRJob *
747 queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
748 PRBool joinable, io_op_type op)
749 {
750 PRJob *jobp;
751 PRIntervalTime now;
752
753 jobp = alloc_job(joinable, tpool);
754 if (NULL == jobp) {
755 return NULL;
756 }
757
758 /*
759 * Add a new job to io_jobq
760 * wakeup io worker thread
761 */
762
763 jobp->job_func = fn;
764 jobp->job_arg = arg;
765 jobp->tpool = tpool;
766 jobp->iod = iod;
767 if (JOB_IO_READ == op) {
768 jobp->io_op = JOB_IO_READ;
769 jobp->io_poll_flags = PR_POLL_READ;
770 } else if (JOB_IO_WRITE == op) {
771 jobp->io_op = JOB_IO_WRITE;
772 jobp->io_poll_flags = PR_POLL_WRITE;
773 } else if (JOB_IO_ACCEPT == op) {
774 jobp->io_op = JOB_IO_ACCEPT;
775 jobp->io_poll_flags = PR_POLL_READ;
776 } else if (JOB_IO_CONNECT == op) {
777 jobp->io_op = JOB_IO_CONNECT;
778 jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT;
779 } else {
780 delete_job(jobp);
781 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
782 return NULL;
783 }
784
785 jobp->timeout = iod->timeout;
786 if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) ||
787 (PR_INTERVAL_NO_WAIT == iod->timeout)) {
788 jobp->absolute = iod->timeout;
789 } else {
790 now = PR_IntervalNow();
791 jobp->absolute = now + iod->timeout;
792 }
793
794
795 PR_Lock(tpool->ioq.lock);
796
797 if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) ||
798 (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) {
799 PR_APPEND_LINK(&jobp->links,&tpool->ioq.list);
800 } else if (PR_INTERVAL_NO_WAIT == iod->timeout) {
801 PR_INSERT_LINK(&jobp->links,&tpool->ioq.list);
802 } else {
803 PRCList *qp;
804 PRJob *tmp_jobp;
805 /*
806 * insert into the timeout-sorted ioq
807 */
808 for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list;
809 qp = qp->prev) {
810 tmp_jobp = JOB_LINKS_PTR(qp);
811 if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
812 break;
813 }
814 }
815 PR_INSERT_AFTER(&jobp->links,qp);
816 }
817
818 jobp->on_ioq = PR_TRUE;
819 tpool->ioq.cnt++;
820 /*
821 * notify io worker thread(s)
822 */
823 PR_Unlock(tpool->ioq.lock);
824 notify_ioq(tpool);
825 return jobp;
826 }
827
828 /* queue a job, when a socket is readable */
829 PR_IMPLEMENT(PRJob *)
830 PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
831 PRBool joinable)
832 {
833 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ));
834 }
835
836 /* queue a job, when a socket is writeable */
837 PR_IMPLEMENT(PRJob *)
838 PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg,
839 PRBool joinable)
840 {
841 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE));
842 }
843
844
845 /* queue a job, when a socket has a pending connection */
846 PR_IMPLEMENT(PRJob *)
847 PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,
848 void * arg, PRBool joinable)
849 {
850 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT));
851 }
852
853 /* queue a job, when a socket can be connected */
854 PR_IMPLEMENT(PRJob *)
855 PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod,
856 const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable)
857 {
858 PRStatus rv;
859 PRErrorCode err;
860
861 rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT);
862 if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){
863 /* connection pending */
864 return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT));
865 } else {
866 /*
867 * connection succeeded or failed; add to jobq right away
868 */
869 if (rv == PR_FAILURE)
870 iod->error = err;
871 else
872 iod->error = 0;
873 return(PR_QueueJob(tpool, fn, arg, joinable));
874 }
875 }
876
877 /* queue a job, when a timer expires */
878 PR_IMPLEMENT(PRJob *)
879 PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout,
880 PRJobFn fn, void * arg, PRBool joinable)
881 {
882 PRIntervalTime now;
883 PRJob *jobp;
884
885 if (PR_INTERVAL_NO_TIMEOUT == timeout) {
886 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
887 return NULL;
888 }
889 if (PR_INTERVAL_NO_WAIT == timeout) {
890 /*
891 * no waiting; add to jobq right away
892 */
893 return(PR_QueueJob(tpool, fn, arg, joinable));
894 }
895 jobp = alloc_job(joinable, tpool);
896 if (NULL == jobp) {
897 return NULL;
898 }
899
900 /*
901 * Add a new job to timer_jobq
902 * wakeup timer worker thread
903 */
904
905 jobp->job_func = fn;
906 jobp->job_arg = arg;
907 jobp->tpool = tpool;
908 jobp->timeout = timeout;
909
910 now = PR_IntervalNow();
911 jobp->absolute = now + timeout;
912
913
914 PR_Lock(tpool->timerq.lock);
915 jobp->on_timerq = PR_TRUE;
916 if (PR_CLIST_IS_EMPTY(&tpool->timerq.list))
917 PR_APPEND_LINK(&jobp->links,&tpool->timerq.list);
918 else {
919 PRCList *qp;
920 PRJob *tmp_jobp;
921 /*
922 * insert into the sorted timer jobq
923 */
924 for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list;
925 qp = qp->prev) {
926 tmp_jobp = JOB_LINKS_PTR(qp);
927 if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
928 break;
929 }
930 }
931 PR_INSERT_AFTER(&jobp->links,qp);
932 }
933 tpool->timerq.cnt++;
934 /*
935 * notify timer worker thread(s)
936 */
937 notify_timerq(tpool);
938 PR_Unlock(tpool->timerq.lock);
939 return jobp;
940 }
941
942 static void
943 notify_timerq(PRThreadPool *tp)
944 {
945 /*
946 * wakeup the timer thread(s)
947 */
948 PR_NotifyCondVar(tp->timerq.cv);
949 }
950
951 static void
952 notify_ioq(PRThreadPool *tp)
953 {
954 PRStatus rval_status;
955
956 /*
957 * wakeup the io thread(s)
958 */
959 rval_status = PR_SetPollableEvent(tp->ioq.notify_fd);
960 PR_ASSERT(PR_SUCCESS == rval_status);
961 }
962
963 /*
964 * cancel a job
965 *
966 * XXXX: is this needed? likely to be removed
967 */
968 PR_IMPLEMENT(PRStatus)
969 PR_CancelJob(PRJob *jobp) {
970
971 PRStatus rval = PR_FAILURE;
972 PRThreadPool *tp;
973
974 if (jobp->on_timerq) {
975 /*
976 * now, check again while holding the timerq lock
977 */
978 tp = jobp->tpool;
979 PR_Lock(tp->timerq.lock);
980 if (jobp->on_timerq) {
981 jobp->on_timerq = PR_FALSE;
982 PR_REMOVE_AND_INIT_LINK(&jobp->links);
983 tp->timerq.cnt--;
984 PR_Unlock(tp->timerq.lock);
985 if (!JOINABLE_JOB(jobp)) {
986 delete_job(jobp);
987 } else {
988 JOIN_NOTIFY(jobp);
989 }
990 rval = PR_SUCCESS;
991 } else
992 PR_Unlock(tp->timerq.lock);
993 } else if (jobp->on_ioq) {
994 /*
995 * now, check again while holding the ioq lock
996 */
997 tp = jobp->tpool;
998 PR_Lock(tp->ioq.lock);
999 if (jobp->on_ioq) {
1000 jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock);
1001 if (NULL == jobp->cancel_cv) {
1002 PR_Unlock(tp->ioq.lock);
1003 PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0);
1004 return PR_FAILURE;
1005 }
1006 /*
1007 * mark job 'cancelled' and notify io thread(s)
1008 * XXXX:
1009 * this assumes there is only one io thread; when there
1010 * are multiple threads, the io thread processing this job
1011 * must be notified.
1012 */
1013 jobp->cancel_io = PR_TRUE;
1014 PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */
1015 notify_ioq(tp);
1016 PR_Lock(tp->ioq.lock);
1017 while (jobp->cancel_io)
1018 PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT);
1019 PR_Unlock(tp->ioq.lock);
1020 PR_ASSERT(!jobp->on_ioq);
1021 if (!JOINABLE_JOB(jobp)) {
1022 delete_job(jobp);
1023 } else {
1024 JOIN_NOTIFY(jobp);
1025 }
1026 rval = PR_SUCCESS;
1027 } else
1028 PR_Unlock(tp->ioq.lock);
1029 }
1030 if (PR_FAILURE == rval)
1031 PR_SetError(PR_INVALID_STATE_ERROR, 0);
1032 return rval;
1033 }
1034
1035 /* join a job, wait until completion */
1036 PR_IMPLEMENT(PRStatus)
1037 PR_JoinJob(PRJob *jobp)
1038 {
1039 if (!JOINABLE_JOB(jobp)) {
1040 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1041 return PR_FAILURE;
1042 }
1043 PR_Lock(jobp->tpool->join_lock);
1044 while(jobp->join_wait)
1045 PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT);
1046 PR_Unlock(jobp->tpool->join_lock);
1047 delete_job(jobp);
1048 return PR_SUCCESS;
1049 }
1050
1051 /* shutdown threadpool */
1052 PR_IMPLEMENT(PRStatus)
1053 PR_ShutdownThreadPool(PRThreadPool *tpool)
1054 {
1055 PRStatus rval = PR_SUCCESS;
1056
1057 PR_Lock(tpool->jobq.lock);
1058 tpool->shutdown = PR_TRUE;
1059 PR_NotifyAllCondVar(tpool->shutdown_cv);
1060 PR_Unlock(tpool->jobq.lock);
1061
1062 return rval;
1063 }
1064
1065 /*
1066 * join thread pool
1067 * wait for termination of worker threads
1068 * reclaim threadpool resources
1069 */
1070 PR_IMPLEMENT(PRStatus)
1071 PR_JoinThreadPool(PRThreadPool *tpool)
1072 {
1073 PRStatus rval = PR_SUCCESS;
1074 PRCList *head;
1075 PRStatus rval_status;
1076
1077 PR_Lock(tpool->jobq.lock);
1078 while (!tpool->shutdown)
1079 PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT);
1080
1081 /*
1082 * wakeup worker threads
1083 */
1084 #ifdef OPT_WINNT
1085 /*
1086 * post shutdown notification for all threads
1087 */
1088 {
1089 int i;
1090 for(i=0; i < tpool->current_threads; i++) {
1091 PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0,
1092 TRUE, NULL);
1093 }
1094 }
1095 #else
1096 PR_NotifyAllCondVar(tpool->jobq.cv);
1097 #endif
1098
1099 /*
1100 * wakeup io thread(s)
1101 */
1102 notify_ioq(tpool);
1103
1104 /*
1105 * wakeup timer thread(s)
1106 */
1107 PR_Lock(tpool->timerq.lock);
1108 notify_timerq(tpool);
1109 PR_Unlock(tpool->timerq.lock);
1110
1111 while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) {
1112 wthread *wthrp;
1113
1114 head = PR_LIST_HEAD(&tpool->jobq.wthreads);
1115 PR_REMOVE_AND_INIT_LINK(head);
1116 PR_Unlock(tpool->jobq.lock);
1117 wthrp = WTHREAD_LINKS_PTR(head);
1118 rval_status = PR_JoinThread(wthrp->thread);
1119 PR_ASSERT(PR_SUCCESS == rval_status);
1120 PR_DELETE(wthrp);
1121 PR_Lock(tpool->jobq.lock);
1122 }
1123 PR_Unlock(tpool->jobq.lock);
1124 while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) {
1125 wthread *wthrp;
1126
1127 head = PR_LIST_HEAD(&tpool->ioq.wthreads);
1128 PR_REMOVE_AND_INIT_LINK(head);
1129 wthrp = WTHREAD_LINKS_PTR(head);
1130 rval_status = PR_JoinThread(wthrp->thread);
1131 PR_ASSERT(PR_SUCCESS == rval_status);
1132 PR_DELETE(wthrp);
1133 }
1134
1135 while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) {
1136 wthread *wthrp;
1137
1138 head = PR_LIST_HEAD(&tpool->timerq.wthreads);
1139 PR_REMOVE_AND_INIT_LINK(head);
1140 wthrp = WTHREAD_LINKS_PTR(head);
1141 rval_status = PR_JoinThread(wthrp->thread);
1142 PR_ASSERT(PR_SUCCESS == rval_status);
1143 PR_DELETE(wthrp);
1144 }
1145
1146 /*
1147 * Delete queued jobs
1148 */
1149 while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) {
1150 PRJob *jobp;
1151
1152 head = PR_LIST_HEAD(&tpool->jobq.list);
1153 PR_REMOVE_AND_INIT_LINK(head);
1154 jobp = JOB_LINKS_PTR(head);
1155 tpool->jobq.cnt--;
1156 delete_job(jobp);
1157 }
1158
1159 /* delete io jobs */
1160 while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) {
1161 PRJob *jobp;
1162
1163 head = PR_LIST_HEAD(&tpool->ioq.list);
1164 PR_REMOVE_AND_INIT_LINK(head);
1165 tpool->ioq.cnt--;
1166 jobp = JOB_LINKS_PTR(head);
1167 delete_job(jobp);
1168 }
1169
1170 /* delete timer jobs */
1171 while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
1172 PRJob *jobp;
1173
1174 head = PR_LIST_HEAD(&tpool->timerq.list);
1175 PR_REMOVE_AND_INIT_LINK(head);
1176 tpool->timerq.cnt--;
1177 jobp = JOB_LINKS_PTR(head);
1178 delete_job(jobp);
1179 }
1180
1181 PR_ASSERT(0 == tpool->jobq.cnt);
1182 PR_ASSERT(0 == tpool->ioq.cnt);
1183 PR_ASSERT(0 == tpool->timerq.cnt);
1184
1185 delete_threadpool(tpool);
1186 return rval;
1187 }
This site is hosted by Intevation GmbH (Datenschutzerklärung und Impressum | Privacy Policy and Imprint)