summaryrefslogtreecommitdiff
path: root/lib/TaskMan.cc
diff options
context:
space:
mode:
Diffstat (limited to 'lib/TaskMan.cc')
-rw-r--r--lib/TaskMan.cc305
1 files changed, 211 insertions, 94 deletions
diff --git a/lib/TaskMan.cc b/lib/TaskMan.cc
index 84def6c..a1aa4b1 100644
--- a/lib/TaskMan.cc
+++ b/lib/TaskMan.cc
@@ -23,6 +23,7 @@ std::vector<TaskMan::w4to_t> TaskMan::w4to;
bool TaskMan::stopped = false;
int TaskMan::number = 0;
+int TaskMan::got_sigchild = 0;
bool TaskMan::inited = false;
int TaskMan::event, TaskMan::eprocess, TaskMan::estatus;
@@ -31,32 +32,36 @@ Handle * TaskMan::ehandle;
sigset_t TaskMan::sigchildset;
-static int got_sigchild = 0;
-
#ifndef _WIN32
void taskman_sigchild(int sig) {
+ TaskMan::SigChild();
+ signal(SIGCHLD, taskman_sigchild);
+}
+
+void taskman_sigpipe(int sig) {
+ signal(sig, taskman_sigpipe);
+}
+void taskman_sighup(int sig) {
+ signal(sig, taskman_sighup);
+}
+#endif
+
+void TaskMan::SigChild() {
int status;
pid_t pid;
pid = wait(&status);
- if (TaskMan::GotChild(pid, status)) {
+ if (GotChild(pid, status)) {
got_sigchild++;
} else {
- TaskMan::WaitFor(pid, 0, status);
+ WaitFor(pid, 0, status);
}
// cerr << "Got SIGCHILD, pid = " << pid << " and status = " << status << endl;
- signal(SIGCHLD, taskman_sigchild);
}
-void taskman_sighole(int sig) {
- signal(sig, taskman_sighole);
-}
-#endif
-
+// Windows implementations of poll and gettimeofday.
#ifdef _WIN32
-
-
#define POLLIN 1 /* Set if data to read. */
#define POLLPRI 2 /* Set if urgent data to read. */
#define POLLOUT 4 /* Set if writing data wouldn't block. */
@@ -78,7 +83,7 @@ struct pollfd {
short revents;
};
-int poll (struct pollfd *fds, unsigned int nfds, int timeout) {
+static int poll (struct pollfd *fds, unsigned int nfds, int timeout) {
fd_set read_fds, write_fds, except_fds;
struct timeval tv = { timeout / 1000, (timeout % 1000) * 1000 };
int max_fd = 0, retval, changedfds;
@@ -136,8 +141,68 @@ int poll (struct pollfd *fds, unsigned int nfds, int timeout) {
return retval;
}
+#define EPOCHFILETIME (116444736000000000i64)
+
+// special version without timezone support...
+static int gettimeofday(struct timeval *tv, struct timezone *tz)
+{
+ FILETIME ft;
+ LARGE_INTEGER li;
+ __int64 t;
+
+ if (tv)
+ {
+ GetSystemTimeAsFileTime(&ft);
+ li.LowPart = ft.dwLowDateTime;
+ li.HighPart = ft.dwHighDateTime;
+ t = li.QuadPart; /* In 100-nanosecond intervals */
+ t -= EPOCHFILETIME; /* Offset to the Epoch time */
+ t /= 10; /* In microseconds */
+ tv->tv_sec = (long)(t / 1000000);
+ tv->tv_usec = (long)(t % 1000000);
+ }
+
+ return 0;
+}
#endif
+// Static class to handle operations on the timeval structure.
+class TimeOps : public Base {
+ public:
+ static timeval Add(const timeval & t1, const timeval & t2) {
+ struct timeval r;
+
+ r.tv_usec = t1.tv_usec + t2.tv_usec;
+ r.tv_sec = t1.tv_sec + t2.tv_sec;
+
+ if (r.tv_usec >= 1000000) {
+ r.tv_usec %= 1000000;
+ r.tv_sec++;
+ }
+
+ return r;
+ }
+ static long Diff(const timeval & t1, const timeval & t2) {
+ return (t2.tv_sec - t1.tv_sec) * 1000 + (t2.tv_usec - t1.tv_usec) / 1000;
+ }
+ static bool IsLE(const timeval & t1, const timeval & t2) {
+ return (t1.tv_sec <= t2.tv_sec) || ((t1.tv_sec == t2.tv_sec) && (t1.tv_usec <= t2.tv_usec));
+ }
+ static long ToMS(const timeval & t) {
+ return t.tv_usec / 1000 + t.tv_sec * 1000;
+ }
+ static void Sleep(const timeval & t) {
+ Sleep(ToMS(t));
+ }
+ static void Sleep(Uint32 t) {
+#ifdef _WIN32
+ ::Sleep(t);
+#else
+ usleep(t * 1000);
+#endif
+ }
+};
+
int TaskMan::GotChild(pid_t pid, int status) {
int r = 0;
unsigned int i;
@@ -161,7 +226,7 @@ void TaskMan::Init() throw (GeneralException) {
#ifndef _WIN32
signal(SIGCHLD, taskman_sigchild);
signal(SIGPIPE, taskman_sighole);
- signal(SIGHUP, taskman_sighole);
+ signal(SIGHUP, taskman_sighup);
sigemptyset(&sigchildset);
sigaddset(&sigchildset, SIGCHLD);
@@ -176,26 +241,32 @@ void TaskMan::Stop() {
stopped = true;
}
+// Accessor of the current processed event.
int TaskMan::Event() {
return event;
}
+// Accessor of the current task triggering the EVT_TASK event.
Task * TaskMan::Etask() {
return etask;
}
+// Accessor of the current handle triggering the EVT_HANDLE event.
Handle * TaskMan::Ehandle() {
return ehandle;
}
+// accessor of the current pid_t triggering the EVT_PROCESS event.
int TaskMan::Eprocess() {
return eprocess;
}
+// Accessor of the status of the processus triggering the EVT_PROCESS event.
int TaskMan::Estatus() {
return estatus;
}
+// Should be only called by the constructor of a Task()
void TaskMan::AddTask(Task * t) {
if (!inited) {
Init();
@@ -224,6 +295,7 @@ std::vector<Task *>::iterator TaskMan::FindTask(Task * t) throw (GeneralExceptio
throw TaskNotFound();
}
+// Should only be called by the destructor of a task.
void TaskMan::RemoveFromWatches(Task * t) {
if (!w4ha.empty()) {
for (std::vector<w4ha_t>::iterator p = w4ha.begin(); p != w4ha.end(); p++) {
@@ -259,10 +331,22 @@ void TaskMan::RemoveFromWatches(Task * t) {
(*p)->RemoveFromWatches();
TaskList.erase(p);
number--;
- p--;
+ p = TaskList.begin();
} else if ((*p) == t) {
TaskList.erase(p);
number--;
+ p = TaskList.begin();
+ }
+ }
+ }
+}
+
+// Possibility to cancel a timeout.
+void TaskMan::RemoveTimeout(Task * t) {
+ if (!w4to.empty()) {
+ for (std::vector<w4to_t>::iterator p = w4to.begin(); p != w4to.end(); p++) {
+ if (p->T == t) {
+ w4to.erase(p);
p--;
}
}
@@ -291,10 +375,13 @@ void TaskMan::WaitFor(pid_t pid, Task * t, int status) {
w4pr[w4pr.size() - 1].status = status;
}
-void TaskMan::WaitFor(timeval t, Task * T, int flags) {
- w4to.push_back(w4to_t(t, flags, T));
+void TaskMan::WaitFor(const timeval & t, Task * T, int flags) {
+ struct timeval tod;
+ gettimeofday(&tod, 0);
+ w4to.push_back(w4to_t(TimeOps::Add(tod, t), flags, T));
}
+// The big ugly mainloop.
void TaskMan::MainLoop() throw (GeneralException) {
struct pollfd * ufsd;
unsigned int nfds;
@@ -311,6 +398,10 @@ void TaskMan::MainLoop() throw (GeneralException) {
}
if (stopped) return;
+
+ // We should have a small fuse here, and check if there is no 'dead' task,
+ // that is, a task which doesn't have anything to wait for, but which is
+ // not a zombie.
// cerr << "-=- TaskMan: begin main loop with " << number << " task to manage.\n";
if (!TaskList.empty()) {
@@ -326,7 +417,7 @@ void TaskMan::MainLoop() throw (GeneralException) {
while (!no_burst) {
no_burst = 1;
/* First, we will check for any burning task and run 'em */
- event = E_BURST;
+ event = Task::EVT_BURST;
if (!TaskList.empty()) {
for (TaskList_t::iterator p = TaskList.begin(); p != TaskList.end(); p++) {
Task * t = *p;
@@ -335,84 +426,118 @@ void TaskMan::MainLoop() throw (GeneralException) {
continue;
}
- if (t->GetState() == TASK_BURST) {
+ if (t->GetState() == Task::TASK_BURST) {
// cerr << "-=- TaskMan: running burning task " << t->GetName() << endl;
+ if (t->BurstHandle) {
+ // This task got the burst status from
+ // a non-watchable handle. Let's fake the handle event.
+ event = Task::EVT_HANDLE;
+ ehandle = t->BurstHandle;
+ }
t->Run();
+ if (t->BurstHandle) {
+ event = Task::EVT_BURST;
+ t->BurstHandle = 0;
+ }
/* if the task added some new tasks, we have to rerun the loop */
no_burst = 0;
break;
}
- if (t->GetState() == TASK_DONE) {
- TaskList.erase(p);
- number--;
- p = TaskList.begin();
- Zombies.push_back(t);
- }
+ // Additionnally, if the task died, let's put it in the zombies list.
+ // This check is done on the whole TaskList at each loop.
+ if (CheckDead(t))
+ p = TaskList.begin();
}
}
}
- /* Let's compute the nearest timeout, and eventually, launch the outdated timeouts. */
+ /* Let's compute the nearest timeout, and run a round trip for expired timeouts. */
int timeout = -1;
- event = E_TIMEOUT;
-
- if (!w4to.empty()) {
- time_t curtime = time(NULL);
+ event = Task::EVT_TIMEOUT;
+ bool got_timeout = true, ran_on_timeout = false;
+
+ while (!w4to.empty() && got_timeout) {
+ got_timeout = false;
+ struct timeval current;
+ gettimeofday(&current, 0);
+ timeout = -1;
for (std::vector<w4to_t>::iterator p = w4to.begin(); p != w4to.end(); p++) {
- int cur_to;
- cur_to = (p->to.tv_sec - curtime) * 1000 + p->to.tv_usec;
- if (cur_to < 0) {
-
- }
+ if (TimeOps::IsLE(p->to, current)) {
+ // This timeout expired; run the task.
+ w4to_t w4 = *p;
+ w4to.erase(p);
+ w4.T->Run();
+ ran_on_timeout = true;
+ got_timeout = true;
+ CheckDead(w4.T);
+ break;
+ } else {
+ // Otherwise, let's keep track of the smallest timeout value.
+ long diff = TimeOps::Diff(current, p->to);
+ if ((timeout == -1) || (timeout > diff)) {
+ timeout = diff;
+ }
+ }
}
}
- /* Now is time to check all the handle and enter into a wait state. */
-
- event = E_HANDLE;
// cerr << "-=- TaskMan: processing handle-waiting tasks.\n";
+ if ((w4ha.size() == 0) && !ran_on_timeout && (timeout != -1) && (Zombies.size() == 0)) {
+ // In order to avoid the engine to run full speed without anything real to do,
+ // let's eat the timeout here.
+ TimeOps::Sleep(timeout);
+ }
+
+ /* Now is time to check all the handle and enter into a wait state. */
+
+ event = Task::EVT_HANDLE;
nfds = w4ha.size();
no_burst = 1;
-
- if (nfds != 0) {
+
+ if (nfds != 0) {
int r;
std::vector<w4ha_t>::iterator p;
struct pollfd * q;
ufsd = (struct pollfd *) malloc(nfds * sizeof(struct pollfd));
- if (!w4ha.empty()) {
- for (q = ufsd, p = w4ha.begin(); p != w4ha.end(); p++, q++) {
- p->dirthy = false;
- if (p->T->IsStopped()) {
- q->fd = 0;
- q->events = 0;
+ // Let's build the pollfd structure.
+ for (q = ufsd, p = w4ha.begin(); p != w4ha.end(); p++, q++) {
+ p->dirty = false;
+ // A stopped task doesn't get cookies.
+ if (p->T->IsStopped()) {
+ q->fd = 0;
+ q->events = 0;
+ } else {
+ if (p->ha->CanWatch()) {
+ // If that handle can be watched, then let's fill in the pollfd structure.
+ q->fd = p->ha->GetHandle();
+ q->events = (p->flags & Task::W4_READING ? POLLIN : 0) | (p->flags & Task::W4_WRITING ? POLLOUT : 0);
} else {
- if (p->ha->CanWatch()) {
- q->fd = p->ha->GetHandle();
- q->events = (p->flags & W4_READING ? POLLIN : 0) | (p->flags & W4_WRITING ? POLLOUT : 0);
- } else {
- p->T->SetBurst();
- no_burst = 0;
- p->dirthy = true;
- q->fd = 0;
- q->events = 0;
- }
+ // Otherwise, let's put the task in burst mode, and set its status to dirty,
+ // considering it already processed for this handle loop.
+ // It'll be run immediately next cycle.
+ p->T->SetBurst();
+ no_burst = 0;
+ p->dirty = true;
+ q->fd = 0;
+ q->events = 0;
}
- }
- for (p = w4ha.begin(); p != w4ha.end(); p++) {
- if (!p->T->IsStopped() && !p->ha->CanWatch() && !(p->flags & W4_STICKY)) {
- w4ha.erase(p);
- p = w4ha.begin();
- }
- }
+ }
}
+ // Now the list is done, let's clean up the w4ha list from the non watchable handles.
+ for (p = w4ha.begin(); p != w4ha.end(); p++) {
+ if (!p->T->IsStopped() && !p->ha->CanWatch() && !(p->flags & Task::W4_STICKY)) {
+ w4ha.erase(p);
+ p = w4ha.begin();
+ }
+ }
#ifndef _WIN32
sigprocmask(SIG_UNBLOCK, &sigchildset, 0);
#endif
- r = poll(ufsd, nfds, (no_burst) && !(Zombies.size()) && !(got_sigchild) ? -1: 0);
+ r = poll(ufsd, nfds, (no_burst) && !(Zombies.size()) && !(got_sigchild) ? timeout: 0);
#ifndef _WIN32
sigprocmask(SIG_BLOCK, &sigchildset, 0);
#endif
@@ -422,10 +547,8 @@ void TaskMan::MainLoop() throw (GeneralException) {
}
} else if (r == 0) {
// timeout.
- // **FIXME**
-#ifndef _WIN32
-#warning FIXME
-#endif
+ // We shouldn't do anything here, since the next loop should catch
+ // the expired timeouts.
} else {
int fd;
struct pollfd * q;
@@ -450,13 +573,13 @@ void TaskMan::MainLoop() throw (GeneralException) {
if (!w4ha.empty()) {
for (std::vector<w4ha_t>::iterator p = w4ha.begin(); p != w4ha.end(); p = touched ? w4ha.begin() : p + 1) {
touched = false;
- if ((p->ha->GetHandle() == fd) && (!p->T->IsStopped()) && (p->T->GetState() != TASK_DONE) && (!p->dirthy)) {
+ if ((p->ha->GetHandle() == fd) && (!p->T->IsStopped()) && (p->T->GetState() != Task::TASK_DONE) && (!p->dirty)) {
// We've got one, launch it.
// cerr << "-=- TaskMan: launching task " << p->T->GetName() << " for handle " << p->ha->GetHandle() << endl;
w4ha_t w4 = *p;
- p->dirthy = true;
+ p->dirty = true;
- if (!(p->flags & W4_STICKY)) {
+ if (!(p->flags & Task::W4_STICKY)) {
w4ha.erase(p);
}
@@ -464,18 +587,7 @@ void TaskMan::MainLoop() throw (GeneralException) {
ehandle = p->ha;
w4.T->Run();
-
- if (w4.T->GetState() == TASK_DONE) {
- // This task died, remove it.
- try {
- std::vector<Task *>::iterator q = FindTask(w4.T);
- TaskList.erase(q);
- number--;
- Zombies.push_back(w4.T);
- }
- catch (TaskNotFound) {
- }
- }
+ CheckDead(w4.T);
}
}
}
@@ -490,7 +602,7 @@ void TaskMan::MainLoop() throw (GeneralException) {
int no_zombies;
no_zombies = 0;
- event = E_TASK;
+ event = Task::EVT_TASK;
// cerr << "-=- TaskMan: processing zombies loop.\n";
while (!no_zombies) {
@@ -507,14 +619,8 @@ void TaskMan::MainLoop() throw (GeneralException) {
// cerr << "-=- TaskMan: running task " << o->GetName() << " for task " << t->GetName() << endl;
etask = t;
o->Run();
-
- if (o->GetState() == TASK_DONE) {
- TaskList_t::iterator f = FindTask(o);
- TaskList.erase(f);
- number--;
- Zombies.push_back(o);
- no_zombies = 0;
- }
+ if (CheckDead(o))
+ no_zombies = 0;
} else {
delete t;
}
@@ -522,9 +628,9 @@ void TaskMan::MainLoop() throw (GeneralException) {
}
}
- /* To end up the loop, let's recall task waiting for processes */
+ /* To end up the loop, let's recall tasks waiting for processes */
- event = E_PROCESS;
+ event = Task::EVT_PROCESS;
// cerr << "-=- TaskMan: processing child-waiting tasks.\n";
if (got_sigchild) {
@@ -549,3 +655,14 @@ void TaskMan::MainLoop() throw (GeneralException) {
}
}
}
+
+bool TaskMan::CheckDead(Task * t) {
+ if (t->GetState() == Task::TASK_DONE) {
+ TaskList_t::iterator f = FindTask(t);
+ TaskList.erase(f);
+ number--;
+ Zombies.push_back(t);
+ return true;
+ }
+ return false;
+}