diff options
Diffstat (limited to 'includes')
-rw-r--r-- | includes/Async.h | 4 | ||||
-rw-r--r-- | includes/BString.h | 2 | ||||
-rw-r--r-- | includes/Exceptions.h | 22 | ||||
-rw-r--r-- | includes/Handle.h | 2 | ||||
-rw-r--r-- | includes/LuaTask.h | 20 | ||||
-rw-r--r-- | includes/SimpleMustache.h | 14 | ||||
-rw-r--r-- | includes/Socket.h | 2 | ||||
-rw-r--r-- | includes/StacklessTask.h | 81 | ||||
-rw-r--r-- | includes/Task.h | 94 | ||||
-rw-r--r-- | includes/TaskMan.h | 42 | ||||
-rw-r--r-- | includes/Threads.h | 58 |
11 files changed, 243 insertions, 98 deletions
diff --git a/includes/Async.h b/includes/Async.h index bd6ce7e..15e0ac7 100644 --- a/includes/Async.h +++ b/includes/Async.h @@ -23,7 +23,7 @@ class AsyncOperation { protected: virtual ~AsyncOperation() { } private: - Queue<AsyncOperation> * m_idleQueue = NULL; + CQueue<AsyncOperation> * m_idleQueue = NULL; IdleReadyCallback_t m_idleReadyCallback = NULL; void * m_idleReadyParam = NULL; void finalize(); @@ -65,7 +65,7 @@ class AsyncManager : public Thread { void stopAllWorkers(); virtual void * proc(); struct TLS { - Queue<AsyncOperation> idleQueue; + CQueue<AsyncOperation> idleQueue; IdleReadyCallback_t idleReadyCallback; void * idleReadyParam; }; diff --git a/includes/BString.h b/includes/BString.h index 29f3d22..09e0d40 100644 --- a/includes/BString.h +++ b/includes/BString.h @@ -16,6 +16,8 @@ class String : private std::string { public: String() : std::string() { } String(const char * str) : std::string(str ? str : "") { } + template<size_t L> + String(const char (&str)[L]) : std::string(str, L - 1) { } String(const char * str, size_t n) : std::string(str ? str : "", str ? n : 0) { } String(char c) { set("%c", c); } String(int32_t i) { set("%i", i); } diff --git a/includes/Exceptions.h b/includes/Exceptions.h index 829a5d1..d7856fe 100644 --- a/includes/Exceptions.h +++ b/includes/Exceptions.h @@ -74,19 +74,19 @@ static inline void AssertHelperInner(const String & msg, const char * details = throw GeneralException(msg, details); } -static inline void AssertHelper(const String & msg, const char * fmt = NULL, ...) __attribute__((format(printf, 2, 3))); +static inline void AssertHelper(const String & msg, const char * fmt, ...) __attribute__((format(printf, 2, 3))); static inline void AssertHelper(const String & msg, const char * fmt, ...) { - if (fmt) { - String details; - va_list ap; - va_start(ap, fmt); - details.set(fmt, ap); - va_end(ap); - AssertHelperInner(msg, details.to_charp()); - } else { - AssertHelperInner(msg); - } + String details; + va_list ap; + va_start(ap, fmt); + details.set(fmt, ap); + va_end(ap); + AssertHelperInner(msg, details.to_charp()); +} + +static inline void AssertHelper(const String & msg) { + AssertHelperInner(msg); } class TestException : public GeneralException { diff --git a/includes/Handle.h b/includes/Handle.h index 5634583..7c502ea 100644 --- a/includes/Handle.h +++ b/includes/Handle.h @@ -40,7 +40,7 @@ class Handle { virtual const char * getName() = 0; virtual ssize_t read(void * buf, size_t count) throw (GeneralException); virtual ssize_t write(const void * buf, size_t count) throw (GeneralException); - template <ssize_t L> void writeString(const char str[L]) { writeString(str, L - 1); } + template <size_t L> void writeString(const char (&str)[L]) { writeString(str, L - 1); } void writeString(const char * str, ssize_t len) { if (len < 0) len = strlen(str); forceWrite(str, len); } void writeString(const String & str) { forceWrite(str.to_charp(), str.strlen()); } void seek(off_t offset, int whence = SEEK_SET) { rseek(offset, whence); } diff --git a/includes/LuaTask.h b/includes/LuaTask.h index 48cef97..37d64fa 100644 --- a/includes/LuaTask.h +++ b/includes/LuaTask.h @@ -11,7 +11,7 @@ class LuaMainTask; class LuaExecCell { public: - LuaExecCell() : m_detached(false) { } + LuaExecCell(); virtual ~LuaExecCell() { } void detach() { m_detached = true; } void exec(LuaMainTask * mainTask); @@ -19,7 +19,7 @@ class LuaExecCell { virtual void run(Lua &) = 0; private: Events::Async m_event; - bool m_detached; + bool m_detached = false; friend class LuaTask; }; @@ -31,6 +31,14 @@ class LuaExecString : public LuaExecCell { String m_str; }; +class LuaExecFile : public LuaExecCell { + public: + LuaExecFile(IO<Handle> file) : m_file(file) { } + private: + virtual void run(Lua &); + IO<Handle> m_file; +}; + class LuaTask : public Task { public: ~LuaTask() { L.weaken(); } @@ -45,17 +53,15 @@ class LuaTask : public Task { class LuaMainTask : public Task { public: - LuaMainTask() : m_stopping(false) { } - ~LuaMainTask() { L.close(); } + LuaMainTask(); + ~LuaMainTask(); void stop(); virtual const char * getName() const { return "LuaMainTask"; } private: void exec(LuaExecCell * cell); virtual void Do(); Lua L; - Events::Async m_queueEvent; - Queue<LuaExecCell> m_queue; - bool m_stopping; + TQueue<LuaExecCell> m_queue; friend class LuaExecCell; }; diff --git a/includes/SimpleMustache.h b/includes/SimpleMustache.h index 9d16377..28cb477 100644 --- a/includes/SimpleMustache.h +++ b/includes/SimpleMustache.h @@ -80,11 +80,13 @@ class SimpleMustache { IO<Buffer> b(new Buffer(str, s)); setTemplate(b); } - void setTemplate(const char * str, ssize_t s = -1) { setTemplate((const uint8_t *) str, s); } + template<size_t S> + void setTemplate(const char (&str)[S]) { setTemplate((const uint8_t *) str, S - 1); } + void setTemplate(const char * str, ssize_t s) { setTemplate((const uint8_t *) str, s); } void setTemplate(const String & str) { setTemplate((const uint8_t *) str.to_charp(), str.strlen()); } - void render(IO<Handle> h, Context * ctx) { AAssert(ctx, "Please pass on a context to render"); render_r(h, ctx, "", m_fragments.begin(), false, -1); } + void render(IO<Handle> h, Context * ctx) const { AAssert(ctx, "Please pass on a context to render"); render_r(h, ctx, "", m_fragments.begin(), false, -1); } void empty() { while (!m_fragments.empty()) { delete m_fragments.front(); m_fragments.pop_front(); } } - void checkTemplate() { Fragments::iterator end = checkTemplate_r(m_fragments.begin()); AAssert(end == m_fragments.end(), "The template wasn't fully checked; possibly mismatched sections"); } + void checkTemplate() { Fragments::const_iterator end = checkTemplate_r(m_fragments.begin()); AAssert(end == m_fragments.end(), "The template wasn't fully checked; possibly mismatched sections"); } ~SimpleMustache() { empty(); } private: struct Fragment { @@ -102,10 +104,10 @@ class SimpleMustache { typedef std::list<Fragment *> Fragments; Fragments m_fragments; - Fragments::iterator render_r(IO<Handle> h, Context * ctx, const String & endSection, Fragments::iterator begin, bool noWrite, int forceIdx); - String escape(const String & s); + Fragments::const_iterator render_r(IO<Handle> h, Context * ctx, const String & endSection, Fragments::const_iterator begin, bool noWrite, int forceIdx) const; + static String escape(const String & s); - Fragments::iterator checkTemplate_r(Fragments::iterator begin, const String & endSection = ""); + Fragments::const_iterator checkTemplate_r(Fragments::const_iterator begin, const String & endSection = "") const; }; }; diff --git a/includes/Socket.h b/includes/Socket.h index 981a385..e5a9f21 100644 --- a/includes/Socket.h +++ b/includes/Socket.h @@ -81,7 +81,7 @@ class Listener : public ListenerBase { public: Listener(int port, const char * local = "", void * opaque = NULL) : ListenerBase(port, local, opaque) { } protected: - virtual void factory(IO<Socket> & io, void * opaque) { createTask(new Worker(io, opaque)); } + virtual void factory(IO<Socket> & io, void * opaque) { TaskMan::registerTask(new Worker(io, opaque)); } virtual void setName() { m_name = String(ClassName(this).c_str()) + " - " + m_listener->getName(); } }; diff --git a/includes/StacklessTask.h b/includes/StacklessTask.h new file mode 100644 index 0000000..bb18a7a --- /dev/null +++ b/includes/StacklessTask.h @@ -0,0 +1,81 @@ +#pragma once + +#include <Task.h> + +namespace Balau { + +class StacklessTask : public Task { + public: + StacklessTask() : m_state(0) { setStackless(); } + protected: + void taskSwitch() throw (GeneralException) { throw TaskSwitch(); } + unsigned int m_state; +}; + +}; + +#define StacklessBegin() \ + switch(m_state) { \ + case 0: { \ + + +#define StacklessOperation(operation) \ + m_state = __LINE__; \ + } \ + case __LINE__: { \ + try { \ + operation; \ + } \ + catch (Balau::EAgain & e) { \ + taskSwitch(); \ + } \ + + +#define StacklessOperationOrCond(operation, cond) \ + m_state = __LINE__; \ + } \ + case __LINE__: { \ + try { \ + if (!(cond)) { \ + operation; \ + } \ + } \ + catch (Balau::EAgain & e) { \ + taskSwitch(); \ + } \ + + +#define StacklessWaitFor(evt) \ + m_state = __LINE__; \ + waitFor(evt); \ + taskSwitch(); \ + } \ + case __LINE__: { \ + + +#define StacklessWaitCond(cond) \ + m_state = __LINE__; \ + } \ + case __LINE__: { \ + if (!(cond)) \ + taskSwitch(); \ + + +#define StacklessYield() \ + m_state = __LINE__; \ + try { \ + yield(true); \ + } \ + catch (Balau::EAgain & e) { \ + taskSwitch(); \ + } \ + } \ + case __LINE__: { \ + + +#define StacklessEnd() \ + break; \ + } \ + default: \ + AssertHelper("unknown state", "State %i is out of range in task %s at %p", m_state, getName(), this); \ + } diff --git a/includes/Task.h b/includes/Task.h index b3f395a..86bbce1 100644 --- a/includes/Task.h +++ b/includes/Task.h @@ -21,6 +21,11 @@ class EAgain : public GeneralException { Events::BaseEvent * m_evt; }; +class TaskSwitch : public GeneralException { + public: + TaskSwitch() : GeneralException("Task Switch") { } +}; + class TaskMan; class Task; @@ -64,6 +69,7 @@ class BaseEvent { class Timeout : public BaseEvent { public: + Timeout() { } Timeout(ev_tstamp tstamp) { set(tstamp); } virtual ~Timeout() { m_evt.stop(); } void evt_cb(ev::timer & w, int revents) { doSignal(); } @@ -75,11 +81,12 @@ class Timeout : public BaseEvent { class TaskEvent : public BaseEvent { public: - TaskEvent(Task * taskWaited); + TaskEvent(Task * taskWaited = NULL); virtual ~TaskEvent(); void ack(); void signal(); Task * taskWaited() { return m_taskWaited; } + void attachToTask(Task * taskWaited); void evt_cb(ev::async & w, int revents) { doSignal(); } protected: virtual void gotOwner(Task * task); @@ -121,11 +128,22 @@ class Task { enum Status { STARTING, RUNNING, - IDLE, + SLEEPING, STOPPED, FAULTED, YIELDED, }; + static const char * StatusToString(enum Status status) { + static const char * strs[] = { + "STARTING", + "RUNNING", + "SLEEPING", + "STOPPED", + "FAULTED", + "YIELDED", + }; + return strs[status]; + }; Task(); virtual ~Task(); virtual const char * getName() const = 0; @@ -135,18 +153,37 @@ class Task { Task * t = getCurrentTask(); t->waitFor(evt); } - static void yield(Events::BaseEvent * evt, bool interruptible = false) throw (GeneralException); + enum OperationYieldType { + SIMPLE, + INTERRUPTIBLE, + STACKLESS, + }; + static void operationYield(Events::BaseEvent * evt = NULL, enum OperationYieldType yieldType = SIMPLE) throw (GeneralException); TaskMan * getTaskMan() const { return m_taskMan; } struct ev_loop * getLoop(); + bool isStackless() { return m_stackless; } protected: - void yield(bool changeStatus = false); + void yield(bool stillRunning = false) throw (GeneralException); + void yield(Events::BaseEvent * evt) { + waitFor(evt); + yield(); + } virtual void Do() = 0; void waitFor(Events::BaseEvent * event); bool setOkayToEAgain(bool enable) { + if (m_stackless) { + AAssert(enable, "You can't make a task go not-okay-to-eagain if it's stackless."); + } bool oldValue = m_okayToEAgain; m_okayToEAgain = enable; return oldValue; } + void setStackless() { + AAssert(!m_stackless, "Can't set a task to be stackless twice"); + AAssert(m_status == STARTING, "Can't set a task to be stackless after it started. status = %s", StatusToString(m_status)); + m_stackless = true; + m_okayToEAgain = true; + } private: static size_t stackSize() { return 64 * 1024; } void setup(TaskMan * taskMan, void * stack); @@ -168,7 +205,54 @@ class Task { Lock m_eventLock; typedef std::list<Events::TaskEvent *> waitedByList_t; waitedByList_t m_waitedBy; - bool m_okayToEAgain; + bool m_okayToEAgain, m_stackless; +}; + +class QueueBase { + public: + bool isEmpty() { ScopeLock sl(m_lock); return !m_front; } + protected: + QueueBase() { pthread_cond_init(&m_cond, NULL); } + ~QueueBase() { while (!isEmpty()) iPop(NULL, false); pthread_cond_destroy(&m_cond); } + void iPush(void * t, Events::Async * event); + void * iPop(Events::Async * event, bool wait); + + private: + QueueBase(const QueueBase &) = delete; + QueueBase & operator=(const QueueBase &) = delete; + Lock m_lock; + struct Cell { + Cell(void * elem) : m_elem(elem) { } + Cell(const Cell &) = delete; + Cell & operator=(const Cell &) = delete; + Cell * m_next = NULL, * m_prev = NULL; + void * m_elem; + }; + Cell * m_front = NULL, * m_back = NULL; + pthread_cond_t m_cond; +}; + +template<class T> +class Queue : public QueueBase { + public: + void push(T * t) { iPush(t, NULL); } + T * pop() { return (T *) iPop(NULL, true); } +}; + +template<class T> +class TQueue : public QueueBase { + public: + void push(T * t) { iPush(t, &m_event); } + T * pop() { return (T *) iPop(&m_event, true); } + private: + Events::Async m_event; +}; + +template<class T> +class CQueue : public QueueBase { + public: + void push(T * t) { iPush(t, NULL); } + T * pop() { return (T *) iPop(NULL, false); } }; }; diff --git a/includes/TaskMan.h b/includes/TaskMan.h index d39598f..2b2742d 100644 --- a/includes/TaskMan.h +++ b/includes/TaskMan.h @@ -10,16 +10,31 @@ #include <Async.h> #include <Threads.h> #include <Exceptions.h> +#include <Task.h> namespace gnu = __gnu_cxx; namespace Balau { -class Task; class TaskScheduler; +namespace Events { + +class TaskEvent; + +}; + class TaskMan { public: + class TaskManThread : public Thread { + public: + virtual ~TaskManThread(); + virtual void * proc(); + void stopMe(int code = 0) { m_taskMan->stopMe(code); } + private: + TaskMan * m_taskMan; + }; + TaskMan(); ~TaskMan(); int mainLoop(); @@ -27,11 +42,25 @@ class TaskMan { struct ev_loop * getLoop() { return m_loop; } void signalTask(Task * t); static void stop(int code); - void stopMe(int code) { m_stopped = true; m_stopCode = code; } - static Thread * createThreadedTaskMan(); + void stopMe(int code = 0); + static TaskManThread * createThreadedTaskMan() { + TaskManThread * r = new TaskManThread(); + r->threadStart(); + return r; + } + static void stopThreadedTaskMan(TaskManThread * tmt) { + tmt->stopMe(0); + tmt->join(); + delete tmt; + } bool stopped() { return m_stopped; } + template<class T> + static T * registerTask(T * t, Task * stick = NULL) { TaskMan::iRegisterTask(t, stick, NULL); return t; } + template<class T> + static T * registerTask(T * t, Events::TaskEvent * event) { TaskMan::iRegisterTask(t, NULL, event); return t; } + private: - static void registerTask(Task * t, Task * stick); + static void iRegisterTask(Task * t, Task * stick, Events::TaskEvent * event); static void registerAsyncOp(AsyncOperation * op); void * getStack(); void freeStack(void * stack); @@ -51,8 +80,6 @@ class TaskMan { friend class Task; friend class TaskScheduler; template<class T> - friend T * createTask(T * t, Task * stick = NULL); - template<class T> friend T * createAsyncOp(T * op); struct taskHasher { size_t operator()(const Task * t) const { return reinterpret_cast<uintptr_t>(t); } }; typedef gnu::hash_set<Task *, taskHasher> taskHash_t; @@ -68,9 +95,6 @@ class TaskMan { }; template<class T> -T * createTask(T * t, Task * stick) { TaskMan::registerTask(t, stick); return t; } - -template<class T> T * createAsyncOp(T * op) { TaskMan::registerAsyncOp(op); return op; } }; diff --git a/includes/Threads.h b/includes/Threads.h index 13527d5..0dfce1a 100644 --- a/includes/Threads.h +++ b/includes/Threads.h @@ -5,8 +5,7 @@ namespace Balau { -template<class T> -class Queue; +class QueueBase; class Lock { public: @@ -18,8 +17,7 @@ class Lock { Lock(const Lock &) = delete; Lock & operator=(const Lock &) = delete; pthread_mutex_t m_lock; - template<class T> - friend class Queue; + friend class QueueBase; }; class ScopeLock { @@ -95,56 +93,4 @@ class GlobalThread : public Thread, public AtStart, public AtExit { virtual void doExit() { join(); } }; -template<class T> -class Queue { - public: - Queue() { pthread_cond_init(&m_cond, NULL); } - ~Queue() { while (!isEmpty()) pop(); pthread_cond_destroy(&m_cond); } - void push(T * t) { - ScopeLock sl(m_lock); - Cell * c = new Cell(t); - c->m_prev = m_back; - if (m_back) - m_back->m_next = c; - else - m_front = c; - m_back = c; - pthread_cond_signal(&m_cond); - } - T * pop(bool wait = true) { - ScopeLock sl(m_lock); - while (!m_front && wait) - pthread_cond_wait(&m_cond, &m_lock.m_lock); - Cell * c = m_front; - if (!c) - return NULL; - m_front = c->m_next; - if (m_front) - m_front->m_prev = NULL; - else - m_back = NULL; - T * t = c->m_elem; - delete c; - return t; - } - bool isEmpty() { - ScopeLock sl(m_lock); - return !m_front; - } - private: - Queue(const Queue &) = delete; - Queue & operator=(const Queue &) = delete; - Lock m_lock; - struct Cell { - Cell(T * elem) : m_next(NULL), m_prev(NULL), m_elem(elem) { } - Cell(const Cell &) = delete; - Cell & operator=(const Cell &) = delete; - Cell * m_next, * m_prev; - T * m_elem; - }; - Cell * volatile m_front = NULL; - Cell * volatile m_back = NULL; - pthread_cond_t m_cond; -}; - }; |