#include #include #include #include #include #include #include #include #include #include "TaskMan.h" #include "config.h" #define USE_POLL 1 TaskMan::TaskList_t TaskMan::TaskList; TaskMan::TaskList_t TaskMan::Zombies; vector TaskMan::w4ha; vector TaskMan::w4pr; vector TaskMan::w4to; int TaskMan::number = 0; bool TaskMan::inited = false; static int got_sigchild = 0; static vector process; static int nbprocess = 0; void taskman_sigchild(int sig) { got_sigchild = 1; process.push_back(wait(0)); signal(SIGCHLD, taskman_sigchild); nbprocess++; } void TaskMan::Init() throw (GeneralException) { if (inited) { throw GeneralException("Task Manager already initialised."); } signal(SIGCHLD, taskman_sigchild); inited = true; number = 0; } void TaskMan::AddTask(Task * t) { if (!inited) { Init(); } if (t) { TaskList.push_back(t); number++; } } vector::iterator TaskMan::FindTask(Task * t) { if (!inited) { Init(); } for (TaskList_t::iterator p = TaskList.begin(); p && (p != TaskList.end()); p++) { if (*p == t) { return p; } } return NULL; } void TaskMan::RemoveFromWatches(Task * t) { for (vector::iterator p = w4ha.begin(); p && (p != w4ha.end()); p++) { if (p->T == t) { w4ha.erase(p); p--; } } for (vector::iterator p = w4pr.begin(); p && (p != w4pr.end()); p++) { if (p->T == t) { w4pr.erase(p); p--; } } for (vector::iterator p = w4to.begin(); p && (p != w4to.end()); p++) { if (p->T == t) { w4to.erase(p); p--; } } } void TaskMan::WaitFor(Handle * h, Task * t, int flags) { if (h->GetHandle() >= 0) { w4ha.push_back(w4ha_t(h, flags, t)); } } void TaskMan::WaitFor(pid_t p, Task * t) { w4pr.push_back(w4pr_t(p, t)); } void TaskMan::WaitFor(timeval t, Task * T, int flags) { w4to.push_back(w4to_t(t, flags, T)); } void TaskMan::MainLoop() throw (GeneralException) { struct pollfd * ufsd; unsigned int nfds; int no_burst; if (!inited) { Init(); } while (1) { if (number == 0) { throw GeneralException("TaskMan: No more task to manage."); } cerr << "==== TaskMan: main loop.\n"; no_burst = 0; while (!no_burst) { no_burst = 1; /* First, we will check for any burning task and run 'em */ for (TaskList_t::iterator p = TaskList.begin(); p && (p != TaskList.end()); p++) { Task * t = *p; if (t->GetState() == TASK_BURST) { cerr << "==== TaskMan: running burning task \"" << t->GetName() << "\".\n"; t->Run(); /* if the task added some new tasks, we have to rerun the loop */ no_burst = 0; break; } if (t->GetState() == TASK_BURST) { no_burst = 0; } else if (t->GetState() == TASK_DONE) { TaskList.erase(p); number--; p--; if (t->HasToClean()) { delete t; } else { Zombies.push_back(t); } } } } nfds = w4ha.size(); cerr << "==== TaskMan: polling.\n"; if (nfds != 0) { int r; vector::iterator p; #ifdef USE_POLL struct pollfd * q; #else int highest; fd_set readfds, writefds, exceptfds; #endif #ifdef USE_POLL ufsd = (struct pollfd *) malloc(nfds * sizeof(struct pollfd)); for (q = ufsd, p = w4ha.begin(); p && (p != w4ha.end()); p++, q++) { if (p->T->IsStopped()) { q->fd = 0; q->events = 0; } else { cerr << "==== TaskMan: adding watch over handle \"" << p->ha->GetName() << "\" for task \"" << p->T->GetName() << "\"\n"; q->fd = p->ha->GetHandle(); q->events = (p->flags & W4_READING ? POLLIN : 0) | (p->flags & W4_WRITING ? POLLOUT : 0); } } r = poll(ufsd, nfds, -1); #else FD_ZERO(readfds); FD_ZERO(writefds); FD_ZERO(exceptfds); highest = -1; for (p = w4ha.begin(); p && (p != w4ha.end()); p++) { if (p->T->IsStopped()) continue; if (p->flags & W4_READING) { FD_SET(p->ha->GetHandle(), readfds); } if (p->flags & W4_WRITING) { FD_SET(p->ha->GetHandle(), writefds); } FD_SET(p->ha->GetHandle(), exceptfds); } r = select(highest + 1, &readfds, &writefds, &exceptfds, NULL); #endif if (r == -1) { throw GeneralException(String("Error during poll: ") + strerror(errno)); } else if (r == 0) { // timeout... } else { int fd; #ifdef USE_POLL struct pollfd * q; int i; for (q = ufsd, i = 0; i < nfds; i++, q++) { if (q->revents & POLLNVAL) { throw GeneralException(String("Error with poll, handle ") + q->fd + " invalid."); } if (q->revents & POLLERR) { throw GeneralException(String("Error condition whith poll, handle ") + q->fd); } if (q->revents & POLLHUP) { cerr << "Handle " << q->fd << " hung up."; // What should I do now? } fd = q->fd; if (q->revents & (POLLIN | POLLOUT)) { #else /* Later perhaps... Let's use poll for now. The following is independant of the use of select or poll. Just have to set 'fd' to the changed handle. Two '{' to open with the first as the loop through the handles and the second with the test "if handle has changed" */ #endif // We have to look into the handle structure now... for (vector::iterator p = w4ha.begin(); p && (p != w4ha.end()); p++) { if ((p->ha->GetHandle() == fd) && (!p->T->IsStopped())) { // We've got one, launch it. bool erased; cerr << "==== TaskMan: event over handle \"" << p->ha->GetName() << "\"\n"; erased = false; p->T->Run(); if (p->T->GetState() == TASK_DONE) { // This task died, remove it. TaskList_t::iterator q = FindTask(p->T); if (q) { TaskList.erase(q); number--; if (p->T->HasToClean()) { delete p->T; } else { Zombies.push_back(p->T); } w4ha.erase(p); p--; erased = true; } else { // Hu-ho, something wrong... throw GeneralException("TaskMan: internal error (task not found)"); } } if (!erased && !(p->flags & W4_STICKY)) { w4ha.erase(p); p--; } } } } } } #ifdef USE_POLL free((void *) ufsd); #endif } int no_zombies; no_zombies = 0; while (!no_zombies) { no_zombies = 1; while (Zombies.size()) { Task * t = Zombies[0], * o; if (!t) { cerr << "!?!?!? We have t = NULL ?!?!?! WTF\n"; break; } if ((o = t->WaitedBy())) { o->Run(); if (o->GetState() == TASK_DONE) { TaskList_t::iterator f = FindTask(o); if (!f) { throw GeneralException("TaskMan: internal error (task not found)"); } TaskList.erase(f); number--; if (o->HasToClean()) { delete o; } else { Zombies.push_back(o); no_zombies = 0; } } } else { delete t; } Zombies.erase(Zombies.begin()); } } } }