diff options
-rw-r--r-- | includes/Async.h | 4 | ||||
-rw-r--r-- | includes/BString.h | 2 | ||||
-rw-r--r-- | includes/Exceptions.h | 22 | ||||
-rw-r--r-- | includes/Handle.h | 2 | ||||
-rw-r--r-- | includes/LuaTask.h | 20 | ||||
-rw-r--r-- | includes/SimpleMustache.h | 14 | ||||
-rw-r--r-- | includes/Socket.h | 2 | ||||
-rw-r--r-- | includes/StacklessTask.h | 81 | ||||
-rw-r--r-- | includes/Task.h | 94 | ||||
-rw-r--r-- | includes/TaskMan.h | 42 | ||||
-rw-r--r-- | includes/Threads.h | 58 | ||||
-rw-r--r-- | src/Async.cc | 6 | ||||
-rw-r--r-- | src/Handle.cc | 6 | ||||
-rw-r--r-- | src/HttpServer.cc | 12 | ||||
-rw-r--r-- | src/Input.cc | 8 | ||||
-rw-r--r-- | src/LuaTask.cc | 58 | ||||
-rw-r--r-- | src/Main.cc | 2 | ||||
-rw-r--r-- | src/Output.cc | 8 | ||||
-rw-r--r-- | src/SimpleMustache.cc | 12 | ||||
-rw-r--r-- | src/Socket.cc | 45 | ||||
-rw-r--r-- | src/Task.cc | 160 | ||||
-rw-r--r-- | src/TaskMan.cc | 102 | ||||
-rw-r--r-- | src/Threads.cc | 47 | ||||
-rw-r--r-- | src/ZHandle.cc | 4 | ||||
-rw-r--r-- | tests/test-Http.cc | 138 | ||||
-rw-r--r-- | tests/test-Sockets.cc | 11 | ||||
-rw-r--r-- | tests/test-Tasks.cc | 47 |
27 files changed, 773 insertions, 234 deletions
diff --git a/includes/Async.h b/includes/Async.h index bd6ce7e..15e0ac7 100644 --- a/includes/Async.h +++ b/includes/Async.h @@ -23,7 +23,7 @@ class AsyncOperation { protected: virtual ~AsyncOperation() { } private: - Queue<AsyncOperation> * m_idleQueue = NULL; + CQueue<AsyncOperation> * m_idleQueue = NULL; IdleReadyCallback_t m_idleReadyCallback = NULL; void * m_idleReadyParam = NULL; void finalize(); @@ -65,7 +65,7 @@ class AsyncManager : public Thread { void stopAllWorkers(); virtual void * proc(); struct TLS { - Queue<AsyncOperation> idleQueue; + CQueue<AsyncOperation> idleQueue; IdleReadyCallback_t idleReadyCallback; void * idleReadyParam; }; diff --git a/includes/BString.h b/includes/BString.h index 29f3d22..09e0d40 100644 --- a/includes/BString.h +++ b/includes/BString.h @@ -16,6 +16,8 @@ class String : private std::string { public: String() : std::string() { } String(const char * str) : std::string(str ? str : "") { } + template<size_t L> + String(const char (&str)[L]) : std::string(str, L - 1) { } String(const char * str, size_t n) : std::string(str ? str : "", str ? n : 0) { } String(char c) { set("%c", c); } String(int32_t i) { set("%i", i); } diff --git a/includes/Exceptions.h b/includes/Exceptions.h index 829a5d1..d7856fe 100644 --- a/includes/Exceptions.h +++ b/includes/Exceptions.h @@ -74,19 +74,19 @@ static inline void AssertHelperInner(const String & msg, const char * details = throw GeneralException(msg, details); } -static inline void AssertHelper(const String & msg, const char * fmt = NULL, ...) __attribute__((format(printf, 2, 3))); +static inline void AssertHelper(const String & msg, const char * fmt, ...) __attribute__((format(printf, 2, 3))); static inline void AssertHelper(const String & msg, const char * fmt, ...) { - if (fmt) { - String details; - va_list ap; - va_start(ap, fmt); - details.set(fmt, ap); - va_end(ap); - AssertHelperInner(msg, details.to_charp()); - } else { - AssertHelperInner(msg); - } + String details; + va_list ap; + va_start(ap, fmt); + details.set(fmt, ap); + va_end(ap); + AssertHelperInner(msg, details.to_charp()); +} + +static inline void AssertHelper(const String & msg) { + AssertHelperInner(msg); } class TestException : public GeneralException { diff --git a/includes/Handle.h b/includes/Handle.h index 5634583..7c502ea 100644 --- a/includes/Handle.h +++ b/includes/Handle.h @@ -40,7 +40,7 @@ class Handle { virtual const char * getName() = 0; virtual ssize_t read(void * buf, size_t count) throw (GeneralException); virtual ssize_t write(const void * buf, size_t count) throw (GeneralException); - template <ssize_t L> void writeString(const char str[L]) { writeString(str, L - 1); } + template <size_t L> void writeString(const char (&str)[L]) { writeString(str, L - 1); } void writeString(const char * str, ssize_t len) { if (len < 0) len = strlen(str); forceWrite(str, len); } void writeString(const String & str) { forceWrite(str.to_charp(), str.strlen()); } void seek(off_t offset, int whence = SEEK_SET) { rseek(offset, whence); } diff --git a/includes/LuaTask.h b/includes/LuaTask.h index 48cef97..37d64fa 100644 --- a/includes/LuaTask.h +++ b/includes/LuaTask.h @@ -11,7 +11,7 @@ class LuaMainTask; class LuaExecCell { public: - LuaExecCell() : m_detached(false) { } + LuaExecCell(); virtual ~LuaExecCell() { } void detach() { m_detached = true; } void exec(LuaMainTask * mainTask); @@ -19,7 +19,7 @@ class LuaExecCell { virtual void run(Lua &) = 0; private: Events::Async m_event; - bool m_detached; + bool m_detached = false; friend class LuaTask; }; @@ -31,6 +31,14 @@ class LuaExecString : public LuaExecCell { String m_str; }; +class LuaExecFile : public LuaExecCell { + public: + LuaExecFile(IO<Handle> file) : m_file(file) { } + private: + virtual void run(Lua &); + IO<Handle> m_file; +}; + class LuaTask : public Task { public: ~LuaTask() { L.weaken(); } @@ -45,17 +53,15 @@ class LuaTask : public Task { class LuaMainTask : public Task { public: - LuaMainTask() : m_stopping(false) { } - ~LuaMainTask() { L.close(); } + LuaMainTask(); + ~LuaMainTask(); void stop(); virtual const char * getName() const { return "LuaMainTask"; } private: void exec(LuaExecCell * cell); virtual void Do(); Lua L; - Events::Async m_queueEvent; - Queue<LuaExecCell> m_queue; - bool m_stopping; + TQueue<LuaExecCell> m_queue; friend class LuaExecCell; }; diff --git a/includes/SimpleMustache.h b/includes/SimpleMustache.h index 9d16377..28cb477 100644 --- a/includes/SimpleMustache.h +++ b/includes/SimpleMustache.h @@ -80,11 +80,13 @@ class SimpleMustache { IO<Buffer> b(new Buffer(str, s)); setTemplate(b); } - void setTemplate(const char * str, ssize_t s = -1) { setTemplate((const uint8_t *) str, s); } + template<size_t S> + void setTemplate(const char (&str)[S]) { setTemplate((const uint8_t *) str, S - 1); } + void setTemplate(const char * str, ssize_t s) { setTemplate((const uint8_t *) str, s); } void setTemplate(const String & str) { setTemplate((const uint8_t *) str.to_charp(), str.strlen()); } - void render(IO<Handle> h, Context * ctx) { AAssert(ctx, "Please pass on a context to render"); render_r(h, ctx, "", m_fragments.begin(), false, -1); } + void render(IO<Handle> h, Context * ctx) const { AAssert(ctx, "Please pass on a context to render"); render_r(h, ctx, "", m_fragments.begin(), false, -1); } void empty() { while (!m_fragments.empty()) { delete m_fragments.front(); m_fragments.pop_front(); } } - void checkTemplate() { Fragments::iterator end = checkTemplate_r(m_fragments.begin()); AAssert(end == m_fragments.end(), "The template wasn't fully checked; possibly mismatched sections"); } + void checkTemplate() { Fragments::const_iterator end = checkTemplate_r(m_fragments.begin()); AAssert(end == m_fragments.end(), "The template wasn't fully checked; possibly mismatched sections"); } ~SimpleMustache() { empty(); } private: struct Fragment { @@ -102,10 +104,10 @@ class SimpleMustache { typedef std::list<Fragment *> Fragments; Fragments m_fragments; - Fragments::iterator render_r(IO<Handle> h, Context * ctx, const String & endSection, Fragments::iterator begin, bool noWrite, int forceIdx); - String escape(const String & s); + Fragments::const_iterator render_r(IO<Handle> h, Context * ctx, const String & endSection, Fragments::const_iterator begin, bool noWrite, int forceIdx) const; + static String escape(const String & s); - Fragments::iterator checkTemplate_r(Fragments::iterator begin, const String & endSection = ""); + Fragments::const_iterator checkTemplate_r(Fragments::const_iterator begin, const String & endSection = "") const; }; }; diff --git a/includes/Socket.h b/includes/Socket.h index 981a385..e5a9f21 100644 --- a/includes/Socket.h +++ b/includes/Socket.h @@ -81,7 +81,7 @@ class Listener : public ListenerBase { public: Listener(int port, const char * local = "", void * opaque = NULL) : ListenerBase(port, local, opaque) { } protected: - virtual void factory(IO<Socket> & io, void * opaque) { createTask(new Worker(io, opaque)); } + virtual void factory(IO<Socket> & io, void * opaque) { TaskMan::registerTask(new Worker(io, opaque)); } virtual void setName() { m_name = String(ClassName(this).c_str()) + " - " + m_listener->getName(); } }; diff --git a/includes/StacklessTask.h b/includes/StacklessTask.h new file mode 100644 index 0000000..bb18a7a --- /dev/null +++ b/includes/StacklessTask.h @@ -0,0 +1,81 @@ +#pragma once + +#include <Task.h> + +namespace Balau { + +class StacklessTask : public Task { + public: + StacklessTask() : m_state(0) { setStackless(); } + protected: + void taskSwitch() throw (GeneralException) { throw TaskSwitch(); } + unsigned int m_state; +}; + +}; + +#define StacklessBegin() \ + switch(m_state) { \ + case 0: { \ + + +#define StacklessOperation(operation) \ + m_state = __LINE__; \ + } \ + case __LINE__: { \ + try { \ + operation; \ + } \ + catch (Balau::EAgain & e) { \ + taskSwitch(); \ + } \ + + +#define StacklessOperationOrCond(operation, cond) \ + m_state = __LINE__; \ + } \ + case __LINE__: { \ + try { \ + if (!(cond)) { \ + operation; \ + } \ + } \ + catch (Balau::EAgain & e) { \ + taskSwitch(); \ + } \ + + +#define StacklessWaitFor(evt) \ + m_state = __LINE__; \ + waitFor(evt); \ + taskSwitch(); \ + } \ + case __LINE__: { \ + + +#define StacklessWaitCond(cond) \ + m_state = __LINE__; \ + } \ + case __LINE__: { \ + if (!(cond)) \ + taskSwitch(); \ + + +#define StacklessYield() \ + m_state = __LINE__; \ + try { \ + yield(true); \ + } \ + catch (Balau::EAgain & e) { \ + taskSwitch(); \ + } \ + } \ + case __LINE__: { \ + + +#define StacklessEnd() \ + break; \ + } \ + default: \ + AssertHelper("unknown state", "State %i is out of range in task %s at %p", m_state, getName(), this); \ + } diff --git a/includes/Task.h b/includes/Task.h index b3f395a..86bbce1 100644 --- a/includes/Task.h +++ b/includes/Task.h @@ -21,6 +21,11 @@ class EAgain : public GeneralException { Events::BaseEvent * m_evt; }; +class TaskSwitch : public GeneralException { + public: + TaskSwitch() : GeneralException("Task Switch") { } +}; + class TaskMan; class Task; @@ -64,6 +69,7 @@ class BaseEvent { class Timeout : public BaseEvent { public: + Timeout() { } Timeout(ev_tstamp tstamp) { set(tstamp); } virtual ~Timeout() { m_evt.stop(); } void evt_cb(ev::timer & w, int revents) { doSignal(); } @@ -75,11 +81,12 @@ class Timeout : public BaseEvent { class TaskEvent : public BaseEvent { public: - TaskEvent(Task * taskWaited); + TaskEvent(Task * taskWaited = NULL); virtual ~TaskEvent(); void ack(); void signal(); Task * taskWaited() { return m_taskWaited; } + void attachToTask(Task * taskWaited); void evt_cb(ev::async & w, int revents) { doSignal(); } protected: virtual void gotOwner(Task * task); @@ -121,11 +128,22 @@ class Task { enum Status { STARTING, RUNNING, - IDLE, + SLEEPING, STOPPED, FAULTED, YIELDED, }; + static const char * StatusToString(enum Status status) { + static const char * strs[] = { + "STARTING", + "RUNNING", + "SLEEPING", + "STOPPED", + "FAULTED", + "YIELDED", + }; + return strs[status]; + }; Task(); virtual ~Task(); virtual const char * getName() const = 0; @@ -135,18 +153,37 @@ class Task { Task * t = getCurrentTask(); t->waitFor(evt); } - static void yield(Events::BaseEvent * evt, bool interruptible = false) throw (GeneralException); + enum OperationYieldType { + SIMPLE, + INTERRUPTIBLE, + STACKLESS, + }; + static void operationYield(Events::BaseEvent * evt = NULL, enum OperationYieldType yieldType = SIMPLE) throw (GeneralException); TaskMan * getTaskMan() const { return m_taskMan; } struct ev_loop * getLoop(); + bool isStackless() { return m_stackless; } protected: - void yield(bool changeStatus = false); + void yield(bool stillRunning = false) throw (GeneralException); + void yield(Events::BaseEvent * evt) { + waitFor(evt); + yield(); + } virtual void Do() = 0; void waitFor(Events::BaseEvent * event); bool setOkayToEAgain(bool enable) { + if (m_stackless) { + AAssert(enable, "You can't make a task go not-okay-to-eagain if it's stackless."); + } bool oldValue = m_okayToEAgain; m_okayToEAgain = enable; return oldValue; } + void setStackless() { + AAssert(!m_stackless, "Can't set a task to be stackless twice"); + AAssert(m_status == STARTING, "Can't set a task to be stackless after it started. status = %s", StatusToString(m_status)); + m_stackless = true; + m_okayToEAgain = true; + } private: static size_t stackSize() { return 64 * 1024; } void setup(TaskMan * taskMan, void * stack); @@ -168,7 +205,54 @@ class Task { Lock m_eventLock; typedef std::list<Events::TaskEvent *> waitedByList_t; waitedByList_t m_waitedBy; - bool m_okayToEAgain; + bool m_okayToEAgain, m_stackless; +}; + +class QueueBase { + public: + bool isEmpty() { ScopeLock sl(m_lock); return !m_front; } + protected: + QueueBase() { pthread_cond_init(&m_cond, NULL); } + ~QueueBase() { while (!isEmpty()) iPop(NULL, false); pthread_cond_destroy(&m_cond); } + void iPush(void * t, Events::Async * event); + void * iPop(Events::Async * event, bool wait); + + private: + QueueBase(const QueueBase &) = delete; + QueueBase & operator=(const QueueBase &) = delete; + Lock m_lock; + struct Cell { + Cell(void * elem) : m_elem(elem) { } + Cell(const Cell &) = delete; + Cell & operator=(const Cell &) = delete; + Cell * m_next = NULL, * m_prev = NULL; + void * m_elem; + }; + Cell * m_front = NULL, * m_back = NULL; + pthread_cond_t m_cond; +}; + +template<class T> +class Queue : public QueueBase { + public: + void push(T * t) { iPush(t, NULL); } + T * pop() { return (T *) iPop(NULL, true); } +}; + +template<class T> +class TQueue : public QueueBase { + public: + void push(T * t) { iPush(t, &m_event); } + T * pop() { return (T *) iPop(&m_event, true); } + private: + Events::Async m_event; +}; + +template<class T> +class CQueue : public QueueBase { + public: + void push(T * t) { iPush(t, NULL); } + T * pop() { return (T *) iPop(NULL, false); } }; }; diff --git a/includes/TaskMan.h b/includes/TaskMan.h index d39598f..2b2742d 100644 --- a/includes/TaskMan.h +++ b/includes/TaskMan.h @@ -10,16 +10,31 @@ #include <Async.h> #include <Threads.h> #include <Exceptions.h> +#include <Task.h> namespace gnu = __gnu_cxx; namespace Balau { -class Task; class TaskScheduler; +namespace Events { + +class TaskEvent; + +}; + class TaskMan { public: + class TaskManThread : public Thread { + public: + virtual ~TaskManThread(); + virtual void * proc(); + void stopMe(int code = 0) { m_taskMan->stopMe(code); } + private: + TaskMan * m_taskMan; + }; + TaskMan(); ~TaskMan(); int mainLoop(); @@ -27,11 +42,25 @@ class TaskMan { struct ev_loop * getLoop() { return m_loop; } void signalTask(Task * t); static void stop(int code); - void stopMe(int code) { m_stopped = true; m_stopCode = code; } - static Thread * createThreadedTaskMan(); + void stopMe(int code = 0); + static TaskManThread * createThreadedTaskMan() { + TaskManThread * r = new TaskManThread(); + r->threadStart(); + return r; + } + static void stopThreadedTaskMan(TaskManThread * tmt) { + tmt->stopMe(0); + tmt->join(); + delete tmt; + } bool stopped() { return m_stopped; } + template<class T> + static T * registerTask(T * t, Task * stick = NULL) { TaskMan::iRegisterTask(t, stick, NULL); return t; } + template<class T> + static T * registerTask(T * t, Events::TaskEvent * event) { TaskMan::iRegisterTask(t, NULL, event); return t; } + private: - static void registerTask(Task * t, Task * stick); + static void iRegisterTask(Task * t, Task * stick, Events::TaskEvent * event); static void registerAsyncOp(AsyncOperation * op); void * getStack(); void freeStack(void * stack); @@ -51,8 +80,6 @@ class TaskMan { friend class Task; friend class TaskScheduler; template<class T> - friend T * createTask(T * t, Task * stick = NULL); - template<class T> friend T * createAsyncOp(T * op); struct taskHasher { size_t operator()(const Task * t) const { return reinterpret_cast<uintptr_t>(t); } }; typedef gnu::hash_set<Task *, taskHasher> taskHash_t; @@ -68,9 +95,6 @@ class TaskMan { }; template<class T> -T * createTask(T * t, Task * stick) { TaskMan::registerTask(t, stick); return t; } - -template<class T> T * createAsyncOp(T * op) { TaskMan::registerAsyncOp(op); return op; } }; diff --git a/includes/Threads.h b/includes/Threads.h index 13527d5..0dfce1a 100644 --- a/includes/Threads.h +++ b/includes/Threads.h @@ -5,8 +5,7 @@ namespace Balau { -template<class T> -class Queue; +class QueueBase; class Lock { public: @@ -18,8 +17,7 @@ class Lock { Lock(const Lock &) = delete; Lock & operator=(const Lock &) = delete; pthread_mutex_t m_lock; - template<class T> - friend class Queue; + friend class QueueBase; }; class ScopeLock { @@ -95,56 +93,4 @@ class GlobalThread : public Thread, public AtStart, public AtExit { virtual void doExit() { join(); } }; -template<class T> -class Queue { - public: - Queue() { pthread_cond_init(&m_cond, NULL); } - ~Queue() { while (!isEmpty()) pop(); pthread_cond_destroy(&m_cond); } - void push(T * t) { - ScopeLock sl(m_lock); - Cell * c = new Cell(t); - c->m_prev = m_back; - if (m_back) - m_back->m_next = c; - else - m_front = c; - m_back = c; - pthread_cond_signal(&m_cond); - } - T * pop(bool wait = true) { - ScopeLock sl(m_lock); - while (!m_front && wait) - pthread_cond_wait(&m_cond, &m_lock.m_lock); - Cell * c = m_front; - if (!c) - return NULL; - m_front = c->m_next; - if (m_front) - m_front->m_prev = NULL; - else - m_back = NULL; - T * t = c->m_elem; - delete c; - return t; - } - bool isEmpty() { - ScopeLock sl(m_lock); - return !m_front; - } - private: - Queue(const Queue &) = delete; - Queue & operator=(const Queue &) = delete; - Lock m_lock; - struct Cell { - Cell(T * elem) : m_next(NULL), m_prev(NULL), m_elem(elem) { } - Cell(const Cell &) = delete; - Cell & operator=(const Cell &) = delete; - Cell * m_next, * m_prev; - T * m_elem; - }; - Cell * volatile m_front = NULL; - Cell * volatile m_back = NULL; - pthread_cond_t m_cond; -}; - }; diff --git a/src/Async.cc b/src/Async.cc index 829c677..18c8ef1 100644 --- a/src/Async.cc +++ b/src/Async.cc @@ -28,7 +28,7 @@ void Balau::AsyncManager::queueOp(AsyncOperation * op) { TLS * tls = getTLS(); Printer::elog(E_ASYNC, "Queuing operation at %p", op); if (op->needsSynchronousCallback()) { - Printer::elog(E_ASYNC, "Operation at %p needs synchronous callback, copying values; idleQueue = %p; idleReadyCallback = %p; idleReadyParam = %p", &tls->idleQueue, tls->idleReadyCallback, tls->idleReadyParam); + Printer::elog(E_ASYNC, "Operation at %p needs synchronous callback, copying values; idleQueue = %p; idleReadyCallback = %p; idleReadyParam = %p", op, &tls->idleQueue, tls->idleReadyCallback, tls->idleReadyParam); op->m_idleQueue = &tls->idleQueue; op->m_idleReadyCallback = tls->idleReadyCallback; op->m_idleReadyParam = tls->idleReadyParam; @@ -98,11 +98,13 @@ void * Balau::AsyncManager::proc() { } stopAllWorkers(); + Printer::elog(E_ASYNC, "Async thread waits for all idle queues to empty"); while (Atomic::Prefetch::Decrement(&m_numTLSes)) { TLS * tls = m_TLSes.pop(); while (!tls->idleQueue.isEmpty()); } + Printer::elog(E_ASYNC, "Async thread stopping"); return NULL; } @@ -150,7 +152,7 @@ void Balau::AsyncManager::idle() { while (!m_ready); AsyncOperation * op; TLS * tls = getTLS(); - while ((op = tls->idleQueue.pop(false))) { + while ((op = tls->idleQueue.pop())) { Printer::elog(E_ASYNC, "AsyncManager::idle() is wrapping up operation %p", op); op->done(); } diff --git a/src/Handle.cc b/src/Handle.cc index dacd30d..9bcb419 100644 --- a/src/Handle.cc +++ b/src/Handle.cc @@ -97,7 +97,7 @@ ssize_t Balau::Handle::forceRead(void * _buf, size_t count, Events::BaseEvent * catch (EAgain e) { if (evt && evt->gotSignal()) return total; - Task::yield(e.getEvent()); + Task::operationYield(e.getEvent()); continue; } if (r < 0) @@ -124,7 +124,7 @@ ssize_t Balau::Handle::forceWrite(const void * _buf, size_t count, Events::BaseE catch (EAgain e) { if (evt && evt->gotSignal()) return total; - Task::yield(e.getEvent()); + Task::operationYield(e.getEvent()); continue; } if (r < 0) @@ -242,7 +242,7 @@ int Balau::FileSystem::mkdir(const char * path) throw (GeneralException) { cbResults_t cbResults; eio_req * r = eio_mkdir(path, 0755, 0, eioDone, &cbResults); EAssert(r != NULL, "eio_mkdir returned a NULL eio_req"); - Task::yield(&cbResults.evt); + Task::operationYield(&cbResults.evt); char str[4096]; if (cbResults.result < 0) diff --git a/src/HttpServer.cc b/src/HttpServer.cc index c9cb0ee..23af486 100644 --- a/src/HttpServer.cc +++ b/src/HttpServer.cc @@ -35,6 +35,8 @@ class HttpWorker : public Task { HttpWorker(IO<Handle> io, void * server); ~HttpWorker(); static void buildErrorTemplate(IO<Handle> h) { m_errorTemplate.setTemplate(h); } + template<size_t S> + static void buildErrorTemplate(const char (&str)[S]) { m_errorTemplate.setTemplate(str, S - 1); } static void buildErrorTemplate(const char * str, ssize_t s) { m_errorTemplate.setTemplate(str, s); } static void buildErrorTemplate(const String & str) { m_errorTemplate.setTemplate(str); } private: @@ -594,14 +596,20 @@ typedef Balau::Listener<Balau::HttpWorker> HttpListener; void Balau::HttpServer::start() { AAssert(!m_started, "Don't start an HttpServer twice"); - m_listenerPtr = createTask(new HttpListener(m_port, m_local.to_charp(), this)); + m_listenerPtr = TaskMan::registerTask(new HttpListener(m_port, m_local.to_charp(), this)); m_started = true; } void Balau::HttpServer::stop() { AAssert(m_started, "Don't stop an HttpServer that hasn't been started"); - reinterpret_cast<HttpListener *>(m_listenerPtr)->stop(); + HttpListener * listener = reinterpret_cast<HttpListener *>(m_listenerPtr); + Events::TaskEvent event(listener); + Task::prepare(&event); + listener->stop(); m_started = false; + Task::operationYield(&event); + IAssert(event.gotSignal(), "HttpServer::stop didn't actually get the listener to stop"); + event.ack(); } void Balau::HttpServer::registerAction(Action * action) { diff --git a/src/Input.cc b/src/Input.cc index 8168231..3608cd0 100644 --- a/src/Input.cc +++ b/src/Input.cc @@ -56,7 +56,7 @@ Balau::Input::Input(const char * fname) throw (GeneralException) { cbResults_t cbResults; eio_req * r = eio_open(fname, O_RDONLY, 0, 0, eioDone, &cbResults); EAssert(r != NULL, "eio_open returned a NULL eio_req"); - Task::yield(&cbResults.evt); + Task::operationYield(&cbResults.evt); if (cbResults.result < 0) { char str[4096]; if (cbResults.errorno == ENOENT) { @@ -71,7 +71,7 @@ Balau::Input::Input(const char * fname) throw (GeneralException) { cbStatsResults_t cbStatsResults; r = eio_fstat(m_fd, 0, eioStatsDone, &cbStatsResults); EAssert(r != NULL, "eio_fstat returned a NULL eio_req"); - Task::yield(&cbStatsResults.evt); + Task::operationYield(&cbStatsResults.evt); if (cbStatsResults.result == 0) { m_size = cbStatsResults.statdata.st_size; m_mtime = cbStatsResults.statdata.st_mtime; @@ -85,7 +85,7 @@ void Balau::Input::close() throw (GeneralException) { eio_req * r = eio_close(m_fd, 0, eioDone, &cbResults); EAssert(r != NULL, "eio_close returned a NULL eio_req"); m_fd = -1; - Task::yield(&cbResults.evt); + Task::operationYield(&cbResults.evt); if (cbResults.result < 0) { char str[4096]; strerror_r(cbResults.errorno, str, sizeof(str)); @@ -99,7 +99,7 @@ ssize_t Balau::Input::read(void * buf, size_t count) throw (GeneralException) { cbResults_t cbResults; eio_req * r = eio_read(m_fd, buf, count, getROffset(), 0, eioDone, &cbResults); EAssert(r != NULL, "eio_read returned a NULL eio_req"); - Task::yield(&cbResults.evt); + Task::operationYield(&cbResults.evt); if (cbResults.result > 0) { rseek(cbResults.result, SEEK_CUR); } else { diff --git a/src/LuaTask.cc b/src/LuaTask.cc index d722847..7e0b256 100644 --- a/src/LuaTask.cc +++ b/src/LuaTask.cc @@ -1,26 +1,58 @@ #include "LuaTask.h" #include "Main.h" #include "TaskMan.h" +#include "Printer.h" + +namespace { + +class LuaTaskStopper : public Balau::LuaExecCell { + virtual void run(Balau::Lua &) { } +}; + +}; + +Balau::LuaExecCell::LuaExecCell() { + Printer::elog(E_TASK, "LuaExecCell created at %p", this); +} + +Balau::LuaMainTask::LuaMainTask() { + Printer::elog(E_TASK, "LuaMainTask created at %p", this); + L.open_base(); + L.open_table(); + L.open_string(); + L.open_math(); + L.open_debug(); + L.open_bit(); + L.open_jit(); +} + +Balau::LuaMainTask::~LuaMainTask() { + L.close(); +} void Balau::LuaMainTask::exec(LuaExecCell * cell) { + Printer::elog(E_TASK, "LuaMainTask at %p is asked to queue Cell %p", this, cell); m_queue.push(cell); - m_queueEvent.trigger(); } void Balau::LuaMainTask::stop() { - Atomic::CmpXChgVal(&m_stopping, true, false); - m_queueEvent.trigger(); + Printer::elog(E_TASK, "LuaMainTask at %p asked to stop", this); + exec(new LuaTaskStopper()); } void Balau::LuaMainTask::Do() { - while (!m_stopping) { - waitFor(&m_queueEvent); - + for (;;) { LuaExecCell * cell; - while ((cell = m_queue.pop(false))) - createTask(new LuaTask(L.thread(), cell), this); - - yield(); + Printer::elog(E_TASK, "LuaMainTask at %p tries to pop an ExecCell", this); + while ((cell = m_queue.pop())) { + Printer::elog(E_TASK, "LuaMainTask at %p popped %p", this, cell); + if (dynamic_cast<LuaTaskStopper *>(cell)) { + Printer::elog(E_TASK, "LuaMainTask at %p is stopping", this); + delete cell; + return; + } + TaskMan::registerTask(new LuaTask(L.thread(), cell), this); + } } } @@ -37,9 +69,13 @@ void Balau::LuaExecCell::exec(LuaMainTask * mainTask) { Task::prepare(&m_event); mainTask->exec(this); if (!m_detached) - Task::yield(&m_event); + Task::operationYield(&m_event); } void Balau::LuaExecString::run(Lua & L) { L.load(m_str); } + +void Balau::LuaExecFile::run(Lua & L) { + L.load(m_file); +} diff --git a/src/Main.cc b/src/Main.cc index fd4021f..32b7cf8 100644 --- a/src/Main.cc +++ b/src/Main.cc @@ -60,7 +60,7 @@ int Balau::Main::bootstrap(int _argc, char ** _argv) { try { m_status = RUNNING; - createTask(new MainTask()); + TaskMan::registerTask(new MainTask()); r = TaskMan::getDefaultTaskMan()->mainLoop(); m_status = STOPPING; } diff --git a/src/Output.cc b/src/Output.cc index a4f6275..dd0a588 100644 --- a/src/Output.cc +++ b/src/Output.cc @@ -56,7 +56,7 @@ Balau::Output::Output(const char * fname, bool truncate) throw (GeneralException cbResults_t cbResults; eio_req * r = eio_open(fname, O_WRONLY | O_CREAT | (truncate ? O_TRUNC : 0), 0755, 0, eioDone, &cbResults); EAssert(r != NULL, "eio_open returned a NULL eio_req"); - Task::yield(&cbResults.evt); + Task::operationYield(&cbResults.evt); if (cbResults.result < 0) { char str[4096]; if (cbResults.errorno == ENOENT) { @@ -71,7 +71,7 @@ Balau::Output::Output(const char * fname, bool truncate) throw (GeneralException cbStatsResults_t cbStatsResults; r = eio_fstat(m_fd, 0, eioStatsDone, &cbStatsResults); EAssert(r != NULL, "eio_fstat returned a NULL eio_req"); - Task::yield(&cbStatsResults.evt); + Task::operationYield(&cbStatsResults.evt); if (cbStatsResults.result == 0) { m_size = cbStatsResults.statdata.st_size; m_mtime = cbStatsResults.statdata.st_mtime; @@ -85,7 +85,7 @@ void Balau::Output::close() throw (GeneralException) { eio_req * r = eio_close(m_fd, 0, eioDone, &cbResults); EAssert(r != NULL, "eio_close returned a NULL eio_req"); m_fd = -1; - Task::yield(&cbResults.evt); + Task::operationYield(&cbResults.evt); if (cbResults.result < 0) { char str[4096]; strerror_r(cbResults.errorno, str, sizeof(str)); @@ -99,7 +99,7 @@ ssize_t Balau::Output::write(const void * buf, size_t count) throw (GeneralExcep cbResults_t cbResults; eio_req * r = eio_write(m_fd, const_cast<void *>(buf), count, getWOffset(), 0, eioDone, &cbResults); EAssert(r != NULL, "eio_write returned a NULL eio_req"); - Task::yield(&cbResults.evt); + Task::operationYield(&cbResults.evt); if (cbResults.result > 0) { wseek(cbResults.result, SEEK_CUR); } else { diff --git a/src/SimpleMustache.cc b/src/SimpleMustache.cc index 92fefeb..d2db99f 100644 --- a/src/SimpleMustache.cc +++ b/src/SimpleMustache.cc @@ -328,9 +328,9 @@ void Balau::SimpleMustache::setTemplate(IO<Handle> _h) { m_fragments.push_back(curFragment); } -Balau::SimpleMustache::Fragments::iterator Balau::SimpleMustache::checkTemplate_r(Fragments::iterator begin, const String & endSection) { - Fragments::iterator cur; - Fragments::iterator end = m_fragments.end(); +Balau::SimpleMustache::Fragments::const_iterator Balau::SimpleMustache::checkTemplate_r(Fragments::const_iterator begin, const String & endSection) const { + Fragments::const_iterator cur; + Fragments::const_iterator end = m_fragments.end(); for (cur = begin; cur != end; cur++) { Fragment * fr = *cur; @@ -346,9 +346,9 @@ Balau::SimpleMustache::Fragments::iterator Balau::SimpleMustache::checkTemplate_ return end; } -Balau::SimpleMustache::Fragments::iterator Balau::SimpleMustache::render_r(IO<Handle> h, Context * ctx, const String & endSection, Fragments::iterator begin, bool noWrite, int forceIdx) { - Fragments::iterator cur; - Fragments::iterator end = m_fragments.end(); +Balau::SimpleMustache::Fragments::const_iterator Balau::SimpleMustache::render_r(IO<Handle> h, Context * ctx, const String & endSection, Fragments::const_iterator begin, bool noWrite, int forceIdx) const { + Fragments::const_iterator cur; + Fragments::const_iterator end = m_fragments.end(); if (endSection.strlen() != 0) { int depth = 0; diff --git a/src/Socket.cc b/src/Socket.cc index 148d0a4..7511bde 100644 --- a/src/Socket.cc +++ b/src/Socket.cc @@ -14,6 +14,26 @@ #include "Main.h" #include "Atomic.h" +#ifndef _WIN32 +namespace { + +class SigpipeBlocker : public Balau::AtStart { + public: + SigpipeBlocker() : AtStart(5) { } + virtual void doStart() { + struct sigaction new_actn, old_actn; + new_actn.sa_handler = SIG_IGN; + sigemptyset(&new_actn.sa_mask); + new_actn.sa_flags = 0; + sigaction(SIGPIPE, &new_actn, &old_actn); + } +}; + +static SigpipeBlocker sigpipeBlocker; + +}; +#endif + void Balau::Socket::SocketEvent::gotOwner(Task * task) { Printer::elog(E_SOCKET, "Arming SocketEvent at %p", this); if (!m_task) { @@ -172,9 +192,8 @@ static const char * inet_ntop(int af, const void * src, char * dst, socklen_t si #endif -#if 0 -// TODO: use getaddrinfo_a, if available. -#else +namespace { + class ResolverThread : public Balau::GlobalThread { public: ResolverThread() : GlobalThread(8), m_stopping(false) { } @@ -186,6 +205,8 @@ class ResolverThread : public Balau::GlobalThread { volatile bool m_stopping; }; +}; + void ResolverThread::threadExit() { m_stopping = true; DNSRequest req; @@ -222,13 +243,12 @@ static DNSRequest resolveName(const char * name, const char * service = NULL, st Balau::Printer::elog(Balau::E_SOCKET, "Sending a request to the resolver thread"); Balau::Task::prepare(&evt); resolverThread.pushRequest(&req); - Balau::Task::yield(&evt); + Balau::Task::operationYield(&evt); Balau::Atomic::MemoryFence(); return req; } -#endif Balau::Socket::Socket() throw (GeneralException) : m_fd(socket(AF_INET6, SOCK_STREAM, 0)) { m_name = "Socket(unconnected)"; @@ -457,7 +477,7 @@ bool Balau::Socket::connect(const char * hostname, int port) { IAssert(spins == 0, "We shouldn't have spinned..."); } - Task::yield(m_evtW, true); + Task::operationYield(m_evtW, Task::INTERRUPTIBLE); // if we're still here, it means the parent task doesn't want to be thrown an exception IAssert(m_evtW->gotSignal(), "We shouldn't have been awoken without getting our event signalled"); @@ -516,7 +536,7 @@ Balau::IO<Balau::Socket> Balau::Socket::accept() throw (GeneralException) { if (s < 0) { if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) { - Task::yield(m_evtR, true); + Task::operationYield(m_evtR, Task::INTERRUPTIBLE); } else { String msg = getErrorMessage(); throw GeneralException(String("Unexpected error accepting a connection: #") + errno + "(" + msg + ")"); @@ -548,7 +568,7 @@ ssize_t Balau::Socket::read(void * buf, size_t count) throw (GeneralException) { } if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) { - Task::yield(m_evtR, true); + Task::operationYield(m_evtR, Task::INTERRUPTIBLE); } else { m_evtR->stop(); return r; @@ -575,8 +595,15 @@ ssize_t Balau::Socket::write(const void * buf, size_t count) throw (GeneralExcep if (r > 0) return r; +#ifndef _WIN32 + if (errno == EPIPE) { + close(); + return 0; + } +#endif + if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) { - Task::yield(m_evtW, true); + Task::operationYield(m_evtW, Task::INTERRUPTIBLE); } else { m_evtW->stop(); return r; diff --git a/src/Task.cc b/src/Task.cc index aba29b8..4465203 100644 --- a/src/Task.cc +++ b/src/Task.cc @@ -6,10 +6,7 @@ static Balau::LocalTmpl<Balau::Task> localTask; -Balau::Task::Task() { - m_status = STARTING; - m_okayToEAgain = false; - +Balau::Task::Task() : m_status(STARTING), m_okayToEAgain(false), m_stackless(false) { Printer::elog(E_TASK, "Created a Task at %p", this); } @@ -22,16 +19,21 @@ bool Balau::Task::needsStacks() { } void Balau::Task::setup(TaskMan * taskMan, void * stack) { - size_t size = stackSize(); + if (m_stackless) { + IAssert(!stack, "Since we're stackless, no stack should've been allocated."); + m_stack = NULL; + } else { + size_t size = stackSize(); #ifndef _WIN32 - IAssert(stack, "Can't setup a coroutine without a stack"); - m_stack = stack; - coro_create(&m_ctx, coroutineTrampoline, this, m_stack, size); + IAssert(stack, "Can't setup a coroutine without a stack"); + m_stack = stack; + coro_create(&m_ctx, coroutineTrampoline, this, m_stack, size); #else - Assert(!stack, "We shouldn't allocate stacks with Fibers"); - m_stack = NULL; - m_fiber = CreateFiber(size, coroutineTrampoline, this); + Assert(!stack, "We shouldn't allocate stacks with Fibers"); + m_stack = NULL; + m_fiber = CreateFiber(size, coroutineTrampoline, this); #endif + } m_taskMan = taskMan; @@ -54,7 +56,7 @@ void Balau::Task::coroutineTrampoline(void * arg) { void Balau::Task::coroutine() { try { - IAssert(m_status == STARTING, "The Task at %p was badly initialized ? m_status = %i", this, m_status); + IAssert((m_status == STARTING) || (m_stackless && (m_status == RUNNING)), "The Task at %p has a bad status ? m_status = %s, stackless = %s", this, StatusToString(m_status), m_stackless ? "true" : "false"); m_status = RUNNING; Do(); m_status = STOPPED; @@ -66,6 +68,12 @@ void Balau::Task::coroutine() { catch (TestException & e) { m_status = STOPPED; Printer::log(M_ERROR, "Unit test failed: %s", e.getMsg()); + const char * details = e.getDetails(); + if (details) + Printer::log(M_ERROR, " %s", details); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_ERROR, "%s", str.to_charp()); TaskMan::stop(-1); } catch (RessourceException & e) { @@ -74,8 +82,25 @@ void Balau::Task::coroutine() { const char * details = e.getDetails(); if (details) Printer::log(M_ERROR, " %s", details); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_DEBUG, "%s", str.to_charp()); TaskMan::stop(-1); } + catch (TaskSwitch & e) { + if (!m_stackless) { + Printer::log(M_ERROR, "Task %s at %p isn't stackless, but still caused a task switch.", getName(), this); + const char * details = e.getDetails(); + if (details) + Printer::log(M_ERROR, " %s", details); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_DEBUG, "%s", str.to_charp()); + m_status = FAULTED; + } else { + Printer::elog(E_TASK, "Stackless task %s at %p is task-switching.", getName(), this); + } + } catch (GeneralException & e) { Printer::log(M_WARNING, "Task %s at %p caused an exception: `%s' - stopping.", getName(), this, e.getMsg()); const char * details = e.getDetails(); @@ -90,39 +115,50 @@ void Balau::Task::coroutine() { Printer::log(M_WARNING, "Task %s at %p caused an unknown exception - stopping.", getName(), this); m_status = FAULTED; } + if (!m_stackless) { #ifndef _WIN32 - coro_transfer(&m_ctx, &m_taskMan->m_returnContext); + coro_transfer(&m_ctx, &m_taskMan->m_returnContext); #else - SwitchToFiber(m_taskMan->m_fiber); + SwitchToFiber(m_taskMan->m_fiber); #endif + } } void Balau::Task::switchTo() { Printer::elog(E_TASK, "Switching to task %p - %s", this, getName()); - IAssert(m_status == YIELDED || m_status == IDLE || m_status == STARTING, "The task at %p isn't either yielded, idle or starting... ? m_status = %i", this, m_status); + IAssert(m_status == YIELDED || m_status == SLEEPING || m_status == STARTING, "The task at %p isn't either yielded, sleeping or starting... ? m_status = %s", this, StatusToString(m_status)); void * oldTLS = g_tlsManager->getTLS(); g_tlsManager->setTLS(m_tls); - if (m_status == YIELDED || m_status == IDLE) + if (m_status == YIELDED || m_status == SLEEPING) m_status = RUNNING; + if (m_stackless) { + coroutine(); + } else { #ifndef _WIN32 - coro_transfer(&m_taskMan->m_returnContext, &m_ctx); + coro_transfer(&m_taskMan->m_returnContext, &m_ctx); #else - SwitchToFiber(m_fiber); + SwitchToFiber(m_fiber); #endif + } g_tlsManager->setTLS(oldTLS); - if (m_status == RUNNING) - m_status = IDLE; + IAssert(m_status != RUNNING, "Task %s at %p is still running... ?", getName(), this); } -void Balau::Task::yield(bool changeStatus) { +void Balau::Task::yield(bool stillRunning) throw (GeneralException) { Printer::elog(E_TASK, "Task %p - %s yielding", this, getName()); - if (changeStatus) + if (stillRunning) m_status = YIELDED; + else + m_status = SLEEPING; + if (m_stackless) { + throw EAgain(NULL); + } else { #ifndef _WIN32 - coro_transfer(&m_ctx, &m_taskMan->m_returnContext); + coro_transfer(&m_ctx, &m_taskMan->m_returnContext); #else - SwitchToFiber(m_taskMan->m_fiber); + SwitchToFiber(m_taskMan->m_fiber); #endif + } } Balau::Task * Balau::Task::getCurrentTask() { @@ -149,7 +185,15 @@ void Balau::Events::BaseEvent::doSignal() { } } -Balau::Events::TaskEvent::TaskEvent(Task * taskWaited) : m_taskWaited(taskWaited), m_ack(false), m_distant(false) { +Balau::Events::TaskEvent::TaskEvent(Task * taskWaited) : m_taskWaited(NULL), m_ack(false), m_distant(false) { + if (taskWaited) + attachToTask(taskWaited); +} + +void Balau::Events::TaskEvent::attachToTask(Task * taskWaited) { + AAssert(!m_taskWaited, "You can't attach a TaskEvent twice."); + m_ack = false; + m_taskWaited = taskWaited; ScopeLock lock(m_taskWaited->m_eventLock); m_taskWaited->m_waitedBy.push_back(this); } @@ -157,7 +201,8 @@ Balau::Events::TaskEvent::TaskEvent(Task * taskWaited) : m_taskWaited(taskWaited void Balau::Events::TaskEvent::signal() { if (m_distant) m_evt.send(); - doSignal(); + else + doSignal(); } void Balau::Events::TaskEvent::gotOwner(Task * task) { @@ -198,6 +243,7 @@ void Balau::Events::TaskEvent::ack() { IAssert(deleted, "We didn't find task %p in the waitedBy lists... ?", this); m_ack = true; reset(); + m_taskWaited = NULL; } void Balau::Events::Timeout::gotOwner(Task * task) { @@ -214,23 +260,77 @@ void Balau::Events::Custom::gotOwner(Task * task) { m_loop = task->getLoop(); } -void Balau::Task::yield(Events::BaseEvent * evt, bool interruptible) throw (GeneralException) { +void Balau::Task::operationYield(Events::BaseEvent * evt, enum OperationYieldType yieldType) throw (GeneralException) { Task * t = getCurrentTask(); if (evt) t->waitFor(evt); + + if (t->m_stackless) { + AAssert(yieldType != SIMPLE, "You can't run simple operations from a stackless task."); + } + + if (yieldType == STACKLESS) { + AAssert(t->m_okayToEAgain, "You can't run a stackless operation from a non-okay-to-eagain task."); + } + bool gotSignal; do { t->yield(evt == NULL); - Printer::elog(E_TASK, "operation back from yielding; interruptible = %s; okayToEAgain = %s", interruptible ? "true" : "false", t->m_okayToEAgain ? "true" : "false"); + static const char * YieldTypeToString[] = { + "SIMPLE", + "INTERRUPTIBLE", + "STACKLESS", + }; + Printer::elog(E_TASK, "operation back from yielding; yieldType = %s; okayToEAgain = %s", YieldTypeToString[yieldType], t->m_okayToEAgain ? "true" : "false"); gotSignal = evt ? evt->gotSignal() : true; - } while ((!interruptible || !t->m_okayToEAgain) && !gotSignal); + } while (((yieldType == SIMPLE) || !t->m_okayToEAgain) && !gotSignal); if (!evt) return; - if (interruptible && t->m_okayToEAgain && !evt->gotSignal()) { + if ((yieldType != SIMPLE) && t->m_okayToEAgain && !evt->gotSignal()) { Printer::elog(E_TASK, "operation is throwing an exception."); throw EAgain(evt); } } + +void Balau::QueueBase::iPush(void * t, Events::Async * event) { + ScopeLock sl(m_lock); + Cell * c = new Cell(t); + c->m_prev = m_back; + if (m_back) + m_back->m_next = c; + else + m_front = c; + m_back = c; + if (event) + event->trigger(); + else + pthread_cond_signal(&m_cond); +} + +void * Balau::QueueBase::iPop(Events::Async * event, bool wait) { + ScopeLock sl(m_lock); + while (!m_front && wait) { + if (event) { + Task::prepare(event); + m_lock.leave(); + Task::operationYield(event, Task::INTERRUPTIBLE); + m_lock.enter(); + } else { + pthread_cond_wait(&m_cond, &m_lock.m_lock); + } + } + Cell * c = m_front; + if (!c) + return NULL; + m_front = c->m_next; + if (m_front) + m_front->m_prev = NULL; + else + m_back = NULL; + void * t = c->m_elem; + delete c; + return t; +} diff --git a/src/TaskMan.cc b/src/TaskMan.cc index 49c326a..177cec1 100644 --- a/src/TaskMan.cc +++ b/src/TaskMan.cc @@ -101,7 +101,6 @@ void Balau::TaskScheduler::stopAll(int code) { m_taskManagers.pop(); altQueue.push(tm); tm->addToPending(new Stopper(code)); - tm->m_evt.send(); } while (!altQueue.empty()) { tm = altQueue.front(); @@ -130,7 +129,6 @@ void * Balau::TaskScheduler::proc() { m_lock.leave(); Printer::elog(E_TASK, "TaskScheduler popped task %s at %p; adding to TaskMan %p", t->getName(), t, tm); tm->addToPending(t); - tm->m_evt.send(); } Printer::elog(E_TASK, "TaskScheduler stopping."); return NULL; @@ -145,6 +143,16 @@ void asyncDummy(ev::async & w, int revents) { Balau::Printer::elog(Balau::E_TASK, "TaskMan is getting woken up..."); } +void Balau::TaskMan::stopMe(int code) { + Task * t = Task::getCurrentTask(); + if (t->getTaskMan() == this) { + m_stopped = true; + m_stopCode = code; + } else { + addToPending(new Stopper(code)); + } +} + Balau::TaskMan::TaskMan() { #ifndef _WIN32 coro_create(&m_returnContext, 0, 0, 0, 0); @@ -168,6 +176,8 @@ Balau::TaskMan::TaskMan() { } #ifdef _WIN32 +namespace { + class WinSocketStartup : public Balau::AtStart { public: WinSocketStartup() : AtStart(5) { } @@ -179,6 +189,8 @@ class WinSocketStartup : public Balau::AtStart { }; static WinSocketStartup wsa; + +}; #endif Balau::TaskMan * Balau::TaskMan::getDefaultTaskMan() { return localTaskMan.get(); } @@ -191,14 +203,16 @@ Balau::TaskMan::~TaskMan() { } s_scheduler.unregisterTaskMan(this); // probably way more work to do here in order to clean up tasks from that thread + m_evt.stop(); ev_loop_destroy(m_loop); } void * Balau::TaskMan::getStack() { + if (!Task::needsStacks()) + return NULL; void * r = NULL; if (m_nStacks == 0) { - if (Task::needsStacks()) - r = malloc(Task::stackSize()); + r = malloc(Task::stackSize()); } else { r = m_stacks.front(); m_stacks.pop(); @@ -230,30 +244,23 @@ int Balau::TaskMan::mainLoop() { s_async.setIdleReadyCallback(asyncIdleReady, this); do { - bool noWait = false; - Printer::elog(E_TASK, "TaskMan::mainLoop() at %p with m_tasks.size = %li", this, m_tasks.size()); - // checking "STARTING" tasks, and running them once; also try to build the status of the noWait boolean. + // checking "STARTING" tasks, and running them once while ((iH = starting.begin()) != starting.end()) { Task * t = *iH; IAssert(t->getStatus() == Task::STARTING, "Got task at %p in the starting list, but isn't starting.", t); t->switchTo(); IAssert(t->getStatus() != Task::STARTING, "Task at %p got switchedTo, but still is 'STARTING'.", t); starting.erase(iH); - if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) { - noWait = true; + if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) stopped.insert(t); - } - if (t->getStatus() == Task::YIELDED) { - noWait = true; + if (t->getStatus() == Task::YIELDED) yielded.insert(t); - } } // if we begin that loop with any pending task, just don't loop, so we can add them immediately. - if (!m_pendingAdd.isEmpty() || !yielded.empty()) - noWait = true; + bool noWait = !m_pendingAdd.isEmpty() || !yielded.empty() || !stopped.empty(); // libev's event "loop". We always runs it once though. m_allowedToSignal = true; @@ -261,7 +268,7 @@ int Balau::TaskMan::mainLoop() { ev_run(m_loop, noWait || m_stopped ? EVRUN_NOWAIT : EVRUN_ONCE); Printer::elog(E_TASK, "TaskMan at %p Getting out of libev main loop", this); - // calling async's idle loop here + // calling async's idle s_async.idle(); // let's check what task got stopped, and signal them @@ -276,7 +283,7 @@ int Balau::TaskMan::mainLoop() { // let's check who got signaled, and call them for (Task * t : m_signaledTasks) { Printer::elog(E_TASK, "TaskMan at %p Switching to task %p (%s - %s) that got signaled somehow.", this, t, t->getName(), ClassName(t).c_str()); - IAssert(t->getStatus() == Task::IDLE || t->getStatus() == Task::YIELDED, "We're switching to a non-idle/yielded task at %p... ? status = %i", t, t->getStatus()); + IAssert(t->getStatus() == Task::SLEEPING || t->getStatus() == Task::YIELDED, "We're switching to a non-sleeping/yielded task at %p... ? status = %i", t, t->getStatus()); bool wasYielded = t->getStatus() == Task::YIELDED; t->switchTo(); if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) { @@ -313,7 +320,7 @@ int Balau::TaskMan::mainLoop() { Printer::elog(E_TASK, "TaskMan at %p popped task %p...", this, t); IAssert(m_tasks.find(t) == m_tasks.end(), "TaskMan got task %p twice... ?", t); ev_now_update(m_loop); - t->setup(this, getStack()); + t->setup(this, t->isStackless() ? NULL : getStack()); m_tasks.insert(t); starting.insert(t); } @@ -345,12 +352,14 @@ int Balau::TaskMan::mainLoop() { return m_stopCode; } -void Balau::TaskMan::registerTask(Balau::Task * t, Balau::Task * stick) { +void Balau::TaskMan::iRegisterTask(Balau::Task * t, Balau::Task * stick, Events::TaskEvent * event) { if (stick) { + IAssert(!event, "inconsistent"); TaskMan * tm = stick->getTaskMan(); tm->addToPending(t); - tm->m_evt.send(); } else { + if (event) + event->attachToTask(t); s_scheduler.registerTask(t); } } @@ -361,6 +370,7 @@ void Balau::TaskMan::registerAsyncOp(Balau::AsyncOperation * op) { void Balau::TaskMan::addToPending(Balau::Task * t) { m_pendingAdd.push(t); + m_evt.send(); } void Balau::TaskMan::signalTask(Task * t) { @@ -373,17 +383,51 @@ void Balau::TaskMan::stop(int code) { s_scheduler.stopAll(code); } -class ThreadedTaskMan : public Balau::Thread { - virtual void * proc() { +void * Balau::TaskMan::TaskManThread::proc() { + bool success = false; + m_taskMan = NULL; + try { m_taskMan = new Balau::TaskMan(); m_taskMan->mainLoop(); - return NULL; + success = true; } - Balau::TaskMan * m_taskMan; -}; + catch (Exit e) { + Printer::log(M_ERROR, "We shouldn't have gotten an Exit exception here... exitting anyway"); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_ERROR, "%s", str.to_charp()); + } + catch (RessourceException e) { + Printer::log(M_ERROR | M_ALERT, "The TaskMan thread got a ressource problem: %s", e.getMsg()); + const char * details = e.getDetails(); + if (details) + Printer::log(M_ERROR, " %s", details); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_DEBUG, "%s", str.to_charp()); + } + catch (GeneralException e) { + Printer::log(M_ERROR | M_ALERT, "The TaskMan thread caused an exception: %s", e.getMsg()); + const char * details = e.getDetails(); + if (details) + Printer::log(M_ERROR, " %s", details); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_DEBUG, "%s", str.to_charp()); + } + catch (...) { + Printer::log(M_ERROR | M_ALERT, "The TaskMan thread caused an unknown exception"); + } + if (!success) { + if (m_taskMan) + delete m_taskMan; + m_taskMan = NULL; + TaskMan::stop(-1); + } + return NULL; +} -Balau::Thread * Balau::TaskMan::createThreadedTaskMan() { - Thread * r = new ThreadedTaskMan(); - r->threadStart(); - return r; +Balau::TaskMan::TaskManThread::~TaskManThread() { + if (m_taskMan) + delete m_taskMan; } diff --git a/src/Threads.cc b/src/Threads.cc index 9ecfff5..7b1b7bc 100644 --- a/src/Threads.cc +++ b/src/Threads.cc @@ -2,6 +2,7 @@ #include "Threads.h" #include "Local.h" #include "Atomic.h" +#include "TaskMan.h" namespace Balau { @@ -39,11 +40,47 @@ Balau::RWLock::RWLock() { } void * Balau::ThreadHelper::threadProc(void * arg) { - void * tls = Local::createTLS(); - g_tlsManager->setTLS(tls); - Balau::Thread * thread = reinterpret_cast<Balau::Thread *>(arg); - void * r = thread->proc(); - free(tls); + void * r = NULL; + bool success = false; + try { + void * tls = Local::createTLS(); + g_tlsManager->setTLS(tls); + Balau::Thread * thread = reinterpret_cast<Balau::Thread *>(arg); + r = thread->proc(); + free(tls); + success = true; + } + catch (Exit e) { + Printer::log(M_ERROR, "We shouldn't have gotten an Exit exception here... exitting anyway"); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_ERROR, "%s", str.to_charp()); + } + catch (RessourceException e) { + Printer::log(M_ERROR | M_ALERT, "The Thread got a ressource problem: %s", e.getMsg()); + const char * details = e.getDetails(); + if (details) + Printer::log(M_ERROR, " %s", details); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_DEBUG, "%s", str.to_charp()); + } + catch (GeneralException e) { + Printer::log(M_ERROR | M_ALERT, "The Thread caused an exception: %s", e.getMsg()); + const char * details = e.getDetails(); + if (details) + Printer::log(M_ERROR, " %s", details); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_DEBUG, "%s", str.to_charp()); + } + catch (...) { + Printer::log(M_ERROR | M_ALERT, "The Thread caused an unknown exception"); + } + + if (!success) + TaskMan::stop(-1); + return r; } diff --git a/src/ZHandle.cc b/src/ZHandle.cc index e2f22f0..4f216c7 100644 --- a/src/ZHandle.cc +++ b/src/ZHandle.cc @@ -79,7 +79,7 @@ ssize_t Balau::ZStream::read(void * buf, size_t count) throw (GeneralException) if (r <= 0) return readTotal; } - Task::yield(nullptr); + Task::operationYield(); int r = inflate(&m_zin, Z_SYNC_FLUSH); size_t didRead = count - m_zin.avail_out; readTotal += didRead; @@ -108,7 +108,7 @@ ssize_t Balau::ZStream::write(const void * buf, size_t count) throw (GeneralExce m_zout.avail_out = BLOCK_SIZE; int r = deflate(&m_zout, Z_NO_FLUSH); EAssert(r == Z_OK, "deflate() didn't return Z_OK but %i", r); - Task::yield(nullptr); + Task::operationYield(); size_t compressed = BLOCK_SIZE - m_zout.avail_out; if (compressed) { size_t w = m_h->forceWrite(obuf, compressed); diff --git a/tests/test-Http.cc b/tests/test-Http.cc index af78e43..88d7a8d 100644 --- a/tests/test-Http.cc +++ b/tests/test-Http.cc @@ -1,41 +1,92 @@ #include <Main.h> #include <HttpServer.h> #include <TaskMan.h> - -#define DAEMON_NAME "Balau/1.0" +#include <Socket.h> +#include <SimpleMustache.h> using namespace Balau; -class TestAction : public HttpServer::Action { - public: - TestAction() : Action(Regexes::any) { } - virtual bool Do(HttpServer * server, Http::Request & req, HttpServer::Action::ActionMatch & match, IO<Handle> out) throw (GeneralException); -}; - -bool TestAction::Do(HttpServer * server, Http::Request & req, HttpServer::Action::ActionMatch & match, IO<Handle> out) throw (GeneralException) { - HttpServer::Response response(server, req, out); - response->writeString( +const char htmlTemplateStr[] = "<!DOCTYPE html PUBLIC \"-//W3C//DTD XHTML 1.0 Transitional//EN\"\n" "\"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd\">\n" "<html xmlns=\"http://www.w3.org/1999/xhtml\">\n" " <head>\n" -" <title>Test</title>\n" +" <meta http-equiv=\"content-type\" content=\"text/html; charset=utf-8\" />\n" +" <title>{{title}}</title>\n" +" <style type=\"text/css\">\n" +" body { font-family: arial, helvetica, sans-serif; }\n" +" </style>\n" " </head>\n" "\n" " <body>\n" -" This is a test document.\n" +" <h1>{{title}}</h1>\n" +" <h2>{{msg}}</h2>\n" " </body>\n" -"</html>\n"); +"</html>\n" +; + +class TestHtmlTemplate : public AtStart { + public: + TestHtmlTemplate() : AtStart(10), htmlTemplate(m_template) { } + virtual void doStart() { m_template.setTemplate(htmlTemplateStr); } + + const SimpleMustache & htmlTemplate; + private: + SimpleMustache m_template; +}; + +static TestHtmlTemplate testHtmlTemplate; + +static Regex stopURL("/stop$"); + +class StopAction : public HttpServer::Action { + public: + StopAction(Events::Async & event, bool & stop) : Action(stopURL), m_event(event), m_stop(stop) { } + private: + virtual bool Do(HttpServer * server, Http::Request & req, HttpServer::Action::ActionMatch & match, IO<Handle> out) throw (GeneralException); + Events::Async & m_event; + bool & m_stop; +}; + +bool StopAction::Do(HttpServer * server, Http::Request & req, HttpServer::Action::ActionMatch & match, IO<Handle> out) throw (GeneralException) { + m_stop = true; + m_event.trigger(); + SimpleMustache::Context ctx; + HttpServer::Response response(server, req, out); + + ctx["title"] = "Stop"; + ctx["msg"] = "Server stopping"; + testHtmlTemplate.htmlTemplate.render(response.get(), &ctx); response.Flush(); return true; } -Balau::Regex testFailureURL("^/failure.html$"); +class TestAction : public HttpServer::Action { + public: + TestAction() : Action(Regexes::any) { } + private: + virtual bool Do(HttpServer * server, Http::Request & req, HttpServer::Action::ActionMatch & match, IO<Handle> out) throw (GeneralException); +}; + +bool TestAction::Do(HttpServer * server, Http::Request & req, HttpServer::Action::ActionMatch & match, IO<Handle> out) throw (GeneralException) { + SimpleMustache::Context ctx; + HttpServer::Response response(server, req, out); + + ctx["title"] = "Test"; + ctx["msg"] = "This is a test document."; + + testHtmlTemplate.htmlTemplate.render(response.get(), &ctx); + response.Flush(); + return true; +} + +static Regex testFailureURL("^/failure.html$"); class TestFailure : public HttpServer::Action { public: TestFailure() : Action(testFailureURL) { } + private: virtual bool Do(HttpServer * server, Http::Request & req, HttpServer::Action::ActionMatch & match, IO<Handle> out) throw (GeneralException); }; @@ -43,32 +94,75 @@ bool TestFailure::Do(HttpServer * server, Http::Request & req, HttpServer::Actio throw GeneralException("Test..."); } -#define NTHREADS 4 +class Stopper : public Task { + virtual const char * getName() const { return "ServerStopper"; } + virtual void Do(); +}; + +void Stopper::Do() { + IO<Socket> s(new Socket()); + bool c = s->connect("localhost", 8080); + TAssert(c); + s->writeString("GET /stop HTTP/1.0\r\n\r\n"); +} + +static const int NTHREADS = 4; void MainTask::Do() { Printer::enable(M_DEBUG); Printer::log(M_STATUS, "Test::Http running."); - Thread * tms[NTHREADS]; + TaskMan::TaskManThread * tms[NTHREADS]; for (int i = 0; i < NTHREADS; i++) tms[i] = TaskMan::createThreadedTaskMan(); + Events::Async event; + bool stop = false; + + waitFor(&event); + HttpServer * s = new HttpServer(); - HttpServer::Action * a = new TestAction(); - HttpServer::Action * f = new TestFailure(); - a->registerMe(s); - f->registerMe(s); + s->registerAction(new TestAction()); + s->registerAction(new TestFailure()); + s->registerAction(new StopAction(event, stop)); s->setPort(8080); s->setLocal("localhost"); s->start(); + Events::Timeout timeout(1); + waitFor(&timeout); yield(); + Events::TaskEvent stopperEvent; + Task * stopper = TaskMan::registerTask(new Stopper, &stopperEvent); + waitFor(&stopperEvent); + + bool gotEvent = false, gotStopperEvent = false; + int count = 0; + + while (!gotEvent || !gotStopperEvent) { + count++; + yield(); + if (event.gotSignal()) { + TAssert(!gotEvent); + gotEvent = true; + } + if (stopperEvent.gotSignal()) { + TAssert(!gotStopperEvent); + gotStopperEvent = true; + stopperEvent.ack(); + } + } + TAssert(count <= 2); + + TAssert(stop); + Printer::log(M_STATUS, "Test::Http is stopping."); + s->stop(); for (int i = 0; i < NTHREADS; i++) - tms[i]->join(); + TaskMan::stopThreadedTaskMan(tms[i]); Printer::log(M_STATUS, "Test::Http passed."); } diff --git a/tests/test-Sockets.cc b/tests/test-Sockets.cc index a3e2755..4ef0a78 100644 --- a/tests/test-Sockets.cc +++ b/tests/test-Sockets.cc @@ -62,11 +62,16 @@ void MainTask::Do() { Printer::enable(M_ALL); Printer::log(M_STATUS, "Test::Sockets running."); - Events::TaskEvent evtSvr(listener = Balau::createTask(new Listener<Worker>(1234))); - Events::TaskEvent evtCln(Balau::createTask(new Client)); - Printer::log(M_STATUS, "Created %s", listener->getName()); + Events::TaskEvent evtSvr; + Events::TaskEvent evtCln; + + listener = TaskMan::registerTask(new Listener<Worker>(1234), &evtSvr); + TaskMan::registerTask(new Client, &evtCln); + waitFor(&evtSvr); waitFor(&evtCln); + + Printer::log(M_STATUS, "Created %s", listener->getName()); bool svrDone = false, clnDone = false; while (!svrDone || !clnDone) { yield(); diff --git a/tests/test-Tasks.cc b/tests/test-Tasks.cc index 272e61d..6bbb503 100644 --- a/tests/test-Tasks.cc +++ b/tests/test-Tasks.cc @@ -1,6 +1,7 @@ #include <Main.h> #include <Task.h> #include <TaskMan.h> +#include <StacklessTask.h> using namespace Balau; @@ -20,9 +21,42 @@ class TestTask : public Task { } }; +class TestOperation { + public: + TestOperation() : m_count(0), m_completed(false) { } + void Do() { + if (m_count++ == 0) { + m_timeout.set(0.2); + Task::operationYield(&m_timeout, Task::STACKLESS); + } + TAssert(m_timeout.gotSignal()); + m_completed = true; + } + bool completed() { return m_completed; } + private: + int m_count; + bool m_completed; + Events::Timeout m_timeout; +}; + +class TestStackless : public StacklessTask { + public: + virtual const char * getName() const { return "TestStackless"; } + private: + virtual void Do() { + StacklessBegin(); + m_operation = new TestOperation(); + StacklessOperation(m_operation->Do()); + TAssert(m_operation->completed()); + delete m_operation; + StacklessEnd(); + } + TestOperation * m_operation; +}; + static void yieldingFunction() { Events::Timeout timeout(0.2); - Task::yield(&timeout); + Task::operationYield(&timeout); TAssert(timeout.gotSignal()); } @@ -30,8 +64,15 @@ void MainTask::Do() { customPrinter = new CustomPrinter(); Printer::log(M_STATUS, "Test::Tasks running."); - Task * testTask = Balau::createTask(new TestTask()); - Events::TaskEvent taskEvt(testTask); + Events::TaskEvent taskEvt; + Task * testTask = TaskMan::registerTask(new TestTask(), &taskEvt); + waitFor(&taskEvt); + TAssert(!taskEvt.gotSignal()); + yield(); + TAssert(taskEvt.gotSignal()); + taskEvt.ack(); + + Task * testStackless = TaskMan::registerTask(new TestStackless(), &taskEvt); waitFor(&taskEvt); TAssert(!taskEvt.gotSignal()); yield(); |