From 1d99828acf48bc6d5a81aadc6123e5172dfc355d Mon Sep 17 00:00:00 2001 From: rpj Date: Mon, 9 Jul 2001 18:22:07 +0000 Subject: * barrier.c: Revamped to fix the race condition. Two alternating semaphores are used instead of the PulseEvent. Also improved overall throughput by returning PTHREAD_BARRIER_SERIAL_THREAD to the first waking thread. * implement.h (pthread_barrier_t_): Revamped. --- ChangeLog | 8 +++ barrier.c | 149 ++++++++++++++++++++++++++++--------------------------- implement.h | 8 +-- pthread.h | 2 +- tests/barrier5.c | 14 ++---- 5 files changed, 95 insertions(+), 86 deletions(-) diff --git a/ChangeLog b/ChangeLog index bfb9646..97ec503 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,11 @@ +2001-07-10 Ross Johnson + + * barrier.c: Revamped to fix the race condition. Two alternating + semaphores are used instead of the PulseEvent. Also improved + overall throughput by returning PTHREAD_BARRIER_SERIAL_THREAD + to the first waking thread. + * implement.h (pthread_barrier_t_): Revamped. + 2001-07-09 Ross Johnson * barrier.c: Fix several bugs in all routines. Now passes diff --git a/barrier.c b/barrier.c index 6a18f25..e9312c9 100644 --- a/barrier.c +++ b/barrier.c @@ -27,61 +27,62 @@ #include "implement.h" +#ifdef __MINGW32__ +#define _LONG long +#define _LPLONG long* +#else +#define _LONG PVOID +#define _LPLONG PVOID* +#endif + int pthread_barrier_init(pthread_barrier_t * barrier, const pthread_barrierattr_t * attr, - int count) + unsigned int count) { - int result = 0; int pshared = PTHREAD_PROCESS_PRIVATE; pthread_barrier_t b; + pthread_mutexattr_t ma; - if (barrier == NULL) + if (barrier == NULL || count == 0) { return EINVAL; } - b = (pthread_barrier_t) calloc(1, sizeof(*b)); - - if (b == NULL) - { - result = ENOMEM; - goto FAIL0; - } - - if (attr != NULL && *attr != NULL) - { - pshared = (*attr)->pshared; - } - - b->nCurrentBarrierHeight = b->nInitialBarrierHeight = count; - - result = pthread_mutex_init(&(b->mtxExclusiveAccess), NULL); - if (0 != result) + if (NULL != (b = (pthread_barrier_t) calloc(1, sizeof(*b)))) { - goto FAIL1; - } + if (attr != NULL && *attr != NULL) + { + pshared = (*attr)->pshared; + } - b->eventBarrierBreeched = CreateEvent(NULL, /* Security attributes */ - TRUE, /* Manual reset */ - FALSE, /* Initially signaled */ - NULL); /* Name */ + b->nCurrentBarrierHeight = b->nInitialBarrierHeight = count; + b->iStep = 0; + b->nSerial = 0; - if (NULL != b->eventBarrierBreeched) - { - goto DONE; + if (0 == pthread_mutexattr_init(&ma) && + 0 == pthread_mutexattr_setpshared(&ma, pshared)) + { + if (0 == pthread_mutex_init(&(b->mtxExclusiveAccess), &ma)) + { + pthread_mutexattr_destroy(&ma); + if (0 == sem_init(&(b->semBarrierBreeched[0]), pshared, 0)) + { + if (0 == sem_init(&(b->semBarrierBreeched[1]), pshared, 0)) + { + *barrier = b; + return 0; + } + (void) sem_destroy(&(b->semBarrierBreeched[0])); + } + (void) pthread_mutex_destroy(&(b->mtxExclusiveAccess)); + } + pthread_mutexattr_destroy(&ma); + } + (void) free(b); } - (void) pthread_mutex_destroy(&(b->mtxExclusiveAccess)); - result = ENOMEM; - FAIL1: - (void) free(b); - b = NULL; - - FAIL0: - DONE: - *barrier = b; - return(result); + return ENOMEM; } int @@ -109,20 +110,12 @@ pthread_barrier_destroy(pthread_barrier_t *barrier) * *barrier != NULL. */ *barrier = NULL; - - result = CloseHandle(b->eventBarrierBreeched); (void) pthread_mutex_unlock(&(b->mtxExclusiveAccess)); - if (result == TRUE) - { - (void) pthread_mutex_destroy(&(b->mtxExclusiveAccess)); - (void) free(b); - result = 0; - } - else - { - *barrier = b; - result = EINVAL; - } + + (void) sem_destroy(&(b->semBarrierBreeched[1])); + (void) sem_destroy(&(b->semBarrierBreeched[0])); + (void) pthread_mutex_destroy(&(b->mtxExclusiveAccess)); + (void) free(b); } return(result); @@ -145,31 +138,28 @@ pthread_barrier_wait(pthread_barrier_t *barrier) if (0 == result) { + int step = b->iStep; + unsigned int serial = b->nSerial; + if (0 == --(b->nCurrentBarrierHeight)) { - b->nCurrentBarrierHeight = b->nInitialBarrierHeight; (void) pthread_mutex_unlock(&(b->mtxExclusiveAccess)); /* - * This is a work-around for the FIXME below. We - * give any threads that didn't quite get to register - * their wait another quantum. This is temporary - * - there is a better way to do this. + * All other threads in this barrier set have at least passed + * through the decrement and test above, so its safe to + * raise the barrier to its full height. */ - Sleep(0); - (void) PulseEvent(b->eventBarrierBreeched); - /* - * Would be better if the first thread to return - * from this routine got this value. On a single - * processor machine that will be the last thread - * to reach the barrier (us), most of the time. - */ - result = PTHREAD_BARRIER_SERIAL_THREAD; + b->iStep = 1 - step; + b->nCurrentBarrierHeight = b->nInitialBarrierHeight; + (void) sem_post_multiple(&(b->semBarrierBreeched[step]), b->nInitialBarrierHeight - 1); } else { pthread_t self; int oldCancelState; + (void) pthread_mutex_unlock(&(b->mtxExclusiveAccess)); + self = pthread_self(); /* @@ -181,20 +171,33 @@ pthread_barrier_wait(pthread_barrier_t *barrier) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldCancelState); } - (void) pthread_mutex_unlock(&(b->mtxExclusiveAccess)); - - /* FIXME!!! It's possible for a thread to be left behind at a - * barrier because of the time gap between the unlock - * and the registration that the thread is waiting on the - * event. + /* + * There is no race condition between the semaphore wait and post + * because we are using two alternating semas and all threads have + * entered barrier_wait and checked nCurrentBarrierHeight before this + * barrier's sema can be posted. Any threads that have not quite + * entered sem_wait below when the multiple_post above is called + * will nevertheless continue through the sema (and the barrier). + * We have fulfilled our synchronisation function. */ - result = pthreadCancelableWait(b->eventBarrierBreeched); + result = sem_wait(&(b->semBarrierBreeched[step])); if (self->cancelType == PTHREAD_CANCEL_DEFERRED) { pthread_setcancelstate(oldCancelState, NULL); } } + + /* + * The first thread to return from the wait will be the + * PTHREAD_BARRIER_SERIAL_THREAD. + */ + result = ((_LONG) serial == InterlockedCompareExchange((_LPLONG) &(b->nSerial), + (_LONG) ((serial + 1) + & 0x7FFFFFFF), + (_LONG) serial) + ? PTHREAD_BARRIER_SERIAL_THREAD + : 0); } return(result); diff --git a/implement.h b/implement.h index 5b30d1a..bc72b3f 100644 --- a/implement.h +++ b/implement.h @@ -187,10 +187,12 @@ struct pthread_spinlock_t_ { }; struct pthread_barrier_t_ { - LONG nCurrentBarrierHeight; - LONG nInitialBarrierHeight; + unsigned int nCurrentBarrierHeight; + unsigned int nInitialBarrierHeight; + unsigned int nSerial; + int iStep; pthread_mutex_t mtxExclusiveAccess; - HANDLE eventBarrierBreeched; + sem_t semBarrierBreeched[2]; }; struct pthread_barrierattr_t_ { diff --git a/pthread.h b/pthread.h index 0122b7c..754efac 100644 --- a/pthread.h +++ b/pthread.h @@ -825,7 +825,7 @@ int pthread_spin_unlock (pthread_spinlock_t * lock); */ int pthread_barrier_init (pthread_barrier_t * barrier, const pthread_barrierattr_t * attr, - int count); + unsigned int count); int pthread_barrier_destroy (pthread_barrier_t * barrier); diff --git a/tests/barrier5.c b/tests/barrier5.c index 6576b05..77f51da 100644 --- a/tests/barrier5.c +++ b/tests/barrier5.c @@ -10,13 +10,13 @@ enum { NUMTHREADS = 16, - ITERATIONS = 10000 + BARRIERS = 10000 }; pthread_barrier_t barrier = NULL; pthread_mutex_t mx = PTHREAD_MUTEX_INITIALIZER; -int barrierReleases[ITERATIONS + 1]; +int barrierReleases[BARRIERS + 1]; void * func(void * barrierHeight) @@ -24,7 +24,7 @@ func(void * barrierHeight) int i; int result; - for (i = 1; i < ITERATIONS; i++) + for (i = 1; i < BARRIERS; i++) { result = pthread_barrier_wait(&barrier); @@ -38,12 +38,8 @@ func(void * barrierHeight) */ if (result == PTHREAD_BARRIER_SERIAL_THREAD) { - assert(pthread_mutex_lock(&mx) == 0); -//printf("Releases bucket %d = %d\n", i - 1, barrierReleases[i - 1]); -//fflush(stdout); assert(barrierReleases[i - 1] == (int) barrierHeight); barrierReleases[i + 1] = 0; - assert(pthread_mutex_unlock(&mx) == 0); } else if (result != 0) { @@ -81,8 +77,8 @@ main() assert(pthread_join(t[i], NULL) == 0); } - assert(barrierReleases[ITERATIONS - 1] == j); - assert(barrierReleases[ITERATIONS] == 0); + assert(barrierReleases[BARRIERS - 1] == j); + assert(barrierReleases[BARRIERS] == 0); assert(pthread_barrier_destroy(&barrier) == 0); } -- cgit v1.2.3