/* * Baltisot * Copyright (C) 1999-2007 Nicolas "Pixel" Noble * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ /* $Id: TaskMan.cc,v 1.46 2007-05-30 11:57:10 pixel Exp $ */ #ifndef _WIN32 #include #include #include #include #include #endif #include #include #include #include #ifdef HAVE_CONFIG_H #include "config.h" #endif #include "TaskMan.h" #include "gettext.h" TaskMan::TaskList_t TaskMan::TaskList; TaskMan::TaskList_t TaskMan::Zombies; std::vector TaskMan::w4ha; std::vector TaskMan::w4pr; std::vector TaskMan::w4to; bool TaskMan::stopped = false; int TaskMan::number = 0; int TaskMan::got_sigchild = 0; bool TaskMan::inited = false; int TaskMan::event, TaskMan::eprocess, TaskMan::estatus; Task * TaskMan::etask; Handle * TaskMan::ehandle; sigset_t TaskMan::sigchildset; #ifndef _WIN32 void taskman_sigchild(int sig) { TaskMan::SigChild(); signal(SIGCHLD, taskman_sigchild); } void taskman_sigpipe(int sig) { signal(sig, taskman_sigpipe); } void taskman_sighup(int sig) { signal(sig, taskman_sighup); } #endif void TaskMan::SigChild() { int status; pid_t pid; pid = wait(&status); if (GotChild(pid, status)) { got_sigchild++; } else { WaitFor(pid, 0, status); } // cerr << "Got SIGCHILD, pid = " << pid << " and status = " << status << endl; } // Windows implementations of poll and gettimeofday. #ifdef _WIN32 #define POLLIN 1 /* Set if data to read. */ #define POLLPRI 2 /* Set if urgent data to read. */ #define POLLOUT 4 /* Set if writing data wouldn't block. */ #define POLLERR 8 /* An error occured. */ #define POLLHUP 16 /* Shutdown or close happened. */ #define POLLNVAL 32 /* Invalid file descriptor. */ #define NPOLLFILE 64 /* Number of canonical fd's in one call to poll(). */ /* The following values are defined by XPG4. */ #define POLLRDNORM POLLIN #define POLLRDBAND POLLPRI #define POLLWRNORM POLLOUT #define POLLWRBAND POLLOUT struct pollfd { int fd; short events; short revents; }; static int poll (struct pollfd *fds, unsigned int nfds, int timeout) { fd_set read_fds, write_fds, except_fds; struct timeval tv = { timeout / 1000, (timeout % 1000) * 1000 }; int max_fd = 0, retval, changedfds; unsigned int i; int n_non_socket = 0; FD_ZERO(&read_fds); FD_ZERO(&write_fds); FD_ZERO(&except_fds); for (int i = 0; i < nfds; i++) { if (!fds[i].fd && !fds[i].events) continue; BOOL dummy; int dummy_s = sizeof(dummy); if (getsockopt(fds[i].fd, SOL_SOCKET, SO_ACCEPTCONN, (char *) &dummy, &dummy_s) == SOCKET_ERROR) { if (WSAGetLastError() == WSAENOTSOCK) { if (fds[i].events & POLLIN) { n_non_socket++; fds[i].revents |= POLLIN; } continue; } } if (fds[i].fd > max_fd) max_fd = fds[i].fd; if (fds[i].events & POLLIN) FD_SET(fds[i].fd, &read_fds); if (fds[i].events & POLLOUT) FD_SET(fds[i].fd, &write_fds); FD_SET(fds[i].fd, &except_fds); fds[i].revents = 0; } if (n_non_socket) { tv.tv_sec = 0; tv.tv_usec = 0; timeout = 0; } changedfds = retval = select(max_fd + 1, &read_fds, &write_fds, &except_fds, timeout < 0 ? NULL : &tv); if (retval <= 0) { if (retval < 0) { int err = WSAGetLastError(); // got timeout or error switch (err) { case WSANOTINITIALISED: Base::printm(M_INFO, "WSANOTINITIALISED\n"); break; case WSAEFAULT: Base::printm(M_INFO, "WSAEFAULT\n"); break; case WSAENETDOWN: Base::printm(M_INFO, "WSAENETDOWN\n"); break; case WSAEINVAL: Base::printm(M_INFO, "WSAEINVAL\n"); break; case WSAEINTR: Base::printm(M_INFO, "WSAEINTR\n"); break; case WSAEINPROGRESS: Base::printm(M_INFO, "WSAEINPROGRESS\n"); break; case WSAENOTSOCK: Base::printm(M_INFO, "WSAENOTSOCK\n"); break; default: Base::printm(M_INFO, "Unknown error\n"); } return retval; } else { return n_non_socket; } } for (i = 0; i < nfds; i++) { if (FD_ISSET(fds[i].fd, &read_fds)) { fds[i].revents |= POLLIN; changedfds--; } if (FD_ISSET(fds[i].fd, &write_fds)) { fds[i].revents |= POLLOUT; changedfds--; } if (FD_ISSET(fds[i].fd, &except_fds)) { fds[i].revents |= POLLERR; changedfds--; } if (changedfds <= 0) break; } return retval + n_non_socket; } #define EPOCHFILETIME (116444736000000000i64) // special version without timezone support... static int gettimeofday(struct timeval *tv, struct timezone *tz) { FILETIME ft; LARGE_INTEGER li; __int64 t; if (tv) { GetSystemTimeAsFileTime(&ft); li.LowPart = ft.dwLowDateTime; li.HighPart = ft.dwHighDateTime; t = li.QuadPart; /* In 100-nanosecond intervals */ t -= EPOCHFILETIME; /* Offset to the Epoch time */ t /= 10; /* In microseconds */ tv->tv_sec = (long)(t / 1000000); tv->tv_usec = (long)(t % 1000000); } return 0; } #endif // Static class to handle operations on the timeval structure. class TimeOps : public Base { public: static timeval Add(const timeval & t1, const timeval & t2) { struct timeval r; r.tv_usec = t1.tv_usec + t2.tv_usec; r.tv_sec = t1.tv_sec + t2.tv_sec; if (r.tv_usec >= 1000000) { r.tv_usec %= 1000000; r.tv_sec++; } return r; } static long Diff(const timeval & t1, const timeval & t2) { return (t2.tv_sec - t1.tv_sec) * 1000 + (t2.tv_usec - t1.tv_usec) / 1000; } static bool IsLE(const timeval & t1, const timeval & t2) { return (t1.tv_sec <= t2.tv_sec) || ((t1.tv_sec == t2.tv_sec) && (t1.tv_usec <= t2.tv_usec)); } static long ToMS(const timeval & t) { return t.tv_usec / 1000 + t.tv_sec * 1000; } static void Sleep(const timeval & t) { Sleep(ToMS(t)); } static void Sleep(Uint32 t) { #ifdef _WIN32 ::Sleep(t); #else usleep(t * 1000); #endif } }; int TaskMan::GotChild(pid_t pid, int status) { int r = 0; unsigned int i; for (i = 0; i < w4pr.size(); i++) { if (w4pr[i].pr == pid) { w4pr[i].flag = true; w4pr[i].status = status; r = 1; } } return r; } void TaskMan::Init() throw (GeneralException) { if (inited) { throw GeneralException(_("Task Manager already initialised.")); } #ifndef _WIN32 signal(SIGCHLD, taskman_sigchild); signal(SIGPIPE, taskman_sigpipe); signal(SIGHUP, taskman_sighup); sigemptyset(&sigchildset); sigaddset(&sigchildset, SIGCHLD); sigprocmask(SIG_BLOCK, &sigchildset, 0); #endif inited = true; number = 0; } void TaskMan::Stop() { stopped = true; } // Accessor of the current processed event. int TaskMan::Event() { return event; } // Accessor of the current task triggering the EVT_TASK event. Task * TaskMan::Etask() { return etask; } // Accessor of the current handle triggering the EVT_HANDLE event. Handle * TaskMan::Ehandle() { return ehandle; } // accessor of the current pid_t triggering the EVT_PROCESS event. int TaskMan::Eprocess() { return eprocess; } // Accessor of the status of the processus triggering the EVT_PROCESS event. int TaskMan::Estatus() { return estatus; } // Should be only called by the constructor of a Task() void TaskMan::AddTask(Task * t) { if (!inited) { Init(); } if (t) { TaskList.push_back(t); number++; } } std::vector::iterator TaskMan::FindTask(Task * t) throw (GeneralException) { if (!inited) { Init(); } if (TaskList.empty()) throw TaskNotFound(); for (std::vector::iterator p = TaskList.begin(); p != TaskList.end(); p++) { if (*p == t) { return p; } } throw TaskNotFound(); } // Should only be called by the destructor of a task. void TaskMan::RemoveFromWatches(Task * t) { if (!w4ha.empty()) { for (std::vector::iterator p = w4ha.begin(); p != w4ha.end(); p++) { if (p->T == t) { w4ha.erase(p); p = w4ha.begin(); if (w4ha.empty()) break; } } } if (!w4pr.empty()) { for (std::vector::iterator p = w4pr.begin(); p != w4pr.end(); p++) { if (p->T == t) { w4pr.erase(p); p = w4pr.begin(); if (w4pr.empty()) break; } } } if (!w4to.empty()) { for (std::vector::iterator p = w4to.begin(); p != w4to.end(); p++) { if (p->T == t) { w4to.erase(p); p = w4to.begin(); if (w4to.empty()) break; } } } if (!TaskList.empty()) { for (TaskList_t::iterator p = TaskList.begin(); p != TaskList.end(); p++) { if ((*p)->WaitedBy() == t) { Zombies.push_back(*p); (*p)->RemoveFromWatches(); TaskList.erase(p); number--; p = TaskList.begin(); if (TaskList.empty()) break; } else if ((*p) == t) { TaskList.erase(p); number--; p = TaskList.begin(); if (TaskList.empty()) break; } } } } // Possibility to cancel a timeout. void TaskMan::RemoveTimeout(Task * t) { if (!w4to.empty()) { for (std::vector::iterator p = w4to.begin(); p != w4to.end(); p++) { if (p->T == t) { w4to.erase(p); p = w4to.begin(); if (w4to.empty()) break; } } } } void TaskMan::WaitFor(Handle * h, Task * t, int flags) { h->SetNonBlock(); w4ha.push_back(w4ha_t(h, flags, t)); } void TaskMan::WaitFor(pid_t pid, Task * t, int status) { if (status == -1) { if (!w4pr.empty()) { for (std::vector::iterator p = w4pr.begin(); p != w4pr.end(); p++) { if (p->pr == pid) { p->T = t; p->flag = true; got_sigchild++; return; } } } } w4pr.push_back(w4pr_t(pid, t)); w4pr[w4pr.size() - 1].status = status; } void TaskMan::WaitFor(const timeval & t, Task * T, int flags) { struct timeval tod; gettimeofday(&tod, 0); w4to.push_back(w4to_t(TimeOps::Add(tod, t), flags, T)); } // The big ugly mainloop. void TaskMan::MainLoop() throw (GeneralException) { struct pollfd * ufsd; unsigned int nfds; int no_burst; if (!inited) { Init(); } while (1) { if (number == 0) { throw GeneralException(_("TaskMan: No more task to manage.")); } if (stopped) return; // We should have a small fuse here, and check if there is no 'dead' task, // that is, a task which doesn't have anything to wait for, but which is // not a zombie. // cerr << "-=- TaskMan: begin main loop with " << number << " task to manage.\n"; if (!TaskList.empty()) { for (TaskList_t::iterator p = TaskList.begin(); p != TaskList.end(); p++) { Task * t = *p; // cerr << "-=- TaskMan: task " << t->GetName() << endl; } } // cerr << "-=- TaskMan: processing burning tasks.\n"; no_burst = 0; while (!no_burst) { no_burst = 1; /* First, we will check for any burning task and run 'em */ event = Task::EVT_BURST; if (!TaskList.empty()) { for (TaskList_t::iterator p = TaskList.begin(); p != TaskList.end(); p++) { Task * t = *p; if (t->IsStopped()) { continue; } if (t->GetState() == Task::TASK_BURST) { // cerr << "-=- TaskMan: running burning task " << t->GetName() << endl; if (t->BurstHandle) { // This task got the burst status from // a non-watchable handle. Let's fake the handle event. event = Task::EVT_HANDLE; ehandle = t->BurstHandle; } t->Run(); if (t->BurstHandle) { event = Task::EVT_BURST; t->BurstHandle = 0; } /* if the task added some new tasks, we have to rerun the loop */ no_burst = 0; break; } // Additionnally, if the task died, let's put it in the zombies list. // This check is done on the whole TaskList at each loop. if (CheckDead(t)) p = TaskList.begin(); } } } /* Let's compute the nearest timeout, and run a round trip for expired timeouts. */ int timeout = -1; event = Task::EVT_TIMEOUT; bool got_timeout = true, ran_on_timeout = false; while (!w4to.empty() && got_timeout) { got_timeout = false; struct timeval current; gettimeofday(¤t, 0); timeout = -1; for (std::vector::iterator p = w4to.begin(); p != w4to.end(); p++) { if (TimeOps::IsLE(p->to, current)) { // This timeout expired; run the task. w4to_t w4 = *p; w4to.erase(p); w4.T->Run(); ran_on_timeout = true; got_timeout = true; CheckDead(w4.T); break; } else { // Otherwise, let's keep track of the smallest timeout value. long diff = TimeOps::Diff(current, p->to); if ((timeout == -1) || (timeout > diff)) { timeout = diff; } } } } // cerr << "-=- TaskMan: processing handle-waiting tasks.\n"; if ((w4ha.size() == 0) && !ran_on_timeout && (timeout != -1) && (Zombies.size() == 0)) { // In order to avoid the engine to run full speed without anything real to do, // let's eat the timeout here. TimeOps::Sleep(timeout); } /* Now is time to check all the handle and enter into a wait state. */ event = Task::EVT_HANDLE; nfds = w4ha.size(); no_burst = 1; if (nfds != 0) { int r; std::vector::iterator p; struct pollfd * q; ufsd = (struct pollfd *) malloc(nfds * sizeof(struct pollfd)); // Let's build the pollfd structure. for (q = ufsd, p = w4ha.begin(); p != w4ha.end(); p++, q++) { p->dirty = false; // A stopped task doesn't get cookies. if (p->T->IsStopped()) { q->fd = 0; q->events = 0; } else { if (p->ha->CanWatch()) { // If that handle can be watched, then let's fill in the pollfd structure. q->fd = p->ha->GetHandle(); q->events = (p->flags & Task::W4_READING ? POLLIN : 0) | (p->flags & Task::W4_WRITING ? POLLOUT : 0); } else { // Otherwise, let's put the task in burst mode, and set its status to dirty, // considering it already processed for this handle loop. // It'll be run immediately next cycle. p->T->SetBurst(); no_burst = 0; p->dirty = true; q->fd = 0; q->events = 0; } } } // Now the list is done, let's clean up the w4ha list from the non watchable handles. for (p = w4ha.begin(); p != w4ha.end(); p++) { if (!p->T->IsStopped() && !p->ha->CanWatch() && !(p->flags & Task::W4_STICKY)) { w4ha.erase(p); p = w4ha.begin(); } } #ifndef _WIN32 sigprocmask(SIG_UNBLOCK, &sigchildset, 0); #endif r = poll(ufsd, nfds, (no_burst) && !(Zombies.size()) && !(got_sigchild) ? timeout: 0); #ifndef _WIN32 sigprocmask(SIG_BLOCK, &sigchildset, 0); #endif if (r < 0) { if (errno != EINTR) { throw GeneralException(String(_("Error during poll: ")) + strerror(errno)); } } else if (r == 0) { // timeout. // We shouldn't do anything here, since the next loop should catch // the expired timeouts. } else { int fd; struct pollfd * q; unsigned int i; for (q = ufsd, i = 0; i < nfds; i++, q++) { if (q->revents & POLLNVAL) { throw GeneralException(String(_("Error with poll, handle ")) + q->fd + _(" invalid.")); } // if (q->revents & POLLERR) { // cerr << _("Error condition with poll, handle ") << q->fd << endl; // } // if (q->revents & POLLHUP) { // cerr << _("Handle ") << q->fd << _(" hung up.\n"); // } fd = q->fd; if (q->revents & (POLLIN | POLLOUT | POLLERR | POLLHUP)) { // We have to look into the handle structure now... bool touched; if (!w4ha.empty()) { for (std::vector::iterator p = w4ha.begin(); p != w4ha.end(); p = touched ? w4ha.begin() : p + 1) { touched = false; if ((p->ha->GetHandle() == fd) && (!p->T->IsStopped()) && (p->T->GetState() != Task::TASK_DONE) && (!p->dirty)) { // We've got one, launch it. // cerr << "-=- TaskMan: launching task " << p->T->GetName() << " for handle " << p->ha->GetHandle() << endl; w4ha_t w4 = *p; p->dirty = true; ehandle = p->ha; if (!(p->flags & Task::W4_STICKY)) { w4ha.erase(p); } touched = true; w4.T->Run(); CheckDead(w4.T); } } } } } } free(ufsd); } /* And finally, let's clean-up all the zombies around here. */ int no_zombies; no_zombies = 0; event = Task::EVT_TASK; // cerr << "-=- TaskMan: processing zombies loop.\n"; while (!no_zombies) { no_zombies = 1; while (Zombies.size()) { Task * t = Zombies[0], * o; if (!t) { // cerr << "!?!?!? We have t = NULL ?!?!?! WTF\n"; break; } if ((o = t->WaitedBy())) { // cerr << "-=- TaskMan: running task " << o->GetName() << " for task " << t->GetName() << endl; etask = t; o->Run(); if (CheckDead(o)) no_zombies = 0; } else { delete t; } Zombies.erase(Zombies.begin()); } } /* To end up the loop, let's recall tasks waiting for processes */ event = Task::EVT_PROCESS; // cerr << "-=- TaskMan: processing child-waiting tasks.\n"; if (got_sigchild) { if (!w4pr.empty()) { for (std::vector::iterator p = w4pr.begin(); p != w4pr.end(); p++) { if (p->flag) { Task * t; if (p->T->IsStopped()) { continue; } eprocess = p->pr; estatus = p->status; // cerr << "-=- TaskMan: running task " << p->T->GetName() << " for process " << p->pr << " (" << p->status << ")\n"; t = p->T; w4pr.erase(p); got_sigchild--; t->Run(); break; } } } } } } bool TaskMan::CheckDead(Task * t) { if (t->GetState() == Task::TASK_DONE) { TaskList_t::iterator f = FindTask(t); TaskList.erase(f); number--; Zombies.push_back(t); return true; } return false; }