#ifndef _WIN32 #include #include #include #include #include #endif #include #include #include #include #ifdef HAVE_CONFIG_H #include "config.h" #endif #include "TaskMan.h" #include "gettext.h" TaskMan::TaskList_t TaskMan::TaskList; TaskMan::TaskList_t TaskMan::Zombies; std::vector TaskMan::w4ha; std::vector TaskMan::w4pr; std::vector TaskMan::w4to; bool TaskMan::stopped = false; int TaskMan::number = 0; bool TaskMan::inited = false; int TaskMan::event, TaskMan::eprocess, TaskMan::estatus; Task * TaskMan::etask; Handle * TaskMan::ehandle; sigset_t TaskMan::sigchildset; static int got_sigchild = 0; #ifndef _WIN32 void taskman_sigchild(int sig) { int status; pid_t pid; pid = wait(&status); if (TaskMan::GotChild(pid, status)) { got_sigchild++; } else { TaskMan::WaitFor(pid, 0, status); } // cerr << "Got SIGCHILD, pid = " << pid << " and status = " << status << endl; signal(SIGCHLD, taskman_sigchild); } void taskman_sighole(int sig) { signal(sig, taskman_sighole); } #endif #ifdef _WIN32 #define POLLIN 1 /* Set if data to read. */ #define POLLPRI 2 /* Set if urgent data to read. */ #define POLLOUT 4 /* Set if writing data wouldn't block. */ #define POLLERR 8 /* An error occured. */ #define POLLHUP 16 /* Shutdown or close happened. */ #define POLLNVAL 32 /* Invalid file descriptor. */ #define NPOLLFILE 64 /* Number of canonical fd's in one call to poll(). */ /* The following values are defined by XPG4. */ #define POLLRDNORM POLLIN #define POLLRDBAND POLLPRI #define POLLWRNORM POLLOUT #define POLLWRBAND POLLOUT struct pollfd { int fd; short events; short revents; }; int poll (struct pollfd *fds, unsigned int nfds, int timeout) { fd_set read_fds, write_fds, except_fds; struct timeval tv = { timeout / 1000, (timeout % 1000) * 1000 }; int max_fd = 0, i, retval, changedfds; FD_ZERO(&read_fds); FD_ZERO(&write_fds); FD_ZERO(&except_fds); for (int i = 0; i < nfds; i++) { if (!fds[i].fd && !fds[i].events) continue; if (fds[i].fd > max_fd) max_fd = fds[i].fd; if (fds[i].events & POLLIN) FD_SET(fds[i].fd, &read_fds); if (fds[i].events & POLLOUT) FD_SET(fds[i].fd, &write_fds); FD_SET(fds[i].fd, &except_fds); fds[i].revents = 0; } changedfds = retval = select(max_fd + 1, &read_fds, &write_fds, &except_fds, timeout < 0 ? NULL : &tv); if (retval <= 0) { // got timeout or error return retval; } for (i = 0; i < nfds; i++) { if (FD_ISSET(fds[i].fd, &read_fds)) { fds[i].revents |= POLLIN; changedfds--; } if (FD_ISSET(fds[i].fd, &write_fds)) { fds[i].revents |= POLLOUT; changedfds--; } if (FD_ISSET(fds[i].fd, &except_fds)) { fds[i].revents |= POLLERR; changedfds--; } if (changedfds <= 0) break; } return retval; } #endif int TaskMan::GotChild(pid_t pid, int status) { int r = 0; unsigned int i; for (i = 0; i < w4pr.size(); i++) { if (w4pr[i].pr == pid) { w4pr[i].flag = true; w4pr[i].status = status; r = 1; } } return r; } void TaskMan::Init() throw (GeneralException) { if (inited) { throw GeneralException(_("Task Manager already initialised.")); } #ifndef _WIN32 signal(SIGCHLD, taskman_sigchild); signal(SIGPIPE, taskman_sighole); signal(SIGHUP, taskman_sighole); sigemptyset(&sigchildset); sigaddset(&sigchildset, SIGCHLD); sigprocmask(SIG_BLOCK, &sigchildset, 0); #endif inited = true; number = 0; } void TaskMan::Stop() { stopped = true; } int TaskMan::Event() { return event; } Task * TaskMan::Etask() { return etask; } Handle * TaskMan::Ehandle() { return ehandle; } int TaskMan::Eprocess() { return eprocess; } int TaskMan::Estatus() { return estatus; } void TaskMan::AddTask(Task * t) { if (!inited) { Init(); } if (t) { TaskList.push_back(t); number++; } } std::vector::iterator TaskMan::FindTask(Task * t) throw (GeneralException) { if (!inited) { Init(); } if (TaskList.empty()) throw TaskNotFound(); for (std::vector::iterator p = TaskList.begin(); p != TaskList.end(); p++) { if (*p == t) { return p; } } throw TaskNotFound(); } void TaskMan::RemoveFromWatches(Task * t) { if (!w4ha.empty()) { for (std::vector::iterator p = w4ha.begin(); p != w4ha.end(); p++) { if (p->T == t) { w4ha.erase(p); p--; } } } if (!w4pr.empty()) { for (std::vector::iterator p = w4pr.begin(); p != w4pr.end(); p++) { if (p->T == t) { w4pr.erase(p); p--; } } } if (!w4to.empty()) { for (std::vector::iterator p = w4to.begin(); p != w4to.end(); p++) { if (p->T == t) { w4to.erase(p); p--; } } } if (!TaskList.empty()) { for (TaskList_t::iterator p = TaskList.begin(); p != TaskList.end(); p++) { if ((*p)->WaitedBy() == t) { Zombies.push_back(*p); (*p)->RemoveFromWatches(); TaskList.erase(p); number--; p--; } else if ((*p) == t) { TaskList.erase(p); number--; p--; } } } } void TaskMan::WaitFor(Handle * h, Task * t, int flags) { h->SetNonBlock(); w4ha.push_back(w4ha_t(h, flags, t)); } void TaskMan::WaitFor(pid_t pid, Task * t, int status) { if (status == -1) { if (!w4pr.empty()) { for (std::vector::iterator p = w4pr.begin(); p != w4pr.end(); p++) { if (p->pr == pid) { p->T = t; p->flag = true; got_sigchild++; return; } } } } w4pr.push_back(w4pr_t(pid, t)); w4pr[w4pr.size() - 1].status = status; } void TaskMan::WaitFor(timeval t, Task * T, int flags) { w4to.push_back(w4to_t(t, flags, T)); } void TaskMan::MainLoop() throw (GeneralException) { struct pollfd * ufsd; unsigned int nfds; int no_burst; if (!inited) { Init(); } while (1) { if (number == 0) { throw GeneralException(_("TaskMan: No more task to manage.")); } if (stopped) return; // cerr << "-=- TaskMan: begin main loop with " << number << " task to manage.\n"; if (!TaskList.empty()) { for (TaskList_t::iterator p = TaskList.begin(); p != TaskList.end(); p++) { Task * t = *p; // cerr << "-=- TaskMan: task " << t->GetName() << endl; } } // cerr << "-=- TaskMan: processing burning tasks.\n"; no_burst = 0; while (!no_burst) { no_burst = 1; /* First, we will check for any burning task and run 'em */ event = E_BURST; if (!TaskList.empty()) { for (TaskList_t::iterator p = TaskList.begin(); p != TaskList.end(); p++) { Task * t = *p; if (t->IsStopped()) { continue; } if (t->GetState() == TASK_BURST) { // cerr << "-=- TaskMan: running burning task " << t->GetName() << endl; t->Run(); /* if the task added some new tasks, we have to rerun the loop */ no_burst = 0; break; } if (t->GetState() == TASK_DONE) { TaskList.erase(p); number--; p = TaskList.begin(); Zombies.push_back(t); } } } } /* Let's compute the nearest timeout, and eventually, launch the outdated timeouts. */ int timeout = -1; event = E_TIMEOUT; if (!w4to.empty()) { time_t curtime = time(NULL); for (std::vector::iterator p = w4to.begin(); p != w4to.end(); p++) { int cur_to; cur_to = (p->to.tv_sec - curtime) * 1000 + p->to.tv_usec; if (cur_to < 0) { } } } /* Now is time to check all the handle and enter into a wait state. */ event = E_HANDLE; // cerr << "-=- TaskMan: processing handle-waiting tasks.\n"; nfds = w4ha.size(); no_burst = 1; if (nfds != 0) { int r; std::vector::iterator p; struct pollfd * q; ufsd = (struct pollfd *) malloc(nfds * sizeof(struct pollfd)); if (!w4ha.empty()) { for (q = ufsd, p = w4ha.begin(); p != w4ha.end(); p++, q++) { p->dirthy = false; if (p->T->IsStopped()) { q->fd = 0; q->events = 0; } else { if (p->ha->CanWatch()) { q->fd = p->ha->GetHandle(); q->events = (p->flags & W4_READING ? POLLIN : 0) | (p->flags & W4_WRITING ? POLLOUT : 0); } else { p->T->SetBurst(); no_burst = 0; p->dirthy = true; q->fd = 0; q->events = 0; } } } for (p = w4ha.begin(); p != w4ha.end(); p++) { if (!p->T->IsStopped() && !p->ha->CanWatch() && !(p->flags & W4_STICKY)) { w4ha.erase(p); p = w4ha.begin(); } } } #ifndef _WIN32 sigprocmask(SIG_UNBLOCK, &sigchildset, 0); #endif r = poll(ufsd, nfds, (no_burst) && !(Zombies.size()) && !(got_sigchild) ? -1: 0); #ifndef _WIN32 sigprocmask(SIG_BLOCK, &sigchildset, 0); #endif if (r < 0) { if (errno != EINTR) { throw GeneralException(String(_("Error during poll: ")) + strerror(errno)); } } else if (r == 0) { // timeout. // **FIXME** #ifndef _WIN32 #warning FIXME #endif } else { int fd; struct pollfd * q; unsigned int i; for (q = ufsd, i = 0; i < nfds; i++, q++) { if (q->revents & POLLNVAL) { throw GeneralException(String(_("Error with poll, handle ")) + q->fd + _(" invalid.")); } // if (q->revents & POLLERR) { // cerr << _("Error condition with poll, handle ") << q->fd << endl; // } // if (q->revents & POLLHUP) { // cerr << _("Handle ") << q->fd << _(" hung up.\n"); // } fd = q->fd; if (q->revents & (POLLIN | POLLOUT | POLLERR | POLLHUP)) { // We have to look into the handle structure now... bool touched; if (!w4ha.empty()) { for (std::vector::iterator p = w4ha.begin(); p != w4ha.end(); p = touched ? w4ha.begin() : p + 1) { touched = false; if ((p->ha->GetHandle() == fd) && (!p->T->IsStopped()) && (p->T->GetState() != TASK_DONE) && (!p->dirthy)) { // We've got one, launch it. // cerr << "-=- TaskMan: launching task " << p->T->GetName() << " for handle " << p->ha->GetHandle() << endl; w4ha_t w4 = *p; p->dirthy = true; if (!(p->flags & W4_STICKY)) { w4ha.erase(p); } touched = true; ehandle = p->ha; w4.T->Run(); if (w4.T->GetState() == TASK_DONE) { // This task died, remove it. try { std::vector::iterator q = FindTask(w4.T); TaskList.erase(q); number--; Zombies.push_back(w4.T); } catch (TaskNotFound) { } } } } } } } } free(ufsd); } /* And finally, let's clean-up all the zombies around here. */ int no_zombies; no_zombies = 0; event = E_TASK; // cerr << "-=- TaskMan: processing zombies loop.\n"; while (!no_zombies) { no_zombies = 1; while (Zombies.size()) { Task * t = Zombies[0], * o; if (!t) { // cerr << "!?!?!? We have t = NULL ?!?!?! WTF\n"; break; } if ((o = t->WaitedBy())) { // cerr << "-=- TaskMan: running task " << o->GetName() << " for task " << t->GetName() << endl; etask = t; o->Run(); if (o->GetState() == TASK_DONE) { TaskList_t::iterator f = FindTask(o); TaskList.erase(f); number--; Zombies.push_back(o); no_zombies = 0; } } else { delete t; } Zombies.erase(Zombies.begin()); } } /* To end up the loop, let's recall task waiting for processes */ event = E_PROCESS; // cerr << "-=- TaskMan: processing child-waiting tasks.\n"; if (got_sigchild) { if (!w4pr.empty()) { for (std::vector::iterator p = w4pr.begin(); p != w4pr.end(); p++) { if (p->flag) { Task * t; if (p->T->IsStopped()) { continue; } eprocess = p->pr; estatus = p->status; // cerr << "-=- TaskMan: running task " << p->T->GetName() << " for process " << p->pr << " (" << p->status << ")\n"; t = p->T; w4pr.erase(p); got_sigchild--; t->Run(); break; } } } } } }