diff options
Diffstat (limited to 'condvar.c')
-rw-r--r-- | condvar.c | 436 |
1 files changed, 241 insertions, 195 deletions
@@ -319,7 +319,6 @@ pthread_condattr_setpshared (pthread_condattr_t * attr, int pshared) } /* pthread_condattr_setpshared */ - int pthread_cond_init (pthread_cond_t * cond, const pthread_condattr_t * attr) /* @@ -377,43 +376,40 @@ pthread_cond_init (pthread_cond_t * cond, const pthread_condattr_t * attr) goto FAIL0; } - cv->waiters = 0; - cv->wasBroadcast = FALSE; + cv->nWaitersBlocked = 0; + cv->nWaitersUnblocked = 0; + cv->nWaitersToUnblock = 0; - if (sem_init (&(cv->sema), 0, 0) != 0) + if (sem_init(&(cv->semBlockLock), 0, 1) != 0) { goto FAIL0; } - if (pthread_mutex_init (&(cv->waitersLock), NULL) != 0) + + if (sem_init(&(cv->semBlockQueue), 0, 0) != 0) { goto FAIL1; } - cv->waitersDone = CreateEvent ( - 0, - (int) FALSE, /* manualReset */ - (int) FALSE, /* setSignaled */ - NULL); - - if (cv->waitersDone == NULL) + if (pthread_mutex_init(&(cv->mtxUnblockLock), 0) != 0) { goto FAIL2; } + result = 0; goto DONE; /* * ------------- - * Failure Code + * Failed... * ------------- */ FAIL2: - (void) pthread_mutex_destroy (&(cv->waitersLock)); + (void) sem_destroy(&(cv->semBlockQueue)); FAIL1: - (void) sem_destroy (&(cv->sema)); + (void) sem_destroy(&(cv->semBlockLock)); FAIL0: DONE: @@ -423,7 +419,6 @@ DONE: } /* pthread_cond_init */ - int pthread_cond_destroy (pthread_cond_t * cond) /* @@ -458,7 +453,7 @@ pthread_cond_destroy (pthread_cond_t * cond) /* * Assuming any race condition here is harmless. */ - if (cond == NULL + if (cond == NULL || *cond == NULL) { return EINVAL; @@ -468,24 +463,42 @@ pthread_cond_destroy (pthread_cond_t * cond) { cv = *cond; - if (pthread_mutex_lock(&(cv->waitersLock)) != 0) - { - return EINVAL; - } + /* + * Synchronize access to waiters blocked count (LEVEL-1) + */ + if (sem_wait(&(cv->semBlockLock)) != 0) + { + return errno; + } - if (cv->waiters > 0) - { - (void) pthread_mutex_unlock(&(cv->waitersLock)); - return EBUSY; - } + /* + * Synchronize access to waiters (to)unblock(ed) counts (LEVEL-2) + */ + if ((result = pthread_mutex_lock(&(cv->mtxUnblockLock))) != 0) + { + (void) sem_post(&(cv->semBlockLock)); + return result; + } - (void) sem_destroy (&(cv->sema)); - (void) CloseHandle (cv->waitersDone); - (void) pthread_mutex_unlock(&(cv->waitersLock)); - (void) pthread_mutex_destroy (&(cv->waitersLock)); + /* + * Check whether cv is still busy (still has waiters blocked) + */ + if (cv->nWaitersBlocked - cv->nWaitersUnblocked > 0) + { + (void) sem_post(&(cv->semBlockLock)); + (void) pthread_mutex_unlock(&(cv->mtxUnblockLock)); + return EBUSY; + } + /* + * Now it is safe to destroy + */ + *cond = NULL; /* Invalidate it before anything else */ + (void) sem_destroy(&(cv->semBlockLock)); + (void) sem_destroy(&(cv->semBlockQueue)); + (void) pthread_mutex_unlock(&(cv->mtxUnblockLock)); + (void) pthread_mutex_destroy(&(cv->mtxUnblockLock)); free(cv); - *cond = NULL; } else { @@ -535,59 +548,76 @@ typedef struct { static void ptw32_cond_wait_cleanup(void * args) { - ptw32_cond_wait_cleanup_args_t * cleanup_args = (ptw32_cond_wait_cleanup_args_t *) args; - pthread_mutex_t * mutexPtr = cleanup_args->mutexPtr; + ptw32_cond_wait_cleanup_args_t * cleanup_args = + (ptw32_cond_wait_cleanup_args_t *) args; pthread_cond_t cv = cleanup_args->cv; int * resultPtr = cleanup_args->resultPtr; - int lastWaiter = FALSE; + int eLastSignal; /* enum: 1=yes 0=no -1=cancelled/timedout w/o signal(s) */ + int result; /* - * Whether we got here legitimately or because of an error we - * indicate that we are no longer waiting. The alternative - * will result in never signaling the broadcasting thread. + * Whether we got here as a result of signal/broadcast or because of + * timeout on wait or thread cancellation we indicate that we are no + * longer waiting. The waiter is responsible for adjusting waiters + * (to)unblock(ed) counts (protected by unblock lock). + * Unblock lock/Sync.LEVEL-2 supports _timedwait and cancellation. */ - if (pthread_mutex_lock (&(cv->waitersLock)) == 0) + if ((result = pthread_mutex_lock(&(cv->mtxUnblockLock))) != 0) { - /* - * The waiter is responsible for decrementing - * its count, protected by an internal mutex. - */ + *resultPtr = result; + return; + } + + cv->nWaitersUnblocked++; - cv->waiters--; + eLastSignal = (cv->nWaitersToUnblock == 0) ? -1 : (--cv->nWaitersToUnblock == 0); - lastWaiter = cv->wasBroadcast && (cv->waiters == 0); + /* + * No more LEVEL-2 access to waiters (to)unblock(ed) counts needed + */ + if ((result = pthread_mutex_unlock(&(cv->mtxUnblockLock))) != 0) + { + *resultPtr = result; + return; + } - if (lastWaiter) + /* + * If last signal... + */ + if (eLastSignal == 1) + { + /* + * ...it means that we have end of 'atomic' signal/broadcast + */ + if (sem_post(&(cv->semBlockLock)) != 0) { - cv->wasBroadcast = FALSE; + *resultPtr = errno; + return; } - - (void) pthread_mutex_unlock (&(cv->waitersLock)); } - /* - * If we are the last waiter on this broadcast - * let the thread doing the broadcast proceed + * If not last signal and not timed out/cancelled wait w/o signal... */ - if (lastWaiter && !SetEvent (cv->waitersDone)) + else if (eLastSignal == 0) { - *resultPtr = EINVAL; + /* + * ...it means that next waiter can go through semaphore + */ + if (sem_post(&(cv->semBlockQueue)) != 0) + { + *resultPtr = errno; + return; + } } /* - * We must always regain the external mutex, even when - * errors occur, because that's the guarantee that we give - * to our callers. - * - * Note that the broadcasting thread may already own the lock. - * The standard actually requires that the signaling thread hold - * the lock at the time that it signals if the developer wants - * predictable scheduling behaviour. It's up to the developer. - * In that case all waiting threads will block here until - * the broadcasting thread releases the lock, having been - * notified by the last waiting thread (SetEvent call above). + * XSH: Upon successful return, the mutex has been locked and is owned + * by the calling thread */ - (void) pthread_mutex_lock (mutexPtr); + if ((result = pthread_mutex_lock(cleanup_args->mutexPtr)) != 0) + { + *resultPtr = result; + } } static int @@ -623,63 +653,68 @@ ptw32_cond_timedwait (pthread_cond_t * cond, cv = *cond; /* - * It's not OK to increment cond->waiters while the caller locked 'mutex', - * there may be other threads just waking up (with 'mutex' unlocked) - * and cv->... data is not protected. + * Synchronize access to waiters blocked count (LEVEL-1) */ - if (pthread_mutex_lock(&(cv->waitersLock)) != 0) + if (sem_wait(&(cv->semBlockLock)) != 0) { - return EINVAL; + return errno; } - cv->waiters++; + cv->nWaitersBlocked++; - if (pthread_mutex_unlock(&(cv->waitersLock)) != 0) + /* + * Thats it. Counted means waiting, no more access needed + */ + if (sem_post(&(cv->semBlockLock)) != 0) { - return EINVAL; + return errno; } /* - * We keep the lock held just long enough to increment the count of - * waiters by one (above). - * Note that we can't keep it held across the - * call to sem_wait since that will deadlock other calls - * to pthread_cond_signal + * Setup this waiter cleanup handler */ cleanup_args.mutexPtr = mutex; cleanup_args.cv = cv; cleanup_args.resultPtr = &result; - pthread_cleanup_push (ptw32_cond_wait_cleanup, (void *) &cleanup_args); + pthread_cleanup_push(ptw32_cond_wait_cleanup, (void *) &cleanup_args); - if ((result = pthread_mutex_unlock (mutex)) == 0) + /* + * Now we can release 'mutex' and... + */ + if ((result = pthread_mutex_unlock(mutex)) == 0) { + /* - * Wait to be awakened by + * ...wait to be awakened by * pthread_cond_signal, or * pthread_cond_broadcast, or - * a timeout + * timeout, or + * thread cancellation + * + * Note: * - * Note: - * ptw32_sem_timedwait is a cancelation point, - * hence providing the - * mechanism for making pthread_cond_wait a cancelation - * point. We use the cleanup mechanism to ensure we - * re-lock the mutex and decrement the waiters count - * if we are canceled. + * ptw32_sem_timedwait is a cancellation point, + * hence providing the mechanism for making + * pthread_cond_wait a cancellation point. + * We use the cleanup mechanism to ensure we + * re-lock the mutex and adjust (to)unblock(ed) waiters + * counts if we are cancelled, timed out or signalled. */ - if (ptw32_sem_timedwait (&(cv->sema), abstime) == -1) - { - result = errno; - } + if (ptw32_sem_timedwait(&(cv->semBlockQueue), abstime) != 0) + { + result = errno; + } } - pthread_cleanup_pop (1); /* Always cleanup */ + /* + * Always cleanup + */ + pthread_cleanup_pop(1); + /* * "result" can be modified by the cleanup handler. - * Specifically, if we are the last waiting thread and failed - * to notify the broadcast thread to proceed. */ return (result); @@ -736,8 +771,11 @@ pthread_cond_wait (pthread_cond_t * cond, * ------------------------------------------------------ */ { - /* The NULL abstime arg means INFINITE waiting. */ + /* + * The NULL abstime arg means INFINITE waiting. + */ return(ptw32_cond_timedwait(cond, mutex, NULL)); + } /* pthread_cond_wait */ @@ -805,6 +843,105 @@ pthread_cond_timedwait (pthread_cond_t * cond, } /* pthread_cond_timedwait */ +static int +ptw32_cond_unblock (pthread_cond_t * cond, + int unblockAll) +{ + int result; + pthread_cond_t cv; + + if (cond == NULL || *cond == NULL) + { + return EINVAL; + } + + cv = *cond; + + /* + * No-op if the CV is static and hasn't been initialised yet. + * Assuming that any race condition is harmless. + */ + if (cv == (pthread_cond_t) PTW32_OBJECT_AUTO_INIT) + { + return 0; + } + + /* + * Synchronize access to waiters blocked count (LEVEL-1) + */ + if (sem_wait(&(cv->semBlockLock)) != 0) + { + return errno; + } + + /* + * Synchronize access to waiters (to)unblock(ed) counts (LEVEL-2) + * This sync.level supports _timedwait and cancellation + */ + if ((result = pthread_mutex_lock(&(cv->mtxUnblockLock))) != 0) + { + return result; + } + + /* + * Adjust waiters blocked and unblocked counts (collect garbage) + */ + if (cv->nWaitersUnblocked != 0) + { + cv->nWaitersBlocked -= cv->nWaitersUnblocked; + cv->nWaitersUnblocked = 0; + } + + /* + * If (after adjustment) there are still some waiters blocked counted... + */ + if ( cv->nWaitersBlocked > 0) + { + /* + * We will unblock first waiter and leave semBlockLock/LEVEL-1 locked + * LEVEL-1 access is left disabled until last signal/unblock completes + */ + cv->nWaitersToUnblock = (unblockAll) ? cv->nWaitersBlocked : 1; + + /* + * No more LEVEL-2 access to waiters (to)unblock(ed) counts needed + * This sync.level supports _timedwait and cancellation + */ + if ((result = pthread_mutex_unlock(&(cv->mtxUnblockLock))) != 0) + { + return result; + } + + + /* + * Now, with LEVEL-2 lock released let first waiter go through semaphore + */ + if (sem_post(&(cv->semBlockQueue)) != 0) + { + return errno; + } + } + /* + * No waiter blocked - no more LEVEL-1 access to blocked count needed... + */ + else if (sem_post(&(cv->semBlockLock)) != 0) + { + return errno; + } + /* + * ...and no more LEVEL-2 access to waiters (to)unblock(ed) counts needed too + * This sync.level supports _timedwait and cancellation + */ + else + { + result = pthread_mutex_unlock(&(cv->mtxUnblockLock)); + } + + return(result); + +} /* ptw32_cond_unblock */ + + int pthread_cond_signal (pthread_cond_t * cond) /* @@ -847,35 +984,10 @@ pthread_cond_signal (pthread_cond_t * cond) * ------------------------------------------------------ */ { - int result = 0; - pthread_cond_t cv; - - if (cond == NULL || *cond == NULL) - { - return EINVAL; - } - - cv = *cond; - - /* - * No-op if the CV is static and hasn't been initialised yet. - * Assuming that race conditions are harmless. - */ - if (cv == (pthread_cond_t) PTW32_OBJECT_AUTO_INIT) - { - return 0; - } - - /* - * If there aren't any waiters, then this is a no-op. - * Assuming that race conditions are harmless. + /* + * The '0'(FALSE) unblockAll arg means unblock ONE waiter. */ - if (cv->waiters > 0) - { - result = sem_post (&(cv->sema)); - } - - return (result); + return(ptw32_cond_unblock(cond, 0)); } /* pthread_cond_signal */ @@ -917,76 +1029,10 @@ pthread_cond_broadcast (pthread_cond_t * cond) * ------------------------------------------------------ */ { - int result = 0; - int wereWaiters = FALSE; - pthread_cond_t cv; - - if (cond == NULL || *cond == NULL) - { - return EINVAL; - } - - cv = *cond; - - /* - * No-op if the CV is static and hasn't been initialised yet. - * Assuming that any race condition is harmless. + /* + * The '1'(TRUE) unblockAll arg means unblock ALL waiters. */ - if (cv == (pthread_cond_t) PTW32_OBJECT_AUTO_INIT) - { - return 0; - } - - if (pthread_mutex_lock(&(cv->waitersLock)) == EINVAL) - { - return EINVAL; - } - - cv->wasBroadcast = TRUE; - wereWaiters = (cv->waiters > 0); - - if (wereWaiters) - { - /* - * Wake up all waiters - */ - -#ifdef NEED_SEM - - result = (ptw32_increase_semaphore( &cv->sema, cv->waiters ) - ? 0 - : EINVAL); - -#else /* NEED_SEM */ - - result = (ReleaseSemaphore( cv->sema, cv->waiters, NULL ) - ? 0 - : EINVAL); - -#endif /* NEED_SEM */ - - } - - (void) pthread_mutex_unlock(&(cv->waitersLock)); - - if (wereWaiters && result == 0) - { - /* - * Wait for all the awakened threads to acquire their part of - * the counting semaphore - */ - if (WaitForSingleObject (cv->waitersDone, INFINITE) - == WAIT_OBJECT_0) - { - result = 0; - } - else - { - result = EINVAL; - } - - } - - return (result); + return(ptw32_cond_unblock(cond, 1)); } + |