summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--eio.c246
-rw-r--r--eio.h26
-rw-r--r--xthread.h2
3 files changed, 140 insertions, 134 deletions
diff --git a/eio.c b/eio.c
index ca7d27a..18ed01f 100644
--- a/eio.c
+++ b/eio.c
@@ -72,20 +72,16 @@
#define dBUF \
char *eio_buf; \
- X_LOCK (wrklock); \
+ X_LOCK (etplock); \
self->dbuf = eio_buf = malloc (EIO_BUFSIZE); \
- X_UNLOCK (wrklock); \
+ X_UNLOCK (etplock); \
errno = ENOMEM; \
if (!eio_buf) \
return -1;
#define EIO_TICKS ((1000000 + 1023) >> 10)
-static void (*want_poll_cb) (void);
-static void (*done_poll_cb) (void);
-
-static unsigned int max_poll_time = 0;
-static unsigned int max_poll_reqs = 0;
+/*****************************************************************************/
/* calculcate time difference in ~1/EIO_TICKS of a second */
static int tvdiff (struct timeval *tv1, struct timeval *tv2)
@@ -96,23 +92,39 @@ static int tvdiff (struct timeval *tv1, struct timeval *tv2)
static unsigned int started, idle, wanted = 4;
-/* worker threads management */
-static mutex_t wrklock = X_MUTEX_INIT;
+typedef struct etp_pool
+{
+ void (*want_poll_cb) (void);
+ void (*done_poll_cb) (void);
+
+ unsigned int max_poll_time;
+ unsigned int max_poll_reqs;
+} etp_pool;
+
+static volatile unsigned int nreqs, nready, npending;
+static volatile unsigned int max_idle = 4;
+
+static mutex_t etplock = X_MUTEX_INIT;
+static mutex_t reslock = X_MUTEX_INIT;
+static mutex_t reqlock = X_MUTEX_INIT;
+static cond_t reqwait = X_COND_INIT;
typedef struct worker
{
- /* locked by wrklock */
+ /* locked by etplock */
struct worker *prev, *next;
thread_t tid;
- /* locked by reslock, reqlock or wrklock */
+ /* locked by reslock, reqlock or etplock */
eio_req *req; /* currently processed request */
void *dbuf;
DIR *dirp;
} worker;
-static worker wrk_first = { &wrk_first, &wrk_first, 0 };
+static worker wrk_first = { &wrk_first, &wrk_first, 0 }; /* NOT etp */
+
+/* worker threads management */
static void worker_clear (worker *wrk)
{
@@ -137,13 +149,6 @@ static void worker_free (worker *wrk)
free (wrk);
}
-static volatile unsigned int nreqs, nready, npending;
-static volatile unsigned int max_idle = 4;
-
-static mutex_t reslock = X_MUTEX_INIT;
-static mutex_t reqlock = X_MUTEX_INIT;
-static cond_t reqwait = X_COND_INIT;
-
unsigned int eio_nreqs (void)
{
return nreqs;
@@ -236,6 +241,85 @@ static eio_req *reqq_shift (reqq *q)
abort ();
}
+static void etp_atfork_prepare (void)
+{
+ X_LOCK (etplock);
+ X_LOCK (reqlock);
+ X_LOCK (reslock);
+#if !HAVE_PREADWRITE
+ X_LOCK (preadwritelock);
+#endif
+#if !HAVE_READDIR_R
+ X_LOCK (readdirlock);
+#endif
+}
+
+static void etp_atfork_parent (void)
+{
+#if !HAVE_READDIR_R
+ X_UNLOCK (readdirlock);
+#endif
+#if !HAVE_PREADWRITE
+ X_UNLOCK (preadwritelock);
+#endif
+ X_UNLOCK (reslock);
+ X_UNLOCK (reqlock);
+ X_UNLOCK (etplock);
+}
+
+static void etp_atfork_child (void)
+{
+ eio_req *prv;
+
+ while (prv = reqq_shift (&req_queue))
+ eio_destroy (prv);
+
+ while (prv = reqq_shift (&res_queue))
+ eio_destroy (prv);
+
+ while (wrk_first.next != &wrk_first)
+ {
+ worker *wrk = wrk_first.next;
+
+ if (wrk->req)
+ eio_destroy (wrk->req);
+
+ worker_clear (wrk);
+ worker_free (wrk);
+ }
+
+ started = 0;
+ idle = 0;
+ nreqs = 0;
+ nready = 0;
+ npending = 0;
+
+ etp_atfork_parent ();
+}
+
+static void
+etp_once_init (void)
+{
+ X_THREAD_ATFORK (etp_atfork_prepare, etp_atfork_parent, etp_atfork_child);
+}
+
+static int
+etp_init (etp_pool *etp, void (*want_poll)(void), void (*done_poll)(void))
+{
+ static pthread_once_t doinit = PTHREAD_ONCE_INIT;
+
+ pthread_once (&doinit, etp_once_init);
+
+ memset (etp, 0, sizeof *etp);
+
+ etp->want_poll_cb = want_poll;
+ etp->done_poll_cb = done_poll;
+}
+
+static etp_pool etp;
+
+/*****************************************************************************/
+
static void grp_try_feed (eio_req *grp)
{
while (grp->size < grp->int2 && !EIO_CANCELLED (grp))
@@ -312,9 +396,9 @@ void eio_grp_cancel (eio_req *grp)
void eio_cancel (eio_req *req)
{
- X_LOCK (wrklock);
+ X_LOCK (etplock);
req->flags |= EIO_FLAG_CANCELLED;
- X_UNLOCK (wrklock);
+ X_UNLOCK (etplock);
eio_grp_cancel (req);
}
@@ -328,7 +412,7 @@ static void start_thread (void)
/*TODO*/
assert (("unable to allocate worker thread data", wrk));
- X_LOCK (wrklock);
+ X_LOCK (etplock);
if (thread_create (&wrk->tid, eio_proc, (void *)wrk))
{
@@ -341,7 +425,7 @@ static void start_thread (void)
else
free (wrk);
- X_UNLOCK (wrklock);
+ X_UNLOCK (etplock);
}
static void maybe_start_thread (void)
@@ -386,22 +470,22 @@ static void end_thread (void)
X_COND_SIGNAL (reqwait);
X_UNLOCK (reqlock);
- X_LOCK (wrklock);
+ X_LOCK (etplock);
--started;
- X_UNLOCK (wrklock);
+ X_UNLOCK (etplock);
}
void eio_set_max_poll_time (double nseconds)
{
if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
- max_poll_time = nseconds;
+ etp.max_poll_time = nseconds;
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
}
void eio_set_max_poll_reqs (unsigned int maxreqs)
{
if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
- max_poll_reqs = maxreqs;
+ etp.max_poll_reqs = maxreqs;
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
}
@@ -429,11 +513,11 @@ void eio_set_max_parallel (unsigned int nthreads)
int eio_poll (void)
{
- int maxreqs = max_poll_reqs;
+ int maxreqs = etp.max_poll_reqs;
struct timeval tv_start, tv_now;
eio_req *req;
- if (max_poll_time)
+ if (etp.max_poll_time)
gettimeofday (&tv_start, 0);
for (;;)
@@ -447,8 +531,8 @@ int eio_poll (void)
{
--npending;
- if (!res_queue.size && done_poll_cb)
- done_poll_cb ();
+ if (!res_queue.size && etp.done_poll_cb)
+ etp.done_poll_cb ();
}
X_UNLOCK (reslock);
@@ -473,11 +557,11 @@ int eio_poll (void)
if (maxreqs && !--maxreqs)
break;
- if (max_poll_time)
+ if (etp.max_poll_time)
{
gettimeofday (&tv_now, 0);
- if (tvdiff (&tv_start, &tv_now) >= max_poll_time)
+ if (tvdiff (&tv_start, &tv_now) >= etp.max_poll_time)
break;
}
}
@@ -731,12 +815,12 @@ eio__scandir (eio_req *req, worker *self)
int memofs = 0;
int res = 0;
- X_LOCK (wrklock);
+ X_LOCK (etplock);
self->dirp = dirp = opendir (req->ptr1);
self->dbuf = u = malloc (sizeof (*u));
req->flags |= EIO_FLAG_PTR2_FREE;
req->ptr2 = names = malloc (memlen);
- X_UNLOCK (wrklock);
+ X_UNLOCK (etplock);
if (dirp && u && names)
for (;;)
@@ -758,9 +842,9 @@ eio__scandir (eio_req *req, worker *self)
while (memofs + len > memlen)
{
memlen *= 2;
- X_LOCK (wrklock);
+ X_LOCK (etplock);
req->ptr2 = names = realloc (names, memlen);
- X_UNLOCK (wrklock);
+ X_UNLOCK (etplock);
if (!names)
break;
@@ -782,9 +866,9 @@ eio__scandir (eio_req *req, worker *self)
#define ALLOC(len) \
if (!req->ptr2) \
{ \
- X_LOCK (wrklock); \
+ X_LOCK (etplock); \
req->flags |= EIO_FLAG_PTR2_FREE; \
- X_UNLOCK (wrklock); \
+ X_UNLOCK (etplock); \
req->ptr2 = malloc (len); \
if (!req->ptr2) \
{ \
@@ -824,9 +908,9 @@ X_THREAD_PROC (eio_proc)
{
--idle;
X_UNLOCK (reqlock);
- X_LOCK (wrklock);
+ X_LOCK (etplock);
--started;
- X_UNLOCK (wrklock);
+ X_UNLOCK (etplock);
goto quit;
}
@@ -950,8 +1034,8 @@ X_THREAD_PROC (eio_proc)
++npending;
- if (!reqq_push (&res_queue, req) && want_poll_cb)
- want_poll_cb ();
+ if (!reqq_push (&res_queue, req) && etp.want_poll_cb)
+ etp.want_poll_cb ();
self->req = 0;
worker_clear (self);
@@ -960,88 +1044,18 @@ X_THREAD_PROC (eio_proc)
}
quit:
- X_LOCK (wrklock);
+ X_LOCK (etplock);
worker_free (self);
- X_UNLOCK (wrklock);
+ X_UNLOCK (etplock);
return 0;
}
/*****************************************************************************/
-static void eio_atfork_prepare (void)
-{
- X_LOCK (wrklock);
- X_LOCK (reqlock);
- X_LOCK (reslock);
-#if !HAVE_PREADWRITE
- X_LOCK (preadwritelock);
-#endif
-#if !HAVE_READDIR_R
- X_LOCK (readdirlock);
-#endif
-}
-
-static void eio_atfork_parent (void)
-{
-#if !HAVE_READDIR_R
- X_UNLOCK (readdirlock);
-#endif
-#if !HAVE_PREADWRITE
- X_UNLOCK (preadwritelock);
-#endif
- X_UNLOCK (reslock);
- X_UNLOCK (reqlock);
- X_UNLOCK (wrklock);
-}
-
-static void eio_atfork_child (void)
-{
- eio_req *prv;
-
- while (prv = reqq_shift (&req_queue))
- eio_destroy (prv);
-
- while (prv = reqq_shift (&res_queue))
- eio_destroy (prv);
-
- while (wrk_first.next != &wrk_first)
- {
- worker *wrk = wrk_first.next;
-
- if (wrk->req)
- eio_destroy (wrk->req);
-
- worker_clear (wrk);
- worker_free (wrk);
- }
-
- started = 0;
- idle = 0;
- nreqs = 0;
- nready = 0;
- npending = 0;
-
- eio_atfork_parent ();
-}
-
int eio_init (void (*want_poll)(void), void (*done_poll)(void))
{
- want_poll_cb = want_poll;
- done_poll_cb = done_poll;
-
-#ifdef _WIN32
- X_MUTEX_CHECK (wrklock);
- X_MUTEX_CHECK (reslock);
- X_MUTEX_CHECK (reqlock);
- X_MUTEX_CHECK (reqwait);
- X_MUTEX_CHECK (preadwritelock);
- X_MUTEX_CHECK (readdirlock);
-
- X_COND_CHECK (reqwait);
-#endif
-
- X_THREAD_ATFORK (eio_atfork_prepare, eio_atfork_parent, eio_atfork_child);
+ etp_init (&etp, want_poll, done_poll);
}
static void eio_api_destroy (eio_req *req)
diff --git a/eio.h b/eio.h
index efc4c8f..a63490b 100644
--- a/eio.h
+++ b/eio.h
@@ -113,7 +113,7 @@ unsigned int eio_npending (void); /* numbe rof finished but unhandled requests *
unsigned int eio_nthreads (void); /* number of worker threads in use currently */
/*****************************************************************************/
-/* high-level request API */
+/* convinience wrappers */
#ifndef EIO_NO_WRAPPERS
eio_req *eio_nop (int pri, eio_cb cb, void *data); /* does nothing except go through the whole process */
@@ -150,17 +150,17 @@ eio_req *eio_symlink (const char *path, const char *new_path, int pri, eio_cb
eio_req *eio_rename (const char *path, const char *new_path, int pri, eio_cb cb, void *data);
#endif
-/* for groups */
+/*****************************************************************************/
+/* groups */
+
eio_req *eio_grp (eio_cb cb, void *data);
void eio_grp_feed (eio_req *grp, void (*feed)(eio_req *req), int limit);
void eio_grp_limit (eio_req *grp, int limit);
void eio_grp_add (eio_req *grp, eio_req *req);
void eio_grp_cancel (eio_req *grp); /* cancels all sub requests but not the group */
-/* cancel a request as soon fast as possible */
-void eio_cancel (eio_req *req);
-/* destroy a request that has never been submitted */
-void eio_destroy (eio_req *req);
+/*****************************************************************************/
+/* request api */
/* true if the request was cancelled, useful in the invoke callback */
#define EIO_CANCELLED(req) ((req)->flags & EIO_FLAG_CANCELLED)
@@ -171,18 +171,12 @@ void eio_destroy (eio_req *req);
#define EIO_STAT_BUF(req) ((EIO_STRUCT_STAT *)EIO_BUF(req))
#define EIO_PATH(req) ((char *)(req)->ptr1)
-/*****************************************************************************/
-/* low-level request API */
-
-/* must be used to initialise eio_req's */
-#define EIO_INIT(req,prio,finish_cb, destroy_cb) \
- memset ((req), 0, sizeof (eio_req)); \
- (req)->pri = (prio) + EIO_PRI_BIAS; \
- (req)->finish = (finish_cb); \
- (req)->destroy = (destroy_cb)
-
/* submit a request for execution */
void eio_submit (eio_req *req);
+/* cancel a request as soon fast as possible, if possible */
+void eio_cancel (eio_req *req);
+/* destroy a request that has never been submitted */
+void eio_destroy (eio_req *req);
/*****************************************************************************/
/* convinience functions */
diff --git a/xthread.h b/xthread.h
index 6b6ccbb..e0de1ee 100644
--- a/xthread.h
+++ b/xthread.h
@@ -37,13 +37,11 @@ typedef int ssize_t;
typedef pthread_mutex_t mutex_t;
#define X_MUTEX_INIT PTHREAD_MUTEX_INITIALIZER
-#define X_MUTEX_CHECK(mutex)
#define X_LOCK(mutex) pthread_mutex_lock (&(mutex))
#define X_UNLOCK(mutex) pthread_mutex_unlock (&(mutex))
typedef pthread_cond_t cond_t;
#define X_COND_INIT PTHREAD_COND_INITIALIZER
-#define X_COND_CHECK(cond)
#define X_COND_SIGNAL(cond) pthread_cond_signal (&(cond))
#define X_COND_WAIT(cond,mutex) pthread_cond_wait (&(cond), &(mutex))
#define X_COND_TIMEDWAIT(cond,mutex,to) pthread_cond_timedwait (&(cond), &(mutex), &(to))