#include #include #include #include #include #include #include #include #include #ifdef HAVE_CONFIG_H #include "config.h" #endif #include "TaskMan.h" #include "gettext.h" #define USE_POLL 1 TaskMan::TaskList_t TaskMan::TaskList; TaskMan::TaskList_t TaskMan::Zombies; std::vector TaskMan::w4ha; std::vector TaskMan::w4pr; std::vector TaskMan::w4to; bool TaskMan::stopped = false; int TaskMan::number = 0; bool TaskMan::inited = false; int TaskMan::event, TaskMan::eprocess, TaskMan::estatus; Task * TaskMan::etask; Handle * TaskMan::ehandle; sigset_t TaskMan::sigchildset; static int got_sigchild = 0; void taskman_sigchild(int sig) { int status; pid_t pid; pid = wait(&status); if (TaskMan::GotChild(pid, status)) { got_sigchild++; } else { TaskMan::WaitFor(pid, 0, status); } // cerr << "Got SIGCHILD, pid = " << pid << " and status = " << status << endl; signal(SIGCHLD, taskman_sigchild); } void taskman_sighole(int sig) { signal(sig, taskman_sighole); } int TaskMan::GotChild(pid_t pid, int status) { int r = 0; unsigned int i; for (i = 0; i < w4pr.size(); i++) { if (w4pr[i].pr == pid) { w4pr[i].flag = true; w4pr[i].status = status; r = 1; } } return r; } void TaskMan::Init() throw (GeneralException) { if (inited) { throw GeneralException(_("Task Manager already initialised.")); } signal(SIGCHLD, taskman_sigchild); signal(SIGPIPE, taskman_sighole); signal(SIGHUP, taskman_sighole); sigemptyset(&sigchildset); sigaddset(&sigchildset, SIGCHLD); sigprocmask(SIG_BLOCK, &sigchildset, 0); inited = true; number = 0; } void TaskMan::Stop() { stopped = true; } int TaskMan::Event() { return event; } Task * TaskMan::Etask() { return etask; } Handle * TaskMan::Ehandle() { return ehandle; } int TaskMan::Eprocess() { return eprocess; } int TaskMan::Estatus() { return estatus; } void TaskMan::AddTask(Task * t) { if (!inited) { Init(); } if (t) { TaskList.push_back(t); number++; } } std::vector::iterator TaskMan::FindTask(Task * t) throw (GeneralException) { if (!inited) { Init(); } if (TaskList.empty()) throw TaskNotFound(); for (std::vector::iterator p = TaskList.begin(); p != TaskList.end(); p++) { if (*p == t) { return p; } } throw TaskNotFound(); } void TaskMan::RemoveFromWatches(Task * t) { if (!w4ha.empty()) { for (std::vector::iterator p = w4ha.begin(); p != w4ha.end(); p++) { if (p->T == t) { w4ha.erase(p); p--; } } } if (!w4pr.empty()) { for (std::vector::iterator p = w4pr.begin(); p != w4pr.end(); p++) { if (p->T == t) { w4pr.erase(p); p--; } } } if (!w4to.empty()) { for (std::vector::iterator p = w4to.begin(); p != w4to.end(); p++) { if (p->T == t) { w4to.erase(p); p--; } } } if (!TaskList.empty()) { for (TaskList_t::iterator p = TaskList.begin(); p != TaskList.end(); p++) { if ((*p)->WaitedBy() == t) { Zombies.push_back(*p); (*p)->RemoveFromWatches(); TaskList.erase(p); number--; p--; } else if ((*p) == t) { TaskList.erase(p); number--; p--; } } } } void TaskMan::WaitFor(Handle * h, Task * t, int flags) { h->SetNonBlock(); w4ha.push_back(w4ha_t(h, flags, t)); } void TaskMan::WaitFor(pid_t pid, Task * t, int status) { if (status == -1) { if (!w4pr.empty()) { for (std::vector::iterator p = w4pr.begin(); p != w4pr.end(); p++) { if (p->pr == pid) { p->T = t; p->flag = true; got_sigchild++; return; } } } } w4pr.push_back(w4pr_t(pid, t)); w4pr[w4pr.size() - 1].status = status; } void TaskMan::WaitFor(timeval t, Task * T, int flags) { w4to.push_back(w4to_t(t, flags, T)); } void TaskMan::MainLoop() throw (GeneralException) { struct pollfd * ufsd; unsigned int nfds; int no_burst; if (!inited) { Init(); } while (1) { if (number == 0) { throw GeneralException(_("TaskMan: No more task to manage.")); } if (stopped) return; // cerr << "-=- TaskMan: begin main loop with " << number << " task to manage.\n"; if (!TaskList.empty()) { for (TaskList_t::iterator p = TaskList.begin(); p != TaskList.end(); p++) { Task * t = *p; // cerr << "-=- TaskMan: task " << t->GetName() << endl; } } // cerr << "-=- TaskMan: processing burning tasks.\n"; no_burst = 0; while (!no_burst) { no_burst = 1; /* First, we will check for any burning task and run 'em */ event = E_BURST; if (!TaskList.empty()) { for (TaskList_t::iterator p = TaskList.begin(); p != TaskList.end(); p++) { Task * t = *p; if (t->IsStopped()) { continue; } if (t->GetState() == TASK_BURST) { // cerr << "-=- TaskMan: running burning task " << t->GetName() << endl; t->Run(); /* if the task added some new tasks, we have to rerun the loop */ no_burst = 0; break; } if (t->GetState() == TASK_DONE) { TaskList.erase(p); number--; p--; Zombies.push_back(t); } } } } /* Let's compute the nearest timeout, and eventually, launch the outdated timeouts. */ int timeout = -1; event = E_TIMEOUT; if (!w4to.empty()) { time_t curtime = time(NULL); for (std::vector::iterator p = w4to.begin(); p != w4to.end(); p++) { int cur_to; cur_to = (p->to.tv_sec - curtime) * 1000 + p->to.tv_usec; if (cur_to < 0) { } } } /* Now is time to check all the handle and enter into a wait state. */ event = E_HANDLE; // cerr << "-=- TaskMan: processing handle-waiting tasks.\n"; nfds = w4ha.size(); no_burst = 1; if (nfds != 0) { int r; std::vector::iterator p; struct pollfd * q; ufsd = (struct pollfd *) malloc(nfds * sizeof(struct pollfd)); if (!w4ha.empty()) { for (q = ufsd, p = w4ha.begin(); p != w4ha.end(); p++, q++) { p->dirthy = false; if (p->T->IsStopped()) { q->fd = 0; q->events = 0; } else { if (p->ha->CanWatch()) { q->fd = p->ha->GetHandle(); q->events = (p->flags & W4_READING ? POLLIN : 0) | (p->flags & W4_WRITING ? POLLOUT : 0); } else { p->T->SetBurst(); no_burst = 0; p->dirthy = true; if (!(p->flags & W4_STICKY)) { w4ha.erase(p); p--; } q->fd = 0; q->events = 0; } } } } sigprocmask(SIG_UNBLOCK, &sigchildset, 0); r = poll(ufsd, nfds, (no_burst) && !(Zombies.size()) && !(got_sigchild) ? -1: 0); sigprocmask(SIG_BLOCK, &sigchildset, 0); if (r < 0) { if (errno != EINTR) { throw GeneralException(String(_("Error during poll: ")) + strerror(errno)); } } else if (r == 0) { // timeout. // **FIXME** #warning FIXME } else { int fd; struct pollfd * q; unsigned int i; for (q = ufsd, i = 0; i < nfds; i++, q++) { if (q->revents & POLLNVAL) { throw GeneralException(String(_("Error with poll, handle ")) + q->fd + _(" invalid.")); } // if (q->revents & POLLERR) { // cerr << _("Error condition with poll, handle ") << q->fd << endl; // } // if (q->revents & POLLHUP) { // cerr << _("Handle ") << q->fd << _(" hung up.\n"); // } fd = q->fd; if (q->revents & (POLLIN | POLLOUT | POLLERR | POLLHUP)) { // We have to look into the handle structure now... bool touched; if (!w4ha.empty()) { for (std::vector::iterator p = w4ha.begin(); p != w4ha.end(); p = touched ? w4ha.begin() : p + 1) { touched = false; if ((p->ha->GetHandle() == fd) && (!p->T->IsStopped()) && (p->T->GetState() != TASK_DONE) && (!p->dirthy)) { // We've got one, launch it. // cerr << "-=- TaskMan: launching task " << p->T->GetName() << " for handle " << p->ha->GetHandle() << endl; w4ha_t w4 = *p; p->dirthy = true; if (!(p->flags & W4_STICKY)) { w4ha.erase(p); } touched = true; ehandle = p->ha; w4.T->Run(); if (w4.T->GetState() == TASK_DONE) { // This task died, remove it. try { std::vector::iterator q = FindTask(w4.T); TaskList.erase(q); number--; Zombies.push_back(w4.T); } catch (TaskNotFound) { } } } } } } } } free(ufsd); } /* And finally, let's clean-up all the zombies around here. */ int no_zombies; no_zombies = 0; event = E_TASK; // cerr << "-=- TaskMan: processing zombies loop.\n"; while (!no_zombies) { no_zombies = 1; while (Zombies.size()) { Task * t = Zombies[0], * o; if (!t) { // cerr << "!?!?!? We have t = NULL ?!?!?! WTF\n"; break; } if ((o = t->WaitedBy())) { // cerr << "-=- TaskMan: running task " << o->GetName() << " for task " << t->GetName() << endl; etask = t; o->Run(); if (o->GetState() == TASK_DONE) { TaskList_t::iterator f = FindTask(o); TaskList.erase(f); number--; Zombies.push_back(o); no_zombies = 0; } } else { delete t; } Zombies.erase(Zombies.begin()); } } /* To end up the loop, let's recall task waiting for processes */ event = E_PROCESS; // cerr << "-=- TaskMan: processing child-waiting tasks.\n"; if (got_sigchild) { if (!w4pr.empty()) { for (std::vector::iterator p = w4pr.begin(); p != w4pr.end(); p++) { if (p->flag) { Task * t; if (p->T->IsStopped()) { continue; } eprocess = p->pr; estatus = p->status; // cerr << "-=- TaskMan: running task " << p->T->GetName() << " for process " << p->pr << " (" << p->status << ")\n"; t = p->T; w4pr.erase(p); got_sigchild--; t->Run(); break; } } } } } }