summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorrpj <rpj>2001-07-09 18:22:07 +0000
committerrpj <rpj>2001-07-09 18:22:07 +0000
commit1d99828acf48bc6d5a81aadc6123e5172dfc355d (patch)
tree4646ce8ba7129298e3ca6931c8e7019e9bbeb2df
parentf58aab44e671bb39b8afb29804a9ca94c238c523 (diff)
* barrier.c: Revamped to fix the race condition. Two alternating
semaphores are used instead of the PulseEvent. Also improved overall throughput by returning PTHREAD_BARRIER_SERIAL_THREAD to the first waking thread. * implement.h (pthread_barrier_t_): Revamped.
-rw-r--r--ChangeLog8
-rw-r--r--barrier.c149
-rw-r--r--implement.h8
-rw-r--r--pthread.h2
-rw-r--r--tests/barrier5.c14
5 files changed, 95 insertions, 86 deletions
diff --git a/ChangeLog b/ChangeLog
index bfb9646..97ec503 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,11 @@
+2001-07-10 Ross Johnson <rpj@setup1.ise.canberra.edu.au>
+
+ * barrier.c: Revamped to fix the race condition. Two alternating
+ semaphores are used instead of the PulseEvent. Also improved
+ overall throughput by returning PTHREAD_BARRIER_SERIAL_THREAD
+ to the first waking thread.
+ * implement.h (pthread_barrier_t_): Revamped.
+
2001-07-09 Ross Johnson <rpj@setup1.ise.canberra.edu.au>
* barrier.c: Fix several bugs in all routines. Now passes
diff --git a/barrier.c b/barrier.c
index 6a18f25..e9312c9 100644
--- a/barrier.c
+++ b/barrier.c
@@ -27,61 +27,62 @@
#include "implement.h"
+#ifdef __MINGW32__
+#define _LONG long
+#define _LPLONG long*
+#else
+#define _LONG PVOID
+#define _LPLONG PVOID*
+#endif
+
int
pthread_barrier_init(pthread_barrier_t * barrier,
const pthread_barrierattr_t * attr,
- int count)
+ unsigned int count)
{
- int result = 0;
int pshared = PTHREAD_PROCESS_PRIVATE;
pthread_barrier_t b;
+ pthread_mutexattr_t ma;
- if (barrier == NULL)
+ if (barrier == NULL || count == 0)
{
return EINVAL;
}
- b = (pthread_barrier_t) calloc(1, sizeof(*b));
-
- if (b == NULL)
- {
- result = ENOMEM;
- goto FAIL0;
- }
-
- if (attr != NULL && *attr != NULL)
- {
- pshared = (*attr)->pshared;
- }
-
- b->nCurrentBarrierHeight = b->nInitialBarrierHeight = count;
-
- result = pthread_mutex_init(&(b->mtxExclusiveAccess), NULL);
- if (0 != result)
+ if (NULL != (b = (pthread_barrier_t) calloc(1, sizeof(*b))))
{
- goto FAIL1;
- }
+ if (attr != NULL && *attr != NULL)
+ {
+ pshared = (*attr)->pshared;
+ }
- b->eventBarrierBreeched = CreateEvent(NULL, /* Security attributes */
- TRUE, /* Manual reset */
- FALSE, /* Initially signaled */
- NULL); /* Name */
+ b->nCurrentBarrierHeight = b->nInitialBarrierHeight = count;
+ b->iStep = 0;
+ b->nSerial = 0;
- if (NULL != b->eventBarrierBreeched)
- {
- goto DONE;
+ if (0 == pthread_mutexattr_init(&ma) &&
+ 0 == pthread_mutexattr_setpshared(&ma, pshared))
+ {
+ if (0 == pthread_mutex_init(&(b->mtxExclusiveAccess), &ma))
+ {
+ pthread_mutexattr_destroy(&ma);
+ if (0 == sem_init(&(b->semBarrierBreeched[0]), pshared, 0))
+ {
+ if (0 == sem_init(&(b->semBarrierBreeched[1]), pshared, 0))
+ {
+ *barrier = b;
+ return 0;
+ }
+ (void) sem_destroy(&(b->semBarrierBreeched[0]));
+ }
+ (void) pthread_mutex_destroy(&(b->mtxExclusiveAccess));
+ }
+ pthread_mutexattr_destroy(&ma);
+ }
+ (void) free(b);
}
- (void) pthread_mutex_destroy(&(b->mtxExclusiveAccess));
- result = ENOMEM;
- FAIL1:
- (void) free(b);
- b = NULL;
-
- FAIL0:
- DONE:
- *barrier = b;
- return(result);
+ return ENOMEM;
}
int
@@ -109,20 +110,12 @@ pthread_barrier_destroy(pthread_barrier_t *barrier)
* *barrier != NULL.
*/
*barrier = NULL;
-
- result = CloseHandle(b->eventBarrierBreeched);
(void) pthread_mutex_unlock(&(b->mtxExclusiveAccess));
- if (result == TRUE)
- {
- (void) pthread_mutex_destroy(&(b->mtxExclusiveAccess));
- (void) free(b);
- result = 0;
- }
- else
- {
- *barrier = b;
- result = EINVAL;
- }
+
+ (void) sem_destroy(&(b->semBarrierBreeched[1]));
+ (void) sem_destroy(&(b->semBarrierBreeched[0]));
+ (void) pthread_mutex_destroy(&(b->mtxExclusiveAccess));
+ (void) free(b);
}
return(result);
@@ -145,31 +138,28 @@ pthread_barrier_wait(pthread_barrier_t *barrier)
if (0 == result)
{
+ int step = b->iStep;
+ unsigned int serial = b->nSerial;
+
if (0 == --(b->nCurrentBarrierHeight))
{
- b->nCurrentBarrierHeight = b->nInitialBarrierHeight;
(void) pthread_mutex_unlock(&(b->mtxExclusiveAccess));
/*
- * This is a work-around for the FIXME below. We
- * give any threads that didn't quite get to register
- * their wait another quantum. This is temporary
- * - there is a better way to do this.
+ * All other threads in this barrier set have at least passed
+ * through the decrement and test above, so its safe to
+ * raise the barrier to its full height.
*/
- Sleep(0);
- (void) PulseEvent(b->eventBarrierBreeched);
- /*
- * Would be better if the first thread to return
- * from this routine got this value. On a single
- * processor machine that will be the last thread
- * to reach the barrier (us), most of the time.
- */
- result = PTHREAD_BARRIER_SERIAL_THREAD;
+ b->iStep = 1 - step;
+ b->nCurrentBarrierHeight = b->nInitialBarrierHeight;
+ (void) sem_post_multiple(&(b->semBarrierBreeched[step]), b->nInitialBarrierHeight - 1);
}
else
{
pthread_t self;
int oldCancelState;
+ (void) pthread_mutex_unlock(&(b->mtxExclusiveAccess));
+
self = pthread_self();
/*
@@ -181,20 +171,33 @@ pthread_barrier_wait(pthread_barrier_t *barrier)
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldCancelState);
}
- (void) pthread_mutex_unlock(&(b->mtxExclusiveAccess));
-
- /* FIXME!!! It's possible for a thread to be left behind at a
- * barrier because of the time gap between the unlock
- * and the registration that the thread is waiting on the
- * event.
+ /*
+ * There is no race condition between the semaphore wait and post
+ * because we are using two alternating semas and all threads have
+ * entered barrier_wait and checked nCurrentBarrierHeight before this
+ * barrier's sema can be posted. Any threads that have not quite
+ * entered sem_wait below when the multiple_post above is called
+ * will nevertheless continue through the sema (and the barrier).
+ * We have fulfilled our synchronisation function.
*/
- result = pthreadCancelableWait(b->eventBarrierBreeched);
+ result = sem_wait(&(b->semBarrierBreeched[step]));
if (self->cancelType == PTHREAD_CANCEL_DEFERRED)
{
pthread_setcancelstate(oldCancelState, NULL);
}
}
+
+ /*
+ * The first thread to return from the wait will be the
+ * PTHREAD_BARRIER_SERIAL_THREAD.
+ */
+ result = ((_LONG) serial == InterlockedCompareExchange((_LPLONG) &(b->nSerial),
+ (_LONG) ((serial + 1)
+ & 0x7FFFFFFF),
+ (_LONG) serial)
+ ? PTHREAD_BARRIER_SERIAL_THREAD
+ : 0);
}
return(result);
diff --git a/implement.h b/implement.h
index 5b30d1a..bc72b3f 100644
--- a/implement.h
+++ b/implement.h
@@ -187,10 +187,12 @@ struct pthread_spinlock_t_ {
};
struct pthread_barrier_t_ {
- LONG nCurrentBarrierHeight;
- LONG nInitialBarrierHeight;
+ unsigned int nCurrentBarrierHeight;
+ unsigned int nInitialBarrierHeight;
+ unsigned int nSerial;
+ int iStep;
pthread_mutex_t mtxExclusiveAccess;
- HANDLE eventBarrierBreeched;
+ sem_t semBarrierBreeched[2];
};
struct pthread_barrierattr_t_ {
diff --git a/pthread.h b/pthread.h
index 0122b7c..754efac 100644
--- a/pthread.h
+++ b/pthread.h
@@ -825,7 +825,7 @@ int pthread_spin_unlock (pthread_spinlock_t * lock);
*/
int pthread_barrier_init (pthread_barrier_t * barrier,
const pthread_barrierattr_t * attr,
- int count);
+ unsigned int count);
int pthread_barrier_destroy (pthread_barrier_t * barrier);
diff --git a/tests/barrier5.c b/tests/barrier5.c
index 6576b05..77f51da 100644
--- a/tests/barrier5.c
+++ b/tests/barrier5.c
@@ -10,13 +10,13 @@
enum {
NUMTHREADS = 16,
- ITERATIONS = 10000
+ BARRIERS = 10000
};
pthread_barrier_t barrier = NULL;
pthread_mutex_t mx = PTHREAD_MUTEX_INITIALIZER;
-int barrierReleases[ITERATIONS + 1];
+int barrierReleases[BARRIERS + 1];
void *
func(void * barrierHeight)
@@ -24,7 +24,7 @@ func(void * barrierHeight)
int i;
int result;
- for (i = 1; i < ITERATIONS; i++)
+ for (i = 1; i < BARRIERS; i++)
{
result = pthread_barrier_wait(&barrier);
@@ -38,12 +38,8 @@ func(void * barrierHeight)
*/
if (result == PTHREAD_BARRIER_SERIAL_THREAD)
{
- assert(pthread_mutex_lock(&mx) == 0);
-//printf("Releases bucket %d = %d\n", i - 1, barrierReleases[i - 1]);
-//fflush(stdout);
assert(barrierReleases[i - 1] == (int) barrierHeight);
barrierReleases[i + 1] = 0;
- assert(pthread_mutex_unlock(&mx) == 0);
}
else if (result != 0)
{
@@ -81,8 +77,8 @@ main()
assert(pthread_join(t[i], NULL) == 0);
}
- assert(barrierReleases[ITERATIONS - 1] == j);
- assert(barrierReleases[ITERATIONS] == 0);
+ assert(barrierReleases[BARRIERS - 1] == j);
+ assert(barrierReleases[BARRIERS] == 0);
assert(pthread_barrier_destroy(&barrier) == 0);
}