comparison nspr/pr/src/io/prmwait.c @ 0:1e5118fa0cb1

This is NSS with a Cmake Buildsyste To compile a static NSS library for Windows we've used the Chromium-NSS fork and added a Cmake buildsystem to compile it statically for Windows. See README.chromium for chromium changes and README.trustbridge for our modifications.
author Andre Heinecke <andre.heinecke@intevation.de>
date Mon, 28 Jul 2014 10:47:06 +0200
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:1e5118fa0cb1
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5
6 #include "primpl.h"
7 #include "pprmwait.h"
8
9 #define _MW_REHASH_MAX 11
10
11 static PRLock *mw_lock = NULL;
12 static _PRGlobalState *mw_state = NULL;
13
14 static PRIntervalTime max_polling_interval;
15
16 #ifdef WINNT
17
18 typedef struct TimerEvent {
19 PRIntervalTime absolute;
20 void (*func)(void *);
21 void *arg;
22 LONG ref_count;
23 PRCList links;
24 } TimerEvent;
25
26 #define TIMER_EVENT_PTR(_qp) \
27 ((TimerEvent *) ((char *) (_qp) - offsetof(TimerEvent, links)))
28
29 struct {
30 PRLock *ml;
31 PRCondVar *new_timer;
32 PRCondVar *cancel_timer;
33 PRThread *manager_thread;
34 PRCList timer_queue;
35 } tm_vars;
36
37 static PRStatus TimerInit(void);
38 static void TimerManager(void *arg);
39 static TimerEvent *CreateTimer(PRIntervalTime timeout,
40 void (*func)(void *), void *arg);
41 static PRBool CancelTimer(TimerEvent *timer);
42
43 static void TimerManager(void *arg)
44 {
45 PRIntervalTime now;
46 PRIntervalTime timeout;
47 PRCList *head;
48 TimerEvent *timer;
49
50 PR_Lock(tm_vars.ml);
51 while (1)
52 {
53 if (PR_CLIST_IS_EMPTY(&tm_vars.timer_queue))
54 {
55 PR_WaitCondVar(tm_vars.new_timer, PR_INTERVAL_NO_TIMEOUT);
56 }
57 else
58 {
59 now = PR_IntervalNow();
60 head = PR_LIST_HEAD(&tm_vars.timer_queue);
61 timer = TIMER_EVENT_PTR(head);
62 if ((PRInt32) (now - timer->absolute) >= 0)
63 {
64 PR_REMOVE_LINK(head);
65 /*
66 * make its prev and next point to itself so that
67 * it's obvious that it's not on the timer_queue.
68 */
69 PR_INIT_CLIST(head);
70 PR_ASSERT(2 == timer->ref_count);
71 PR_Unlock(tm_vars.ml);
72 timer->func(timer->arg);
73 PR_Lock(tm_vars.ml);
74 timer->ref_count -= 1;
75 if (0 == timer->ref_count)
76 {
77 PR_NotifyAllCondVar(tm_vars.cancel_timer);
78 }
79 }
80 else
81 {
82 timeout = (PRIntervalTime)(timer->absolute - now);
83 PR_WaitCondVar(tm_vars.new_timer, timeout);
84 }
85 }
86 }
87 PR_Unlock(tm_vars.ml);
88 }
89
90 static TimerEvent *CreateTimer(
91 PRIntervalTime timeout,
92 void (*func)(void *),
93 void *arg)
94 {
95 TimerEvent *timer;
96 PRCList *links, *tail;
97 TimerEvent *elem;
98
99 timer = PR_NEW(TimerEvent);
100 if (NULL == timer)
101 {
102 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
103 return timer;
104 }
105 timer->absolute = PR_IntervalNow() + timeout;
106 timer->func = func;
107 timer->arg = arg;
108 timer->ref_count = 2;
109 PR_Lock(tm_vars.ml);
110 tail = links = PR_LIST_TAIL(&tm_vars.timer_queue);
111 while (links->prev != tail)
112 {
113 elem = TIMER_EVENT_PTR(links);
114 if ((PRInt32)(timer->absolute - elem->absolute) >= 0)
115 {
116 break;
117 }
118 links = links->prev;
119 }
120 PR_INSERT_AFTER(&timer->links, links);
121 PR_NotifyCondVar(tm_vars.new_timer);
122 PR_Unlock(tm_vars.ml);
123 return timer;
124 }
125
126 static PRBool CancelTimer(TimerEvent *timer)
127 {
128 PRBool canceled = PR_FALSE;
129
130 PR_Lock(tm_vars.ml);
131 timer->ref_count -= 1;
132 if (timer->links.prev == &timer->links)
133 {
134 while (timer->ref_count == 1)
135 {
136 PR_WaitCondVar(tm_vars.cancel_timer, PR_INTERVAL_NO_TIMEOUT);
137 }
138 }
139 else
140 {
141 PR_REMOVE_LINK(&timer->links);
142 canceled = PR_TRUE;
143 }
144 PR_Unlock(tm_vars.ml);
145 PR_DELETE(timer);
146 return canceled;
147 }
148
149 static PRStatus TimerInit(void)
150 {
151 tm_vars.ml = PR_NewLock();
152 if (NULL == tm_vars.ml)
153 {
154 goto failed;
155 }
156 tm_vars.new_timer = PR_NewCondVar(tm_vars.ml);
157 if (NULL == tm_vars.new_timer)
158 {
159 goto failed;
160 }
161 tm_vars.cancel_timer = PR_NewCondVar(tm_vars.ml);
162 if (NULL == tm_vars.cancel_timer)
163 {
164 goto failed;
165 }
166 PR_INIT_CLIST(&tm_vars.timer_queue);
167 tm_vars.manager_thread = PR_CreateThread(
168 PR_SYSTEM_THREAD, TimerManager, NULL, PR_PRIORITY_NORMAL,
169 PR_LOCAL_THREAD, PR_UNJOINABLE_THREAD, 0);
170 if (NULL == tm_vars.manager_thread)
171 {
172 goto failed;
173 }
174 return PR_SUCCESS;
175
176 failed:
177 if (NULL != tm_vars.cancel_timer)
178 {
179 PR_DestroyCondVar(tm_vars.cancel_timer);
180 }
181 if (NULL != tm_vars.new_timer)
182 {
183 PR_DestroyCondVar(tm_vars.new_timer);
184 }
185 if (NULL != tm_vars.ml)
186 {
187 PR_DestroyLock(tm_vars.ml);
188 }
189 return PR_FAILURE;
190 }
191
192 #endif /* WINNT */
193
194 /******************************************************************/
195 /******************************************************************/
196 /************************ The private portion *********************/
197 /******************************************************************/
198 /******************************************************************/
199 void _PR_InitMW(void)
200 {
201 #ifdef WINNT
202 /*
203 * We use NT 4's InterlockedCompareExchange() to operate
204 * on PRMWStatus variables.
205 */
206 PR_ASSERT(sizeof(LONG) == sizeof(PRMWStatus));
207 TimerInit();
208 #endif
209 mw_lock = PR_NewLock();
210 PR_ASSERT(NULL != mw_lock);
211 mw_state = PR_NEWZAP(_PRGlobalState);
212 PR_ASSERT(NULL != mw_state);
213 PR_INIT_CLIST(&mw_state->group_list);
214 max_polling_interval = PR_MillisecondsToInterval(MAX_POLLING_INTERVAL);
215 } /* _PR_InitMW */
216
217 void _PR_CleanupMW(void)
218 {
219 PR_DestroyLock(mw_lock);
220 mw_lock = NULL;
221 if (mw_state->group) {
222 PR_DestroyWaitGroup(mw_state->group);
223 /* mw_state->group is set to NULL as a side effect. */
224 }
225 PR_DELETE(mw_state);
226 } /* _PR_CleanupMW */
227
228 static PRWaitGroup *MW_Init2(void)
229 {
230 PRWaitGroup *group = mw_state->group; /* it's the null group */
231 if (NULL == group) /* there is this special case */
232 {
233 group = PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH);
234 if (NULL == group) goto failed_alloc;
235 PR_Lock(mw_lock);
236 if (NULL == mw_state->group)
237 {
238 mw_state->group = group;
239 group = NULL;
240 }
241 PR_Unlock(mw_lock);
242 if (group != NULL) (void)PR_DestroyWaitGroup(group);
243 group = mw_state->group; /* somebody beat us to it */
244 }
245 failed_alloc:
246 return group; /* whatever */
247 } /* MW_Init2 */
248
249 static _PR_HashStory MW_AddHashInternal(PRRecvWait *desc, _PRWaiterHash *hash)
250 {
251 /*
252 ** The entries are put in the table using the fd (PRFileDesc*) of
253 ** the receive descriptor as the key. This allows us to locate
254 ** the appropriate entry aqain when the poll operation finishes.
255 **
256 ** The pointer to the file descriptor object is first divided by
257 ** the natural alignment of a pointer in the belief that object
258 ** will have at least that many zeros in the low order bits.
259 ** This may not be a good assuption.
260 **
261 ** We try to put the entry in by rehashing _MW_REHASH_MAX times. After
262 ** that we declare defeat and force the table to be reconstructed.
263 ** Since some fds might be added more than once, won't that cause
264 ** collisions even in an empty table?
265 */
266 PRIntn rehash = _MW_REHASH_MAX;
267 PRRecvWait **waiter;
268 PRUintn hidx = _MW_HASH(desc->fd, hash->length);
269 PRUintn hoffset = 0;
270
271 while (rehash-- > 0)
272 {
273 waiter = &hash->recv_wait;
274 if (NULL == waiter[hidx])
275 {
276 waiter[hidx] = desc;
277 hash->count += 1;
278 #if 0
279 printf("Adding 0x%x->0x%x ", desc, desc->fd);
280 printf(
281 "table[%u:%u:*%u]: 0x%x->0x%x\n",
282 hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
283 #endif
284 return _prmw_success;
285 }
286 if (desc == waiter[hidx])
287 {
288 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); /* desc already in table */
289 return _prmw_error;
290 }
291 #if 0
292 printf("Failing 0x%x->0x%x ", desc, desc->fd);
293 printf(
294 "table[*%u:%u:%u]: 0x%x->0x%x\n",
295 hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
296 #endif
297 if (0 == hoffset)
298 {
299 hoffset = _MW_HASH2(desc->fd, hash->length);
300 PR_ASSERT(0 != hoffset);
301 }
302 hidx = (hidx + hoffset) % (hash->length);
303 }
304 return _prmw_rehash;
305 } /* MW_AddHashInternal */
306
307 static _PR_HashStory MW_ExpandHashInternal(PRWaitGroup *group)
308 {
309 PRRecvWait **desc;
310 PRUint32 pidx, length;
311 _PRWaiterHash *newHash, *oldHash = group->waiter;
312 PRBool retry;
313 _PR_HashStory hrv;
314
315 static const PRInt32 prime_number[] = {
316 _PR_DEFAULT_HASH_LENGTH, 179, 521, 907, 1427,
317 2711, 3917, 5021, 8219, 11549, 18911, 26711, 33749, 44771};
318 PRUintn primes = (sizeof(prime_number) / sizeof(PRInt32));
319
320 /* look up the next size we'd like to use for the hash table */
321 for (pidx = 0; pidx < primes; ++pidx)
322 {
323 if (prime_number[pidx] == oldHash->length)
324 {
325 break;
326 }
327 }
328 /* table size must be one of the prime numbers */
329 PR_ASSERT(pidx < primes);
330
331 /* if pidx == primes - 1, we can't expand the table any more */
332 while (pidx < primes - 1)
333 {
334 /* next size */
335 ++pidx;
336 length = prime_number[pidx];
337
338 /* allocate the new hash table and fill it in with the old */
339 newHash = (_PRWaiterHash*)PR_CALLOC(
340 sizeof(_PRWaiterHash) + (length * sizeof(PRRecvWait*)));
341 if (NULL == newHash)
342 {
343 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
344 return _prmw_error;
345 }
346
347 newHash->length = length;
348 retry = PR_FALSE;
349 for (desc = &oldHash->recv_wait;
350 newHash->count < oldHash->count; ++desc)
351 {
352 PR_ASSERT(desc < &oldHash->recv_wait + oldHash->length);
353 if (NULL != *desc)
354 {
355 hrv = MW_AddHashInternal(*desc, newHash);
356 PR_ASSERT(_prmw_error != hrv);
357 if (_prmw_success != hrv)
358 {
359 PR_DELETE(newHash);
360 retry = PR_TRUE;
361 break;
362 }
363 }
364 }
365 if (retry) continue;
366
367 PR_DELETE(group->waiter);
368 group->waiter = newHash;
369 group->p_timestamp += 1;
370 return _prmw_success;
371 }
372
373 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
374 return _prmw_error; /* we're hosed */
375 } /* MW_ExpandHashInternal */
376
377 #ifndef WINNT
378 static void _MW_DoneInternal(
379 PRWaitGroup *group, PRRecvWait **waiter, PRMWStatus outcome)
380 {
381 /*
382 ** Add this receive wait object to the list of finished I/O
383 ** operations for this particular group. If there are other
384 ** threads waiting on the group, notify one. If not, arrange
385 ** for this thread to return.
386 */
387
388 #if 0
389 printf("Removing 0x%x->0x%x\n", *waiter, (*waiter)->fd);
390 #endif
391 (*waiter)->outcome = outcome;
392 PR_APPEND_LINK(&((*waiter)->internal), &group->io_ready);
393 PR_NotifyCondVar(group->io_complete);
394 PR_ASSERT(0 != group->waiter->count);
395 group->waiter->count -= 1;
396 *waiter = NULL;
397 } /* _MW_DoneInternal */
398 #endif /* WINNT */
399
400 static PRRecvWait **_MW_LookupInternal(PRWaitGroup *group, PRFileDesc *fd)
401 {
402 /*
403 ** Find the receive wait object corresponding to the file descriptor.
404 ** Only search the wait group specified.
405 */
406 PRRecvWait **desc;
407 PRIntn rehash = _MW_REHASH_MAX;
408 _PRWaiterHash *hash = group->waiter;
409 PRUintn hidx = _MW_HASH(fd, hash->length);
410 PRUintn hoffset = 0;
411
412 while (rehash-- > 0)
413 {
414 desc = (&hash->recv_wait) + hidx;
415 if ((*desc != NULL) && ((*desc)->fd == fd)) return desc;
416 if (0 == hoffset)
417 {
418 hoffset = _MW_HASH2(fd, hash->length);
419 PR_ASSERT(0 != hoffset);
420 }
421 hidx = (hidx + hoffset) % (hash->length);
422 }
423 return NULL;
424 } /* _MW_LookupInternal */
425
426 #ifndef WINNT
427 static PRStatus _MW_PollInternal(PRWaitGroup *group)
428 {
429 PRRecvWait **waiter;
430 PRStatus rv = PR_FAILURE;
431 PRInt32 count, count_ready;
432 PRIntervalTime polling_interval;
433
434 group->poller = PR_GetCurrentThread();
435
436 while (PR_TRUE)
437 {
438 PRIntervalTime now, since_last_poll;
439 PRPollDesc *poll_list;
440
441 while (0 == group->waiter->count)
442 {
443 PRStatus st;
444 st = PR_WaitCondVar(group->new_business, PR_INTERVAL_NO_TIMEOUT);
445 if (_prmw_running != group->state)
446 {
447 PR_SetError(PR_INVALID_STATE_ERROR, 0);
448 goto aborted;
449 }
450 if (_MW_ABORTED(st)) goto aborted;
451 }
452
453 /*
454 ** There's something to do. See if our existing polling list
455 ** is large enough for what we have to do?
456 */
457
458 while (group->polling_count < group->waiter->count)
459 {
460 PRUint32 old_count = group->waiter->count;
461 PRUint32 new_count = PR_ROUNDUP(old_count, _PR_POLL_COUNT_FUDGE);
462 PRSize new_size = sizeof(PRPollDesc) * new_count;
463 PRPollDesc *old_polling_list = group->polling_list;
464
465 PR_Unlock(group->ml);
466 poll_list = (PRPollDesc*)PR_CALLOC(new_size);
467 if (NULL == poll_list)
468 {
469 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
470 PR_Lock(group->ml);
471 goto failed_alloc;
472 }
473 if (NULL != old_polling_list)
474 PR_DELETE(old_polling_list);
475 PR_Lock(group->ml);
476 if (_prmw_running != group->state)
477 {
478 PR_SetError(PR_INVALID_STATE_ERROR, 0);
479 goto aborted;
480 }
481 group->polling_list = poll_list;
482 group->polling_count = new_count;
483 }
484
485 now = PR_IntervalNow();
486 polling_interval = max_polling_interval;
487 since_last_poll = now - group->last_poll;
488
489 waiter = &group->waiter->recv_wait;
490 poll_list = group->polling_list;
491 for (count = 0; count < group->waiter->count; ++waiter)
492 {
493 PR_ASSERT(waiter < &group->waiter->recv_wait
494 + group->waiter->length);
495 if (NULL != *waiter) /* a live one! */
496 {
497 if ((PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
498 && (since_last_poll >= (*waiter)->timeout))
499 _MW_DoneInternal(group, waiter, PR_MW_TIMEOUT);
500 else
501 {
502 if (PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
503 {
504 (*waiter)->timeout -= since_last_poll;
505 if ((*waiter)->timeout < polling_interval)
506 polling_interval = (*waiter)->timeout;
507 }
508 PR_ASSERT(poll_list < group->polling_list
509 + group->polling_count);
510 poll_list->fd = (*waiter)->fd;
511 poll_list->in_flags = PR_POLL_READ;
512 poll_list->out_flags = 0;
513 #if 0
514 printf(
515 "Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n",
516 poll_list, count, poll_list->fd, (*waiter)->timeout);
517 #endif
518 poll_list += 1;
519 count += 1;
520 }
521 }
522 }
523
524 PR_ASSERT(count == group->waiter->count);
525
526 /*
527 ** If there are no more threads waiting for completion,
528 ** we need to return.
529 */
530 if ((!PR_CLIST_IS_EMPTY(&group->io_ready))
531 && (1 == group->waiting_threads)) break;
532
533 if (0 == count) continue; /* wait for new business */
534
535 group->last_poll = now;
536
537 PR_Unlock(group->ml);
538
539 count_ready = PR_Poll(group->polling_list, count, polling_interval);
540
541 PR_Lock(group->ml);
542
543 if (_prmw_running != group->state)
544 {
545 PR_SetError(PR_INVALID_STATE_ERROR, 0);
546 goto aborted;
547 }
548 if (-1 == count_ready)
549 {
550 goto failed_poll; /* that's a shame */
551 }
552 else if (0 < count_ready)
553 {
554 for (poll_list = group->polling_list; count > 0;
555 poll_list++, count--)
556 {
557 PR_ASSERT(
558 poll_list < group->polling_list + group->polling_count);
559 if (poll_list->out_flags != 0)
560 {
561 waiter = _MW_LookupInternal(group, poll_list->fd);
562 /*
563 ** If 'waiter' is NULL, that means the wait receive
564 ** descriptor has been canceled.
565 */
566 if (NULL != waiter)
567 _MW_DoneInternal(group, waiter, PR_MW_SUCCESS);
568 }
569 }
570 }
571 /*
572 ** If there are no more threads waiting for completion,
573 ** we need to return.
574 ** This thread was "borrowed" to do the polling, but it really
575 ** belongs to the client.
576 */
577 if ((!PR_CLIST_IS_EMPTY(&group->io_ready))
578 && (1 == group->waiting_threads)) break;
579 }
580
581 rv = PR_SUCCESS;
582
583 aborted:
584 failed_poll:
585 failed_alloc:
586 group->poller = NULL; /* we were that, not we ain't */
587 if ((_prmw_running == group->state) && (group->waiting_threads > 1))
588 {
589 /* Wake up one thread to become the new poller. */
590 PR_NotifyCondVar(group->io_complete);
591 }
592 return rv; /* we return with the lock held */
593 } /* _MW_PollInternal */
594 #endif /* !WINNT */
595
596 static PRMWGroupState MW_TestForShutdownInternal(PRWaitGroup *group)
597 {
598 PRMWGroupState rv = group->state;
599 /*
600 ** Looking at the group's fields is safe because
601 ** once the group's state is no longer running, it
602 ** cannot revert and there is a safe check on entry
603 ** to make sure no more threads are made to wait.
604 */
605 if ((_prmw_stopping == rv)
606 && (0 == group->waiting_threads))
607 {
608 rv = group->state = _prmw_stopped;
609 PR_NotifyCondVar(group->mw_manage);
610 }
611 return rv;
612 } /* MW_TestForShutdownInternal */
613
614 #ifndef WINNT
615 static void _MW_InitialRecv(PRCList *io_ready)
616 {
617 PRRecvWait *desc = (PRRecvWait*)io_ready;
618 if ((NULL == desc->buffer.start)
619 || (0 == desc->buffer.length))
620 desc->bytesRecv = 0;
621 else
622 {
623 desc->bytesRecv = (desc->fd->methods->recv)(
624 desc->fd, desc->buffer.start,
625 desc->buffer.length, 0, desc->timeout);
626 if (desc->bytesRecv < 0) /* SetError should already be there */
627 desc->outcome = PR_MW_FAILURE;
628 }
629 } /* _MW_InitialRecv */
630 #endif
631
632 #ifdef WINNT
633 static void NT_TimeProc(void *arg)
634 {
635 _MDOverlapped *overlapped = (_MDOverlapped *)arg;
636 PRRecvWait *desc = overlapped->data.mw.desc;
637 PRFileDesc *bottom;
638
639 if (InterlockedCompareExchange((LONG *)&desc->outcome,
640 (LONG)PR_MW_TIMEOUT, (LONG)PR_MW_PENDING) != (LONG)PR_MW_PENDING)
641 {
642 /* This wait recv descriptor has already completed. */
643 return;
644 }
645
646 /* close the osfd to abort the outstanding async io request */
647 /* $$$$
648 ** Little late to be checking if NSPR's on the bottom of stack,
649 ** but if we don't check, we can't assert that the private data
650 ** is what we think it is.
651 ** $$$$
652 */
653 bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
654 PR_ASSERT(NULL != bottom);
655 if (NULL != bottom) /* now what!?!?! */
656 {
657 bottom->secret->state = _PR_FILEDESC_CLOSED;
658 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
659 {
660 fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
661 PR_ASSERT(!"What shall I do?");
662 }
663 }
664 return;
665 } /* NT_TimeProc */
666
667 static PRStatus NT_HashRemove(PRWaitGroup *group, PRFileDesc *fd)
668 {
669 PRRecvWait **waiter;
670
671 _PR_MD_LOCK(&group->mdlock);
672 waiter = _MW_LookupInternal(group, fd);
673 if (NULL != waiter)
674 {
675 group->waiter->count -= 1;
676 *waiter = NULL;
677 }
678 _PR_MD_UNLOCK(&group->mdlock);
679 return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
680 }
681
682 PRStatus NT_HashRemoveInternal(PRWaitGroup *group, PRFileDesc *fd)
683 {
684 PRRecvWait **waiter;
685
686 waiter = _MW_LookupInternal(group, fd);
687 if (NULL != waiter)
688 {
689 group->waiter->count -= 1;
690 *waiter = NULL;
691 }
692 return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
693 }
694 #endif /* WINNT */
695
696 /******************************************************************/
697 /******************************************************************/
698 /********************** The public API portion ********************/
699 /******************************************************************/
700 /******************************************************************/
701 PR_IMPLEMENT(PRStatus) PR_AddWaitFileDesc(
702 PRWaitGroup *group, PRRecvWait *desc)
703 {
704 _PR_HashStory hrv;
705 PRStatus rv = PR_FAILURE;
706 #ifdef WINNT
707 _MDOverlapped *overlapped;
708 HANDLE hFile;
709 BOOL bResult;
710 DWORD dwError;
711 PRFileDesc *bottom;
712 #endif
713
714 if (!_pr_initialized) _PR_ImplicitInitialization();
715 if ((NULL == group) && (NULL == (group = MW_Init2())))
716 {
717 return rv;
718 }
719
720 PR_ASSERT(NULL != desc->fd);
721
722 desc->outcome = PR_MW_PENDING; /* nice, well known value */
723 desc->bytesRecv = 0; /* likewise, though this value is ambiguious */
724
725 PR_Lock(group->ml);
726
727 if (_prmw_running != group->state)
728 {
729 /* Not allowed to add after cancelling the group */
730 desc->outcome = PR_MW_INTERRUPT;
731 PR_SetError(PR_INVALID_STATE_ERROR, 0);
732 PR_Unlock(group->ml);
733 return rv;
734 }
735
736 #ifdef WINNT
737 _PR_MD_LOCK(&group->mdlock);
738 #endif
739
740 /*
741 ** If the waiter count is zero at this point, there's no telling
742 ** how long we've been idle. Therefore, initialize the beginning
743 ** of the timing interval. As long as the list doesn't go empty,
744 ** it will maintain itself.
745 */
746 if (0 == group->waiter->count)
747 group->last_poll = PR_IntervalNow();
748
749 do
750 {
751 hrv = MW_AddHashInternal(desc, group->waiter);
752 if (_prmw_rehash != hrv) break;
753 hrv = MW_ExpandHashInternal(group); /* gruesome */
754 if (_prmw_success != hrv) break;
755 } while (PR_TRUE);
756
757 #ifdef WINNT
758 _PR_MD_UNLOCK(&group->mdlock);
759 #endif
760
761 PR_NotifyCondVar(group->new_business); /* tell the world */
762 rv = (_prmw_success == hrv) ? PR_SUCCESS : PR_FAILURE;
763 PR_Unlock(group->ml);
764
765 #ifdef WINNT
766 overlapped = PR_NEWZAP(_MDOverlapped);
767 if (NULL == overlapped)
768 {
769 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
770 NT_HashRemove(group, desc->fd);
771 return rv;
772 }
773 overlapped->ioModel = _MD_MultiWaitIO;
774 overlapped->data.mw.desc = desc;
775 overlapped->data.mw.group = group;
776 if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)
777 {
778 overlapped->data.mw.timer = CreateTimer(
779 desc->timeout,
780 NT_TimeProc,
781 overlapped);
782 if (0 == overlapped->data.mw.timer)
783 {
784 NT_HashRemove(group, desc->fd);
785 PR_DELETE(overlapped);
786 /*
787 * XXX It appears that a maximum of 16 timer events can
788 * be outstanding. GetLastError() returns 0 when I try it.
789 */
790 PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, GetLastError());
791 return PR_FAILURE;
792 }
793 }
794
795 /* Reach to the bottom layer to get the OS fd */
796 bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
797 PR_ASSERT(NULL != bottom);
798 if (NULL == bottom)
799 {
800 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
801 return PR_FAILURE;
802 }
803 hFile = (HANDLE)bottom->secret->md.osfd;
804 if (!bottom->secret->md.io_model_committed)
805 {
806 PRInt32 st;
807 st = _md_Associate(hFile);
808 PR_ASSERT(0 != st);
809 bottom->secret->md.io_model_committed = PR_TRUE;
810 }
811 bResult = ReadFile(hFile,
812 desc->buffer.start,
813 (DWORD)desc->buffer.length,
814 NULL,
815 &overlapped->overlapped);
816 if (FALSE == bResult && (dwError = GetLastError()) != ERROR_IO_PENDING)
817 {
818 if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)
819 {
820 if (InterlockedCompareExchange((LONG *)&desc->outcome,
821 (LONG)PR_MW_FAILURE, (LONG)PR_MW_PENDING)
822 == (LONG)PR_MW_PENDING)
823 {
824 CancelTimer(overlapped->data.mw.timer);
825 }
826 NT_HashRemove(group, desc->fd);
827 PR_DELETE(overlapped);
828 }
829 _PR_MD_MAP_READ_ERROR(dwError);
830 rv = PR_FAILURE;
831 }
832 #endif
833
834 return rv;
835 } /* PR_AddWaitFileDesc */
836
837 PR_IMPLEMENT(PRRecvWait*) PR_WaitRecvReady(PRWaitGroup *group)
838 {
839 PRCList *io_ready = NULL;
840 #ifdef WINNT
841 PRThread *me = _PR_MD_CURRENT_THREAD();
842 _MDOverlapped *overlapped;
843 #endif
844
845 if (!_pr_initialized) _PR_ImplicitInitialization();
846 if ((NULL == group) && (NULL == (group = MW_Init2()))) goto failed_init;
847
848 PR_Lock(group->ml);
849
850 if (_prmw_running != group->state)
851 {
852 PR_SetError(PR_INVALID_STATE_ERROR, 0);
853 goto invalid_state;
854 }
855
856 group->waiting_threads += 1; /* the polling thread is counted */
857
858 #ifdef WINNT
859 _PR_MD_LOCK(&group->mdlock);
860 while (PR_CLIST_IS_EMPTY(&group->io_ready))
861 {
862 _PR_THREAD_LOCK(me);
863 me->state = _PR_IO_WAIT;
864 PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
865 if (!_PR_IS_NATIVE_THREAD(me))
866 {
867 _PR_SLEEPQ_LOCK(me->cpu);
868 _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
869 _PR_SLEEPQ_UNLOCK(me->cpu);
870 }
871 _PR_THREAD_UNLOCK(me);
872 _PR_MD_UNLOCK(&group->mdlock);
873 PR_Unlock(group->ml);
874 _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
875 me->state = _PR_RUNNING;
876 PR_Lock(group->ml);
877 _PR_MD_LOCK(&group->mdlock);
878 if (_PR_PENDING_INTERRUPT(me)) {
879 PR_REMOVE_LINK(&me->waitQLinks);
880 _PR_MD_UNLOCK(&group->mdlock);
881 me->flags &= ~_PR_INTERRUPT;
882 me->io_suspended = PR_FALSE;
883 PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
884 goto aborted;
885 }
886 }
887 io_ready = PR_LIST_HEAD(&group->io_ready);
888 PR_ASSERT(io_ready != NULL);
889 PR_REMOVE_LINK(io_ready);
890 _PR_MD_UNLOCK(&group->mdlock);
891 overlapped = (_MDOverlapped *)
892 ((char *)io_ready - offsetof(_MDOverlapped, data));
893 io_ready = &overlapped->data.mw.desc->internal;
894 #else
895 do
896 {
897 /*
898 ** If the I/O ready list isn't empty, have this thread
899 ** return with the first receive wait object that's available.
900 */
901 if (PR_CLIST_IS_EMPTY(&group->io_ready))
902 {
903 /*
904 ** Is there a polling thread yet? If not, grab this thread
905 ** and use it.
906 */
907 if (NULL == group->poller)
908 {
909 /*
910 ** This thread will stay do polling until it becomes the only one
911 ** left to service a completion. Then it will return and there will
912 ** be none left to actually poll or to run completions.
913 **
914 ** The polling function should only return w/ failure or
915 ** with some I/O ready.
916 */
917 if (PR_FAILURE == _MW_PollInternal(group)) goto failed_poll;
918 }
919 else
920 {
921 /*
922 ** There are four reasons a thread can be awakened from
923 ** a wait on the io_complete condition variable.
924 ** 1. Some I/O has completed, i.e., the io_ready list
925 ** is nonempty.
926 ** 2. The wait group is canceled.
927 ** 3. The thread is interrupted.
928 ** 4. The current polling thread has to leave and needs
929 ** a replacement.
930 ** The logic to find a new polling thread is made more
931 ** complicated by all the other possible events.
932 ** I tried my best to write the logic clearly, but
933 ** it is still full of if's with continue and goto.
934 */
935 PRStatus st;
936 do
937 {
938 st = PR_WaitCondVar(group->io_complete, PR_INTERVAL_NO_TIMEOUT);
939 if (_prmw_running != group->state)
940 {
941 PR_SetError(PR_INVALID_STATE_ERROR, 0);
942 goto aborted;
943 }
944 if (_MW_ABORTED(st) || (NULL == group->poller)) break;
945 } while (PR_CLIST_IS_EMPTY(&group->io_ready));
946
947 /*
948 ** The thread is interrupted and has to leave. It might
949 ** have also been awakened to process ready i/o or be the
950 ** new poller. To be safe, if either condition is true,
951 ** we awaken another thread to take its place.
952 */
953 if (_MW_ABORTED(st))
954 {
955 if ((NULL == group->poller
956 || !PR_CLIST_IS_EMPTY(&group->io_ready))
957 && group->waiting_threads > 1)
958 PR_NotifyCondVar(group->io_complete);
959 goto aborted;
960 }
961
962 /*
963 ** A new poller is needed, but can I be the new poller?
964 ** If there is no i/o ready, sure. But if there is any
965 ** i/o ready, it has a higher priority. I want to
966 ** process the ready i/o first and wake up another
967 ** thread to be the new poller.
968 */
969 if (NULL == group->poller)
970 {
971 if (PR_CLIST_IS_EMPTY(&group->io_ready))
972 continue;
973 if (group->waiting_threads > 1)
974 PR_NotifyCondVar(group->io_complete);
975 }
976 }
977 PR_ASSERT(!PR_CLIST_IS_EMPTY(&group->io_ready));
978 }
979 io_ready = PR_LIST_HEAD(&group->io_ready);
980 PR_NotifyCondVar(group->io_taken);
981 PR_ASSERT(io_ready != NULL);
982 PR_REMOVE_LINK(io_ready);
983 } while (NULL == io_ready);
984
985 failed_poll:
986
987 #endif
988
989 aborted:
990
991 group->waiting_threads -= 1;
992 invalid_state:
993 (void)MW_TestForShutdownInternal(group);
994 PR_Unlock(group->ml);
995
996 failed_init:
997 if (NULL != io_ready)
998 {
999 /* If the operation failed, record the reason why */
1000 switch (((PRRecvWait*)io_ready)->outcome)
1001 {
1002 case PR_MW_PENDING:
1003 PR_ASSERT(0);
1004 break;
1005 case PR_MW_SUCCESS:
1006 #ifndef WINNT
1007 _MW_InitialRecv(io_ready);
1008 #endif
1009 break;
1010 #ifdef WINNT
1011 case PR_MW_FAILURE:
1012 _PR_MD_MAP_READ_ERROR(overlapped->data.mw.error);
1013 break;
1014 #endif
1015 case PR_MW_TIMEOUT:
1016 PR_SetError(PR_IO_TIMEOUT_ERROR, 0);
1017 break;
1018 case PR_MW_INTERRUPT:
1019 PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
1020 break;
1021 default: break;
1022 }
1023 #ifdef WINNT
1024 if (NULL != overlapped->data.mw.timer)
1025 {
1026 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1027 != overlapped->data.mw.desc->timeout);
1028 CancelTimer(overlapped->data.mw.timer);
1029 }
1030 else
1031 {
1032 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1033 == overlapped->data.mw.desc->timeout);
1034 }
1035 PR_DELETE(overlapped);
1036 #endif
1037 }
1038 return (PRRecvWait*)io_ready;
1039 } /* PR_WaitRecvReady */
1040
1041 PR_IMPLEMENT(PRStatus) PR_CancelWaitFileDesc(PRWaitGroup *group, PRRecvWait *desc)
1042 {
1043 #if !defined(WINNT)
1044 PRRecvWait **recv_wait;
1045 #endif
1046 PRStatus rv = PR_SUCCESS;
1047 if (NULL == group) group = mw_state->group;
1048 PR_ASSERT(NULL != group);
1049 if (NULL == group)
1050 {
1051 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1052 return PR_FAILURE;
1053 }
1054
1055 PR_Lock(group->ml);
1056
1057 if (_prmw_running != group->state)
1058 {
1059 PR_SetError(PR_INVALID_STATE_ERROR, 0);
1060 rv = PR_FAILURE;
1061 goto unlock;
1062 }
1063
1064 #ifdef WINNT
1065 if (InterlockedCompareExchange((LONG *)&desc->outcome,
1066 (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING) == (LONG)PR_MW_PENDING)
1067 {
1068 PRFileDesc *bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
1069 PR_ASSERT(NULL != bottom);
1070 if (NULL == bottom)
1071 {
1072 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1073 goto unlock;
1074 }
1075 bottom->secret->state = _PR_FILEDESC_CLOSED;
1076 #if 0
1077 fprintf(stderr, "cancel wait recv: closing socket\n");
1078 #endif
1079 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
1080 {
1081 fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
1082 exit(1);
1083 }
1084 }
1085 #else
1086 if (NULL != (recv_wait = _MW_LookupInternal(group, desc->fd)))
1087 {
1088 /* it was in the wait table */
1089 _MW_DoneInternal(group, recv_wait, PR_MW_INTERRUPT);
1090 goto unlock;
1091 }
1092 if (!PR_CLIST_IS_EMPTY(&group->io_ready))
1093 {
1094 /* is it already complete? */
1095 PRCList *head = PR_LIST_HEAD(&group->io_ready);
1096 do
1097 {
1098 PRRecvWait *done = (PRRecvWait*)head;
1099 if (done == desc) goto unlock;
1100 head = PR_NEXT_LINK(head);
1101 } while (head != &group->io_ready);
1102 }
1103 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1104 rv = PR_FAILURE;
1105
1106 #endif
1107 unlock:
1108 PR_Unlock(group->ml);
1109 return rv;
1110 } /* PR_CancelWaitFileDesc */
1111
1112 PR_IMPLEMENT(PRRecvWait*) PR_CancelWaitGroup(PRWaitGroup *group)
1113 {
1114 PRRecvWait **desc;
1115 PRRecvWait *recv_wait = NULL;
1116 #ifdef WINNT
1117 _MDOverlapped *overlapped;
1118 PRRecvWait **end;
1119 PRThread *me = _PR_MD_CURRENT_THREAD();
1120 #endif
1121
1122 if (NULL == group) group = mw_state->group;
1123 PR_ASSERT(NULL != group);
1124 if (NULL == group)
1125 {
1126 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1127 return NULL;
1128 }
1129
1130 PR_Lock(group->ml);
1131 if (_prmw_stopped != group->state)
1132 {
1133 if (_prmw_running == group->state)
1134 group->state = _prmw_stopping; /* so nothing new comes in */
1135 if (0 == group->waiting_threads) /* is there anybody else? */
1136 group->state = _prmw_stopped; /* we can stop right now */
1137 else
1138 {
1139 PR_NotifyAllCondVar(group->new_business);
1140 PR_NotifyAllCondVar(group->io_complete);
1141 }
1142 while (_prmw_stopped != group->state)
1143 (void)PR_WaitCondVar(group->mw_manage, PR_INTERVAL_NO_TIMEOUT);
1144 }
1145
1146 #ifdef WINNT
1147 _PR_MD_LOCK(&group->mdlock);
1148 #endif
1149 /* make all the existing descriptors look done/interrupted */
1150 #ifdef WINNT
1151 end = &group->waiter->recv_wait + group->waiter->length;
1152 for (desc = &group->waiter->recv_wait; desc < end; ++desc)
1153 {
1154 if (NULL != *desc)
1155 {
1156 if (InterlockedCompareExchange((LONG *)&(*desc)->outcome,
1157 (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING)
1158 == (LONG)PR_MW_PENDING)
1159 {
1160 PRFileDesc *bottom = PR_GetIdentitiesLayer(
1161 (*desc)->fd, PR_NSPR_IO_LAYER);
1162 PR_ASSERT(NULL != bottom);
1163 if (NULL == bottom)
1164 {
1165 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1166 goto invalid_arg;
1167 }
1168 bottom->secret->state = _PR_FILEDESC_CLOSED;
1169 #if 0
1170 fprintf(stderr, "cancel wait group: closing socket\n");
1171 #endif
1172 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
1173 {
1174 fprintf(stderr, "closesocket failed: %d\n",
1175 WSAGetLastError());
1176 exit(1);
1177 }
1178 }
1179 }
1180 }
1181 while (group->waiter->count > 0)
1182 {
1183 _PR_THREAD_LOCK(me);
1184 me->state = _PR_IO_WAIT;
1185 PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
1186 if (!_PR_IS_NATIVE_THREAD(me))
1187 {
1188 _PR_SLEEPQ_LOCK(me->cpu);
1189 _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
1190 _PR_SLEEPQ_UNLOCK(me->cpu);
1191 }
1192 _PR_THREAD_UNLOCK(me);
1193 _PR_MD_UNLOCK(&group->mdlock);
1194 PR_Unlock(group->ml);
1195 _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
1196 me->state = _PR_RUNNING;
1197 PR_Lock(group->ml);
1198 _PR_MD_LOCK(&group->mdlock);
1199 }
1200 #else
1201 for (desc = &group->waiter->recv_wait; group->waiter->count > 0; ++desc)
1202 {
1203 PR_ASSERT(desc < &group->waiter->recv_wait + group->waiter->length);
1204 if (NULL != *desc)
1205 _MW_DoneInternal(group, desc, PR_MW_INTERRUPT);
1206 }
1207 #endif
1208
1209 /* take first element of finished list and return it or NULL */
1210 if (PR_CLIST_IS_EMPTY(&group->io_ready))
1211 PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
1212 else
1213 {
1214 PRCList *head = PR_LIST_HEAD(&group->io_ready);
1215 PR_REMOVE_AND_INIT_LINK(head);
1216 #ifdef WINNT
1217 overlapped = (_MDOverlapped *)
1218 ((char *)head - offsetof(_MDOverlapped, data));
1219 head = &overlapped->data.mw.desc->internal;
1220 if (NULL != overlapped->data.mw.timer)
1221 {
1222 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1223 != overlapped->data.mw.desc->timeout);
1224 CancelTimer(overlapped->data.mw.timer);
1225 }
1226 else
1227 {
1228 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1229 == overlapped->data.mw.desc->timeout);
1230 }
1231 PR_DELETE(overlapped);
1232 #endif
1233 recv_wait = (PRRecvWait*)head;
1234 }
1235 #ifdef WINNT
1236 invalid_arg:
1237 _PR_MD_UNLOCK(&group->mdlock);
1238 #endif
1239 PR_Unlock(group->ml);
1240
1241 return recv_wait;
1242 } /* PR_CancelWaitGroup */
1243
1244 PR_IMPLEMENT(PRWaitGroup*) PR_CreateWaitGroup(PRInt32 size /* ignored */)
1245 {
1246 PRWaitGroup *wg;
1247
1248 if (NULL == (wg = PR_NEWZAP(PRWaitGroup)))
1249 {
1250 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1251 goto failed;
1252 }
1253 /* the wait group itself */
1254 wg->ml = PR_NewLock();
1255 if (NULL == wg->ml) goto failed_lock;
1256 wg->io_taken = PR_NewCondVar(wg->ml);
1257 if (NULL == wg->io_taken) goto failed_cvar0;
1258 wg->io_complete = PR_NewCondVar(wg->ml);
1259 if (NULL == wg->io_complete) goto failed_cvar1;
1260 wg->new_business = PR_NewCondVar(wg->ml);
1261 if (NULL == wg->new_business) goto failed_cvar2;
1262 wg->mw_manage = PR_NewCondVar(wg->ml);
1263 if (NULL == wg->mw_manage) goto failed_cvar3;
1264
1265 PR_INIT_CLIST(&wg->group_link);
1266 PR_INIT_CLIST(&wg->io_ready);
1267
1268 /* the waiters sequence */
1269 wg->waiter = (_PRWaiterHash*)PR_CALLOC(
1270 sizeof(_PRWaiterHash) +
1271 (_PR_DEFAULT_HASH_LENGTH * sizeof(PRRecvWait*)));
1272 if (NULL == wg->waiter)
1273 {
1274 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1275 goto failed_waiter;
1276 }
1277 wg->waiter->count = 0;
1278 wg->waiter->length = _PR_DEFAULT_HASH_LENGTH;
1279
1280 #ifdef WINNT
1281 _PR_MD_NEW_LOCK(&wg->mdlock);
1282 PR_INIT_CLIST(&wg->wait_list);
1283 #endif /* WINNT */
1284
1285 PR_Lock(mw_lock);
1286 PR_APPEND_LINK(&wg->group_link, &mw_state->group_list);
1287 PR_Unlock(mw_lock);
1288 return wg;
1289
1290 failed_waiter:
1291 PR_DestroyCondVar(wg->mw_manage);
1292 failed_cvar3:
1293 PR_DestroyCondVar(wg->new_business);
1294 failed_cvar2:
1295 PR_DestroyCondVar(wg->io_complete);
1296 failed_cvar1:
1297 PR_DestroyCondVar(wg->io_taken);
1298 failed_cvar0:
1299 PR_DestroyLock(wg->ml);
1300 failed_lock:
1301 PR_DELETE(wg);
1302 wg = NULL;
1303
1304 failed:
1305 return wg;
1306 } /* MW_CreateWaitGroup */
1307
1308 PR_IMPLEMENT(PRStatus) PR_DestroyWaitGroup(PRWaitGroup *group)
1309 {
1310 PRStatus rv = PR_SUCCESS;
1311 if (NULL == group) group = mw_state->group;
1312 PR_ASSERT(NULL != group);
1313 if (NULL != group)
1314 {
1315 PR_Lock(group->ml);
1316 if ((group->waiting_threads == 0)
1317 && (group->waiter->count == 0)
1318 && PR_CLIST_IS_EMPTY(&group->io_ready))
1319 {
1320 group->state = _prmw_stopped;
1321 }
1322 else
1323 {
1324 PR_SetError(PR_INVALID_STATE_ERROR, 0);
1325 rv = PR_FAILURE;
1326 }
1327 PR_Unlock(group->ml);
1328 if (PR_FAILURE == rv) return rv;
1329
1330 PR_Lock(mw_lock);
1331 PR_REMOVE_LINK(&group->group_link);
1332 PR_Unlock(mw_lock);
1333
1334 #ifdef WINNT
1335 /*
1336 * XXX make sure wait_list is empty and waiter is empty.
1337 * These must be checked while holding mdlock.
1338 */
1339 _PR_MD_FREE_LOCK(&group->mdlock);
1340 #endif
1341
1342 PR_DELETE(group->waiter);
1343 PR_DELETE(group->polling_list);
1344 PR_DestroyCondVar(group->mw_manage);
1345 PR_DestroyCondVar(group->new_business);
1346 PR_DestroyCondVar(group->io_complete);
1347 PR_DestroyCondVar(group->io_taken);
1348 PR_DestroyLock(group->ml);
1349 if (group == mw_state->group) mw_state->group = NULL;
1350 PR_DELETE(group);
1351 }
1352 else
1353 {
1354 /* The default wait group is not created yet. */
1355 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1356 rv = PR_FAILURE;
1357 }
1358 return rv;
1359 } /* PR_DestroyWaitGroup */
1360
1361 /**********************************************************************
1362 ***********************************************************************
1363 ******************** Wait group enumerations **************************
1364 ***********************************************************************
1365 **********************************************************************/
1366
1367 PR_IMPLEMENT(PRMWaitEnumerator*) PR_CreateMWaitEnumerator(PRWaitGroup *group)
1368 {
1369 PRMWaitEnumerator *enumerator = PR_NEWZAP(PRMWaitEnumerator);
1370 if (NULL == enumerator) PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1371 else
1372 {
1373 enumerator->group = group;
1374 enumerator->seal = _PR_ENUM_SEALED;
1375 }
1376 return enumerator;
1377 } /* PR_CreateMWaitEnumerator */
1378
1379 PR_IMPLEMENT(PRStatus) PR_DestroyMWaitEnumerator(PRMWaitEnumerator* enumerator)
1380 {
1381 PR_ASSERT(NULL != enumerator);
1382 PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
1383 if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal))
1384 {
1385 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1386 return PR_FAILURE;
1387 }
1388 enumerator->seal = _PR_ENUM_UNSEALED;
1389 PR_Free(enumerator);
1390 return PR_SUCCESS;
1391 } /* PR_DestroyMWaitEnumerator */
1392
1393 PR_IMPLEMENT(PRRecvWait*) PR_EnumerateWaitGroup(
1394 PRMWaitEnumerator *enumerator, const PRRecvWait *previous)
1395 {
1396 PRRecvWait *result = NULL;
1397
1398 /* entry point sanity checking */
1399 PR_ASSERT(NULL != enumerator);
1400 PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
1401 if ((NULL == enumerator)
1402 || (_PR_ENUM_SEALED != enumerator->seal)) goto bad_argument;
1403
1404 /* beginning of enumeration */
1405 if (NULL == previous)
1406 {
1407 if (NULL == enumerator->group)
1408 {
1409 enumerator->group = mw_state->group;
1410 if (NULL == enumerator->group)
1411 {
1412 PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
1413 return NULL;
1414 }
1415 }
1416 enumerator->waiter = &enumerator->group->waiter->recv_wait;
1417 enumerator->p_timestamp = enumerator->group->p_timestamp;
1418 enumerator->thread = PR_GetCurrentThread();
1419 enumerator->index = 0;
1420 }
1421 /* continuing an enumeration */
1422 else
1423 {
1424 PRThread *me = PR_GetCurrentThread();
1425 PR_ASSERT(me == enumerator->thread);
1426 if (me != enumerator->thread) goto bad_argument;
1427
1428 /* need to restart the enumeration */
1429 if (enumerator->p_timestamp != enumerator->group->p_timestamp)
1430 return PR_EnumerateWaitGroup(enumerator, NULL);
1431 }
1432
1433 /* actually progress the enumeration */
1434 #if defined(WINNT)
1435 _PR_MD_LOCK(&enumerator->group->mdlock);
1436 #else
1437 PR_Lock(enumerator->group->ml);
1438 #endif
1439 while (enumerator->index++ < enumerator->group->waiter->length)
1440 {
1441 if (NULL != (result = *(enumerator->waiter)++)) break;
1442 }
1443 #if defined(WINNT)
1444 _PR_MD_UNLOCK(&enumerator->group->mdlock);
1445 #else
1446 PR_Unlock(enumerator->group->ml);
1447 #endif
1448
1449 return result; /* what we live for */
1450
1451 bad_argument:
1452 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1453 return NULL; /* probably ambiguous */
1454 } /* PR_EnumerateWaitGroup */
1455
1456 /* prmwait.c */
This site is hosted by Intevation GmbH (Datenschutzerklärung und Impressum | Privacy Policy and Imprint)