summaryrefslogtreecommitdiff
path: root/eio.c
diff options
context:
space:
mode:
Diffstat (limited to 'eio.c')
-rw-r--r--eio.c565
1 files changed, 344 insertions, 221 deletions
diff --git a/eio.c b/eio.c
index 18ed01f..c5cb845 100644
--- a/eio.c
+++ b/eio.c
@@ -1,3 +1,42 @@
+/*
+ * libeio implementation
+ *
+ * Copyright (c) 2007,2008 Marc Alexander Lehmann <libeio@schmorp.de>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without modifica-
+ * tion, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
+ * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+ * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
+ * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+ * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
+ * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+ * OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Alternatively, the contents of this file may be used under the terms of
+ * the GNU General Public License ("GPL") version 2 or any later version,
+ * in which case the provisions of the GPL are applicable instead of
+ * the above. If you wish to allow the use of your version of this file
+ * only under the terms of the GPL and not to allow others to use your
+ * version of this file under the BSD license, indicate your decision
+ * by deleting the provisions above and replace them with the notice
+ * and other provisions required by the GPL. If you do not delete the
+ * provisions above, a recipient may use your version of this file under
+ * either the BSD or the GPL.
+ */
+
#include "eio.h"
#include "xthread.h"
@@ -72,9 +111,9 @@
#define dBUF \
char *eio_buf; \
- X_LOCK (etplock); \
+ ETP_WORKER_LOCK (self); \
self->dbuf = eio_buf = malloc (EIO_BUFSIZE); \
- X_UNLOCK (etplock); \
+ ETP_WORKER_UNLOCK (self); \
errno = ENOMEM; \
if (!eio_buf) \
return -1;
@@ -83,6 +122,34 @@
/*****************************************************************************/
+#define ETP_PRI_MIN EIO_PRI_MIN
+#define ETP_PRI_MAX EIO_PRI_MAX
+
+#define ETP_REQ eio_req
+#define ETP_DESTROY(req) eio_destroy (req)
+static int eio_finish (eio_req *req);
+#define ETP_FINISH(req) eio_finish (req)
+
+#define ETP_WORKER_CLEAR(req) \
+ if (wrk->dbuf) \
+ { \
+ free (wrk->dbuf); \
+ wrk->dbuf = 0; \
+ } \
+ \
+ if (wrk->dirp) \
+ { \
+ closedir (wrk->dirp); \
+ wrk->dirp = 0; \
+ }
+#define ETP_WORKER_COMMON \
+ void *dbuf; \
+ DIR *dirp;
+
+/*****************************************************************************/
+
+#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1)
+
/* calculcate time difference in ~1/EIO_TICKS of a second */
static int tvdiff (struct timeval *tv1, struct timeval *tv2)
{
@@ -92,56 +159,48 @@ static int tvdiff (struct timeval *tv1, struct timeval *tv2)
static unsigned int started, idle, wanted = 4;
-typedef struct etp_pool
-{
- void (*want_poll_cb) (void);
- void (*done_poll_cb) (void);
+static void (*want_poll_cb) (void);
+static void (*done_poll_cb) (void);
- unsigned int max_poll_time;
- unsigned int max_poll_reqs;
-} etp_pool;
+static unsigned int max_poll_time; /* reslock */
+static unsigned int max_poll_reqs; /* reslock */
-static volatile unsigned int nreqs, nready, npending;
+static volatile unsigned int nreqs; /* reqlock */
+static volatile unsigned int nready; /* reqlock */
+static volatile unsigned int npending; /* reqlock */
static volatile unsigned int max_idle = 4;
-static mutex_t etplock = X_MUTEX_INIT;
+static mutex_t wrklock = X_MUTEX_INIT;
static mutex_t reslock = X_MUTEX_INIT;
static mutex_t reqlock = X_MUTEX_INIT;
static cond_t reqwait = X_COND_INIT;
-typedef struct worker
+typedef struct etp_worker
{
- /* locked by etplock */
- struct worker *prev, *next;
+ /* locked by wrklock */
+ struct etp_worker *prev, *next;
thread_t tid;
- /* locked by reslock, reqlock or etplock */
- eio_req *req; /* currently processed request */
- void *dbuf;
- DIR *dirp;
-} worker;
+ /* locked by reslock, reqlock or wrklock */
+ ETP_REQ *req; /* currently processed request */
+
+ ETP_WORKER_COMMON
+} etp_worker;
-static worker wrk_first = { &wrk_first, &wrk_first, 0 }; /* NOT etp */
+static etp_worker wrk_first = { &wrk_first, &wrk_first, 0 }; /* NOT etp */
+
+#define ETP_WORKER_LOCK(wrk) X_LOCK (wrklock)
+#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock)
/* worker threads management */
-static void worker_clear (worker *wrk)
+static void etp_worker_clear (etp_worker *wrk)
{
- if (wrk->dirp)
- {
- closedir (wrk->dirp);
- wrk->dirp = 0;
- }
-
- if (wrk->dbuf)
- {
- free (wrk->dbuf);
- wrk->dbuf = 0;
- }
+ ETP_WORKER_CLEAR (wrk);
}
-static void worker_free (worker *wrk)
+static void etp_worker_free (etp_worker *wrk)
{
wrk->next->prev = wrk->prev;
wrk->prev->next = wrk->next;
@@ -149,12 +208,16 @@ static void worker_free (worker *wrk)
free (wrk);
}
-unsigned int eio_nreqs (void)
+static unsigned int etp_nreqs (void)
{
- return nreqs;
+ int retval;
+ if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
+ retval = nreqs;
+ if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
+ return retval;
}
-unsigned int eio_nready (void)
+static unsigned int etp_nready (void)
{
unsigned int retval;
@@ -165,7 +228,7 @@ unsigned int eio_nready (void)
return retval;
}
-unsigned int eio_npending (void)
+static unsigned int etp_npending (void)
{
unsigned int retval;
@@ -176,7 +239,7 @@ unsigned int eio_npending (void)
return retval;
}
-unsigned int eio_nthreads (void)
+static unsigned int etp_nthreads (void)
{
unsigned int retval;
@@ -193,14 +256,14 @@ unsigned int eio_nthreads (void)
* per shift, the most expensive operation.
*/
typedef struct {
- eio_req *qs[EIO_NUM_PRI], *qe[EIO_NUM_PRI]; /* qstart, qend */
+ ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */
int size;
-} reqq;
+} etp_reqq;
-static reqq req_queue;
-static reqq res_queue;
+static etp_reqq req_queue;
+static etp_reqq res_queue;
-static int reqq_push (reqq *q, eio_req *req)
+static int reqq_push (etp_reqq *q, ETP_REQ *req)
{
int pri = req->pri;
req->next = 0;
@@ -216,7 +279,7 @@ static int reqq_push (reqq *q, eio_req *req)
return q->size++;
}
-static eio_req *reqq_shift (reqq *q)
+static ETP_REQ *reqq_shift (etp_reqq *q)
{
int pri;
@@ -225,7 +288,7 @@ static eio_req *reqq_shift (reqq *q)
--q->size;
- for (pri = EIO_NUM_PRI; pri--; )
+ for (pri = ETP_NUM_PRI; pri--; )
{
eio_req *req = q->qs[pri];
@@ -243,7 +306,7 @@ static eio_req *reqq_shift (reqq *q)
static void etp_atfork_prepare (void)
{
- X_LOCK (etplock);
+ X_LOCK (wrklock);
X_LOCK (reqlock);
X_LOCK (reslock);
#if !HAVE_PREADWRITE
@@ -264,28 +327,28 @@ static void etp_atfork_parent (void)
#endif
X_UNLOCK (reslock);
X_UNLOCK (reqlock);
- X_UNLOCK (etplock);
+ X_UNLOCK (wrklock);
}
static void etp_atfork_child (void)
{
- eio_req *prv;
+ ETP_REQ *prv;
while (prv = reqq_shift (&req_queue))
- eio_destroy (prv);
+ ETP_DESTROY (prv);
while (prv = reqq_shift (&res_queue))
- eio_destroy (prv);
+ ETP_DESTROY (prv);
while (wrk_first.next != &wrk_first)
{
- worker *wrk = wrk_first.next;
+ etp_worker *wrk = wrk_first.next;
if (wrk->req)
- eio_destroy (wrk->req);
+ ETP_DESTROY (wrk->req);
- worker_clear (wrk);
- worker_free (wrk);
+ etp_worker_clear (wrk);
+ etp_worker_free (wrk);
}
started = 0;
@@ -304,19 +367,199 @@ etp_once_init (void)
}
static int
-etp_init (etp_pool *etp, void (*want_poll)(void), void (*done_poll)(void))
+etp_init (void (*want_poll)(void), void (*done_poll)(void))
{
static pthread_once_t doinit = PTHREAD_ONCE_INIT;
pthread_once (&doinit, etp_once_init);
- memset (etp, 0, sizeof *etp);
+ want_poll_cb = want_poll;
+ done_poll_cb = done_poll;
+}
+
+X_THREAD_PROC (etp_proc);
+
+static void etp_start_thread (void)
+{
+ etp_worker *wrk = calloc (1, sizeof (etp_worker));
+
+ /*TODO*/
+ assert (("unable to allocate worker thread data", wrk));
+
+ X_LOCK (wrklock);
- etp->want_poll_cb = want_poll;
- etp->done_poll_cb = done_poll;
+ if (thread_create (&wrk->tid, etp_proc, (void *)wrk))
+ {
+ wrk->prev = &wrk_first;
+ wrk->next = wrk_first.next;
+ wrk_first.next->prev = wrk;
+ wrk_first.next = wrk;
+ ++started;
+ }
+ else
+ free (wrk);
+
+ X_UNLOCK (wrklock);
+}
+
+static void etp_maybe_start_thread (void)
+{
+ if (etp_nthreads () >= wanted)
+ return;
+
+ /* todo: maybe use idle here, but might be less exact */
+ if (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ())
+ return;
+
+ etp_start_thread ();
}
-static etp_pool etp;
+static void etp_end_thread (void)
+{
+ eio_req *req = calloc (1, sizeof (eio_req));
+
+ req->type = -1;
+ req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
+
+ X_LOCK (reqlock);
+ reqq_push (&req_queue, req);
+ X_COND_SIGNAL (reqwait);
+ X_UNLOCK (reqlock);
+
+ X_LOCK (wrklock);
+ --started;
+ X_UNLOCK (wrklock);
+}
+
+static int etp_poll (void)
+{
+ unsigned int maxreqs;
+ unsigned int maxtime;
+ struct timeval tv_start, tv_now;
+
+ X_LOCK (reslock);
+ maxreqs = max_poll_reqs;
+ maxtime = max_poll_time;
+ X_UNLOCK (reslock);
+
+ if (maxtime)
+ gettimeofday (&tv_start, 0);
+
+ for (;;)
+ {
+ ETP_REQ *req;
+
+ etp_maybe_start_thread ();
+
+ X_LOCK (reslock);
+ req = reqq_shift (&res_queue);
+
+ if (req)
+ {
+ --npending;
+
+ if (!res_queue.size && done_poll_cb)
+ done_poll_cb ();
+ }
+
+ X_UNLOCK (reslock);
+
+ if (!req)
+ return 0;
+
+ X_LOCK (reqlock);
+ --nreqs;
+ X_UNLOCK (reqlock);
+
+ if (req->type == EIO_GROUP && req->size)
+ {
+ req->int1 = 1; /* mark request as delayed */
+ continue;
+ }
+ else
+ {
+ int res = ETP_FINISH (req);
+ if (res)
+ return res;
+ }
+
+ if (maxreqs && !--maxreqs)
+ break;
+
+ if (maxtime)
+ {
+ gettimeofday (&tv_now, 0);
+
+ if (tvdiff (&tv_start, &tv_now) >= maxtime)
+ break;
+ }
+ }
+
+ errno = EAGAIN;
+ return -1;
+}
+
+static void etp_cancel (ETP_REQ *req)
+{
+ X_LOCK (wrklock);
+ req->flags |= EIO_FLAG_CANCELLED;
+ X_UNLOCK (wrklock);
+
+ eio_grp_cancel (req);
+}
+
+static void etp_submit (ETP_REQ *req)
+{
+ req->pri -= ETP_PRI_MIN;
+
+ if (req->pri < ETP_PRI_MIN - ETP_PRI_MIN) req->pri = ETP_PRI_MIN - ETP_PRI_MIN;
+ if (req->pri > ETP_PRI_MAX - ETP_PRI_MIN) req->pri = ETP_PRI_MAX - ETP_PRI_MIN;
+
+ X_LOCK (reqlock);
+ ++nreqs;
+ ++nready;
+ reqq_push (&req_queue, req);
+ X_COND_SIGNAL (reqwait);
+ X_UNLOCK (reqlock);
+
+ etp_maybe_start_thread ();
+}
+
+static void etp_set_max_poll_time (double nseconds)
+{
+ if (WORDACCESS_UNSAFE) X_LOCK (reslock);
+ max_poll_time = nseconds;
+ if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
+}
+
+static void etp_set_max_poll_reqs (unsigned int maxreqs)
+{
+ if (WORDACCESS_UNSAFE) X_LOCK (reslock);
+ max_poll_reqs = maxreqs;
+ if (WORDACCESS_UNSAFE) X_UNLOCK (reslock);
+}
+
+static void etp_set_max_idle (unsigned int nthreads)
+{
+ if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
+ max_idle = nthreads <= 0 ? 1 : nthreads;
+ if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
+}
+
+static void etp_set_min_parallel (unsigned int nthreads)
+{
+ if (wanted < nthreads)
+ wanted = nthreads;
+}
+
+static void etp_set_max_parallel (unsigned int nthreads)
+{
+ if (wanted > nthreads)
+ wanted = nthreads;
+
+ while (started > wanted)
+ etp_end_thread ();
+}
/*****************************************************************************/
@@ -337,8 +580,6 @@ static void grp_try_feed (eio_req *grp)
}
}
-static int eio_finish (eio_req *req);
-
static int grp_dec (eio_req *grp)
{
--grp->size;
@@ -396,178 +637,62 @@ void eio_grp_cancel (eio_req *grp)
void eio_cancel (eio_req *req)
{
- X_LOCK (etplock);
- req->flags |= EIO_FLAG_CANCELLED;
- X_UNLOCK (etplock);
-
- eio_grp_cancel (req);
+ etp_cancel (req);
}
-X_THREAD_PROC (eio_proc);
-
-static void start_thread (void)
+void eio_submit (eio_req *req)
{
- worker *wrk = calloc (1, sizeof (worker));
-
- /*TODO*/
- assert (("unable to allocate worker thread data", wrk));
-
- X_LOCK (etplock);
-
- if (thread_create (&wrk->tid, eio_proc, (void *)wrk))
- {
- wrk->prev = &wrk_first;
- wrk->next = wrk_first.next;
- wrk_first.next->prev = wrk;
- wrk_first.next = wrk;
- ++started;
- }
- else
- free (wrk);
-
- X_UNLOCK (etplock);
+ etp_submit (req);
}
-static void maybe_start_thread (void)
+unsigned int eio_nreqs (void)
{
- if (eio_nthreads () >= wanted)
- return;
-
- /* todo: maybe use idle here, but might be less exact */
- if (0 <= (int)eio_nthreads () + (int)eio_npending () - (int)eio_nreqs ())
- return;
-
- start_thread ();
+ return etp_nreqs ();
}
-void eio_submit (eio_req *req)
+unsigned int eio_nready (void)
{
- req->pri += EIO_PRI_BIAS;
-
- if (req->pri < EIO_PRI_MIN + EIO_PRI_BIAS) req->pri = EIO_PRI_MIN + EIO_PRI_BIAS;
- if (req->pri > EIO_PRI_MAX + EIO_PRI_BIAS) req->pri = EIO_PRI_MAX + EIO_PRI_BIAS;
-
- ++nreqs;
-
- X_LOCK (reqlock);
- ++nready;
- reqq_push (&req_queue, req);
- X_COND_SIGNAL (reqwait);
- X_UNLOCK (reqlock);
-
- maybe_start_thread ();
+ return etp_nready ();
}
-static void end_thread (void)
+unsigned int eio_npending (void)
{
- eio_req *req = calloc (1, sizeof (eio_req));
-
- req->type = EIO_QUIT;
- req->pri = EIO_PRI_MAX + EIO_PRI_BIAS;
-
- X_LOCK (reqlock);
- reqq_push (&req_queue, req);
- X_COND_SIGNAL (reqwait);
- X_UNLOCK (reqlock);
+ return etp_npending ();
+}
- X_LOCK (etplock);
- --started;
- X_UNLOCK (etplock);
+unsigned int eio_nthreads (void)
+{
+ return etp_nthreads ();
}
void eio_set_max_poll_time (double nseconds)
{
- if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
- etp.max_poll_time = nseconds;
- if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
+ etp_set_max_poll_time (nseconds);
}
void eio_set_max_poll_reqs (unsigned int maxreqs)
{
- if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
- etp.max_poll_reqs = maxreqs;
- if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
+ etp_set_max_poll_reqs (maxreqs);
}
void eio_set_max_idle (unsigned int nthreads)
{
- if (WORDACCESS_UNSAFE) X_LOCK (reqlock);
- max_idle = nthreads <= 0 ? 1 : nthreads;
- if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock);
+ etp_set_max_idle (nthreads);
}
void eio_set_min_parallel (unsigned int nthreads)
{
- if (wanted < nthreads)
- wanted = nthreads;
+ etp_set_min_parallel (nthreads);
}
void eio_set_max_parallel (unsigned int nthreads)
{
- if (wanted > nthreads)
- wanted = nthreads;
-
- while (started > wanted)
- end_thread ();
+ etp_set_max_parallel (nthreads);
}
int eio_poll (void)
{
- int maxreqs = etp.max_poll_reqs;
- struct timeval tv_start, tv_now;
- eio_req *req;
-
- if (etp.max_poll_time)
- gettimeofday (&tv_start, 0);
-
- for (;;)
- {
- maybe_start_thread ();
-
- X_LOCK (reslock);
- req = reqq_shift (&res_queue);
-
- if (req)
- {
- --npending;
-
- if (!res_queue.size && etp.done_poll_cb)
- etp.done_poll_cb ();
- }
-
- X_UNLOCK (reslock);
-
- if (!req)
- return 0;
-
- --nreqs;
-
- if (req->type == EIO_GROUP && req->size)
- {
- req->int1 = 1; /* mark request as delayed */
- continue;
- }
- else
- {
- int res = eio_finish (req);
- if (res)
- return res;
- }
-
- if (maxreqs && !--maxreqs)
- break;
-
- if (etp.max_poll_time)
- {
- gettimeofday (&tv_now, 0);
-
- if (tvdiff (&tv_start, &tv_now) >= etp.max_poll_time)
- break;
- }
- }
-
- errno = EAGAIN;
- return -1;
+ return etp_poll ();
}
/*****************************************************************************/
@@ -707,7 +832,7 @@ eio__readdir_r (DIR *dirp, EIO_STRUCT_DIRENT *ent, EIO_STRUCT_DIRENT **res)
/* sendfile always needs emulation */
static ssize_t
-eio__sendfile (int ofd, int ifd, off_t offset, size_t count, worker *self)
+eio__sendfile (int ofd, int ifd, off_t offset, size_t count, etp_worker *self)
{
ssize_t res;
@@ -801,7 +926,7 @@ eio__sendfile (int ofd, int ifd, off_t offset, size_t count, worker *self)
/* read a full directory */
static void
-eio__scandir (eio_req *req, worker *self)
+eio__scandir (eio_req *req, etp_worker *self)
{
DIR *dirp;
union
@@ -815,12 +940,12 @@ eio__scandir (eio_req *req, worker *self)
int memofs = 0;
int res = 0;
- X_LOCK (etplock);
+ X_LOCK (wrklock);
self->dirp = dirp = opendir (req->ptr1);
self->dbuf = u = malloc (sizeof (*u));
req->flags |= EIO_FLAG_PTR2_FREE;
req->ptr2 = names = malloc (memlen);
- X_UNLOCK (etplock);
+ X_UNLOCK (wrklock);
if (dirp && u && names)
for (;;)
@@ -842,9 +967,9 @@ eio__scandir (eio_req *req, worker *self)
while (memofs + len > memlen)
{
memlen *= 2;
- X_LOCK (etplock);
+ X_LOCK (wrklock);
req->ptr2 = names = realloc (names, memlen);
- X_UNLOCK (etplock);
+ X_UNLOCK (wrklock);
if (!names)
break;
@@ -866,9 +991,9 @@ eio__scandir (eio_req *req, worker *self)
#define ALLOC(len) \
if (!req->ptr2) \
{ \
- X_LOCK (etplock); \
+ X_LOCK (wrklock); \
req->flags |= EIO_FLAG_PTR2_FREE; \
- X_UNLOCK (etplock); \
+ X_UNLOCK (wrklock); \
req->ptr2 = malloc (len); \
if (!req->ptr2) \
{ \
@@ -878,19 +1003,17 @@ eio__scandir (eio_req *req, worker *self)
} \
}
-X_THREAD_PROC (eio_proc)
+X_THREAD_PROC (etp_proc)
{
- eio_req *req;
+ ETP_REQ *req;
struct timespec ts;
- worker *self = (worker *)thr_arg;
+ etp_worker *self = (etp_worker *)thr_arg;
/* try to distribute timeouts somewhat randomly */
ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL);
for (;;)
{
- ts.tv_sec = time (0) + IDLE_TIMEOUT;
-
X_LOCK (reqlock);
for (;;)
@@ -902,21 +1025,21 @@ X_THREAD_PROC (eio_proc)
++idle;
+ ts.tv_sec = time (0) + IDLE_TIMEOUT;
if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT)
{
if (idle > max_idle)
{
--idle;
X_UNLOCK (reqlock);
- X_LOCK (etplock);
+ X_LOCK (wrklock);
--started;
- X_UNLOCK (etplock);
+ X_UNLOCK (wrklock);
goto quit;
}
/* we are allowed to idle, so do so without any timeout */
X_COND_WAIT (reqwait, reqlock);
- ts.tv_sec = time (0) + IDLE_TIMEOUT;
}
--idle;
@@ -1020,7 +1143,7 @@ X_THREAD_PROC (eio_proc)
req->result = 0;
break;
- case EIO_QUIT:
+ case -1:
goto quit;
default:
@@ -1034,19 +1157,19 @@ X_THREAD_PROC (eio_proc)
++npending;
- if (!reqq_push (&res_queue, req) && etp.want_poll_cb)
- etp.want_poll_cb ();
+ if (!reqq_push (&res_queue, req) && want_poll_cb)
+ want_poll_cb ();
self->req = 0;
- worker_clear (self);
+ etp_worker_clear (self);
X_UNLOCK (reslock);
}
quit:
- X_LOCK (etplock);
- worker_free (self);
- X_UNLOCK (etplock);
+ X_LOCK (wrklock);
+ etp_worker_free (self);
+ X_UNLOCK (wrklock);
return 0;
}
@@ -1055,7 +1178,7 @@ quit:
int eio_init (void (*want_poll)(void), void (*done_poll)(void))
{
- etp_init (&etp, want_poll, done_poll);
+ etp_init (want_poll, done_poll);
}
static void eio_api_destroy (eio_req *req)
@@ -1323,7 +1446,7 @@ void eio_grp_add (eio_req *grp, eio_req *req)
ssize_t eio_sendfile_sync (int ofd, int ifd, off_t offset, size_t count)
{
- worker wrk;
+ etp_worker wrk;
wrk.dbuf = 0;