summaryrefslogtreecommitdiff
path: root/eio.c
diff options
context:
space:
mode:
Diffstat (limited to 'eio.c')
-rw-r--r--eio.c246
1 files changed, 130 insertions, 116 deletions
diff --git a/eio.c b/eio.c
index ca7d27a..18ed01f 100644
--- a/eio.c
+++ b/eio.c
@@ -72,20 +72,16 @@
#define dBUF \
char *eio_buf; \
- X_LOCK (wrklock); \
+ X_LOCK (etplock); \
self->dbuf = eio_buf = malloc (EIO_BUFSIZE); \
- X_UNLOCK (wrklock); \
+ X_UNLOCK (etplock); \
errno = ENOMEM; \
if (!eio_buf) \
return -1;
#define EIO_TICKS ((1000000 + 1023) >> 10)
-static void (*want_poll_cb) (void);
-static void (*done_poll_cb) (void);
-
-static unsigned int max_poll_time = 0;
-static unsigned int max_poll_reqs = 0;
+/*****************************************************************************/
/* calculcate time difference in ~1/EIO_TICKS of a second */
static int tvdiff (struct timeval *tv1, struct timeval *tv2)
@@ -96,23 +92,39 @@ static int tvdiff (struct timeval *tv1, struct timeval *tv2)
static unsigned int started, idle, wanted = 4;
-/* worker threads management */
-static mutex_t wrklock = X_MUTEX_INIT;
+typedef struct etp_pool
+{
+ void (*want_poll_cb) (void);
+ void (*done_poll_cb) (void);
+
+ unsigned int max_poll_time;
+ unsigned int max_poll_reqs;
+} etp_pool;
+
+static volatile unsigned int nreqs, nready, npending;
+static volatile unsigned int max_idle = 4;
+
+static mutex_t etplock = X_MUTEX_INIT;
+static mutex_t reslock = X_MUTEX_INIT;
+static mutex_t reqlock = X_MUTEX_INIT;
+static cond_t reqwait = X_COND_INIT;
typedef struct worker
{
- /* locked by wrklock */
+ /* locked by etplock */
struct worker *prev, *next;
thread_t tid;
- /* locked by reslock, reqlock or wrklock */
+ /* locked by reslock, reqlock or etplock */
eio_req *req; /* currently processed request */
void *dbuf;
DIR *dirp;
} worker;
-static worker wrk_first = { &wrk_first, &wrk_first, 0 };
+static worker wrk_first = { &wrk_first, &wrk_first, 0 }; /* NOT etp */
+
+/* worker threads management */
static void worker_clear (worker *wrk)
{
@@ -137,13 +149,6 @@ static void worker_free (worker *wrk)
free (wrk);
}
-static volatile unsigned int nreqs, nready, npending;
-static volatile unsigned int max_idle = 4;
-
-static mutex_t reslock = X_MUTEX_INIT;
-static mutex_t reqlock = X_MUTEX_INIT;
-static cond_t reqwait = X_COND_INIT;
-
unsigned int eio_nreqs (void)
{
return nreqs;
@@ -236,6 +241,85 @@ static eio_req *reqq_shift (reqq *q)
abort ();
}
+static void etp_atfork_prepare (void)
+{
+ X_LOCK (etplock);
+ X_LOCK (reqlock);
+ X_LOCK (reslock);
+#if !HAVE_PREADWRITE
+ X_LOCK (preadwritelock);
+#endif
+#if !HAVE_READDIR_R
+ X_LOCK (readdirlock);
+#endif
+}
+
+static void etp_atfork_parent (void)
+{
+#if !HAVE_READDIR_R
+ X_UNLOCK (readdirlock);
+#endif
+#if !HAVE_PREADWRITE
+ X_UNLOCK (preadwritelock);
+#endif
+ X_UNLOCK (reslock);
+ X_UNLOCK (reqlock);
+ X_UNLOCK (etplock);
+}
+
+static void etp_atfork_child (void)
+{
+ eio_req *prv;
+
+ while (prv = reqq_shift (&req_queue))
+ eio_destroy (prv);
+
+ while (prv = reqq_shift (&res_queue))
+ eio_destroy (prv);
+
+ while (wrk_first.next != &wrk_first)
+ {
+ worker *wrk = wrk_first.next;
+
+ if (wrk->req)
+ eio_destroy (wrk->req);
+
+ worker_clear (wrk);
+ worker_free (wrk);
+ }
+
+ started = 0;
+ idle = 0;
+ nreqs = 0;
+ nready = 0;
+ npending = 0;
+
+ etp_atfork_parent ();
+}
+
+static void
+etp_once_init (void)
+{
+ X_THREAD_ATFORK (etp_atfork_prepare, etp_atfork_parent, etp_atfork_child);
+}
+
+static int
+etp_init (etp_pool *etp, void (*want_poll)(void), void (*done_poll)(void))
+{
+ static pthread_once_t doinit = PTHREAD_ONCE_INIT;
+
+ pthread_once (&doinit, etp_once_init);
+
+ memset (etp, 0, sizeof *etp);
+
+ etp->want_poll_cb = want_poll;
+ etp->done_poll_cb = done_poll;
+}
+
+static etp_pool etp;
+
+/*****************************************************************************/
+
static void grp_try_feed (eio_req *grp)
{
while (grp->size < grp->int2 && !EIO_CANCELLED (grp))
@@ -312,9 +396,9 @@ void eio_grp_cancel (eio_req *grp)
void eio_cancel (eio_req *req)
{
- X_LOCK (wrklock);
+ X_LOCK (etplock);
req->flags |= EIO_FLAG_CANCELLED;
- X_UNLOCK (wrklock);
+ X_UNLOCK (etplock);
eio_grp_cancel (req);
}
@@ -328,7 +412,7 @@ static void start_thread (void)
/*TODO*/
assert (("unable to allocate worker thread data", wrk));
- X_LOCK (wrklock);
+ X_LOCK (etplock);
if (thread_create (&wrk->tid, eio_proc, (void *)wrk))
{
@@ -341,7 +425,7 @@ static void start_thread (void)
else
free (wrk);
- X_UNLOCK (wrklock);
+ X_UNLOCK (etplock);
}
static void maybe_start_thread (void)
@@ -386,22 +470,22 @@ static void end_thread (void)
X_COND_SIGNAL (reqwait);
X_UNLOCK (reqlock);
- X_LOCK (wrklock);
+ X_LOCK (etplock);
--started;
- X_UNLOCK (wrklock);
+ X_UNLOCK (etplock);
}
void eio_set_max_poll_time (double nseconds)
{
if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
- max_poll_time = nseconds;
+ etp.max_poll_time = nseconds;
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
}
void eio_set_max_poll_reqs (unsigned int maxreqs)
{
if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
- max_poll_reqs = maxreqs;
+ etp.max_poll_reqs = maxreqs;
if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
}
@@ -429,11 +513,11 @@ void eio_set_max_parallel (unsigned int nthreads)
int eio_poll (void)
{
- int maxreqs = max_poll_reqs;
+ int maxreqs = etp.max_poll_reqs;
struct timeval tv_start, tv_now;
eio_req *req;
- if (max_poll_time)
+ if (etp.max_poll_time)
gettimeofday (&tv_start, 0);
for (;;)
@@ -447,8 +531,8 @@ int eio_poll (void)
{
--npending;
- if (!res_queue.size && done_poll_cb)
- done_poll_cb ();
+ if (!res_queue.size && etp.done_poll_cb)
+ etp.done_poll_cb ();
}
X_UNLOCK (reslock);
@@ -473,11 +557,11 @@ int eio_poll (void)
if (maxreqs && !--maxreqs)
break;
- if (max_poll_time)
+ if (etp.max_poll_time)
{
gettimeofday (&tv_now, 0);
- if (tvdiff (&tv_start, &tv_now) >= max_poll_time)
+ if (tvdiff (&tv_start, &tv_now) >= etp.max_poll_time)
break;
}
}
@@ -731,12 +815,12 @@ eio__scandir (eio_req *req, worker *self)
int memofs = 0;
int res = 0;
- X_LOCK (wrklock);
+ X_LOCK (etplock);
self->dirp = dirp = opendir (req->ptr1);
self->dbuf = u = malloc (sizeof (*u));
req->flags |= EIO_FLAG_PTR2_FREE;
req->ptr2 = names = malloc (memlen);
- X_UNLOCK (wrklock);
+ X_UNLOCK (etplock);
if (dirp && u && names)
for (;;)
@@ -758,9 +842,9 @@ eio__scandir (eio_req *req, worker *self)
while (memofs + len > memlen)
{
memlen *= 2;
- X_LOCK (wrklock);
+ X_LOCK (etplock);
req->ptr2 = names = realloc (names, memlen);
- X_UNLOCK (wrklock);
+ X_UNLOCK (etplock);
if (!names)
break;
@@ -782,9 +866,9 @@ eio__scandir (eio_req *req, worker *self)
#define ALLOC(len) \
if (!req->ptr2) \
{ \
- X_LOCK (wrklock); \
+ X_LOCK (etplock); \
req->flags |= EIO_FLAG_PTR2_FREE; \
- X_UNLOCK (wrklock); \
+ X_UNLOCK (etplock); \
req->ptr2 = malloc (len); \
if (!req->ptr2) \
{ \
@@ -824,9 +908,9 @@ X_THREAD_PROC (eio_proc)
{
--idle;
X_UNLOCK (reqlock);
- X_LOCK (wrklock);
+ X_LOCK (etplock);
--started;
- X_UNLOCK (wrklock);
+ X_UNLOCK (etplock);
goto quit;
}
@@ -950,8 +1034,8 @@ X_THREAD_PROC (eio_proc)
++npending;
- if (!reqq_push (&res_queue, req) && want_poll_cb)
- want_poll_cb ();
+ if (!reqq_push (&res_queue, req) && etp.want_poll_cb)
+ etp.want_poll_cb ();
self->req = 0;
worker_clear (self);
@@ -960,88 +1044,18 @@ X_THREAD_PROC (eio_proc)
}
quit:
- X_LOCK (wrklock);
+ X_LOCK (etplock);
worker_free (self);
- X_UNLOCK (wrklock);
+ X_UNLOCK (etplock);
return 0;
}
/*****************************************************************************/
-static void eio_atfork_prepare (void)
-{
- X_LOCK (wrklock);
- X_LOCK (reqlock);
- X_LOCK (reslock);
-#if !HAVE_PREADWRITE
- X_LOCK (preadwritelock);
-#endif
-#if !HAVE_READDIR_R
- X_LOCK (readdirlock);
-#endif
-}
-
-static void eio_atfork_parent (void)
-{
-#if !HAVE_READDIR_R
- X_UNLOCK (readdirlock);
-#endif
-#if !HAVE_PREADWRITE
- X_UNLOCK (preadwritelock);
-#endif
- X_UNLOCK (reslock);
- X_UNLOCK (reqlock);
- X_UNLOCK (wrklock);
-}
-
-static void eio_atfork_child (void)
-{
- eio_req *prv;
-
- while (prv = reqq_shift (&req_queue))
- eio_destroy (prv);
-
- while (prv = reqq_shift (&res_queue))
- eio_destroy (prv);
-
- while (wrk_first.next != &wrk_first)
- {
- worker *wrk = wrk_first.next;
-
- if (wrk->req)
- eio_destroy (wrk->req);
-
- worker_clear (wrk);
- worker_free (wrk);
- }
-
- started = 0;
- idle = 0;
- nreqs = 0;
- nready = 0;
- npending = 0;
-
- eio_atfork_parent ();
-}
-
int eio_init (void (*want_poll)(void), void (*done_poll)(void))
{
- want_poll_cb = want_poll;
- done_poll_cb = done_poll;
-
-#ifdef _WIN32
- X_MUTEX_CHECK (wrklock);
- X_MUTEX_CHECK (reslock);
- X_MUTEX_CHECK (reqlock);
- X_MUTEX_CHECK (reqwait);
- X_MUTEX_CHECK (preadwritelock);
- X_MUTEX_CHECK (readdirlock);
-
- X_COND_CHECK (reqwait);
-#endif
-
- X_THREAD_ATFORK (eio_atfork_prepare, eio_atfork_parent, eio_atfork_child);
+ etp_init (&etp, want_poll, done_poll);
}
static void eio_api_destroy (eio_req *req)