From 5b826fe110d9cde198d2aae27e144ac635ad921f Mon Sep 17 00:00:00 2001 From: rpj Date: Sun, 3 Jun 2001 16:41:27 +0000 Subject: pthreads: 2001-06-03 Ross Johnson Contributed by - Alexander Terekhov - Louis Thomas * condvar.c (pthread_cond_init): Completely revamped. (pthread_cond_destroy): Likewise. (ptw32_cond_wait_cleanup): Likewise. (ptw32_cond_timedwait): Likewise. (ptw32_cond_unblock): New general signaling routine. (pthread_cond_signal): Now calls ptw32_cond_unblock. (pthread_cond_broadcast): Likewise. * implement.h (pthread_cond_t_): Revamped. * README.CV: New; explanation of the above changes. pthreads/tests: 2001-06-3 Ross Johnson * condvar2_1.c: New test. * condvar3_1.c: New test. * condvar3_2.c: New test. --- condvar.c | 628 +++++++++++++++++++++++++++++++++++++------------------------- 1 file changed, 378 insertions(+), 250 deletions(-) (limited to 'condvar.c') diff --git a/condvar.c b/condvar.c index afc6eea..eb1fd9a 100644 --- a/condvar.c +++ b/condvar.c @@ -27,7 +27,7 @@ #include "implement.h" static int -ptw32_cond_check_need_init(pthread_cond_t *cond) +ptw32_cond_check_need_init (pthread_cond_t *cond) { int result = 0; @@ -77,7 +77,7 @@ ptw32_cond_check_need_init(pthread_cond_t *cond) LeaveCriticalSection(&ptw32_cond_test_init_lock); - return(result); + return result; } @@ -124,7 +124,7 @@ pthread_condattr_init (pthread_condattr_t * attr) *attr = attr_result; - return (result); + return result; } /* pthread_condattr_init */ @@ -162,17 +162,16 @@ pthread_condattr_destroy (pthread_condattr_t * attr) if (attr == NULL || *attr == NULL) { result = EINVAL; - } else { - free (*attr); + (void) free (*attr); *attr = NULL; result = 0; } - return (result); + return result; } /* pthread_condattr_destroy */ @@ -220,13 +219,10 @@ pthread_condattr_getpshared (const pthread_condattr_t * attr, int *pshared) { int result; - if ((attr != NULL && *attr != NULL) && - (pshared != NULL)) + if ((attr != NULL && *attr != NULL) && (pshared != NULL)) { - *pshared = (*attr)->pshared; result = 0; - } else { @@ -234,7 +230,7 @@ pthread_condattr_getpshared (const pthread_condattr_t * attr, int *pshared) result = EINVAL; } - return (result); + return result; } /* pthread_condattr_getpshared */ @@ -284,12 +280,10 @@ pthread_condattr_setpshared (pthread_condattr_t * attr, int pshared) { int result; - if ((attr != NULL && *attr != NULL) && - ((pshared == PTHREAD_PROCESS_SHARED) || - (pshared == PTHREAD_PROCESS_PRIVATE))) + if ((attr != NULL && *attr != NULL) + && ((pshared == PTHREAD_PROCESS_SHARED) + || (pshared == PTHREAD_PROCESS_PRIVATE))) { - - if (pshared == PTHREAD_PROCESS_SHARED) { @@ -306,16 +300,15 @@ pthread_condattr_setpshared (pthread_condattr_t * attr, int pshared) { result = 0; } - (*attr)->pshared = pshared; + (*attr)->pshared = pshared; } else { result = EINVAL; - } - return (result); + return result; } /* pthread_condattr_setpshared */ @@ -349,7 +342,7 @@ pthread_cond_init (pthread_cond_t * cond, const pthread_condattr_t * attr) * ------------------------------------------------------ */ { - int result = EAGAIN; + int result; pthread_cond_t cv = NULL; if (cond == NULL) @@ -365,37 +358,35 @@ pthread_cond_init (pthread_cond_t * cond, const pthread_condattr_t * attr) * processes. */ result = ENOSYS; - - goto FAIL0; + goto DONE; } - cv = (pthread_cond_t) calloc (1, sizeof (*cv)); + cv = (pthread_cond_t) calloc(1, sizeof (*cv)); if (cv == NULL) { result = ENOMEM; - goto FAIL0; + goto DONE; } - cv->waiters = 0; - cv->wasBroadcast = FALSE; + cv->nWaitersBlocked = 0; + cv->nWaitersUnblocked = 0; + cv->nWaitersToUnblock = 0; + cv->nWaitersGone = 0; - if (sem_init (&(cv->sema), 0, 0) != 0) + if (sem_init(&(cv->semBlockLock), 0, 1) != 0) { + result = errno; goto FAIL0; } - if (pthread_mutex_init (&(cv->waitersLock), NULL) != 0) + + if (sem_init(&(cv->semBlockQueue), 0, 0) != 0) { + result = errno; goto FAIL1; } - cv->waitersDone = CreateEvent ( - 0, - (int) FALSE, /* manualReset */ - (int) FALSE, /* setSignaled */ - NULL); - - if (cv->waitersDone == NULL) + if ((result = pthread_mutex_init(&(cv->mtxUnblockLock), 0)) != 0) { goto FAIL2; } @@ -406,22 +397,23 @@ pthread_cond_init (pthread_cond_t * cond, const pthread_condattr_t * attr) /* * ------------- - * Failure Code + * Failed... * ------------- */ FAIL2: - (void) pthread_mutex_destroy (&(cv->waitersLock)); + (void) sem_destroy(&(cv->semBlockQueue)); FAIL1: - (void) sem_destroy (&(cv->sema)); - free(cv); - cv = NULL; + (void) sem_destroy(&(cv->semBlockLock)); FAIL0: + (void) free(cv); + cv = NULL; + DONE: *cond = cv; - return (result); + return result; } /* pthread_cond_init */ @@ -454,8 +446,8 @@ pthread_cond_destroy (pthread_cond_t * cond) * ------------------------------------------------------ */ { - int result = 0; pthread_cond_t cv; + int result = 0, result1 = 0, result2 = 0; /* * Assuming any race condition here is harmless. @@ -470,24 +462,56 @@ pthread_cond_destroy (pthread_cond_t * cond) { cv = *cond; - if (pthread_mutex_lock(&(cv->waitersLock)) != 0) - { - return EINVAL; - } - - if (cv->waiters > 0) - { - (void) pthread_mutex_unlock(&(cv->waitersLock)); - return EBUSY; - } + /* + * Synchronize access to waiters blocked count (LEVEL-1) + */ + if (sem_wait(&(cv->semBlockLock)) != 0) + { + return errno; + } - (void) sem_destroy (&(cv->sema)); - (void) CloseHandle (cv->waitersDone); - (void) pthread_mutex_unlock(&(cv->waitersLock)); - (void) pthread_mutex_destroy (&(cv->waitersLock)); + /* + * Synchronize access to waiters (to)unblock(ed) counts (LEVEL-2) + */ + if ((result = pthread_mutex_lock(&(cv->mtxUnblockLock))) != 0) + { + (void) sem_post(&(cv->semBlockLock)); + return result; + } - free(cv); - *cond = NULL; + /* + * Check whether cv is still busy (still has waiters) + */ + if (cv->nWaitersBlocked - cv->nWaitersGone - cv->nWaitersUnblocked > 0) + { + if (sem_post(&(cv->semBlockLock)) != 0) + { + result = errno; + } + result1 = pthread_mutex_unlock(&(cv->mtxUnblockLock)); + result2 = EBUSY; + } + else + { + /* + * Now it is safe to destroy + */ + *cond = NULL; + if (sem_destroy(&(cv->semBlockLock)) != 0) + { + result = errno; + } + if (sem_destroy(&(cv->semBlockQueue)) != 0) + { + result1 = errno; + } + if ((result2 = pthread_mutex_unlock(&(cv->mtxUnblockLock))) == 0) + { + result2 = pthread_mutex_destroy(&(cv->mtxUnblockLock)); + } + + (void) free(cv); + } } else { @@ -521,7 +545,8 @@ pthread_cond_destroy (pthread_cond_t * cond) LeaveCriticalSection(&ptw32_cond_test_init_lock); } - return (result); + return ((result != 0) ? result : ((result1 != 0) ? result1 : result2)); + } /* @@ -532,65 +557,135 @@ typedef struct { pthread_mutex_t * mutexPtr; pthread_cond_t cv; int * resultPtr; + int signaled; } ptw32_cond_wait_cleanup_args_t; static void ptw32_cond_wait_cleanup(void * args) { ptw32_cond_wait_cleanup_args_t * cleanup_args = (ptw32_cond_wait_cleanup_args_t *) args; - pthread_mutex_t * mutexPtr = cleanup_args->mutexPtr; pthread_cond_t cv = cleanup_args->cv; int * resultPtr = cleanup_args->resultPtr; - int lastWaiter = FALSE; + int nSignalsWasLeft; + int nWaitersWasGone = 0; + int result; /* - * Whether we got here legitimately or because of an error we - * indicate that we are no longer waiting. The alternative - * will result in never signaling the broadcasting thread. + * Whether we got here as a result of signal/broadcast or because of + * timeout on wait or thread cancellation we indicate that we are no + * longer waiting. The waiter is responsible for adjusting waiters + * (to)unblock(ed) counts (protected by unblock lock). + * Unblock lock/Sync.LEVEL-2 supports _timedwait and cancellation. */ - if (pthread_mutex_lock (&(cv->waitersLock)) == 0) + if ((result = pthread_mutex_lock(&(cv->mtxUnblockLock))) != 0) { - /* - * The waiter is responsible for decrementing - * its count, protected by an internal mutex. - */ - - cv->waiters--; - - lastWaiter = cv->wasBroadcast && (cv->waiters == 0); + *resultPtr = result; + goto FAIL0; + } - if (lastWaiter) + if ( 0 != (nSignalsWasLeft = cv->nWaitersToUnblock) ) + { + if ( ! cleanup_args->signaled ) + { + if ( 0 != cv->nWaitersBlocked ) + { + (cv->nWaitersBlocked)--; + } + else + { + (cv->nWaitersGone)++; + } + } + if ( 0 == --(cv->nWaitersToUnblock) ) + { + if ( 0 != cv->nWaitersBlocked ) + { + if (sem_post( &(cv->semBlockLock) ) != 0) + { + *resultPtr = errno; + goto FAIL1; + } + nSignalsWasLeft = 0; + } + else if ( 0 != (nWaitersWasGone = cv->nWaitersGone) ) + { + cv->nWaitersGone = 0; + } + } + } + else if ( INT_MAX/2 == ++(cv->nWaitersGone) ) + { + if (sem_wait( &(cv->semBlockLock) ) != 0) + { + *resultPtr = errno; + goto FAIL1; + } + cv->nWaitersBlocked -= cv->nWaitersGone; + if (sem_post( &(cv->semBlockLock) ) != 0) { - cv->wasBroadcast = FALSE; + *resultPtr = errno; + goto FAIL1; } + cv->nWaitersGone = 0; + } - (void) pthread_mutex_unlock (&(cv->waitersLock)); + /* + * No more LEVEL-2 access to waiters (to)unblock(ed) counts needed + */ + if ((result = pthread_mutex_unlock(&(cv->mtxUnblockLock))) != 0) + { + *resultPtr = result; + goto FAIL0; } /* - * If we are the last waiter on this broadcast - * let the thread doing the broadcast proceed + * If last signal... */ - if (lastWaiter && !SetEvent (cv->waitersDone)) + if ( 1 == nSignalsWasLeft ) { - *resultPtr = EINVAL; + if ( 0 != nWaitersWasGone ) + { + // sem_adjust( &(cv->semBlockQueue), -nWaitersWasGone ); + while ( nWaitersWasGone-- ) { + if (sem_wait( &(cv->semBlockQueue)) != 0 ) + { + *resultPtr = errno; + goto FAIL0; + } + } + } + /* + * ...it means that we have end of 'atomic' signal/broadcast + */ + if (sem_post(&(cv->semBlockLock)) != 0) + { + *resultPtr = errno; + goto FAIL0; + } } + goto DONE; + + FAIL1: + if ((result = pthread_mutex_unlock(&(cv->mtxUnblockLock))) != 0) + { + *resultPtr = result; + } + + FAIL0: + return; + + DONE: /* - * We must always regain the external mutex, even when - * errors occur, because that's the guarantee that we give - * to our callers. - * - * Note that the broadcasting thread may already own the lock. - * The standard actually requires that the signaling thread hold - * the lock at the time that it signals if the developer wants - * predictable scheduling behaviour. It's up to the developer. - * In that case all waiting threads will block here until - * the broadcasting thread releases the lock, having been - * notified by the last waiting thread (SetEvent call above). + * XSH: Upon successful return, the mutex has been locked and is owned + * by the calling thread */ - (void) pthread_mutex_lock (mutexPtr); -} + if ((result = pthread_mutex_lock(cleanup_args->mutexPtr)) != 0) + { + *resultPtr = result; + } + +} /* ptw32_cond_wait_cleanup */ static int ptw32_cond_timedwait (pthread_cond_t * cond, @@ -625,72 +720,211 @@ ptw32_cond_timedwait (pthread_cond_t * cond, cv = *cond; /* - * It's not OK to increment cond->waiters while the caller locked 'mutex', - * there may be other threads just waking up (with 'mutex' unlocked) - * and cv->... data is not protected. + * Synchronize access to waiters blocked count (LEVEL-1) */ - if (pthread_mutex_lock(&(cv->waitersLock)) != 0) + if (sem_wait(&(cv->semBlockLock)) != 0) { - return EINVAL; + return errno; } - cv->waiters++; + cv->nWaitersBlocked++; - if (pthread_mutex_unlock(&(cv->waitersLock)) != 0) + /* + * Thats it. Counted means waiting, no more access needed + */ + if (sem_post(&(cv->semBlockLock)) != 0) { - return EINVAL; + return errno; } /* - * We keep the lock held just long enough to increment the count of - * waiters by one (above). - * Note that we can't keep it held across the - * call to sem_wait since that will deadlock other calls - * to pthread_cond_signal + * Setup this waiter cleanup handler */ cleanup_args.mutexPtr = mutex; cleanup_args.cv = cv; cleanup_args.resultPtr = &result; + /* + * If we're canceled, or the cancelable wait fails for any reason, + * including a timeout, then tell the cleanup routine that we + * have not been signaled. + */ + cleanup_args.signaled = 0; - pthread_cleanup_push (ptw32_cond_wait_cleanup, (void *) &cleanup_args); + pthread_cleanup_push(ptw32_cond_wait_cleanup, (void *) &cleanup_args); - if ((result = pthread_mutex_unlock (mutex)) == 0) + /* + * Now we can release 'mutex' and... + */ + if ((result = pthread_mutex_unlock(mutex)) == 0) { + /* - * Wait to be awakened by + * ...wait to be awakened by * pthread_cond_signal, or * pthread_cond_broadcast, or - * a timeout + * timeout, or + * thread cancellation * * Note: - * ptw32_sem_timedwait is a cancelation point, - * hence providing the - * mechanism for making pthread_cond_wait a cancelation - * point. We use the cleanup mechanism to ensure we - * re-lock the mutex and decrement the waiters count - * if we are canceled. + * + * ptw32_sem_timedwait is a cancellation point, + * hence providing the mechanism for making + * pthread_cond_wait a cancellation point. + * We use the cleanup mechanism to ensure we + * re-lock the mutex and adjust (to)unblock(ed) waiters + * counts if we are cancelled, timed out or signalled. */ - if (ptw32_sem_timedwait (&(cv->sema), abstime) == -1) - { - result = errno; - } + if (ptw32_sem_timedwait(&(cv->semBlockQueue), abstime) != 0) + { + result = errno; + } } - pthread_cleanup_pop (1); /* Always cleanup */ + /* + * Not executed if we're canceled. Signaled is false if we timed out. + */ + cleanup_args.signaled = (result == 0); + + /* + * Always cleanup + */ + pthread_cleanup_pop(1); /* * "result" can be modified by the cleanup handler. - * Specifically, if we are the last waiting thread and failed - * to notify the broadcast thread to proceed. */ - return (result); + return result; } /* ptw32_cond_timedwait */ +static int +ptw32_cond_unblock (pthread_cond_t * cond, + int unblockAll) + /* + * Notes. + * + * Does not use the external mutex for synchronisation, + * therefore semBlockLock is needed. + * mtxUnblockLock is for LEVEL-2 synch. LEVEL-2 is the + * state where the external mutex is not necessarily locked by + * any thread, ie. between cond_wait unlocking and re-acquiring + * the lock after having been signaled or a timeout or + * cancellation. + * + * Uses the following CV elements: + * nWaitersBlocked + * nWaitersToUnblock + * nWaitersGone + * mtxUnblockLock + * semBlockLock + * semBlockQueue + */ +{ + int result; + int result2; + pthread_cond_t cv; + int nSignalsToIssue = 1; + + if (cond == NULL || *cond == NULL) + { + return EINVAL; + } + + cv = *cond; + + /* + * No-op if the CV is static and hasn't been initialised yet. + * Assuming that any race condition is harmless. + */ + if (cv == (pthread_cond_t) PTW32_OBJECT_AUTO_INIT) + { + return 0; + } + + /* + * Synchronize access to waiters (to)unblock(ed) counts (LEVEL-2) + * This sync.level supports _timedwait and cancellation + */ + if ((result = pthread_mutex_lock(&(cv->mtxUnblockLock))) != 0) + { + return result; + } + + if ( 0 != cv->nWaitersToUnblock ) + { + if ( 0 == cv->nWaitersBlocked ) + { + goto FAIL1; + } + if (unblockAll) + { + cv->nWaitersToUnblock += (nSignalsToIssue = cv->nWaitersBlocked); + cv->nWaitersBlocked = 0; + } + else + { + cv->nWaitersToUnblock++; + cv->nWaitersBlocked--; + } + } + else + { + if (sem_wait( &(cv->semBlockLock) ) != 0) + { + result = errno; + goto FAIL1; + } + if ( cv->nWaitersBlocked > cv->nWaitersGone ) + { + if ( 0 != cv->nWaitersGone ) + { + cv->nWaitersBlocked -= cv->nWaitersGone; + cv->nWaitersGone = 0; + } + if (unblockAll) + { + nSignalsToIssue = cv->nWaitersToUnblock = cv->nWaitersBlocked; + cv->nWaitersBlocked = 0; + } + else + { + nSignalsToIssue = cv->nWaitersToUnblock = 1; + cv->nWaitersBlocked--; + } + } + else + { + if (sem_post( &(cv->semBlockLock) ) != 0) + { + result = errno; + goto FAIL1; + } + } + } + + FAIL1: + if ((result2 = pthread_mutex_unlock( &(cv->mtxUnblockLock) )) != 0) + { + result = result2; + } + while (0 != nSignalsToIssue--) + { + if (sem_post( &(cv->semBlockQueue) ) != 0) + { + result = errno; + goto FAIL0; + } + } + + FAIL0: + return result; + +} /* ptw32_cond_unblock */ + int pthread_cond_wait (pthread_cond_t * cond, - pthread_mutex_t * mutex) + pthread_mutex_t * mutex) /* * ------------------------------------------------------ * DOCPUBLIC @@ -715,8 +949,9 @@ pthread_cond_wait (pthread_cond_t * cond, * awakened by a signal or broadcast. * * NOTES: + * * 1) The function must be called with 'mutex' LOCKED - * by the calling thread, or undefined behaviour + * by the calling thread, or undefined behaviour * will result. * * 2) This routine atomically releases 'mutex' and causes @@ -738,8 +973,11 @@ pthread_cond_wait (pthread_cond_t * cond, * ------------------------------------------------------ */ { - /* The NULL abstime arg means INFINITE waiting. */ - return(ptw32_cond_timedwait(cond, mutex, NULL)); + /* + * The NULL abstime arg means INFINITE waiting. + */ + return (ptw32_cond_timedwait(cond, mutex, NULL)); + } /* pthread_cond_wait */ @@ -772,7 +1010,7 @@ pthread_cond_timedwait (pthread_cond_t * cond, * * NOTES: * 1) The function must be called with 'mutex' LOCKED - * by the calling thread, or undefined behaviour + * by the calling thread, or undefined behaviour * will result. * * 2) This routine atomically releases 'mutex' and causes @@ -792,18 +1030,13 @@ pthread_cond_timedwait (pthread_cond_t * cond, * ------------------------------------------------------ */ { - int result = 0; - if (abstime == NULL) { - result = EINVAL; - } - else - { - result = ptw32_cond_timedwait(cond, mutex, abstime); + return EINVAL; } - return(result); + return (ptw32_cond_timedwait(cond, mutex, abstime)); + } /* pthread_cond_timedwait */ @@ -831,17 +1064,10 @@ pthread_cond_signal (pthread_cond_t * cond) * an unspecified waiter is awakened. * * NOTES: + * * 1) Use when any waiter can respond and only one need * respond (all waiters being equal). * - * 2) This function MUST be called under the protection - * of the SAME mutex that is used with the condition - * variable being signaled; OTHERWISE, the condition - * variable may be signaled between the test of the - * associated condition and the blocking - * pthread_cond_signal. - * This can cause an infinite wait. - * * RESULTS * 0 successfully signaled condition, * EINVAL 'cond' is invalid, @@ -849,35 +1075,10 @@ pthread_cond_signal (pthread_cond_t * cond) * ------------------------------------------------------ */ { - int result = 0; - pthread_cond_t cv; - - if (cond == NULL || *cond == NULL) - { - return EINVAL; - } - - cv = *cond; - /* - * No-op if the CV is static and hasn't been initialised yet. - * Assuming that race conditions are harmless. + * The '0'(FALSE) unblockAll arg means unblock ONE waiter. */ - if (cv == (pthread_cond_t) PTW32_OBJECT_AUTO_INIT) - { - return 0; - } - - /* - * If there aren't any waiters, then this is a no-op. - * Assuming that race conditions are harmless. - */ - if (cv->waiters > 0) - { - result = sem_post (&(cv->sema)); - } - - return (result); + return (ptw32_cond_unblock(cond, 0)); } /* pthread_cond_signal */ @@ -899,14 +1100,8 @@ pthread_cond_broadcast (pthread_cond_t * cond) * all waiting threads. * * NOTES: - * 1) This function MUST be called under the protection - * of the SAME mutex that is used with the condition - * variable being signaled; OTHERWISE, the condition - * variable may be signaled between the test of the - * associated condition and the blocking pthread_cond_wait. - * This can cause an infinite wait. - * - * 2) Use when more than one waiter may respond to + * + * 1) Use when more than one waiter may respond to * predicate change or if any waiting thread may * not be able to respond * @@ -919,76 +1114,9 @@ pthread_cond_broadcast (pthread_cond_t * cond) * ------------------------------------------------------ */ { - int result = 0; - int wereWaiters = FALSE; - pthread_cond_t cv; - - if (cond == NULL || *cond == NULL) - { - return EINVAL; - } - - cv = *cond; - /* - * No-op if the CV is static and hasn't been initialised yet. - * Assuming that any race condition is harmless. + * The '1'(TRUE) unblockAll arg means unblock ALL waiters. */ - if (cv == (pthread_cond_t) PTW32_OBJECT_AUTO_INIT) - { - return 0; - } + return (ptw32_cond_unblock(cond, 1)); - if (pthread_mutex_lock(&(cv->waitersLock)) == EINVAL) - { - return EINVAL; - } - - cv->wasBroadcast = TRUE; - wereWaiters = (cv->waiters > 0); - - if (wereWaiters) - { - /* - * Wake up all waiters - */ - -#ifdef NEED_SEM - - result = (ptw32_increase_semaphore( &cv->sema, cv->waiters ) - ? 0 - : EINVAL); - -#else /* NEED_SEM */ - - result = (ReleaseSemaphore( cv->sema, cv->waiters, NULL ) - ? 0 - : EINVAL); - -#endif /* NEED_SEM */ - - } - - (void) pthread_mutex_unlock(&(cv->waitersLock)); - - if (wereWaiters && result == 0) - { - /* - * Wait for all the awakened threads to acquire their part of - * the counting semaphore - */ - if (WaitForSingleObject (cv->waitersDone, INFINITE) - == WAIT_OBJECT_0) - { - result = 0; - } - else - { - result = EINVAL; - } - - } - - return (result); - -} +} /* pthread_cond_broadcast */ -- cgit v1.2.3