summaryrefslogtreecommitdiff
path: root/condvar.c
diff options
context:
space:
mode:
authorrpj <rpj>2001-02-06 05:44:38 +0000
committerrpj <rpj>2001-02-06 05:44:38 +0000
commit25f0d94be4e63b1a3cea1844bc4be7849c452a75 (patch)
treea124e65563367a4f2d3780128e29a7bddddf2b4e /condvar.c
parent9171eeaa77aaa6c4c34a88f5305ee3ebbc63077c (diff)
Created experimental branch.
Diffstat (limited to 'condvar.c')
-rw-r--r--condvar.c436
1 files changed, 241 insertions, 195 deletions
diff --git a/condvar.c b/condvar.c
index a92fdfb..2974000 100644
--- a/condvar.c
+++ b/condvar.c
@@ -319,7 +319,6 @@ pthread_condattr_setpshared (pthread_condattr_t * attr, int pshared)
} /* pthread_condattr_setpshared */
-
int
pthread_cond_init (pthread_cond_t * cond, const pthread_condattr_t * attr)
/*
@@ -377,43 +376,40 @@ pthread_cond_init (pthread_cond_t * cond, const pthread_condattr_t * attr)
goto FAIL0;
}
- cv->waiters = 0;
- cv->wasBroadcast = FALSE;
+ cv->nWaitersBlocked = 0;
+ cv->nWaitersUnblocked = 0;
+ cv->nWaitersToUnblock = 0;
- if (sem_init (&(cv->sema), 0, 0) != 0)
+ if (sem_init(&(cv->semBlockLock), 0, 1) != 0)
{
goto FAIL0;
}
- if (pthread_mutex_init (&(cv->waitersLock), NULL) != 0)
+
+ if (sem_init(&(cv->semBlockQueue), 0, 0) != 0)
{
goto FAIL1;
}
- cv->waitersDone = CreateEvent (
- 0,
- (int) FALSE, /* manualReset */
- (int) FALSE, /* setSignaled */
- NULL);
-
- if (cv->waitersDone == NULL)
+ if (pthread_mutex_init(&(cv->mtxUnblockLock), 0) != 0)
{
goto FAIL2;
}
+
result = 0;
goto DONE;
/*
* -------------
- * Failure Code
+ * Failed...
* -------------
*/
FAIL2:
- (void) pthread_mutex_destroy (&(cv->waitersLock));
+ (void) sem_destroy(&(cv->semBlockQueue));
FAIL1:
- (void) sem_destroy (&(cv->sema));
+ (void) sem_destroy(&(cv->semBlockLock));
FAIL0:
DONE:
@@ -423,7 +419,6 @@ DONE:
} /* pthread_cond_init */
-
int
pthread_cond_destroy (pthread_cond_t * cond)
/*
@@ -458,7 +453,7 @@ pthread_cond_destroy (pthread_cond_t * cond)
/*
* Assuming any race condition here is harmless.
*/
- if (cond == NULL
+ if (cond == NULL
|| *cond == NULL)
{
return EINVAL;
@@ -468,24 +463,42 @@ pthread_cond_destroy (pthread_cond_t * cond)
{
cv = *cond;
- if (pthread_mutex_lock(&(cv->waitersLock)) != 0)
- {
- return EINVAL;
- }
+ /*
+ * Synchronize access to waiters blocked count (LEVEL-1)
+ */
+ if (sem_wait(&(cv->semBlockLock)) != 0)
+ {
+ return errno;
+ }
- if (cv->waiters > 0)
- {
- (void) pthread_mutex_unlock(&(cv->waitersLock));
- return EBUSY;
- }
+ /*
+ * Synchronize access to waiters (to)unblock(ed) counts (LEVEL-2)
+ */
+ if ((result = pthread_mutex_lock(&(cv->mtxUnblockLock))) != 0)
+ {
+ (void) sem_post(&(cv->semBlockLock));
+ return result;
+ }
- (void) sem_destroy (&(cv->sema));
- (void) CloseHandle (cv->waitersDone);
- (void) pthread_mutex_unlock(&(cv->waitersLock));
- (void) pthread_mutex_destroy (&(cv->waitersLock));
+ /*
+ * Check whether cv is still busy (still has waiters blocked)
+ */
+ if (cv->nWaitersBlocked - cv->nWaitersUnblocked > 0)
+ {
+ (void) sem_post(&(cv->semBlockLock));
+ (void) pthread_mutex_unlock(&(cv->mtxUnblockLock));
+ return EBUSY;
+ }
+ /*
+ * Now it is safe to destroy
+ */
+ *cond = NULL; /* Invalidate it before anything else */
+ (void) sem_destroy(&(cv->semBlockLock));
+ (void) sem_destroy(&(cv->semBlockQueue));
+ (void) pthread_mutex_unlock(&(cv->mtxUnblockLock));
+ (void) pthread_mutex_destroy(&(cv->mtxUnblockLock));
free(cv);
- *cond = NULL;
}
else
{
@@ -535,59 +548,76 @@ typedef struct {
static void
ptw32_cond_wait_cleanup(void * args)
{
- ptw32_cond_wait_cleanup_args_t * cleanup_args = (ptw32_cond_wait_cleanup_args_t *) args;
- pthread_mutex_t * mutexPtr = cleanup_args->mutexPtr;
+ ptw32_cond_wait_cleanup_args_t * cleanup_args =
+ (ptw32_cond_wait_cleanup_args_t *) args;
pthread_cond_t cv = cleanup_args->cv;
int * resultPtr = cleanup_args->resultPtr;
- int lastWaiter = FALSE;
+ int eLastSignal; /* enum: 1=yes 0=no -1=cancelled/timedout w/o signal(s) */
+ int result;
/*
- * Whether we got here legitimately or because of an error we
- * indicate that we are no longer waiting. The alternative
- * will result in never signaling the broadcasting thread.
+ * Whether we got here as a result of signal/broadcast or because of
+ * timeout on wait or thread cancellation we indicate that we are no
+ * longer waiting. The waiter is responsible for adjusting waiters
+ * (to)unblock(ed) counts (protected by unblock lock).
+ * Unblock lock/Sync.LEVEL-2 supports _timedwait and cancellation.
*/
- if (pthread_mutex_lock (&(cv->waitersLock)) == 0)
+ if ((result = pthread_mutex_lock(&(cv->mtxUnblockLock))) != 0)
{
- /*
- * The waiter is responsible for decrementing
- * its count, protected by an internal mutex.
- */
+ *resultPtr = result;
+ return;
+ }
+
+ cv->nWaitersUnblocked++;
- cv->waiters--;
+ eLastSignal = (cv->nWaitersToUnblock == 0) ? -1 : (--cv->nWaitersToUnblock == 0);
- lastWaiter = cv->wasBroadcast && (cv->waiters == 0);
+ /*
+ * No more LEVEL-2 access to waiters (to)unblock(ed) counts needed
+ */
+ if ((result = pthread_mutex_unlock(&(cv->mtxUnblockLock))) != 0)
+ {
+ *resultPtr = result;
+ return;
+ }
- if (lastWaiter)
+ /*
+ * If last signal...
+ */
+ if (eLastSignal == 1)
+ {
+ /*
+ * ...it means that we have end of 'atomic' signal/broadcast
+ */
+ if (sem_post(&(cv->semBlockLock)) != 0)
{
- cv->wasBroadcast = FALSE;
+ *resultPtr = errno;
+ return;
}
-
- (void) pthread_mutex_unlock (&(cv->waitersLock));
}
-
/*
- * If we are the last waiter on this broadcast
- * let the thread doing the broadcast proceed
+ * If not last signal and not timed out/cancelled wait w/o signal...
*/
- if (lastWaiter && !SetEvent (cv->waitersDone))
+ else if (eLastSignal == 0)
{
- *resultPtr = EINVAL;
+ /*
+ * ...it means that next waiter can go through semaphore
+ */
+ if (sem_post(&(cv->semBlockQueue)) != 0)
+ {
+ *resultPtr = errno;
+ return;
+ }
}
/*
- * We must always regain the external mutex, even when
- * errors occur, because that's the guarantee that we give
- * to our callers.
- *
- * Note that the broadcasting thread may already own the lock.
- * The standard actually requires that the signaling thread hold
- * the lock at the time that it signals if the developer wants
- * predictable scheduling behaviour. It's up to the developer.
- * In that case all waiting threads will block here until
- * the broadcasting thread releases the lock, having been
- * notified by the last waiting thread (SetEvent call above).
+ * XSH: Upon successful return, the mutex has been locked and is owned
+ * by the calling thread
*/
- (void) pthread_mutex_lock (mutexPtr);
+ if ((result = pthread_mutex_lock(cleanup_args->mutexPtr)) != 0)
+ {
+ *resultPtr = result;
+ }
}
static int
@@ -623,63 +653,68 @@ ptw32_cond_timedwait (pthread_cond_t * cond,
cv = *cond;
/*
- * It's not OK to increment cond->waiters while the caller locked 'mutex',
- * there may be other threads just waking up (with 'mutex' unlocked)
- * and cv->... data is not protected.
+ * Synchronize access to waiters blocked count (LEVEL-1)
*/
- if (pthread_mutex_lock(&(cv->waitersLock)) != 0)
+ if (sem_wait(&(cv->semBlockLock)) != 0)
{
- return EINVAL;
+ return errno;
}
- cv->waiters++;
+ cv->nWaitersBlocked++;
- if (pthread_mutex_unlock(&(cv->waitersLock)) != 0)
+ /*
+ * Thats it. Counted means waiting, no more access needed
+ */
+ if (sem_post(&(cv->semBlockLock)) != 0)
{
- return EINVAL;
+ return errno;
}
/*
- * We keep the lock held just long enough to increment the count of
- * waiters by one (above).
- * Note that we can't keep it held across the
- * call to sem_wait since that will deadlock other calls
- * to pthread_cond_signal
+ * Setup this waiter cleanup handler
*/
cleanup_args.mutexPtr = mutex;
cleanup_args.cv = cv;
cleanup_args.resultPtr = &result;
- pthread_cleanup_push (ptw32_cond_wait_cleanup, (void *) &cleanup_args);
+ pthread_cleanup_push(ptw32_cond_wait_cleanup, (void *) &cleanup_args);
- if ((result = pthread_mutex_unlock (mutex)) == 0)
+ /*
+ * Now we can release 'mutex' and...
+ */
+ if ((result = pthread_mutex_unlock(mutex)) == 0)
{
+
/*
- * Wait to be awakened by
+ * ...wait to be awakened by
* pthread_cond_signal, or
* pthread_cond_broadcast, or
- * a timeout
+ * timeout, or
+ * thread cancellation
+ *
+ * Note:
*
- * Note:
- * ptw32_sem_timedwait is a cancelation point,
- * hence providing the
- * mechanism for making pthread_cond_wait a cancelation
- * point. We use the cleanup mechanism to ensure we
- * re-lock the mutex and decrement the waiters count
- * if we are canceled.
+ * ptw32_sem_timedwait is a cancellation point,
+ * hence providing the mechanism for making
+ * pthread_cond_wait a cancellation point.
+ * We use the cleanup mechanism to ensure we
+ * re-lock the mutex and adjust (to)unblock(ed) waiters
+ * counts if we are cancelled, timed out or signalled.
*/
- if (ptw32_sem_timedwait (&(cv->sema), abstime) == -1)
- {
- result = errno;
- }
+ if (ptw32_sem_timedwait(&(cv->semBlockQueue), abstime) != 0)
+ {
+ result = errno;
+ }
}
- pthread_cleanup_pop (1); /* Always cleanup */
+ /*
+ * Always cleanup
+ */
+ pthread_cleanup_pop(1);
+
/*
* "result" can be modified by the cleanup handler.
- * Specifically, if we are the last waiting thread and failed
- * to notify the broadcast thread to proceed.
*/
return (result);
@@ -736,8 +771,11 @@ pthread_cond_wait (pthread_cond_t * cond,
* ------------------------------------------------------
*/
{
- /* The NULL abstime arg means INFINITE waiting. */
+ /*
+ * The NULL abstime arg means INFINITE waiting.
+ */
return(ptw32_cond_timedwait(cond, mutex, NULL));
+
} /* pthread_cond_wait */
@@ -805,6 +843,105 @@ pthread_cond_timedwait (pthread_cond_t * cond,
} /* pthread_cond_timedwait */
+static int
+ptw32_cond_unblock (pthread_cond_t * cond,
+ int unblockAll)
+{
+ int result;
+ pthread_cond_t cv;
+
+ if (cond == NULL || *cond == NULL)
+ {
+ return EINVAL;
+ }
+
+ cv = *cond;
+
+ /*
+ * No-op if the CV is static and hasn't been initialised yet.
+ * Assuming that any race condition is harmless.
+ */
+ if (cv == (pthread_cond_t) PTW32_OBJECT_AUTO_INIT)
+ {
+ return 0;
+ }
+
+ /*
+ * Synchronize access to waiters blocked count (LEVEL-1)
+ */
+ if (sem_wait(&(cv->semBlockLock)) != 0)
+ {
+ return errno;
+ }
+
+ /*
+ * Synchronize access to waiters (to)unblock(ed) counts (LEVEL-2)
+ * This sync.level supports _timedwait and cancellation
+ */
+ if ((result = pthread_mutex_lock(&(cv->mtxUnblockLock))) != 0)
+ {
+ return result;
+ }
+
+ /*
+ * Adjust waiters blocked and unblocked counts (collect garbage)
+ */
+ if (cv->nWaitersUnblocked != 0)
+ {
+ cv->nWaitersBlocked -= cv->nWaitersUnblocked;
+ cv->nWaitersUnblocked = 0;
+ }
+
+ /*
+ * If (after adjustment) there are still some waiters blocked counted...
+ */
+ if ( cv->nWaitersBlocked > 0)
+ {
+ /*
+ * We will unblock first waiter and leave semBlockLock/LEVEL-1 locked
+ * LEVEL-1 access is left disabled until last signal/unblock completes
+ */
+ cv->nWaitersToUnblock = (unblockAll) ? cv->nWaitersBlocked : 1;
+
+ /*
+ * No more LEVEL-2 access to waiters (to)unblock(ed) counts needed
+ * This sync.level supports _timedwait and cancellation
+ */
+ if ((result = pthread_mutex_unlock(&(cv->mtxUnblockLock))) != 0)
+ {
+ return result;
+ }
+
+
+ /*
+ * Now, with LEVEL-2 lock released let first waiter go through semaphore
+ */
+ if (sem_post(&(cv->semBlockQueue)) != 0)
+ {
+ return errno;
+ }
+ }
+ /*
+ * No waiter blocked - no more LEVEL-1 access to blocked count needed...
+ */
+ else if (sem_post(&(cv->semBlockLock)) != 0)
+ {
+ return errno;
+ }
+ /*
+ * ...and no more LEVEL-2 access to waiters (to)unblock(ed) counts needed too
+ * This sync.level supports _timedwait and cancellation
+ */
+ else
+ {
+ result = pthread_mutex_unlock(&(cv->mtxUnblockLock));
+ }
+
+ return(result);
+
+} /* ptw32_cond_unblock */
+
+
int
pthread_cond_signal (pthread_cond_t * cond)
/*
@@ -847,35 +984,10 @@ pthread_cond_signal (pthread_cond_t * cond)
* ------------------------------------------------------
*/
{
- int result = 0;
- pthread_cond_t cv;
-
- if (cond == NULL || *cond == NULL)
- {
- return EINVAL;
- }
-
- cv = *cond;
-
- /*
- * No-op if the CV is static and hasn't been initialised yet.
- * Assuming that race conditions are harmless.
- */
- if (cv == (pthread_cond_t) PTW32_OBJECT_AUTO_INIT)
- {
- return 0;
- }
-
- /*
- * If there aren't any waiters, then this is a no-op.
- * Assuming that race conditions are harmless.
+ /*
+ * The '0'(FALSE) unblockAll arg means unblock ONE waiter.
*/
- if (cv->waiters > 0)
- {
- result = sem_post (&(cv->sema));
- }
-
- return (result);
+ return(ptw32_cond_unblock(cond, 0));
} /* pthread_cond_signal */
@@ -917,76 +1029,10 @@ pthread_cond_broadcast (pthread_cond_t * cond)
* ------------------------------------------------------
*/
{
- int result = 0;
- int wereWaiters = FALSE;
- pthread_cond_t cv;
-
- if (cond == NULL || *cond == NULL)
- {
- return EINVAL;
- }
-
- cv = *cond;
-
- /*
- * No-op if the CV is static and hasn't been initialised yet.
- * Assuming that any race condition is harmless.
+ /*
+ * The '1'(TRUE) unblockAll arg means unblock ALL waiters.
*/
- if (cv == (pthread_cond_t) PTW32_OBJECT_AUTO_INIT)
- {
- return 0;
- }
-
- if (pthread_mutex_lock(&(cv->waitersLock)) == EINVAL)
- {
- return EINVAL;
- }
-
- cv->wasBroadcast = TRUE;
- wereWaiters = (cv->waiters > 0);
-
- if (wereWaiters)
- {
- /*
- * Wake up all waiters
- */
-
-#ifdef NEED_SEM
-
- result = (ptw32_increase_semaphore( &cv->sema, cv->waiters )
- ? 0
- : EINVAL);
-
-#else /* NEED_SEM */
-
- result = (ReleaseSemaphore( cv->sema, cv->waiters, NULL )
- ? 0
- : EINVAL);
-
-#endif /* NEED_SEM */
-
- }
-
- (void) pthread_mutex_unlock(&(cv->waitersLock));
-
- if (wereWaiters && result == 0)
- {
- /*
- * Wait for all the awakened threads to acquire their part of
- * the counting semaphore
- */
- if (WaitForSingleObject (cv->waitersDone, INFINITE)
- == WAIT_OBJECT_0)
- {
- result = 0;
- }
- else
- {
- result = EINVAL;
- }
-
- }
-
- return (result);
+ return(ptw32_cond_unblock(cond, 1));
}
+