From 606233b22cf296dee6f6e80822515e0cd9789efa Mon Sep 17 00:00:00 2001 From: root Date: Thu, 31 Jan 2008 13:10:56 +0000 Subject: first round of ev_async --- ev.c | 223 ++++++++++++++++++++++++++++++++++++++------------------- ev.h | 34 ++++++++- ev.pod | 49 +++++++++++++ ev_vars.h | 12 +++- ev_wrap.h | 12 ++++ event_compat.h | 1 + 6 files changed, 255 insertions(+), 76 deletions(-) diff --git a/ev.c b/ev.c index b33ad3b..e7a189a 100644 --- a/ev.c +++ b/ev.c @@ -1,7 +1,7 @@ /* * libev event processing core, watcher management * - * Copyright (c) 2007 Marc Alexander Lehmann + * Copyright (c) 2007,2008 Marc Alexander Lehmann * All rights reserved. * * Redistribution and use in source and binary forms, with or without modifica- @@ -293,7 +293,7 @@ typedef ev_watcher_time *WT; #if EV_USE_MONOTONIC /* sig_atomic_t is used to avoid per-thread variables or locking but still */ /* giving it a reasonably high chance of working on typical architetcures */ -static sig_atomic_t have_monotonic; /* did clock_gettime (CLOCK_MONOTONIC) work? */ +static EV_ATOMIC_T have_monotonic; /* did clock_gettime (CLOCK_MONOTONIC) work? */ #endif #ifdef _WIN32 @@ -765,15 +765,13 @@ adjustheap (WT *heap, int N, int k) typedef struct { WL head; - sig_atomic_t volatile gotsig; + EV_ATOMIC_T gotsig; } ANSIG; static ANSIG *signals; static int signalmax; -static int sigpipe [2]; -static sig_atomic_t volatile gotsig; -static ev_io sigev; +static EV_ATOMIC_T gotsig; void inline_size signals_init (ANSIG *base, int count) @@ -787,24 +785,101 @@ signals_init (ANSIG *base, int count) } } -static void -sighandler (int signum) +/*****************************************************************************/ + +void inline_speed +fd_intern (int fd) { -#if _WIN32 - signal (signum, sighandler); +#ifdef _WIN32 + int arg = 1; + ioctlsocket (_get_osfhandle (fd), FIONBIO, &arg); +#else + fcntl (fd, F_SETFD, FD_CLOEXEC); + fcntl (fd, F_SETFL, O_NONBLOCK); #endif +} - signals [signum - 1].gotsig = 1; +static void noinline +evpipe_init (EV_P) +{ + if (!ev_is_active (&pipeev)) + { + while (pipe (evpipe)) + syserr ("(libev) error creating signal/async pipe"); + + fd_intern (evpipe [0]); + fd_intern (evpipe [1]); + + ev_io_set (&pipeev, evpipe [0], EV_READ); + ev_io_start (EV_A_ &pipeev); + ev_unref (EV_A); /* child watcher should not keep loop alive */ + } +} - if (!gotsig) +void inline_size +evpipe_write (EV_P_ int sig, int async) +{ + if (!(gotasync || gotsig)) { int old_errno = errno; - gotsig = 1; - write (sigpipe [1], &signum, 1); + + if (sig) gotsig = 1; + if (async) gotasync = 1; + + write (evpipe [1], &old_errno, 1); errno = old_errno; } } +static void +pipecb (EV_P_ ev_io *iow, int revents) +{ + { + int dummy; + read (evpipe [0], &dummy, 1); + } + + if (gotsig) + { + int signum; + gotsig = 0; + + for (signum = signalmax; signum--; ) + if (signals [signum].gotsig) + ev_feed_signal_event (EV_A_ signum + 1); + } + + if (gotasync) + { + int i; + gotasync = 0; + + for (i = asynccnt; i--; ) + if (asyncs [i]->sent) + { + asyncs [i]->sent = 0; + ev_feed_event (EV_A_ asyncs [i], EV_ASYNC); + } + } +} + +/*****************************************************************************/ + +static void +sighandler (int signum) +{ +#if EV_MULTIPLICITY + struct ev_loop *loop = &default_loop_struct; +#endif + +#if _WIN32 + signal (signum, sighandler); +#endif + + signals [signum - 1].gotsig = 1; + evpipe_write (EV_A_ 1, 0); +} + void noinline ev_feed_signal_event (EV_P_ int signum) { @@ -825,42 +900,6 @@ ev_feed_signal_event (EV_P_ int signum) ev_feed_event (EV_A_ (W)w, EV_SIGNAL); } -static void -sigcb (EV_P_ ev_io *iow, int revents) -{ - int signum; - - read (sigpipe [0], &revents, 1); - gotsig = 0; - - for (signum = signalmax; signum--; ) - if (signals [signum].gotsig) - ev_feed_signal_event (EV_A_ signum + 1); -} - -void inline_speed -fd_intern (int fd) -{ -#ifdef _WIN32 - int arg = 1; - ioctlsocket (_get_osfhandle (fd), FIONBIO, &arg); -#else - fcntl (fd, F_SETFD, FD_CLOEXEC); - fcntl (fd, F_SETFL, O_NONBLOCK); -#endif -} - -static void noinline -siginit (EV_P) -{ - fd_intern (sigpipe [0]); - fd_intern (sigpipe [1]); - - ev_io_set (&sigev, sigpipe [0], EV_READ); - ev_io_start (EV_A_ &sigev); - ev_unref (EV_A); /* child watcher should not keep loop alive */ -} - /*****************************************************************************/ static WL childs [EV_PID_HASHSIZE]; @@ -1086,8 +1125,8 @@ loop_init (EV_P_ unsigned int flags) if (!backend && (flags & EVBACKEND_SELECT)) backend = select_init (EV_A_ flags); #endif - ev_init (&sigev, sigcb); - ev_set_priority (&sigev, EV_MAXPRI); + ev_init (&pipeev, pipecb); + ev_set_priority (&pipeev, EV_MAXPRI); } } @@ -1096,6 +1135,15 @@ loop_destroy (EV_P) { int i; + if (ev_is_active (&pipeev)) + { + ev_ref (EV_A); /* signal watcher */ + ev_io_stop (EV_A_ &pipeev); + + close (evpipe [0]); evpipe [0] = 0; + close (evpipe [1]); evpipe [1] = 0; + } + #if EV_USE_INOTIFY if (fs_fd >= 0) close (fs_fd); @@ -1163,20 +1211,19 @@ loop_fork (EV_P) infy_fork (EV_A); #endif - if (ev_is_active (&sigev)) + if (ev_is_active (&pipeev)) { - /* default loop */ + /* this "locks" the handlers against writing to the pipe */ + gotsig = gotasync = 1; ev_ref (EV_A); - ev_io_stop (EV_A_ &sigev); - close (sigpipe [0]); - close (sigpipe [1]); + ev_io_stop (EV_A_ &pipeev); + close (evpipe [0]); + close (evpipe [1]); - while (pipe (sigpipe)) - syserr ("(libev) error creating pipe"); - - siginit (EV_A); - sigcb (EV_A_ &sigev, EV_READ); + evpipe_init (EV_A); + /* now iterate over everything */ + evcb (EV_A_ &pipeev, EV_READ); } postfork = 0; @@ -1221,10 +1268,6 @@ int ev_default_loop (unsigned int flags) #endif { - if (sigpipe [0] == sigpipe [1]) - if (pipe (sigpipe)) - return 0; - if (!ev_default_loop_ptr) { #if EV_MULTIPLICITY @@ -1237,8 +1280,6 @@ ev_default_loop (unsigned int flags) if (ev_backend (EV_A)) { - siginit (EV_A); - #ifndef _WIN32 ev_signal_init (&childev, childcb, SIGCHLD); ev_set_priority (&childev, EV_MAXPRI); @@ -1265,12 +1306,6 @@ ev_default_destroy (void) ev_signal_stop (EV_A_ &childev); #endif - ev_ref (EV_A); /* signal watcher */ - ev_io_stop (EV_A_ &sigev); - - close (sigpipe [0]); sigpipe [0] = 0; - close (sigpipe [1]); sigpipe [1] = 0; - loop_destroy (EV_A); } @@ -1867,6 +1902,8 @@ ev_signal_start (EV_P_ ev_signal *w) assert (("ev_signal_start called with illegal signal number", w->signum > 0)); + evpipe_init (EV_A); + { #ifndef _WIN32 sigset_t full, prev; @@ -2389,6 +2426,44 @@ ev_fork_stop (EV_P_ ev_fork *w) } #endif +#if EV_ASYNC_ENABLE +void +ev_async_start (EV_P_ ev_async *w) +{ + if (expect_false (ev_is_active (w))) + return; + + evpipe_init (EV_A); + + ev_start (EV_A_ (W)w, ++asynccnt); + array_needsize (ev_async *, asyncs, asyncmax, asynccnt, EMPTY2); + asyncs [asynccnt - 1] = w; +} + +void +ev_async_stop (EV_P_ ev_async *w) +{ + clear_pending (EV_A_ (W)w); + if (expect_false (!ev_is_active (w))) + return; + + { + int active = ((W)w)->active; + asyncs [active - 1] = asyncs [--asynccnt]; + ((W)asyncs [active - 1])->active = active; + } + + ev_stop (EV_A_ (W)w); +} + +void +ev_async_send (EV_P_ ev_async *w) +{ + w->sent = 1; + evpipe_write (EV_A_ 0, 1); +} +#endif + /*****************************************************************************/ struct ev_once diff --git a/ev.h b/ev.h index b1ebab5..4dab80d 100644 --- a/ev.h +++ b/ev.h @@ -1,7 +1,7 @@ /* * libev native API header * - * Copyright (c) 2007 Marc Alexander Lehmann + * Copyright (c) 2007,2008 Marc Alexander Lehmann * All rights reserved. * * Redistribution and use in source and binary forms, with or without modifica- @@ -78,6 +78,15 @@ typedef double ev_tstamp; # define EV_EMBED_ENABLE 1 #endif +#ifndef EV_ASYNC_ENABLE +# define EV_ASYNC_ENABLE 1 +#endif + +#ifndef EV_ATOMIC_T +# include +# define EV_ATOMIC_T sig_atomic_t volatile +#endif + /*****************************************************************************/ #if EV_STAT_ENABLE @@ -120,6 +129,7 @@ struct ev_loop; #define EV_CHECK 0x00008000L /* event loop finished poll */ #define EV_EMBED 0x00010000L /* embedded event loop needs sweep */ #define EV_FORK 0x00020000L /* event loop resumed in child */ +#define EV_ASYNC 0x00040000L /* async intra-loop signal */ #define EV_ERROR 0x80000000L /* sent when an error occurs */ /* can be used to add custom fields to all watchers, while losing binary compatibility */ @@ -307,6 +317,17 @@ typedef struct ev_embed } ev_embed; #endif +#if EV_ASYNC_ENABLE +/* invoked when somebody calls ev_async_send on the watcher */ +/* revent EV_ASYNC */ +typedef struct ev_async +{ + EV_WATCHER (ev_async) + + EV_ATOMIC_T sent; /* private */ +} ev_async; +#endif + /* the presence of this union forces similar struct layout */ union ev_any_watcher { @@ -332,6 +353,9 @@ union ev_any_watcher #if EV_EMBED_ENABLE struct ev_embed embed; #endif +#if EV_ASYND_ENABLE + struct ev_async async; +#endif }; /* bits for ev_default_loop and ev_loop_new */ @@ -465,6 +489,7 @@ void ev_once (EV_P_ int fd, int events, ev_tstamp timeout, void (*cb)(int revent #define ev_check_set(ev) /* nop, yes, this is a serious in-joke */ #define ev_embed_set(ev,other_) do { (ev)->other = (other_); } while (0) #define ev_fork_set(ev) /* nop, yes, this is a serious in-joke */ +#define ev_async_set(ev) do { (ev)->gotasync = 0; } while (0) #define ev_io_init(ev,cb,fd,events) do { ev_init ((ev), (cb)); ev_io_set ((ev),(fd),(events)); } while (0) #define ev_timer_init(ev,cb,after,repeat) do { ev_init ((ev), (cb)); ev_timer_set ((ev),(after),(repeat)); } while (0) @@ -477,6 +502,7 @@ void ev_once (EV_P_ int fd, int events, ev_tstamp timeout, void (*cb)(int revent #define ev_check_init(ev,cb) do { ev_init ((ev), (cb)); ev_check_set ((ev)); } while (0) #define ev_embed_init(ev,cb,other) do { ev_init ((ev), (cb)); ev_embed_set ((ev),(other)); } while (0) #define ev_fork_init(ev,cb) do { ev_init ((ev), (cb)); ev_fork_set ((ev)); } while (0) +#define ev_async_init(ev,cb) do { ev_init ((ev), (cb)); ev_async_set ((ev)); } while (0) #define ev_is_pending(ev) (0 + ((ev_watcher *)(void *)(ev))->pending) /* ro, true when watcher is waiting for callback invocation */ #define ev_is_active(ev) (0 + ((ev_watcher *)(void *)(ev))->active) /* ro, true when the watcher has been started */ @@ -552,6 +578,12 @@ void ev_embed_stop (EV_P_ ev_embed *w); void ev_embed_sweep (EV_P_ ev_embed *w); # endif +# if EV_ASYNC_ENABLE +void ev_async_start (EV_P_ ev_async *w); +void ev_async_stop (EV_P_ ev_async *w); +void ev_async_send (EV_P_ ev_async *w); +# endif + #endif #ifdef __cplusplus diff --git a/ev.pod b/ev.pod index ff63696..46bac97 100644 --- a/ev.pod +++ b/ev.pod @@ -776,6 +776,10 @@ The embedded event loop specified in the C watcher needs attention. The event loop has been resumed in the child process after fork (see C). +=item C + +The given async watcher has been asynchronously notified (see C). + =item C An unspecified error has occured, the watcher has been stopped. This might @@ -2048,6 +2052,51 @@ believe me. =back +=head2 C - how to wake up another event loop + +In general, you cannot use an C from multiple threads or other +asynchronous sources such as signal handlers (as opposed to multiple event +loops - those are of course safe to use in different threads). + +Sometimes, however, you need to wake up another event loop you do not +control, for example because it belongs to another thread. This is what +C watchers do: as long as the C watcher is active, you +can signal it by calling C, which is thread- and signal +safe. + +This functionality is very similar to C watchers, as signals, +too, are asynchronous in nature, and signals, too, will be compressed +(i.e. the number of callback invocations may be less than the number of +C calls). + +Unlike C watchers, C works with any event loop, not +just the default loop. + +=head3 Watcher-Specific Functions and Data Members + +=over 4 + +=item ev_async_init (ev_async *, callback) + +Initialises and configures the async watcher - it has no parameters of any +kind. There is a C macro, but using it is utterly pointless, +believe me. + +=item ev_async_send (loop, ev_async *) + +Sends/signals/activates the given C watcher, that is, feeds +an C event on the watcher into the event loop. Unlike +C, this call is safe to do in other threads, signal or +similar contexts (see the dicusssion of C in the embedding +section below on what exactly this means). + +This call incurs the overhead of a syscall only once per loop iteration, +so while the overhead might be noticable, it doesn't apply to repeated +calls to C. + +=back + + =head1 OTHER FUNCTIONS There are some other functions of possible interest. Described. Here. Now. diff --git a/ev_vars.h b/ev_vars.h index 4ac997e..c9637f2 100644 --- a/ev_vars.h +++ b/ev_vars.h @@ -1,7 +1,7 @@ /* * loop member variable declarations * - * Copyright (c) 2007 Marc Alexander Lehmann + * Copyright (c) 2007,2008 Marc Alexander Lehmann * All rights reserved. * * Redistribution and use in source and binary forms, with or without modifica- @@ -55,6 +55,9 @@ VARx(ev_tstamp, backend_fudge) /* assumed typical timer resolution */ VAR (backend_modify, void (*backend_modify)(EV_P_ int fd, int oev, int nev)) VAR (backend_poll , void (*backend_poll)(EV_P_ ev_tstamp timeout)) +VAR (evpipe, int evpipe [2]) +VARx(ev_io, pipeev) + #if !defined(_WIN32) || EV_GENWRAP VARx(pid_t, curpid) #endif @@ -137,6 +140,13 @@ VARx(int, forkmax) VARx(int, forkcnt) #endif +#if EV_ASYNC_ENABLE || EV_GENWRAP +VARx(EV_ATOMIC_T, gotasync) +VARx(struct ev_async **, asyncs) +VARx(int, asyncmax) +VARx(int, asynccnt) +#endif + #if EV_USE_INOTIFY || EV_GENWRAP VARx(int, fs_fd) VARx(ev_io, fs_w) diff --git a/ev_wrap.h b/ev_wrap.h index f9bc61e..fbfc802 100644 --- a/ev_wrap.h +++ b/ev_wrap.h @@ -13,6 +13,8 @@ #define backend_fudge ((loop)->backend_fudge) #define backend_modify ((loop)->backend_modify) #define backend_poll ((loop)->backend_poll) +#define evpipe ((loop)->evpipe) +#define pipeev ((loop)->pipeev) #define curpid ((loop)->curpid) #define postfork ((loop)->postfork) #define vec_ri ((loop)->vec_ri) @@ -61,6 +63,10 @@ #define forks ((loop)->forks) #define forkmax ((loop)->forkmax) #define forkcnt ((loop)->forkcnt) +#define gotasync ((loop)->gotasync) +#define asyncs ((loop)->asyncs) +#define asyncmax ((loop)->asyncmax) +#define asynccnt ((loop)->asynccnt) #define fs_fd ((loop)->fs_fd) #define fs_w ((loop)->fs_w) #define fs_hash ((loop)->fs_hash) @@ -78,6 +84,8 @@ #undef backend_fudge #undef backend_modify #undef backend_poll +#undef evpipe +#undef pipeev #undef curpid #undef postfork #undef vec_ri @@ -126,6 +134,10 @@ #undef forks #undef forkmax #undef forkcnt +#undef gotasync +#undef asyncs +#undef asyncmax +#undef asynccnt #undef fs_fd #undef fs_w #undef fs_hash diff --git a/event_compat.h b/event_compat.h index 4811906..d5cc1ef 100644 --- a/event_compat.h +++ b/event_compat.h @@ -1,5 +1,6 @@ /* * Copyright (c) 2000-2004 Niels Provos + * Copyright (c) 2008 Marc Alexander Lehmann * All rights reserved. * * Redistribution and use in source and binary forms, with or without -- cgit v1.2.3