diff options
-rw-r--r-- | include/Exceptions.h | 10 | ||||
-rw-r--r-- | include/Handle.h | 4 | ||||
-rw-r--r-- | include/String.h | 2 | ||||
-rw-r--r-- | include/Task.h | 7 | ||||
-rw-r--r-- | include/TaskMan.h | 4 | ||||
-rw-r--r-- | lib/CopyJob.cc | 14 | ||||
-rw-r--r-- | lib/HttpServ.cc | 50 | ||||
-rw-r--r-- | lib/Input.cc | 2 | ||||
-rw-r--r-- | lib/ReadJob.cc | 11 | ||||
-rw-r--r-- | lib/String.cc | 36 | ||||
-rw-r--r-- | lib/Task.cc | 21 | ||||
-rw-r--r-- | lib/TaskMan.cc | 228 |
12 files changed, 317 insertions, 72 deletions
diff --git a/include/Exceptions.h b/include/Exceptions.h index 052002e..090ace9 100644 --- a/include/Exceptions.h +++ b/include/Exceptions.h @@ -45,13 +45,13 @@ class String; class Base { public: - char * strdup(const char * s) const { + static char * strdup(const char * s) { return xstrdup(s); } - void * malloc(ssize_t s) const { + static void * malloc(ssize_t s) { return xmalloc(s); } - void * realloc(void * p, size_t s) const { + static void * realloc(void * p, size_t s) { return xrealloc(p, s); } void * operator new(size_t s) { @@ -63,10 +63,10 @@ class Base { void operator delete(void * p) { xfree(p); } - void free(void *& p) const { + static void free(void *& p) { xfree(p); } - void free(char *& p) const { + static void free(char *& p) { xfree((void *) p); } }; diff --git a/include/Handle.h b/include/Handle.h index 237adff..14eb92d 100644 --- a/include/Handle.h +++ b/include/Handle.h @@ -36,11 +36,11 @@ class Handle : public Base { virtual bool CanRead(); virtual bool CanWrite(); virtual String GetName(); + void close(); + int GetHandle(); protected: Handle(int h); - int GetHandle(); - void close(); private: int h; bool closed, nonblock; diff --git a/include/String.h b/include/String.h index 94faa54..0e8372f 100644 --- a/include/String.h +++ b/include/String.h @@ -80,7 +80,7 @@ class String : public Base { private: String(int hs, const char *); - static char t[BUFSIZ]; + static char t[]; char * str; size_t siz; }; diff --git a/include/Task.h b/include/Task.h index cdc51c0..f80258d 100644 --- a/include/Task.h +++ b/include/Task.h @@ -13,6 +13,8 @@ #define TASK_BURST 2 #define W4_STICKY 1 +#define W4_READING 2 +#define W4_WRITING 4 class Task : public Base { public: @@ -21,11 +23,12 @@ class Task : public Base { virtual String GetName(); int Run(); int GetState(); - void Suspend() throw (GeneralException); + void Suspend(int = -1) throw (GeneralException); void WaitFor(Task *); void WaitFor(Handle *, int = 0); void WaitFor(pid_t); void WaitFor(timeval, int = 0); + Task * WaitedBy(); void SetBurst(); void SetCleanUp(); bool HasToClean(); @@ -48,7 +51,7 @@ class Task : public Base { bool stopped; bool cleanup; bool suspended; - vector<wbta_t> wbta; + Task * wbta; }; #else diff --git a/include/TaskMan.h b/include/TaskMan.h index e649956..03c0681 100644 --- a/include/TaskMan.h +++ b/include/TaskMan.h @@ -7,7 +7,9 @@ class TaskMan : public Base { public: - static int AddTask(Task *); + static void AddTask(Task *); + static vector<Task *>::iterator FindTask(Task *); + static void RemoveFromWatches(Task *); static void Init() throw (GeneralException); static void MainLoop() throw (GeneralException); static void WaitFor(Handle *, Task *, int = 0); diff --git a/lib/CopyJob.cc b/lib/CopyJob.cc index 3b01092..7b4694a 100644 --- a/lib/CopyJob.cc +++ b/lib/CopyJob.cc @@ -2,8 +2,9 @@ #include "General.h" CopyJob::CopyJob(Handle * as, Handle * ad, ssize_t asiz, bool ads) : s(as), d(ad), ds(ads), siz(asiz), cursiz(0), r(0) { - WaitFor(s, W4_STICKY); - WaitFor(d, W4_STICKY); + WaitFor(s, W4_STICKY | W4_READING); + WaitFor(d, W4_STICKY | W4_WRITING); + cerr << "Creating a copyjob from " << s->GetName() << " to " << d->GetName() << " of " << siz << " bytes.\n"; } CopyJob::~CopyJob() { } @@ -11,13 +12,18 @@ CopyJob::~CopyJob() { } int CopyJob::Do() throw (GeneralException) { int tr; + cerr << GetName() << " running...\n"; + switch (current) { case 0: tr = siz >= 0 ? siz - cursiz : COPY_BUFSIZ; + cerr << "Reading " << tr << " bytes.\n"; try { r = s->read(buffer, MIN(COPY_BUFSIZ, tr)); + cerr << "Got " << r << " bytes.\n"; } catch (IOAgain e) { + cerr << "Not enough bytes. Suspending.\n"; Suspend(); } case 1: @@ -25,17 +31,19 @@ int CopyJob::Do() throw (GeneralException) { return TASK_DONE; } try { + cerr << "Writing " << r << " bytes.\n"; d->write(buffer, r); } catch (IOAgain e) { current = 1; + cerr << "No more byte in the output. Suspending.\n"; Suspend(); } current = 0; } cursiz += r; - if (!s->IsClosed() || (siz != cursiz)) { + if (!s->IsClosed() && (siz != cursiz)) { throw TaskSwitch(); } diff --git a/lib/HttpServ.cc b/lib/HttpServ.cc index c534741..9ab2959 100644 --- a/lib/HttpServ.cc +++ b/lib/HttpServ.cc @@ -46,18 +46,18 @@ String ProcessRequest::GetName() { int ProcessRequest::Do() { switch (current) { case 0: + if (!s.IsConnected()) return TASK_DONE; + c = new ReadJob(&s, &b); WaitFor(c); current = 1; - Suspend(); + Suspend(TASK_ON_HOLD); case 1: delete c; bad = false; - - if (!s.IsConnected()) return TASK_DONE; - + cerr << "Got a request\n----\n"; post = ParseUri(file, domain, &b); @@ -65,7 +65,7 @@ int ProcessRequest::Do() { len = -1; do { b >> t; - cerr << t << endl; + // cerr << t << endl; if ((t.strstr("Content-Length: ") == 0) || (t.strstr("Content-length: ") == 0)) { cerr << "Saw 'Content-Lenght:', reading length from '" << t.extract(16) << "'\n"; len = t.extract(16).to_int(); @@ -89,17 +89,23 @@ int ProcessRequest::Do() { Vars = new Variables(); } - c = new CopyJob(&s, &b, len); - WaitFor(c); current = 2; - Suspend(); + if (hasvars && (len)) { + c = new CopyJob(&s, &b, len); + WaitFor(c); + Suspend(); + } else { + c = 0; + } case 2: - delete c; - ParseVars(&s, len); + if (hasvars) { + if (c) delete c; + ParseVars(&b, len); + } cerr << " Domain = '" << domain << "' - File = '" << file << "'\n"; - + if (!bad) { // Nous vérifions le domaine. if (domain != "") { @@ -248,16 +254,26 @@ bool ProcessRequest::ParseUri(String & file, String & domain, Handle * s) { // p nous indiquera la position de la chaîne URL. switch (t[0]) { case 'P': /* POST? */ + cerr << "Do we have a POST request? "; if (t.extract(1, 4) == "OST ") { + cerr << "Yes.\n"; p = t.to_charp(5); post = true; - break; + } else { + cerr << "No.\n"; + bad = true; } + break; case 'G': /* GET? */ + cerr << "Do we have a GET request? "; if (t.extract(1, 3) == "ET ") { + cerr << "Yes.\n"; p = t.to_charp(4); - break; + } else { + cerr << "No.\n"; + bad = true; } + break; default: bad = true; } @@ -375,16 +391,20 @@ HttpServ::HttpServ(Action * ap, int port, const String & nname) throw (GeneralEx } Listener.SetNonBlock(); - WaitFor(&Listener, W4_STICKY); + WaitFor(&Listener, W4_STICKY | W4_READING); cerr << "Mini HTTP-Server '" << name << "' ready and listening for port " << port << endl; } int HttpServ::Do() { try { + Task * r; + Socket s = Listener.Accept(); s.SetNonBlock(); - new ProcessRequest(p, s, name, localport); + r = new ProcessRequest(p, s, name, localport); + r->SetBurst(); + r->SetCleanUp(); } catch (GeneralException) { } diff --git a/lib/Input.cc b/lib/Input.cc index ad8991a..df0222a 100644 --- a/lib/Input.cc +++ b/lib/Input.cc @@ -16,6 +16,8 @@ Input::Input(String no) throw (GeneralException) : if (GetHandle() < 0) { throw IOGeneral(String("Error opening file") + no + " for reading: " + strerror(errno)); } + + cerr << "File " << no << " is opened with handle " << GetHandle() << endl; } bool Input::CanWrite() { diff --git a/lib/ReadJob.cc b/lib/ReadJob.cc index f2749bb..6fab08b 100644 --- a/lib/ReadJob.cc +++ b/lib/ReadJob.cc @@ -2,8 +2,8 @@ #include "HttpServ.h" ReadJob::ReadJob(Handle * as, Handle * ad) : s(as), d(ad) { - WaitFor(s, W4_STICKY); - WaitFor(d, W4_STICKY); + WaitFor(s, W4_STICKY | W4_READING); + WaitFor(d, W4_STICKY | W4_WRITING); } ReadJob::~ReadJob() { } @@ -11,30 +11,23 @@ ReadJob::~ReadJob() { } int ReadJob::Do() throw (GeneralException) { String buff; - cerr << "ReadJob running...\n"; - switch (current) { case 0: try { - cerr << "Trying to read...\n"; *s >> buff; } catch (IOAgain e) { - cerr << "Suspending ReadJob to wait for reading...\n"; throw TaskSwitch(); } - cerr << "Read some bytes...\n"; case 1: try { *d << buff << endnl; } catch (IOAgain e) { - cerr << "Suspending ReadJob to wait for writing...\n"; current = 1; throw TaskSwitch(); } current = 0; - cerr << "Wrote some bytes...\n"; if (buff == "") return TASK_DONE; } diff --git a/lib/String.cc b/lib/String.cc index d587b75..39ea9d8 100644 --- a/lib/String.cc +++ b/lib/String.cc @@ -10,15 +10,15 @@ extern "C" { int isDateArgument(char *); } -char String::t[BUFSIZ]; +char String::t[BUFSIZ + 1]; String::String(const String & s) : str(::strdup(s.str)), siz(s.siz) { } String::String(char c) : siz(1) { - static char t[2]; + char * t = (char *) malloc(2); sprintf(t, "%c", c); - str = ::strdup(t); + str = t; } String::String(const char * s) : str(s ? ::strdup(s) : ::strdup("")) { @@ -61,13 +61,18 @@ const char * String::set(char * s, ...) { const char * String::to_charp(size_t from, ssize_t to) const { if (to < 0) { - strcpy(t, &(str[from])); + strncpy(t, &(str[from]), BUFSIZ); } else { if (to >= siz) { to = siz - 1; } + + if ((to - from) > BUFSIZ) { + from -= (to - from) - BUFSIZ; + } + if (to >= from) { - int i; + int i; for (i = 0; i <= to - from; i++) { t[i] = str[i + from]; } @@ -112,16 +117,25 @@ String & String::operator=(const String & s) { } String String::operator+(const String & s) const { - strcpy(t, str); - strcat(t, s.str); - return String(siz + s.siz, t); + char * t = (char *) malloc(s.siz + siz + 1), * u; + String o; + + strcpy((u = t), str); + u += siz; + strcpy(u, s.str); + o = String(siz + s.siz, t); + free(t); + return o; } String & String::operator+=(const String & s) { - strcpy(t, str); - strcat(t, s.str); + char * t = (char *) malloc(s.siz + siz + 1), * u; + + strcpy((u = t), str); + u += siz; + strcat(u, s.str); free(str); - str = ::strdup(t); + str = t; siz += s.siz; return (*this); } diff --git a/lib/Task.cc b/lib/Task.cc index 4f36c7b..9ed1589 100644 --- a/lib/Task.cc +++ b/lib/Task.cc @@ -4,11 +4,12 @@ #include "Task.h" #include "String.h" -Task::Task() : current(0), state(TASK_ON_HOLD), stopped(false), cleanup(false), suspended(false) { +Task::Task() : current(0), state(TASK_ON_HOLD), stopped(false), cleanup(false), suspended(false), wbta(0) { TaskMan::AddTask(this); } Task::~Task() { + TaskMan::RemoveFromWatches(this); } int Task::Do() throw (GeneralException) { @@ -16,22 +17,16 @@ int Task::Do() throw (GeneralException) { } int Task::Run() { - cerr << "Running task '" << GetName() << "'...\n"; try { - cerr << "Launching method Do()...\n"; state = Do(); } catch (TaskSwitch) { - cerr << "Catch a task switching.\n"; return state; } catch (GeneralException e) { - cerr << "Task " << GetName() << " caused an unexpected exception: '" << e.GetMsg() << "', closing it.\n"; return TASK_DONE; } - cerr << "Task exitted normally.\n"; - return state; } @@ -43,8 +38,10 @@ String Task::GetName() { return "Unknow Task"; } -void Task::Suspend() throw (GeneralException) { - cerr << "Suspending task " << GetName() << "...\n"; +void Task::Suspend(int newstate) throw (GeneralException) { + if (newstate != -1) { + state = newstate; + } suspended = true; throw TaskSwitch(); } @@ -54,7 +51,7 @@ void Task::WaitFor(Handle * h, int flags) { } void Task::WaitFor(Task * t) { - t->wbta.push_back(wbta_t(this)); + t->wbta = this; } void Task::WaitFor(pid_t p) { @@ -88,3 +85,7 @@ void Task::Restart() { bool Task::IsStopped() { return stopped; } + +Task * Task::WaitedBy() { + return wbta; +} diff --git a/lib/TaskMan.cc b/lib/TaskMan.cc index bab9380..97fd1bd 100644 --- a/lib/TaskMan.cc +++ b/lib/TaskMan.cc @@ -1,7 +1,16 @@ #include <signal.h> #include <wait.h> +#include <sys/poll.h> +#include <errno.h> +#include <string.h> +#include <sys/time.h> +#include <sys/types.h> +#include <unistd.h> #include <vector.h> #include "TaskMan.h" +#include "config.h" + +#define USE_POLL 1 TaskMan::TaskList_t TaskMan::TaskList; TaskMan::TaskList_t TaskMan::Zombies; @@ -34,18 +43,58 @@ void TaskMan::Init() throw (GeneralException) { number = 0; } -int TaskMan::AddTask(Task * t) { +void TaskMan::AddTask(Task * t) { if (!inited) { Init(); } - TaskList.push_back(t); - number++; + + if (t) { + TaskList.push_back(t); + number++; + } +} - return 0; +vector<Task *>::iterator TaskMan::FindTask(Task * t) { + if (!inited) { + Init(); + } + + for (TaskList_t::iterator p = TaskList.begin(); p && (p != TaskList.end()); p++) { + if (*p == t) { + return p; + } + } + + return NULL; +} + +void TaskMan::RemoveFromWatches(Task * t) { + for (vector<w4ha_t>::iterator p = w4ha.begin(); p && (p != w4ha.end()); p++) { + if (p->T == t) { + w4ha.erase(p); + p--; + } + } + + for (vector<w4pr_t>::iterator p = w4pr.begin(); p && (p != w4pr.end()); p++) { + if (p->T == t) { + w4pr.erase(p); + p--; + } + } + + for (vector<w4to_t>::iterator p = w4to.begin(); p && (p != w4to.end()); p++) { + if (p->T == t) { + w4to.erase(p); + p--; + } + } } void TaskMan::WaitFor(Handle * h, Task * t, int flags) { - w4ha.push_back(w4ha_t(h, flags, t)); + if (h->GetHandle() >= 0) { + w4ha.push_back(w4ha_t(h, flags, t)); + } } void TaskMan::WaitFor(pid_t p, Task * t) { @@ -57,8 +106,8 @@ void TaskMan::WaitFor(timeval t, Task * T, int flags) { } void TaskMan::MainLoop() throw (GeneralException) { - TaskList_t::iterator p; - Task * t; + struct pollfd * ufsd; + unsigned int nfds; int no_burst; @@ -67,8 +116,6 @@ void TaskMan::MainLoop() throw (GeneralException) { } while (1) { - cerr << "TaskMan: Begin of main loop with " << number << " task to handle.\n"; - if (number == 0) { throw GeneralException("TaskMan: No more task to manage."); } @@ -77,11 +124,14 @@ void TaskMan::MainLoop() throw (GeneralException) { while (!no_burst) { no_burst = 1; /* First, we will check for any burning task and run 'em */ - for (p = TaskList.begin(); p && (p != TaskList.end()); p++) { - t = *p; + for (TaskList_t::iterator p = TaskList.begin(); p && (p != TaskList.end()); p++) { + Task * t = *p; if (t->GetState() == TASK_BURST) { t->Run(); + /* if the task added some new tasks, we have to rerun the loop */ + no_burst = 0; + break; } if (t->GetState() == TASK_BURST) { @@ -98,10 +148,162 @@ void TaskMan::MainLoop() throw (GeneralException) { } } } - } + + nfds = w4ha.size(); -#ifdef HAVE_POLL + if (nfds != 0) { + int r; + vector<w4ha_t>::iterator p; +#ifdef USE_POLL + struct pollfd * q; #else + int highest; + fd_set readfds, writefds, exceptfds; #endif +#ifdef USE_POLL + ufsd = (struct pollfd *) malloc(nfds * sizeof(struct pollfd)); + for (q = ufsd, p = w4ha.begin(); p && (p != w4ha.end()); p++, q++) { + if (p->T->IsStopped()) { + q->fd = 0; + q->events = 0; + } else { + q->fd = p->ha->GetHandle(); + q->events = (p->flags & W4_READING ? POLLIN : 0) | (p->flags & W4_WRITING ? POLLOUT : 0); + } + } + + r = poll(ufsd, nfds, -1); +#else + FD_ZERO(readfds); + FD_ZERO(writefds); + FD_ZERO(exceptfds); + + highest = -1; + for (p = w4ha.begin(); p && (p != w4ha.end()); p++) { + if (p->T->IsStopped()) continue; + if (p->flags & W4_READING) { + FD_SET(p->ha->GetHandle(), readfds); + } + if (p->flags & W4_WRITING) { + FD_SET(p->ha->GetHandle(), writefds); + } + FD_SET(p->ha->GetHandle(), exceptfds); + } + + r = select(highest + 1, &readfds, &writefds, &exceptfds, NULL); +#endif + if (r == -1) { + throw GeneralException(String("Error during poll: ") + strerror(errno)); + } else if (r == 0) { + // timeout... + } else { + int fd; +#ifdef USE_POLL + struct pollfd * q; + int i; + for (q = ufsd, i = 0; i < nfds; i++, q++) { + + if (q->revents & POLLNVAL) { + throw GeneralException(String("Error with poll, handle ") + q->fd + " invalid."); + } + + if (q->revents & POLLERR) { + throw GeneralException(String("Error condition whith poll, handle ") + q->fd); + } + + if (q->revents & POLLHUP) { + cerr << "Handle " << q->fd << " hung up."; + // What should I do now? + } + + fd = q->fd; + if (q->revents & (POLLIN | POLLOUT)) { +#else + /* Later perhaps... Let's use poll for now. + The following is independant of the use of select or poll. + Just have to set 'fd' to the changed handle. Two '{' to open with + the first as the loop through the handles and the second with + the test "if handle has changed" */ +#endif + // We have to look into the handle structure now... + for (vector<w4ha_t>::iterator p = w4ha.begin(); p && (p != w4ha.end()); p++) { + if ((p->ha->GetHandle() == fd) && (!p->T->IsStopped())) { + // We've got one, launch it. + bool erased; + + erased = false; + p->T->Run(); + + if (p->T->GetState() == TASK_DONE) { + // This task died, remove it. + TaskList_t::iterator q = FindTask(p->T); + if (q) { + TaskList.erase(q); + number--; + if (p->T->HasToClean()) { + delete p->T; + } else { + Zombies.push_back(p->T); + } + w4ha.erase(p); + p--; + erased = true; + } else { + // Hu-ho, something wrong... + throw GeneralException("TaskMan: internal error (task not found)"); + } + } + + if (!erased && !(p->flags & W4_STICKY)) { + w4ha.erase(p); + p--; + } + } + } + } + } + } +#ifdef USE_POLL + free((void *) ufsd); +#endif + } + + int no_zombies; + no_zombies = 0; + + while (!no_zombies) { + no_zombies = 1; + while (Zombies.size()) { + Task * t = Zombies[0], * o; + + if (!t) { + cerr << "!?!?!? We have t = NULL ?!?!?! WTF\n"; + break; + } + + if ((o = t->WaitedBy())) { + o->Run(); + + if (o->GetState() == TASK_DONE) { + TaskList_t::iterator f = FindTask(o); + if (!f) { + throw GeneralException("TaskMan: internal error (task not found)"); + } + TaskList.erase(f); + number--; + if (o->HasToClean()) { + delete o; + } else { + Zombies.push_back(o); + no_zombies = 0; + } + } + } else { + delete t; + } + Zombies.erase(Zombies.begin()); + } + } + } } |