From 8fdf9104af7c10a778e4ed6bd0c501c7d2087cdd Mon Sep 17 00:00:00 2001 From: pixel Date: Sat, 28 Oct 2006 16:42:19 +0000 Subject: Task manager now has timeout support. --- lib/Exceptions.cc | 16 ++- lib/Task.cc | 4 +- lib/TaskMan.cc | 305 +++++++++++++++++++++++++++++++++++++----------------- 3 files changed, 228 insertions(+), 97 deletions(-) (limited to 'lib') diff --git a/lib/Exceptions.cc b/lib/Exceptions.cc index f8dae62..096ba46 100644 --- a/lib/Exceptions.cc +++ b/lib/Exceptions.cc @@ -17,7 +17,7 @@ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -/* $Id: Exceptions.cc,v 1.39 2005-02-24 11:30:05 pixel Exp $ */ +/* $Id: Exceptions.cc,v 1.40 2006-10-28 16:42:19 pixel Exp $ */ #include #include @@ -208,6 +208,16 @@ pid_t xfork() throw (GeneralException) { } #endif +#ifdef HAVE_WAIT +pid_t xwait(int * s) throw (GeneralException) { + return wait(s); +} +#else +pid_t xwait(int *) throw (GeneralException) { + throw GeneralException(_("Function wait() not supported by this system.\n")); +} +#endif + void xexit(int status) throw (GeneralException) { throw Exit(status); } @@ -265,6 +275,10 @@ pid_t Base::fork() { return xfork(); } +pid_t Base::wait(int * s) { + return xwait(s); +} + void Base::exit(int status) { xexit(status); } diff --git a/lib/Task.cc b/lib/Task.cc index 4d161ea..999881e 100644 --- a/lib/Task.cc +++ b/lib/Task.cc @@ -10,7 +10,7 @@ #include "BString.h" #include "gettext.h" -Task::Task() : current(0), state(TASK_ON_HOLD), stopped(false), suspended(false), wbta(0) { +Task::Task() : current(0), state(TASK_ON_HOLD), stopped(false), suspended(false), wbta(0), BurstHandle(0) { TaskMan::AddTask(this); } @@ -83,7 +83,7 @@ void Task::WaitFor(pid_t p) { TaskMan::WaitFor(p, this); } -void Task::WaitFor(timeval t, int flags) { +void Task::WaitFor(const timeval & t, int flags) { TaskMan::WaitFor(t, this, flags); } diff --git a/lib/TaskMan.cc b/lib/TaskMan.cc index 84def6c..a1aa4b1 100644 --- a/lib/TaskMan.cc +++ b/lib/TaskMan.cc @@ -23,6 +23,7 @@ std::vector TaskMan::w4to; bool TaskMan::stopped = false; int TaskMan::number = 0; +int TaskMan::got_sigchild = 0; bool TaskMan::inited = false; int TaskMan::event, TaskMan::eprocess, TaskMan::estatus; @@ -31,32 +32,36 @@ Handle * TaskMan::ehandle; sigset_t TaskMan::sigchildset; -static int got_sigchild = 0; - #ifndef _WIN32 void taskman_sigchild(int sig) { + TaskMan::SigChild(); + signal(SIGCHLD, taskman_sigchild); +} + +void taskman_sigpipe(int sig) { + signal(sig, taskman_sigpipe); +} +void taskman_sighup(int sig) { + signal(sig, taskman_sighup); +} +#endif + +void TaskMan::SigChild() { int status; pid_t pid; pid = wait(&status); - if (TaskMan::GotChild(pid, status)) { + if (GotChild(pid, status)) { got_sigchild++; } else { - TaskMan::WaitFor(pid, 0, status); + WaitFor(pid, 0, status); } // cerr << "Got SIGCHILD, pid = " << pid << " and status = " << status << endl; - signal(SIGCHLD, taskman_sigchild); } -void taskman_sighole(int sig) { - signal(sig, taskman_sighole); -} -#endif - +// Windows implementations of poll and gettimeofday. #ifdef _WIN32 - - #define POLLIN 1 /* Set if data to read. */ #define POLLPRI 2 /* Set if urgent data to read. */ #define POLLOUT 4 /* Set if writing data wouldn't block. */ @@ -78,7 +83,7 @@ struct pollfd { short revents; }; -int poll (struct pollfd *fds, unsigned int nfds, int timeout) { +static int poll (struct pollfd *fds, unsigned int nfds, int timeout) { fd_set read_fds, write_fds, except_fds; struct timeval tv = { timeout / 1000, (timeout % 1000) * 1000 }; int max_fd = 0, retval, changedfds; @@ -136,8 +141,68 @@ int poll (struct pollfd *fds, unsigned int nfds, int timeout) { return retval; } +#define EPOCHFILETIME (116444736000000000i64) + +// special version without timezone support... +static int gettimeofday(struct timeval *tv, struct timezone *tz) +{ + FILETIME ft; + LARGE_INTEGER li; + __int64 t; + + if (tv) + { + GetSystemTimeAsFileTime(&ft); + li.LowPart = ft.dwLowDateTime; + li.HighPart = ft.dwHighDateTime; + t = li.QuadPart; /* In 100-nanosecond intervals */ + t -= EPOCHFILETIME; /* Offset to the Epoch time */ + t /= 10; /* In microseconds */ + tv->tv_sec = (long)(t / 1000000); + tv->tv_usec = (long)(t % 1000000); + } + + return 0; +} #endif +// Static class to handle operations on the timeval structure. +class TimeOps : public Base { + public: + static timeval Add(const timeval & t1, const timeval & t2) { + struct timeval r; + + r.tv_usec = t1.tv_usec + t2.tv_usec; + r.tv_sec = t1.tv_sec + t2.tv_sec; + + if (r.tv_usec >= 1000000) { + r.tv_usec %= 1000000; + r.tv_sec++; + } + + return r; + } + static long Diff(const timeval & t1, const timeval & t2) { + return (t2.tv_sec - t1.tv_sec) * 1000 + (t2.tv_usec - t1.tv_usec) / 1000; + } + static bool IsLE(const timeval & t1, const timeval & t2) { + return (t1.tv_sec <= t2.tv_sec) || ((t1.tv_sec == t2.tv_sec) && (t1.tv_usec <= t2.tv_usec)); + } + static long ToMS(const timeval & t) { + return t.tv_usec / 1000 + t.tv_sec * 1000; + } + static void Sleep(const timeval & t) { + Sleep(ToMS(t)); + } + static void Sleep(Uint32 t) { +#ifdef _WIN32 + ::Sleep(t); +#else + usleep(t * 1000); +#endif + } +}; + int TaskMan::GotChild(pid_t pid, int status) { int r = 0; unsigned int i; @@ -161,7 +226,7 @@ void TaskMan::Init() throw (GeneralException) { #ifndef _WIN32 signal(SIGCHLD, taskman_sigchild); signal(SIGPIPE, taskman_sighole); - signal(SIGHUP, taskman_sighole); + signal(SIGHUP, taskman_sighup); sigemptyset(&sigchildset); sigaddset(&sigchildset, SIGCHLD); @@ -176,26 +241,32 @@ void TaskMan::Stop() { stopped = true; } +// Accessor of the current processed event. int TaskMan::Event() { return event; } +// Accessor of the current task triggering the EVT_TASK event. Task * TaskMan::Etask() { return etask; } +// Accessor of the current handle triggering the EVT_HANDLE event. Handle * TaskMan::Ehandle() { return ehandle; } +// accessor of the current pid_t triggering the EVT_PROCESS event. int TaskMan::Eprocess() { return eprocess; } +// Accessor of the status of the processus triggering the EVT_PROCESS event. int TaskMan::Estatus() { return estatus; } +// Should be only called by the constructor of a Task() void TaskMan::AddTask(Task * t) { if (!inited) { Init(); @@ -224,6 +295,7 @@ std::vector::iterator TaskMan::FindTask(Task * t) throw (GeneralExceptio throw TaskNotFound(); } +// Should only be called by the destructor of a task. void TaskMan::RemoveFromWatches(Task * t) { if (!w4ha.empty()) { for (std::vector::iterator p = w4ha.begin(); p != w4ha.end(); p++) { @@ -259,10 +331,22 @@ void TaskMan::RemoveFromWatches(Task * t) { (*p)->RemoveFromWatches(); TaskList.erase(p); number--; - p--; + p = TaskList.begin(); } else if ((*p) == t) { TaskList.erase(p); number--; + p = TaskList.begin(); + } + } + } +} + +// Possibility to cancel a timeout. +void TaskMan::RemoveTimeout(Task * t) { + if (!w4to.empty()) { + for (std::vector::iterator p = w4to.begin(); p != w4to.end(); p++) { + if (p->T == t) { + w4to.erase(p); p--; } } @@ -291,10 +375,13 @@ void TaskMan::WaitFor(pid_t pid, Task * t, int status) { w4pr[w4pr.size() - 1].status = status; } -void TaskMan::WaitFor(timeval t, Task * T, int flags) { - w4to.push_back(w4to_t(t, flags, T)); +void TaskMan::WaitFor(const timeval & t, Task * T, int flags) { + struct timeval tod; + gettimeofday(&tod, 0); + w4to.push_back(w4to_t(TimeOps::Add(tod, t), flags, T)); } +// The big ugly mainloop. void TaskMan::MainLoop() throw (GeneralException) { struct pollfd * ufsd; unsigned int nfds; @@ -311,6 +398,10 @@ void TaskMan::MainLoop() throw (GeneralException) { } if (stopped) return; + + // We should have a small fuse here, and check if there is no 'dead' task, + // that is, a task which doesn't have anything to wait for, but which is + // not a zombie. // cerr << "-=- TaskMan: begin main loop with " << number << " task to manage.\n"; if (!TaskList.empty()) { @@ -326,7 +417,7 @@ void TaskMan::MainLoop() throw (GeneralException) { while (!no_burst) { no_burst = 1; /* First, we will check for any burning task and run 'em */ - event = E_BURST; + event = Task::EVT_BURST; if (!TaskList.empty()) { for (TaskList_t::iterator p = TaskList.begin(); p != TaskList.end(); p++) { Task * t = *p; @@ -335,84 +426,118 @@ void TaskMan::MainLoop() throw (GeneralException) { continue; } - if (t->GetState() == TASK_BURST) { + if (t->GetState() == Task::TASK_BURST) { // cerr << "-=- TaskMan: running burning task " << t->GetName() << endl; + if (t->BurstHandle) { + // This task got the burst status from + // a non-watchable handle. Let's fake the handle event. + event = Task::EVT_HANDLE; + ehandle = t->BurstHandle; + } t->Run(); + if (t->BurstHandle) { + event = Task::EVT_BURST; + t->BurstHandle = 0; + } /* if the task added some new tasks, we have to rerun the loop */ no_burst = 0; break; } - if (t->GetState() == TASK_DONE) { - TaskList.erase(p); - number--; - p = TaskList.begin(); - Zombies.push_back(t); - } + // Additionnally, if the task died, let's put it in the zombies list. + // This check is done on the whole TaskList at each loop. + if (CheckDead(t)) + p = TaskList.begin(); } } } - /* Let's compute the nearest timeout, and eventually, launch the outdated timeouts. */ + /* Let's compute the nearest timeout, and run a round trip for expired timeouts. */ int timeout = -1; - event = E_TIMEOUT; - - if (!w4to.empty()) { - time_t curtime = time(NULL); + event = Task::EVT_TIMEOUT; + bool got_timeout = true, ran_on_timeout = false; + + while (!w4to.empty() && got_timeout) { + got_timeout = false; + struct timeval current; + gettimeofday(¤t, 0); + timeout = -1; for (std::vector::iterator p = w4to.begin(); p != w4to.end(); p++) { - int cur_to; - cur_to = (p->to.tv_sec - curtime) * 1000 + p->to.tv_usec; - if (cur_to < 0) { - - } + if (TimeOps::IsLE(p->to, current)) { + // This timeout expired; run the task. + w4to_t w4 = *p; + w4to.erase(p); + w4.T->Run(); + ran_on_timeout = true; + got_timeout = true; + CheckDead(w4.T); + break; + } else { + // Otherwise, let's keep track of the smallest timeout value. + long diff = TimeOps::Diff(current, p->to); + if ((timeout == -1) || (timeout > diff)) { + timeout = diff; + } + } } } - /* Now is time to check all the handle and enter into a wait state. */ - - event = E_HANDLE; // cerr << "-=- TaskMan: processing handle-waiting tasks.\n"; + if ((w4ha.size() == 0) && !ran_on_timeout && (timeout != -1) && (Zombies.size() == 0)) { + // In order to avoid the engine to run full speed without anything real to do, + // let's eat the timeout here. + TimeOps::Sleep(timeout); + } + + /* Now is time to check all the handle and enter into a wait state. */ + + event = Task::EVT_HANDLE; nfds = w4ha.size(); no_burst = 1; - - if (nfds != 0) { + + if (nfds != 0) { int r; std::vector::iterator p; struct pollfd * q; ufsd = (struct pollfd *) malloc(nfds * sizeof(struct pollfd)); - if (!w4ha.empty()) { - for (q = ufsd, p = w4ha.begin(); p != w4ha.end(); p++, q++) { - p->dirthy = false; - if (p->T->IsStopped()) { - q->fd = 0; - q->events = 0; + // Let's build the pollfd structure. + for (q = ufsd, p = w4ha.begin(); p != w4ha.end(); p++, q++) { + p->dirty = false; + // A stopped task doesn't get cookies. + if (p->T->IsStopped()) { + q->fd = 0; + q->events = 0; + } else { + if (p->ha->CanWatch()) { + // If that handle can be watched, then let's fill in the pollfd structure. + q->fd = p->ha->GetHandle(); + q->events = (p->flags & Task::W4_READING ? POLLIN : 0) | (p->flags & Task::W4_WRITING ? POLLOUT : 0); } else { - if (p->ha->CanWatch()) { - q->fd = p->ha->GetHandle(); - q->events = (p->flags & W4_READING ? POLLIN : 0) | (p->flags & W4_WRITING ? POLLOUT : 0); - } else { - p->T->SetBurst(); - no_burst = 0; - p->dirthy = true; - q->fd = 0; - q->events = 0; - } + // Otherwise, let's put the task in burst mode, and set its status to dirty, + // considering it already processed for this handle loop. + // It'll be run immediately next cycle. + p->T->SetBurst(); + no_burst = 0; + p->dirty = true; + q->fd = 0; + q->events = 0; } - } - for (p = w4ha.begin(); p != w4ha.end(); p++) { - if (!p->T->IsStopped() && !p->ha->CanWatch() && !(p->flags & W4_STICKY)) { - w4ha.erase(p); - p = w4ha.begin(); - } - } + } } + // Now the list is done, let's clean up the w4ha list from the non watchable handles. + for (p = w4ha.begin(); p != w4ha.end(); p++) { + if (!p->T->IsStopped() && !p->ha->CanWatch() && !(p->flags & Task::W4_STICKY)) { + w4ha.erase(p); + p = w4ha.begin(); + } + } #ifndef _WIN32 sigprocmask(SIG_UNBLOCK, &sigchildset, 0); #endif - r = poll(ufsd, nfds, (no_burst) && !(Zombies.size()) && !(got_sigchild) ? -1: 0); + r = poll(ufsd, nfds, (no_burst) && !(Zombies.size()) && !(got_sigchild) ? timeout: 0); #ifndef _WIN32 sigprocmask(SIG_BLOCK, &sigchildset, 0); #endif @@ -422,10 +547,8 @@ void TaskMan::MainLoop() throw (GeneralException) { } } else if (r == 0) { // timeout. - // **FIXME** -#ifndef _WIN32 -#warning FIXME -#endif + // We shouldn't do anything here, since the next loop should catch + // the expired timeouts. } else { int fd; struct pollfd * q; @@ -450,13 +573,13 @@ void TaskMan::MainLoop() throw (GeneralException) { if (!w4ha.empty()) { for (std::vector::iterator p = w4ha.begin(); p != w4ha.end(); p = touched ? w4ha.begin() : p + 1) { touched = false; - if ((p->ha->GetHandle() == fd) && (!p->T->IsStopped()) && (p->T->GetState() != TASK_DONE) && (!p->dirthy)) { + if ((p->ha->GetHandle() == fd) && (!p->T->IsStopped()) && (p->T->GetState() != Task::TASK_DONE) && (!p->dirty)) { // We've got one, launch it. // cerr << "-=- TaskMan: launching task " << p->T->GetName() << " for handle " << p->ha->GetHandle() << endl; w4ha_t w4 = *p; - p->dirthy = true; + p->dirty = true; - if (!(p->flags & W4_STICKY)) { + if (!(p->flags & Task::W4_STICKY)) { w4ha.erase(p); } @@ -464,18 +587,7 @@ void TaskMan::MainLoop() throw (GeneralException) { ehandle = p->ha; w4.T->Run(); - - if (w4.T->GetState() == TASK_DONE) { - // This task died, remove it. - try { - std::vector::iterator q = FindTask(w4.T); - TaskList.erase(q); - number--; - Zombies.push_back(w4.T); - } - catch (TaskNotFound) { - } - } + CheckDead(w4.T); } } } @@ -490,7 +602,7 @@ void TaskMan::MainLoop() throw (GeneralException) { int no_zombies; no_zombies = 0; - event = E_TASK; + event = Task::EVT_TASK; // cerr << "-=- TaskMan: processing zombies loop.\n"; while (!no_zombies) { @@ -507,14 +619,8 @@ void TaskMan::MainLoop() throw (GeneralException) { // cerr << "-=- TaskMan: running task " << o->GetName() << " for task " << t->GetName() << endl; etask = t; o->Run(); - - if (o->GetState() == TASK_DONE) { - TaskList_t::iterator f = FindTask(o); - TaskList.erase(f); - number--; - Zombies.push_back(o); - no_zombies = 0; - } + if (CheckDead(o)) + no_zombies = 0; } else { delete t; } @@ -522,9 +628,9 @@ void TaskMan::MainLoop() throw (GeneralException) { } } - /* To end up the loop, let's recall task waiting for processes */ + /* To end up the loop, let's recall tasks waiting for processes */ - event = E_PROCESS; + event = Task::EVT_PROCESS; // cerr << "-=- TaskMan: processing child-waiting tasks.\n"; if (got_sigchild) { @@ -549,3 +655,14 @@ void TaskMan::MainLoop() throw (GeneralException) { } } } + +bool TaskMan::CheckDead(Task * t) { + if (t->GetState() == Task::TASK_DONE) { + TaskList_t::iterator f = FindTask(t); + TaskList.erase(f); + number--; + Zombies.push_back(t); + return true; + } + return false; +} -- cgit v1.2.3