Mercurial > trustbridge > nss-cmake-static
comparison nspr/pr/src/misc/prtpool.c @ 0:1e5118fa0cb1
This is NSS with a Cmake Buildsyste
To compile a static NSS library for Windows we've used the
Chromium-NSS fork and added a Cmake buildsystem to compile
it statically for Windows. See README.chromium for chromium
changes and README.trustbridge for our modifications.
author | Andre Heinecke <andre.heinecke@intevation.de> |
---|---|
date | Mon, 28 Jul 2014 10:47:06 +0200 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:1e5118fa0cb1 |
---|---|
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ | |
2 /* This Source Code Form is subject to the terms of the Mozilla Public | |
3 * License, v. 2.0. If a copy of the MPL was not distributed with this | |
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ | |
5 | |
6 #include "nspr.h" | |
7 | |
8 /* | |
9 * Thread pools | |
10 * Thread pools create and manage threads to provide support for | |
11 * scheduling jobs onto one or more threads. | |
12 * | |
13 */ | |
14 #ifdef OPT_WINNT | |
15 #include <windows.h> | |
16 #endif | |
17 | |
18 /* | |
19 * worker thread | |
20 */ | |
21 typedef struct wthread { | |
22 PRCList links; | |
23 PRThread *thread; | |
24 } wthread; | |
25 | |
26 /* | |
27 * queue of timer jobs | |
28 */ | |
29 typedef struct timer_jobq { | |
30 PRCList list; | |
31 PRLock *lock; | |
32 PRCondVar *cv; | |
33 PRInt32 cnt; | |
34 PRCList wthreads; | |
35 } timer_jobq; | |
36 | |
37 /* | |
38 * queue of jobs | |
39 */ | |
40 typedef struct tp_jobq { | |
41 PRCList list; | |
42 PRInt32 cnt; | |
43 PRLock *lock; | |
44 PRCondVar *cv; | |
45 PRCList wthreads; | |
46 #ifdef OPT_WINNT | |
47 HANDLE nt_completion_port; | |
48 #endif | |
49 } tp_jobq; | |
50 | |
51 /* | |
52 * queue of IO jobs | |
53 */ | |
54 typedef struct io_jobq { | |
55 PRCList list; | |
56 PRPollDesc *pollfds; | |
57 PRInt32 npollfds; | |
58 PRJob **polljobs; | |
59 PRLock *lock; | |
60 PRInt32 cnt; | |
61 PRFileDesc *notify_fd; | |
62 PRCList wthreads; | |
63 } io_jobq; | |
64 | |
65 /* | |
66 * Threadpool | |
67 */ | |
68 struct PRThreadPool { | |
69 PRInt32 init_threads; | |
70 PRInt32 max_threads; | |
71 PRInt32 current_threads; | |
72 PRInt32 idle_threads; | |
73 PRUint32 stacksize; | |
74 tp_jobq jobq; | |
75 io_jobq ioq; | |
76 timer_jobq timerq; | |
77 PRLock *join_lock; /* used with jobp->join_cv */ | |
78 PRCondVar *shutdown_cv; | |
79 PRBool shutdown; | |
80 }; | |
81 | |
82 typedef enum io_op_type | |
83 { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type; | |
84 | |
85 #ifdef OPT_WINNT | |
86 typedef struct NT_notifier { | |
87 OVERLAPPED overlapped; /* must be first */ | |
88 PRJob *jobp; | |
89 } NT_notifier; | |
90 #endif | |
91 | |
92 struct PRJob { | |
93 PRCList links; /* for linking jobs */ | |
94 PRBool on_ioq; /* job on ioq */ | |
95 PRBool on_timerq; /* job on timerq */ | |
96 PRJobFn job_func; | |
97 void *job_arg; | |
98 PRCondVar *join_cv; | |
99 PRBool join_wait; /* == PR_TRUE, when waiting to join */ | |
100 PRCondVar *cancel_cv; /* for cancelling IO jobs */ | |
101 PRBool cancel_io; /* for cancelling IO jobs */ | |
102 PRThreadPool *tpool; /* back pointer to thread pool */ | |
103 PRJobIoDesc *iod; | |
104 io_op_type io_op; | |
105 PRInt16 io_poll_flags; | |
106 PRNetAddr *netaddr; | |
107 PRIntervalTime timeout; /* relative value */ | |
108 PRIntervalTime absolute; | |
109 #ifdef OPT_WINNT | |
110 NT_notifier nt_notifier; | |
111 #endif | |
112 }; | |
113 | |
114 #define JOB_LINKS_PTR(_qp) \ | |
115 ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links))) | |
116 | |
117 #define WTHREAD_LINKS_PTR(_qp) \ | |
118 ((wthread *) ((char *) (_qp) - offsetof(wthread, links))) | |
119 | |
120 #define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv) | |
121 | |
122 #define JOIN_NOTIFY(_jobp) \ | |
123 PR_BEGIN_MACRO \ | |
124 PR_Lock(_jobp->tpool->join_lock); \ | |
125 _jobp->join_wait = PR_FALSE; \ | |
126 PR_NotifyCondVar(_jobp->join_cv); \ | |
127 PR_Unlock(_jobp->tpool->join_lock); \ | |
128 PR_END_MACRO | |
129 | |
130 #define CANCEL_IO_JOB(jobp) \ | |
131 PR_BEGIN_MACRO \ | |
132 jobp->cancel_io = PR_FALSE; \ | |
133 jobp->on_ioq = PR_FALSE; \ | |
134 PR_REMOVE_AND_INIT_LINK(&jobp->links); \ | |
135 tp->ioq.cnt--; \ | |
136 PR_NotifyCondVar(jobp->cancel_cv); \ | |
137 PR_END_MACRO | |
138 | |
139 static void delete_job(PRJob *jobp); | |
140 static PRThreadPool * alloc_threadpool(void); | |
141 static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp); | |
142 static void notify_ioq(PRThreadPool *tp); | |
143 static void notify_timerq(PRThreadPool *tp); | |
144 | |
145 /* | |
146 * locks are acquired in the following order | |
147 * | |
148 * tp->ioq.lock,tp->timerq.lock | |
149 * | | |
150 * V | |
151 * tp->jobq->lock | |
152 */ | |
153 | |
154 /* | |
155 * worker thread function | |
156 */ | |
157 static void wstart(void *arg) | |
158 { | |
159 PRThreadPool *tp = (PRThreadPool *) arg; | |
160 PRCList *head; | |
161 | |
162 /* | |
163 * execute jobs until shutdown | |
164 */ | |
165 while (!tp->shutdown) { | |
166 PRJob *jobp; | |
167 #ifdef OPT_WINNT | |
168 BOOL rv; | |
169 DWORD unused, shutdown; | |
170 LPOVERLAPPED olp; | |
171 | |
172 PR_Lock(tp->jobq.lock); | |
173 tp->idle_threads++; | |
174 PR_Unlock(tp->jobq.lock); | |
175 rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port, | |
176 &unused, &shutdown, &olp, INFINITE); | |
177 | |
178 PR_ASSERT(rv); | |
179 if (shutdown) | |
180 break; | |
181 jobp = ((NT_notifier *) olp)->jobp; | |
182 PR_Lock(tp->jobq.lock); | |
183 tp->idle_threads--; | |
184 tp->jobq.cnt--; | |
185 PR_Unlock(tp->jobq.lock); | |
186 #else | |
187 | |
188 PR_Lock(tp->jobq.lock); | |
189 while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) { | |
190 tp->idle_threads++; | |
191 PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT); | |
192 tp->idle_threads--; | |
193 } | |
194 if (tp->shutdown) { | |
195 PR_Unlock(tp->jobq.lock); | |
196 break; | |
197 } | |
198 head = PR_LIST_HEAD(&tp->jobq.list); | |
199 /* | |
200 * remove job from queue | |
201 */ | |
202 PR_REMOVE_AND_INIT_LINK(head); | |
203 tp->jobq.cnt--; | |
204 jobp = JOB_LINKS_PTR(head); | |
205 PR_Unlock(tp->jobq.lock); | |
206 #endif | |
207 | |
208 jobp->job_func(jobp->job_arg); | |
209 if (!JOINABLE_JOB(jobp)) { | |
210 delete_job(jobp); | |
211 } else { | |
212 JOIN_NOTIFY(jobp); | |
213 } | |
214 } | |
215 PR_Lock(tp->jobq.lock); | |
216 tp->current_threads--; | |
217 PR_Unlock(tp->jobq.lock); | |
218 } | |
219 | |
220 /* | |
221 * add a job to the work queue | |
222 */ | |
223 static void | |
224 add_to_jobq(PRThreadPool *tp, PRJob *jobp) | |
225 { | |
226 /* | |
227 * add to jobq | |
228 */ | |
229 #ifdef OPT_WINNT | |
230 PR_Lock(tp->jobq.lock); | |
231 tp->jobq.cnt++; | |
232 PR_Unlock(tp->jobq.lock); | |
233 /* | |
234 * notify worker thread(s) | |
235 */ | |
236 PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0, | |
237 FALSE, &jobp->nt_notifier.overlapped); | |
238 #else | |
239 PR_Lock(tp->jobq.lock); | |
240 PR_APPEND_LINK(&jobp->links,&tp->jobq.list); | |
241 tp->jobq.cnt++; | |
242 if ((tp->idle_threads < tp->jobq.cnt) && | |
243 (tp->current_threads < tp->max_threads)) { | |
244 wthread *wthrp; | |
245 /* | |
246 * increment thread count and unlock the jobq lock | |
247 */ | |
248 tp->current_threads++; | |
249 PR_Unlock(tp->jobq.lock); | |
250 /* create new worker thread */ | |
251 wthrp = PR_NEWZAP(wthread); | |
252 if (wthrp) { | |
253 wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart, | |
254 tp, PR_PRIORITY_NORMAL, | |
255 PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize); | |
256 if (NULL == wthrp->thread) { | |
257 PR_DELETE(wthrp); /* this sets wthrp to NULL */ | |
258 } | |
259 } | |
260 PR_Lock(tp->jobq.lock); | |
261 if (NULL == wthrp) { | |
262 tp->current_threads--; | |
263 } else { | |
264 PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); | |
265 } | |
266 } | |
267 /* | |
268 * wakeup a worker thread | |
269 */ | |
270 PR_NotifyCondVar(tp->jobq.cv); | |
271 PR_Unlock(tp->jobq.lock); | |
272 #endif | |
273 } | |
274 | |
275 /* | |
276 * io worker thread function | |
277 */ | |
278 static void io_wstart(void *arg) | |
279 { | |
280 PRThreadPool *tp = (PRThreadPool *) arg; | |
281 int pollfd_cnt, pollfds_used; | |
282 int rv; | |
283 PRCList *qp, *nextqp; | |
284 PRPollDesc *pollfds; | |
285 PRJob **polljobs; | |
286 int poll_timeout; | |
287 PRIntervalTime now; | |
288 | |
289 /* | |
290 * scan io_jobq | |
291 * construct poll list | |
292 * call PR_Poll | |
293 * for all fds, for which poll returns true, move the job to | |
294 * jobq and wakeup worker thread. | |
295 */ | |
296 while (!tp->shutdown) { | |
297 PRJob *jobp; | |
298 | |
299 pollfd_cnt = tp->ioq.cnt + 10; | |
300 if (pollfd_cnt > tp->ioq.npollfds) { | |
301 | |
302 /* | |
303 * re-allocate pollfd array if the current one is not large | |
304 * enough | |
305 */ | |
306 if (NULL != tp->ioq.pollfds) | |
307 PR_Free(tp->ioq.pollfds); | |
308 tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt * | |
309 (sizeof(PRPollDesc) + sizeof(PRJob *))); | |
310 PR_ASSERT(NULL != tp->ioq.pollfds); | |
311 /* | |
312 * array of pollfds | |
313 */ | |
314 pollfds = tp->ioq.pollfds; | |
315 tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]); | |
316 /* | |
317 * parallel array of jobs | |
318 */ | |
319 polljobs = tp->ioq.polljobs; | |
320 tp->ioq.npollfds = pollfd_cnt; | |
321 } | |
322 | |
323 pollfds_used = 0; | |
324 /* | |
325 * add the notify fd; used for unblocking io thread(s) | |
326 */ | |
327 pollfds[pollfds_used].fd = tp->ioq.notify_fd; | |
328 pollfds[pollfds_used].in_flags = PR_POLL_READ; | |
329 pollfds[pollfds_used].out_flags = 0; | |
330 polljobs[pollfds_used] = NULL; | |
331 pollfds_used++; | |
332 /* | |
333 * fill in the pollfd array | |
334 */ | |
335 PR_Lock(tp->ioq.lock); | |
336 for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { | |
337 nextqp = qp->next; | |
338 jobp = JOB_LINKS_PTR(qp); | |
339 if (jobp->cancel_io) { | |
340 CANCEL_IO_JOB(jobp); | |
341 continue; | |
342 } | |
343 if (pollfds_used == (pollfd_cnt)) | |
344 break; | |
345 pollfds[pollfds_used].fd = jobp->iod->socket; | |
346 pollfds[pollfds_used].in_flags = jobp->io_poll_flags; | |
347 pollfds[pollfds_used].out_flags = 0; | |
348 polljobs[pollfds_used] = jobp; | |
349 | |
350 pollfds_used++; | |
351 } | |
352 if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) { | |
353 qp = tp->ioq.list.next; | |
354 jobp = JOB_LINKS_PTR(qp); | |
355 if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) | |
356 poll_timeout = PR_INTERVAL_NO_TIMEOUT; | |
357 else if (PR_INTERVAL_NO_WAIT == jobp->timeout) | |
358 poll_timeout = PR_INTERVAL_NO_WAIT; | |
359 else { | |
360 poll_timeout = jobp->absolute - PR_IntervalNow(); | |
361 if (poll_timeout <= 0) /* already timed out */ | |
362 poll_timeout = PR_INTERVAL_NO_WAIT; | |
363 } | |
364 } else { | |
365 poll_timeout = PR_INTERVAL_NO_TIMEOUT; | |
366 } | |
367 PR_Unlock(tp->ioq.lock); | |
368 | |
369 /* | |
370 * XXXX | |
371 * should retry if more jobs have been added to the queue? | |
372 * | |
373 */ | |
374 PR_ASSERT(pollfds_used <= pollfd_cnt); | |
375 rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout); | |
376 | |
377 if (tp->shutdown) { | |
378 break; | |
379 } | |
380 | |
381 if (rv > 0) { | |
382 /* | |
383 * at least one io event is set | |
384 */ | |
385 PRStatus rval_status; | |
386 PRInt32 index; | |
387 | |
388 PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd); | |
389 /* | |
390 * reset the pollable event, if notified | |
391 */ | |
392 if (pollfds[0].out_flags & PR_POLL_READ) { | |
393 rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd); | |
394 PR_ASSERT(PR_SUCCESS == rval_status); | |
395 } | |
396 | |
397 for(index = 1; index < (pollfds_used); index++) { | |
398 PRInt16 events = pollfds[index].in_flags; | |
399 PRInt16 revents = pollfds[index].out_flags; | |
400 jobp = polljobs[index]; | |
401 | |
402 if ((revents & PR_POLL_NVAL) || /* busted in all cases */ | |
403 (revents & PR_POLL_ERR) || | |
404 ((events & PR_POLL_WRITE) && | |
405 (revents & PR_POLL_HUP))) { /* write op & hup */ | |
406 PR_Lock(tp->ioq.lock); | |
407 if (jobp->cancel_io) { | |
408 CANCEL_IO_JOB(jobp); | |
409 PR_Unlock(tp->ioq.lock); | |
410 continue; | |
411 } | |
412 PR_REMOVE_AND_INIT_LINK(&jobp->links); | |
413 tp->ioq.cnt--; | |
414 jobp->on_ioq = PR_FALSE; | |
415 PR_Unlock(tp->ioq.lock); | |
416 | |
417 /* set error */ | |
418 if (PR_POLL_NVAL & revents) | |
419 jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR; | |
420 else if (PR_POLL_HUP & revents) | |
421 jobp->iod->error = PR_CONNECT_RESET_ERROR; | |
422 else | |
423 jobp->iod->error = PR_IO_ERROR; | |
424 | |
425 /* | |
426 * add to jobq | |
427 */ | |
428 add_to_jobq(tp, jobp); | |
429 } else if (revents) { | |
430 /* | |
431 * add to jobq | |
432 */ | |
433 PR_Lock(tp->ioq.lock); | |
434 if (jobp->cancel_io) { | |
435 CANCEL_IO_JOB(jobp); | |
436 PR_Unlock(tp->ioq.lock); | |
437 continue; | |
438 } | |
439 PR_REMOVE_AND_INIT_LINK(&jobp->links); | |
440 tp->ioq.cnt--; | |
441 jobp->on_ioq = PR_FALSE; | |
442 PR_Unlock(tp->ioq.lock); | |
443 | |
444 if (jobp->io_op == JOB_IO_CONNECT) { | |
445 if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS) | |
446 jobp->iod->error = 0; | |
447 else | |
448 jobp->iod->error = PR_GetError(); | |
449 } else | |
450 jobp->iod->error = 0; | |
451 | |
452 add_to_jobq(tp, jobp); | |
453 } | |
454 } | |
455 } | |
456 /* | |
457 * timeout processing | |
458 */ | |
459 now = PR_IntervalNow(); | |
460 PR_Lock(tp->ioq.lock); | |
461 for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { | |
462 nextqp = qp->next; | |
463 jobp = JOB_LINKS_PTR(qp); | |
464 if (jobp->cancel_io) { | |
465 CANCEL_IO_JOB(jobp); | |
466 continue; | |
467 } | |
468 if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) | |
469 break; | |
470 if ((PR_INTERVAL_NO_WAIT != jobp->timeout) && | |
471 ((PRInt32)(jobp->absolute - now) > 0)) | |
472 break; | |
473 PR_REMOVE_AND_INIT_LINK(&jobp->links); | |
474 tp->ioq.cnt--; | |
475 jobp->on_ioq = PR_FALSE; | |
476 jobp->iod->error = PR_IO_TIMEOUT_ERROR; | |
477 add_to_jobq(tp, jobp); | |
478 } | |
479 PR_Unlock(tp->ioq.lock); | |
480 } | |
481 } | |
482 | |
483 /* | |
484 * timer worker thread function | |
485 */ | |
486 static void timer_wstart(void *arg) | |
487 { | |
488 PRThreadPool *tp = (PRThreadPool *) arg; | |
489 PRCList *qp; | |
490 PRIntervalTime timeout; | |
491 PRIntervalTime now; | |
492 | |
493 /* | |
494 * call PR_WaitCondVar with minimum value of all timeouts | |
495 */ | |
496 while (!tp->shutdown) { | |
497 PRJob *jobp; | |
498 | |
499 PR_Lock(tp->timerq.lock); | |
500 if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) { | |
501 timeout = PR_INTERVAL_NO_TIMEOUT; | |
502 } else { | |
503 PRCList *qp; | |
504 | |
505 qp = tp->timerq.list.next; | |
506 jobp = JOB_LINKS_PTR(qp); | |
507 | |
508 timeout = jobp->absolute - PR_IntervalNow(); | |
509 if (timeout <= 0) | |
510 timeout = PR_INTERVAL_NO_WAIT; /* already timed out */ | |
511 } | |
512 if (PR_INTERVAL_NO_WAIT != timeout) | |
513 PR_WaitCondVar(tp->timerq.cv, timeout); | |
514 if (tp->shutdown) { | |
515 PR_Unlock(tp->timerq.lock); | |
516 break; | |
517 } | |
518 /* | |
519 * move expired-timer jobs to jobq | |
520 */ | |
521 now = PR_IntervalNow(); | |
522 while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) { | |
523 qp = tp->timerq.list.next; | |
524 jobp = JOB_LINKS_PTR(qp); | |
525 | |
526 if ((PRInt32)(jobp->absolute - now) > 0) { | |
527 break; | |
528 } | |
529 /* | |
530 * job timed out | |
531 */ | |
532 PR_REMOVE_AND_INIT_LINK(&jobp->links); | |
533 tp->timerq.cnt--; | |
534 jobp->on_timerq = PR_FALSE; | |
535 add_to_jobq(tp, jobp); | |
536 } | |
537 PR_Unlock(tp->timerq.lock); | |
538 } | |
539 } | |
540 | |
541 static void | |
542 delete_threadpool(PRThreadPool *tp) | |
543 { | |
544 if (NULL != tp) { | |
545 if (NULL != tp->shutdown_cv) | |
546 PR_DestroyCondVar(tp->shutdown_cv); | |
547 if (NULL != tp->jobq.cv) | |
548 PR_DestroyCondVar(tp->jobq.cv); | |
549 if (NULL != tp->jobq.lock) | |
550 PR_DestroyLock(tp->jobq.lock); | |
551 if (NULL != tp->join_lock) | |
552 PR_DestroyLock(tp->join_lock); | |
553 #ifdef OPT_WINNT | |
554 if (NULL != tp->jobq.nt_completion_port) | |
555 CloseHandle(tp->jobq.nt_completion_port); | |
556 #endif | |
557 /* Timer queue */ | |
558 if (NULL != tp->timerq.cv) | |
559 PR_DestroyCondVar(tp->timerq.cv); | |
560 if (NULL != tp->timerq.lock) | |
561 PR_DestroyLock(tp->timerq.lock); | |
562 | |
563 if (NULL != tp->ioq.lock) | |
564 PR_DestroyLock(tp->ioq.lock); | |
565 if (NULL != tp->ioq.pollfds) | |
566 PR_Free(tp->ioq.pollfds); | |
567 if (NULL != tp->ioq.notify_fd) | |
568 PR_DestroyPollableEvent(tp->ioq.notify_fd); | |
569 PR_Free(tp); | |
570 } | |
571 return; | |
572 } | |
573 | |
574 static PRThreadPool * | |
575 alloc_threadpool(void) | |
576 { | |
577 PRThreadPool *tp; | |
578 | |
579 tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp)); | |
580 if (NULL == tp) | |
581 goto failed; | |
582 tp->jobq.lock = PR_NewLock(); | |
583 if (NULL == tp->jobq.lock) | |
584 goto failed; | |
585 tp->jobq.cv = PR_NewCondVar(tp->jobq.lock); | |
586 if (NULL == tp->jobq.cv) | |
587 goto failed; | |
588 tp->join_lock = PR_NewLock(); | |
589 if (NULL == tp->join_lock) | |
590 goto failed; | |
591 #ifdef OPT_WINNT | |
592 tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, | |
593 NULL, 0, 0); | |
594 if (NULL == tp->jobq.nt_completion_port) | |
595 goto failed; | |
596 #endif | |
597 | |
598 tp->ioq.lock = PR_NewLock(); | |
599 if (NULL == tp->ioq.lock) | |
600 goto failed; | |
601 | |
602 /* Timer queue */ | |
603 | |
604 tp->timerq.lock = PR_NewLock(); | |
605 if (NULL == tp->timerq.lock) | |
606 goto failed; | |
607 tp->timerq.cv = PR_NewCondVar(tp->timerq.lock); | |
608 if (NULL == tp->timerq.cv) | |
609 goto failed; | |
610 | |
611 tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock); | |
612 if (NULL == tp->shutdown_cv) | |
613 goto failed; | |
614 tp->ioq.notify_fd = PR_NewPollableEvent(); | |
615 if (NULL == tp->ioq.notify_fd) | |
616 goto failed; | |
617 return tp; | |
618 failed: | |
619 delete_threadpool(tp); | |
620 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); | |
621 return NULL; | |
622 } | |
623 | |
624 /* Create thread pool */ | |
625 PR_IMPLEMENT(PRThreadPool *) | |
626 PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads, | |
627 PRUint32 stacksize) | |
628 { | |
629 PRThreadPool *tp; | |
630 PRThread *thr; | |
631 int i; | |
632 wthread *wthrp; | |
633 | |
634 tp = alloc_threadpool(); | |
635 if (NULL == tp) | |
636 return NULL; | |
637 | |
638 tp->init_threads = initial_threads; | |
639 tp->max_threads = max_threads; | |
640 tp->stacksize = stacksize; | |
641 PR_INIT_CLIST(&tp->jobq.list); | |
642 PR_INIT_CLIST(&tp->ioq.list); | |
643 PR_INIT_CLIST(&tp->timerq.list); | |
644 PR_INIT_CLIST(&tp->jobq.wthreads); | |
645 PR_INIT_CLIST(&tp->ioq.wthreads); | |
646 PR_INIT_CLIST(&tp->timerq.wthreads); | |
647 tp->shutdown = PR_FALSE; | |
648 | |
649 PR_Lock(tp->jobq.lock); | |
650 for(i=0; i < initial_threads; ++i) { | |
651 | |
652 thr = PR_CreateThread(PR_USER_THREAD, wstart, | |
653 tp, PR_PRIORITY_NORMAL, | |
654 PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize); | |
655 PR_ASSERT(thr); | |
656 wthrp = PR_NEWZAP(wthread); | |
657 PR_ASSERT(wthrp); | |
658 wthrp->thread = thr; | |
659 PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); | |
660 } | |
661 tp->current_threads = initial_threads; | |
662 | |
663 thr = PR_CreateThread(PR_USER_THREAD, io_wstart, | |
664 tp, PR_PRIORITY_NORMAL, | |
665 PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); | |
666 PR_ASSERT(thr); | |
667 wthrp = PR_NEWZAP(wthread); | |
668 PR_ASSERT(wthrp); | |
669 wthrp->thread = thr; | |
670 PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads); | |
671 | |
672 thr = PR_CreateThread(PR_USER_THREAD, timer_wstart, | |
673 tp, PR_PRIORITY_NORMAL, | |
674 PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); | |
675 PR_ASSERT(thr); | |
676 wthrp = PR_NEWZAP(wthread); | |
677 PR_ASSERT(wthrp); | |
678 wthrp->thread = thr; | |
679 PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads); | |
680 | |
681 PR_Unlock(tp->jobq.lock); | |
682 return tp; | |
683 } | |
684 | |
685 static void | |
686 delete_job(PRJob *jobp) | |
687 { | |
688 if (NULL != jobp) { | |
689 if (NULL != jobp->join_cv) { | |
690 PR_DestroyCondVar(jobp->join_cv); | |
691 jobp->join_cv = NULL; | |
692 } | |
693 if (NULL != jobp->cancel_cv) { | |
694 PR_DestroyCondVar(jobp->cancel_cv); | |
695 jobp->cancel_cv = NULL; | |
696 } | |
697 PR_DELETE(jobp); | |
698 } | |
699 } | |
700 | |
701 static PRJob * | |
702 alloc_job(PRBool joinable, PRThreadPool *tp) | |
703 { | |
704 PRJob *jobp; | |
705 | |
706 jobp = PR_NEWZAP(PRJob); | |
707 if (NULL == jobp) | |
708 goto failed; | |
709 if (joinable) { | |
710 jobp->join_cv = PR_NewCondVar(tp->join_lock); | |
711 jobp->join_wait = PR_TRUE; | |
712 if (NULL == jobp->join_cv) | |
713 goto failed; | |
714 } else { | |
715 jobp->join_cv = NULL; | |
716 } | |
717 #ifdef OPT_WINNT | |
718 jobp->nt_notifier.jobp = jobp; | |
719 #endif | |
720 return jobp; | |
721 failed: | |
722 delete_job(jobp); | |
723 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); | |
724 return NULL; | |
725 } | |
726 | |
727 /* queue a job */ | |
728 PR_IMPLEMENT(PRJob *) | |
729 PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable) | |
730 { | |
731 PRJob *jobp; | |
732 | |
733 jobp = alloc_job(joinable, tpool); | |
734 if (NULL == jobp) | |
735 return NULL; | |
736 | |
737 jobp->job_func = fn; | |
738 jobp->job_arg = arg; | |
739 jobp->tpool = tpool; | |
740 | |
741 add_to_jobq(tpool, jobp); | |
742 return jobp; | |
743 } | |
744 | |
745 /* queue a job, when a socket is readable or writeable */ | |
746 static PRJob * | |
747 queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, | |
748 PRBool joinable, io_op_type op) | |
749 { | |
750 PRJob *jobp; | |
751 PRIntervalTime now; | |
752 | |
753 jobp = alloc_job(joinable, tpool); | |
754 if (NULL == jobp) { | |
755 return NULL; | |
756 } | |
757 | |
758 /* | |
759 * Add a new job to io_jobq | |
760 * wakeup io worker thread | |
761 */ | |
762 | |
763 jobp->job_func = fn; | |
764 jobp->job_arg = arg; | |
765 jobp->tpool = tpool; | |
766 jobp->iod = iod; | |
767 if (JOB_IO_READ == op) { | |
768 jobp->io_op = JOB_IO_READ; | |
769 jobp->io_poll_flags = PR_POLL_READ; | |
770 } else if (JOB_IO_WRITE == op) { | |
771 jobp->io_op = JOB_IO_WRITE; | |
772 jobp->io_poll_flags = PR_POLL_WRITE; | |
773 } else if (JOB_IO_ACCEPT == op) { | |
774 jobp->io_op = JOB_IO_ACCEPT; | |
775 jobp->io_poll_flags = PR_POLL_READ; | |
776 } else if (JOB_IO_CONNECT == op) { | |
777 jobp->io_op = JOB_IO_CONNECT; | |
778 jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT; | |
779 } else { | |
780 delete_job(jobp); | |
781 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); | |
782 return NULL; | |
783 } | |
784 | |
785 jobp->timeout = iod->timeout; | |
786 if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) || | |
787 (PR_INTERVAL_NO_WAIT == iod->timeout)) { | |
788 jobp->absolute = iod->timeout; | |
789 } else { | |
790 now = PR_IntervalNow(); | |
791 jobp->absolute = now + iod->timeout; | |
792 } | |
793 | |
794 | |
795 PR_Lock(tpool->ioq.lock); | |
796 | |
797 if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) || | |
798 (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) { | |
799 PR_APPEND_LINK(&jobp->links,&tpool->ioq.list); | |
800 } else if (PR_INTERVAL_NO_WAIT == iod->timeout) { | |
801 PR_INSERT_LINK(&jobp->links,&tpool->ioq.list); | |
802 } else { | |
803 PRCList *qp; | |
804 PRJob *tmp_jobp; | |
805 /* | |
806 * insert into the timeout-sorted ioq | |
807 */ | |
808 for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list; | |
809 qp = qp->prev) { | |
810 tmp_jobp = JOB_LINKS_PTR(qp); | |
811 if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { | |
812 break; | |
813 } | |
814 } | |
815 PR_INSERT_AFTER(&jobp->links,qp); | |
816 } | |
817 | |
818 jobp->on_ioq = PR_TRUE; | |
819 tpool->ioq.cnt++; | |
820 /* | |
821 * notify io worker thread(s) | |
822 */ | |
823 PR_Unlock(tpool->ioq.lock); | |
824 notify_ioq(tpool); | |
825 return jobp; | |
826 } | |
827 | |
828 /* queue a job, when a socket is readable */ | |
829 PR_IMPLEMENT(PRJob *) | |
830 PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, | |
831 PRBool joinable) | |
832 { | |
833 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ)); | |
834 } | |
835 | |
836 /* queue a job, when a socket is writeable */ | |
837 PR_IMPLEMENT(PRJob *) | |
838 PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg, | |
839 PRBool joinable) | |
840 { | |
841 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE)); | |
842 } | |
843 | |
844 | |
845 /* queue a job, when a socket has a pending connection */ | |
846 PR_IMPLEMENT(PRJob *) | |
847 PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, | |
848 void * arg, PRBool joinable) | |
849 { | |
850 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT)); | |
851 } | |
852 | |
853 /* queue a job, when a socket can be connected */ | |
854 PR_IMPLEMENT(PRJob *) | |
855 PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod, | |
856 const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable) | |
857 { | |
858 PRStatus rv; | |
859 PRErrorCode err; | |
860 | |
861 rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT); | |
862 if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){ | |
863 /* connection pending */ | |
864 return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT)); | |
865 } else { | |
866 /* | |
867 * connection succeeded or failed; add to jobq right away | |
868 */ | |
869 if (rv == PR_FAILURE) | |
870 iod->error = err; | |
871 else | |
872 iod->error = 0; | |
873 return(PR_QueueJob(tpool, fn, arg, joinable)); | |
874 } | |
875 } | |
876 | |
877 /* queue a job, when a timer expires */ | |
878 PR_IMPLEMENT(PRJob *) | |
879 PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout, | |
880 PRJobFn fn, void * arg, PRBool joinable) | |
881 { | |
882 PRIntervalTime now; | |
883 PRJob *jobp; | |
884 | |
885 if (PR_INTERVAL_NO_TIMEOUT == timeout) { | |
886 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); | |
887 return NULL; | |
888 } | |
889 if (PR_INTERVAL_NO_WAIT == timeout) { | |
890 /* | |
891 * no waiting; add to jobq right away | |
892 */ | |
893 return(PR_QueueJob(tpool, fn, arg, joinable)); | |
894 } | |
895 jobp = alloc_job(joinable, tpool); | |
896 if (NULL == jobp) { | |
897 return NULL; | |
898 } | |
899 | |
900 /* | |
901 * Add a new job to timer_jobq | |
902 * wakeup timer worker thread | |
903 */ | |
904 | |
905 jobp->job_func = fn; | |
906 jobp->job_arg = arg; | |
907 jobp->tpool = tpool; | |
908 jobp->timeout = timeout; | |
909 | |
910 now = PR_IntervalNow(); | |
911 jobp->absolute = now + timeout; | |
912 | |
913 | |
914 PR_Lock(tpool->timerq.lock); | |
915 jobp->on_timerq = PR_TRUE; | |
916 if (PR_CLIST_IS_EMPTY(&tpool->timerq.list)) | |
917 PR_APPEND_LINK(&jobp->links,&tpool->timerq.list); | |
918 else { | |
919 PRCList *qp; | |
920 PRJob *tmp_jobp; | |
921 /* | |
922 * insert into the sorted timer jobq | |
923 */ | |
924 for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list; | |
925 qp = qp->prev) { | |
926 tmp_jobp = JOB_LINKS_PTR(qp); | |
927 if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { | |
928 break; | |
929 } | |
930 } | |
931 PR_INSERT_AFTER(&jobp->links,qp); | |
932 } | |
933 tpool->timerq.cnt++; | |
934 /* | |
935 * notify timer worker thread(s) | |
936 */ | |
937 notify_timerq(tpool); | |
938 PR_Unlock(tpool->timerq.lock); | |
939 return jobp; | |
940 } | |
941 | |
942 static void | |
943 notify_timerq(PRThreadPool *tp) | |
944 { | |
945 /* | |
946 * wakeup the timer thread(s) | |
947 */ | |
948 PR_NotifyCondVar(tp->timerq.cv); | |
949 } | |
950 | |
951 static void | |
952 notify_ioq(PRThreadPool *tp) | |
953 { | |
954 PRStatus rval_status; | |
955 | |
956 /* | |
957 * wakeup the io thread(s) | |
958 */ | |
959 rval_status = PR_SetPollableEvent(tp->ioq.notify_fd); | |
960 PR_ASSERT(PR_SUCCESS == rval_status); | |
961 } | |
962 | |
963 /* | |
964 * cancel a job | |
965 * | |
966 * XXXX: is this needed? likely to be removed | |
967 */ | |
968 PR_IMPLEMENT(PRStatus) | |
969 PR_CancelJob(PRJob *jobp) { | |
970 | |
971 PRStatus rval = PR_FAILURE; | |
972 PRThreadPool *tp; | |
973 | |
974 if (jobp->on_timerq) { | |
975 /* | |
976 * now, check again while holding the timerq lock | |
977 */ | |
978 tp = jobp->tpool; | |
979 PR_Lock(tp->timerq.lock); | |
980 if (jobp->on_timerq) { | |
981 jobp->on_timerq = PR_FALSE; | |
982 PR_REMOVE_AND_INIT_LINK(&jobp->links); | |
983 tp->timerq.cnt--; | |
984 PR_Unlock(tp->timerq.lock); | |
985 if (!JOINABLE_JOB(jobp)) { | |
986 delete_job(jobp); | |
987 } else { | |
988 JOIN_NOTIFY(jobp); | |
989 } | |
990 rval = PR_SUCCESS; | |
991 } else | |
992 PR_Unlock(tp->timerq.lock); | |
993 } else if (jobp->on_ioq) { | |
994 /* | |
995 * now, check again while holding the ioq lock | |
996 */ | |
997 tp = jobp->tpool; | |
998 PR_Lock(tp->ioq.lock); | |
999 if (jobp->on_ioq) { | |
1000 jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock); | |
1001 if (NULL == jobp->cancel_cv) { | |
1002 PR_Unlock(tp->ioq.lock); | |
1003 PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0); | |
1004 return PR_FAILURE; | |
1005 } | |
1006 /* | |
1007 * mark job 'cancelled' and notify io thread(s) | |
1008 * XXXX: | |
1009 * this assumes there is only one io thread; when there | |
1010 * are multiple threads, the io thread processing this job | |
1011 * must be notified. | |
1012 */ | |
1013 jobp->cancel_io = PR_TRUE; | |
1014 PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */ | |
1015 notify_ioq(tp); | |
1016 PR_Lock(tp->ioq.lock); | |
1017 while (jobp->cancel_io) | |
1018 PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT); | |
1019 PR_Unlock(tp->ioq.lock); | |
1020 PR_ASSERT(!jobp->on_ioq); | |
1021 if (!JOINABLE_JOB(jobp)) { | |
1022 delete_job(jobp); | |
1023 } else { | |
1024 JOIN_NOTIFY(jobp); | |
1025 } | |
1026 rval = PR_SUCCESS; | |
1027 } else | |
1028 PR_Unlock(tp->ioq.lock); | |
1029 } | |
1030 if (PR_FAILURE == rval) | |
1031 PR_SetError(PR_INVALID_STATE_ERROR, 0); | |
1032 return rval; | |
1033 } | |
1034 | |
1035 /* join a job, wait until completion */ | |
1036 PR_IMPLEMENT(PRStatus) | |
1037 PR_JoinJob(PRJob *jobp) | |
1038 { | |
1039 if (!JOINABLE_JOB(jobp)) { | |
1040 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); | |
1041 return PR_FAILURE; | |
1042 } | |
1043 PR_Lock(jobp->tpool->join_lock); | |
1044 while(jobp->join_wait) | |
1045 PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT); | |
1046 PR_Unlock(jobp->tpool->join_lock); | |
1047 delete_job(jobp); | |
1048 return PR_SUCCESS; | |
1049 } | |
1050 | |
1051 /* shutdown threadpool */ | |
1052 PR_IMPLEMENT(PRStatus) | |
1053 PR_ShutdownThreadPool(PRThreadPool *tpool) | |
1054 { | |
1055 PRStatus rval = PR_SUCCESS; | |
1056 | |
1057 PR_Lock(tpool->jobq.lock); | |
1058 tpool->shutdown = PR_TRUE; | |
1059 PR_NotifyAllCondVar(tpool->shutdown_cv); | |
1060 PR_Unlock(tpool->jobq.lock); | |
1061 | |
1062 return rval; | |
1063 } | |
1064 | |
1065 /* | |
1066 * join thread pool | |
1067 * wait for termination of worker threads | |
1068 * reclaim threadpool resources | |
1069 */ | |
1070 PR_IMPLEMENT(PRStatus) | |
1071 PR_JoinThreadPool(PRThreadPool *tpool) | |
1072 { | |
1073 PRStatus rval = PR_SUCCESS; | |
1074 PRCList *head; | |
1075 PRStatus rval_status; | |
1076 | |
1077 PR_Lock(tpool->jobq.lock); | |
1078 while (!tpool->shutdown) | |
1079 PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT); | |
1080 | |
1081 /* | |
1082 * wakeup worker threads | |
1083 */ | |
1084 #ifdef OPT_WINNT | |
1085 /* | |
1086 * post shutdown notification for all threads | |
1087 */ | |
1088 { | |
1089 int i; | |
1090 for(i=0; i < tpool->current_threads; i++) { | |
1091 PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0, | |
1092 TRUE, NULL); | |
1093 } | |
1094 } | |
1095 #else | |
1096 PR_NotifyAllCondVar(tpool->jobq.cv); | |
1097 #endif | |
1098 | |
1099 /* | |
1100 * wakeup io thread(s) | |
1101 */ | |
1102 notify_ioq(tpool); | |
1103 | |
1104 /* | |
1105 * wakeup timer thread(s) | |
1106 */ | |
1107 PR_Lock(tpool->timerq.lock); | |
1108 notify_timerq(tpool); | |
1109 PR_Unlock(tpool->timerq.lock); | |
1110 | |
1111 while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) { | |
1112 wthread *wthrp; | |
1113 | |
1114 head = PR_LIST_HEAD(&tpool->jobq.wthreads); | |
1115 PR_REMOVE_AND_INIT_LINK(head); | |
1116 PR_Unlock(tpool->jobq.lock); | |
1117 wthrp = WTHREAD_LINKS_PTR(head); | |
1118 rval_status = PR_JoinThread(wthrp->thread); | |
1119 PR_ASSERT(PR_SUCCESS == rval_status); | |
1120 PR_DELETE(wthrp); | |
1121 PR_Lock(tpool->jobq.lock); | |
1122 } | |
1123 PR_Unlock(tpool->jobq.lock); | |
1124 while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) { | |
1125 wthread *wthrp; | |
1126 | |
1127 head = PR_LIST_HEAD(&tpool->ioq.wthreads); | |
1128 PR_REMOVE_AND_INIT_LINK(head); | |
1129 wthrp = WTHREAD_LINKS_PTR(head); | |
1130 rval_status = PR_JoinThread(wthrp->thread); | |
1131 PR_ASSERT(PR_SUCCESS == rval_status); | |
1132 PR_DELETE(wthrp); | |
1133 } | |
1134 | |
1135 while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) { | |
1136 wthread *wthrp; | |
1137 | |
1138 head = PR_LIST_HEAD(&tpool->timerq.wthreads); | |
1139 PR_REMOVE_AND_INIT_LINK(head); | |
1140 wthrp = WTHREAD_LINKS_PTR(head); | |
1141 rval_status = PR_JoinThread(wthrp->thread); | |
1142 PR_ASSERT(PR_SUCCESS == rval_status); | |
1143 PR_DELETE(wthrp); | |
1144 } | |
1145 | |
1146 /* | |
1147 * Delete queued jobs | |
1148 */ | |
1149 while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) { | |
1150 PRJob *jobp; | |
1151 | |
1152 head = PR_LIST_HEAD(&tpool->jobq.list); | |
1153 PR_REMOVE_AND_INIT_LINK(head); | |
1154 jobp = JOB_LINKS_PTR(head); | |
1155 tpool->jobq.cnt--; | |
1156 delete_job(jobp); | |
1157 } | |
1158 | |
1159 /* delete io jobs */ | |
1160 while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) { | |
1161 PRJob *jobp; | |
1162 | |
1163 head = PR_LIST_HEAD(&tpool->ioq.list); | |
1164 PR_REMOVE_AND_INIT_LINK(head); | |
1165 tpool->ioq.cnt--; | |
1166 jobp = JOB_LINKS_PTR(head); | |
1167 delete_job(jobp); | |
1168 } | |
1169 | |
1170 /* delete timer jobs */ | |
1171 while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) { | |
1172 PRJob *jobp; | |
1173 | |
1174 head = PR_LIST_HEAD(&tpool->timerq.list); | |
1175 PR_REMOVE_AND_INIT_LINK(head); | |
1176 tpool->timerq.cnt--; | |
1177 jobp = JOB_LINKS_PTR(head); | |
1178 delete_job(jobp); | |
1179 } | |
1180 | |
1181 PR_ASSERT(0 == tpool->jobq.cnt); | |
1182 PR_ASSERT(0 == tpool->ioq.cnt); | |
1183 PR_ASSERT(0 == tpool->timerq.cnt); | |
1184 | |
1185 delete_threadpool(tpool); | |
1186 return rval; | |
1187 } |