summaryrefslogtreecommitdiff
path: root/includes
diff options
context:
space:
mode:
Diffstat (limited to 'includes')
-rw-r--r--includes/Async.h4
-rw-r--r--includes/BString.h2
-rw-r--r--includes/Exceptions.h22
-rw-r--r--includes/Handle.h2
-rw-r--r--includes/LuaTask.h20
-rw-r--r--includes/SimpleMustache.h14
-rw-r--r--includes/Socket.h2
-rw-r--r--includes/StacklessTask.h81
-rw-r--r--includes/Task.h94
-rw-r--r--includes/TaskMan.h42
-rw-r--r--includes/Threads.h58
11 files changed, 243 insertions, 98 deletions
diff --git a/includes/Async.h b/includes/Async.h
index bd6ce7e..15e0ac7 100644
--- a/includes/Async.h
+++ b/includes/Async.h
@@ -23,7 +23,7 @@ class AsyncOperation {
protected:
virtual ~AsyncOperation() { }
private:
- Queue<AsyncOperation> * m_idleQueue = NULL;
+ CQueue<AsyncOperation> * m_idleQueue = NULL;
IdleReadyCallback_t m_idleReadyCallback = NULL;
void * m_idleReadyParam = NULL;
void finalize();
@@ -65,7 +65,7 @@ class AsyncManager : public Thread {
void stopAllWorkers();
virtual void * proc();
struct TLS {
- Queue<AsyncOperation> idleQueue;
+ CQueue<AsyncOperation> idleQueue;
IdleReadyCallback_t idleReadyCallback;
void * idleReadyParam;
};
diff --git a/includes/BString.h b/includes/BString.h
index 29f3d22..09e0d40 100644
--- a/includes/BString.h
+++ b/includes/BString.h
@@ -16,6 +16,8 @@ class String : private std::string {
public:
String() : std::string() { }
String(const char * str) : std::string(str ? str : "") { }
+ template<size_t L>
+ String(const char (&str)[L]) : std::string(str, L - 1) { }
String(const char * str, size_t n) : std::string(str ? str : "", str ? n : 0) { }
String(char c) { set("%c", c); }
String(int32_t i) { set("%i", i); }
diff --git a/includes/Exceptions.h b/includes/Exceptions.h
index 829a5d1..d7856fe 100644
--- a/includes/Exceptions.h
+++ b/includes/Exceptions.h
@@ -74,19 +74,19 @@ static inline void AssertHelperInner(const String & msg, const char * details =
throw GeneralException(msg, details);
}
-static inline void AssertHelper(const String & msg, const char * fmt = NULL, ...) __attribute__((format(printf, 2, 3)));
+static inline void AssertHelper(const String & msg, const char * fmt, ...) __attribute__((format(printf, 2, 3)));
static inline void AssertHelper(const String & msg, const char * fmt, ...) {
- if (fmt) {
- String details;
- va_list ap;
- va_start(ap, fmt);
- details.set(fmt, ap);
- va_end(ap);
- AssertHelperInner(msg, details.to_charp());
- } else {
- AssertHelperInner(msg);
- }
+ String details;
+ va_list ap;
+ va_start(ap, fmt);
+ details.set(fmt, ap);
+ va_end(ap);
+ AssertHelperInner(msg, details.to_charp());
+}
+
+static inline void AssertHelper(const String & msg) {
+ AssertHelperInner(msg);
}
class TestException : public GeneralException {
diff --git a/includes/Handle.h b/includes/Handle.h
index 5634583..7c502ea 100644
--- a/includes/Handle.h
+++ b/includes/Handle.h
@@ -40,7 +40,7 @@ class Handle {
virtual const char * getName() = 0;
virtual ssize_t read(void * buf, size_t count) throw (GeneralException);
virtual ssize_t write(const void * buf, size_t count) throw (GeneralException);
- template <ssize_t L> void writeString(const char str[L]) { writeString(str, L - 1); }
+ template <size_t L> void writeString(const char (&str)[L]) { writeString(str, L - 1); }
void writeString(const char * str, ssize_t len) { if (len < 0) len = strlen(str); forceWrite(str, len); }
void writeString(const String & str) { forceWrite(str.to_charp(), str.strlen()); }
void seek(off_t offset, int whence = SEEK_SET) { rseek(offset, whence); }
diff --git a/includes/LuaTask.h b/includes/LuaTask.h
index 48cef97..37d64fa 100644
--- a/includes/LuaTask.h
+++ b/includes/LuaTask.h
@@ -11,7 +11,7 @@ class LuaMainTask;
class LuaExecCell {
public:
- LuaExecCell() : m_detached(false) { }
+ LuaExecCell();
virtual ~LuaExecCell() { }
void detach() { m_detached = true; }
void exec(LuaMainTask * mainTask);
@@ -19,7 +19,7 @@ class LuaExecCell {
virtual void run(Lua &) = 0;
private:
Events::Async m_event;
- bool m_detached;
+ bool m_detached = false;
friend class LuaTask;
};
@@ -31,6 +31,14 @@ class LuaExecString : public LuaExecCell {
String m_str;
};
+class LuaExecFile : public LuaExecCell {
+ public:
+ LuaExecFile(IO<Handle> file) : m_file(file) { }
+ private:
+ virtual void run(Lua &);
+ IO<Handle> m_file;
+};
+
class LuaTask : public Task {
public:
~LuaTask() { L.weaken(); }
@@ -45,17 +53,15 @@ class LuaTask : public Task {
class LuaMainTask : public Task {
public:
- LuaMainTask() : m_stopping(false) { }
- ~LuaMainTask() { L.close(); }
+ LuaMainTask();
+ ~LuaMainTask();
void stop();
virtual const char * getName() const { return "LuaMainTask"; }
private:
void exec(LuaExecCell * cell);
virtual void Do();
Lua L;
- Events::Async m_queueEvent;
- Queue<LuaExecCell> m_queue;
- bool m_stopping;
+ TQueue<LuaExecCell> m_queue;
friend class LuaExecCell;
};
diff --git a/includes/SimpleMustache.h b/includes/SimpleMustache.h
index 9d16377..28cb477 100644
--- a/includes/SimpleMustache.h
+++ b/includes/SimpleMustache.h
@@ -80,11 +80,13 @@ class SimpleMustache {
IO<Buffer> b(new Buffer(str, s));
setTemplate(b);
}
- void setTemplate(const char * str, ssize_t s = -1) { setTemplate((const uint8_t *) str, s); }
+ template<size_t S>
+ void setTemplate(const char (&str)[S]) { setTemplate((const uint8_t *) str, S - 1); }
+ void setTemplate(const char * str, ssize_t s) { setTemplate((const uint8_t *) str, s); }
void setTemplate(const String & str) { setTemplate((const uint8_t *) str.to_charp(), str.strlen()); }
- void render(IO<Handle> h, Context * ctx) { AAssert(ctx, "Please pass on a context to render"); render_r(h, ctx, "", m_fragments.begin(), false, -1); }
+ void render(IO<Handle> h, Context * ctx) const { AAssert(ctx, "Please pass on a context to render"); render_r(h, ctx, "", m_fragments.begin(), false, -1); }
void empty() { while (!m_fragments.empty()) { delete m_fragments.front(); m_fragments.pop_front(); } }
- void checkTemplate() { Fragments::iterator end = checkTemplate_r(m_fragments.begin()); AAssert(end == m_fragments.end(), "The template wasn't fully checked; possibly mismatched sections"); }
+ void checkTemplate() { Fragments::const_iterator end = checkTemplate_r(m_fragments.begin()); AAssert(end == m_fragments.end(), "The template wasn't fully checked; possibly mismatched sections"); }
~SimpleMustache() { empty(); }
private:
struct Fragment {
@@ -102,10 +104,10 @@ class SimpleMustache {
typedef std::list<Fragment *> Fragments;
Fragments m_fragments;
- Fragments::iterator render_r(IO<Handle> h, Context * ctx, const String & endSection, Fragments::iterator begin, bool noWrite, int forceIdx);
- String escape(const String & s);
+ Fragments::const_iterator render_r(IO<Handle> h, Context * ctx, const String & endSection, Fragments::const_iterator begin, bool noWrite, int forceIdx) const;
+ static String escape(const String & s);
- Fragments::iterator checkTemplate_r(Fragments::iterator begin, const String & endSection = "");
+ Fragments::const_iterator checkTemplate_r(Fragments::const_iterator begin, const String & endSection = "") const;
};
};
diff --git a/includes/Socket.h b/includes/Socket.h
index 981a385..e5a9f21 100644
--- a/includes/Socket.h
+++ b/includes/Socket.h
@@ -81,7 +81,7 @@ class Listener : public ListenerBase {
public:
Listener(int port, const char * local = "", void * opaque = NULL) : ListenerBase(port, local, opaque) { }
protected:
- virtual void factory(IO<Socket> & io, void * opaque) { createTask(new Worker(io, opaque)); }
+ virtual void factory(IO<Socket> & io, void * opaque) { TaskMan::registerTask(new Worker(io, opaque)); }
virtual void setName() { m_name = String(ClassName(this).c_str()) + " - " + m_listener->getName(); }
};
diff --git a/includes/StacklessTask.h b/includes/StacklessTask.h
new file mode 100644
index 0000000..bb18a7a
--- /dev/null
+++ b/includes/StacklessTask.h
@@ -0,0 +1,81 @@
+#pragma once
+
+#include <Task.h>
+
+namespace Balau {
+
+class StacklessTask : public Task {
+ public:
+ StacklessTask() : m_state(0) { setStackless(); }
+ protected:
+ void taskSwitch() throw (GeneralException) { throw TaskSwitch(); }
+ unsigned int m_state;
+};
+
+};
+
+#define StacklessBegin() \
+ switch(m_state) { \
+ case 0: { \
+
+
+#define StacklessOperation(operation) \
+ m_state = __LINE__; \
+ } \
+ case __LINE__: { \
+ try { \
+ operation; \
+ } \
+ catch (Balau::EAgain & e) { \
+ taskSwitch(); \
+ } \
+
+
+#define StacklessOperationOrCond(operation, cond) \
+ m_state = __LINE__; \
+ } \
+ case __LINE__: { \
+ try { \
+ if (!(cond)) { \
+ operation; \
+ } \
+ } \
+ catch (Balau::EAgain & e) { \
+ taskSwitch(); \
+ } \
+
+
+#define StacklessWaitFor(evt) \
+ m_state = __LINE__; \
+ waitFor(evt); \
+ taskSwitch(); \
+ } \
+ case __LINE__: { \
+
+
+#define StacklessWaitCond(cond) \
+ m_state = __LINE__; \
+ } \
+ case __LINE__: { \
+ if (!(cond)) \
+ taskSwitch(); \
+
+
+#define StacklessYield() \
+ m_state = __LINE__; \
+ try { \
+ yield(true); \
+ } \
+ catch (Balau::EAgain & e) { \
+ taskSwitch(); \
+ } \
+ } \
+ case __LINE__: { \
+
+
+#define StacklessEnd() \
+ break; \
+ } \
+ default: \
+ AssertHelper("unknown state", "State %i is out of range in task %s at %p", m_state, getName(), this); \
+ }
diff --git a/includes/Task.h b/includes/Task.h
index b3f395a..86bbce1 100644
--- a/includes/Task.h
+++ b/includes/Task.h
@@ -21,6 +21,11 @@ class EAgain : public GeneralException {
Events::BaseEvent * m_evt;
};
+class TaskSwitch : public GeneralException {
+ public:
+ TaskSwitch() : GeneralException("Task Switch") { }
+};
+
class TaskMan;
class Task;
@@ -64,6 +69,7 @@ class BaseEvent {
class Timeout : public BaseEvent {
public:
+ Timeout() { }
Timeout(ev_tstamp tstamp) { set(tstamp); }
virtual ~Timeout() { m_evt.stop(); }
void evt_cb(ev::timer & w, int revents) { doSignal(); }
@@ -75,11 +81,12 @@ class Timeout : public BaseEvent {
class TaskEvent : public BaseEvent {
public:
- TaskEvent(Task * taskWaited);
+ TaskEvent(Task * taskWaited = NULL);
virtual ~TaskEvent();
void ack();
void signal();
Task * taskWaited() { return m_taskWaited; }
+ void attachToTask(Task * taskWaited);
void evt_cb(ev::async & w, int revents) { doSignal(); }
protected:
virtual void gotOwner(Task * task);
@@ -121,11 +128,22 @@ class Task {
enum Status {
STARTING,
RUNNING,
- IDLE,
+ SLEEPING,
STOPPED,
FAULTED,
YIELDED,
};
+ static const char * StatusToString(enum Status status) {
+ static const char * strs[] = {
+ "STARTING",
+ "RUNNING",
+ "SLEEPING",
+ "STOPPED",
+ "FAULTED",
+ "YIELDED",
+ };
+ return strs[status];
+ };
Task();
virtual ~Task();
virtual const char * getName() const = 0;
@@ -135,18 +153,37 @@ class Task {
Task * t = getCurrentTask();
t->waitFor(evt);
}
- static void yield(Events::BaseEvent * evt, bool interruptible = false) throw (GeneralException);
+ enum OperationYieldType {
+ SIMPLE,
+ INTERRUPTIBLE,
+ STACKLESS,
+ };
+ static void operationYield(Events::BaseEvent * evt = NULL, enum OperationYieldType yieldType = SIMPLE) throw (GeneralException);
TaskMan * getTaskMan() const { return m_taskMan; }
struct ev_loop * getLoop();
+ bool isStackless() { return m_stackless; }
protected:
- void yield(bool changeStatus = false);
+ void yield(bool stillRunning = false) throw (GeneralException);
+ void yield(Events::BaseEvent * evt) {
+ waitFor(evt);
+ yield();
+ }
virtual void Do() = 0;
void waitFor(Events::BaseEvent * event);
bool setOkayToEAgain(bool enable) {
+ if (m_stackless) {
+ AAssert(enable, "You can't make a task go not-okay-to-eagain if it's stackless.");
+ }
bool oldValue = m_okayToEAgain;
m_okayToEAgain = enable;
return oldValue;
}
+ void setStackless() {
+ AAssert(!m_stackless, "Can't set a task to be stackless twice");
+ AAssert(m_status == STARTING, "Can't set a task to be stackless after it started. status = %s", StatusToString(m_status));
+ m_stackless = true;
+ m_okayToEAgain = true;
+ }
private:
static size_t stackSize() { return 64 * 1024; }
void setup(TaskMan * taskMan, void * stack);
@@ -168,7 +205,54 @@ class Task {
Lock m_eventLock;
typedef std::list<Events::TaskEvent *> waitedByList_t;
waitedByList_t m_waitedBy;
- bool m_okayToEAgain;
+ bool m_okayToEAgain, m_stackless;
+};
+
+class QueueBase {
+ public:
+ bool isEmpty() { ScopeLock sl(m_lock); return !m_front; }
+ protected:
+ QueueBase() { pthread_cond_init(&m_cond, NULL); }
+ ~QueueBase() { while (!isEmpty()) iPop(NULL, false); pthread_cond_destroy(&m_cond); }
+ void iPush(void * t, Events::Async * event);
+ void * iPop(Events::Async * event, bool wait);
+
+ private:
+ QueueBase(const QueueBase &) = delete;
+ QueueBase & operator=(const QueueBase &) = delete;
+ Lock m_lock;
+ struct Cell {
+ Cell(void * elem) : m_elem(elem) { }
+ Cell(const Cell &) = delete;
+ Cell & operator=(const Cell &) = delete;
+ Cell * m_next = NULL, * m_prev = NULL;
+ void * m_elem;
+ };
+ Cell * m_front = NULL, * m_back = NULL;
+ pthread_cond_t m_cond;
+};
+
+template<class T>
+class Queue : public QueueBase {
+ public:
+ void push(T * t) { iPush(t, NULL); }
+ T * pop() { return (T *) iPop(NULL, true); }
+};
+
+template<class T>
+class TQueue : public QueueBase {
+ public:
+ void push(T * t) { iPush(t, &m_event); }
+ T * pop() { return (T *) iPop(&m_event, true); }
+ private:
+ Events::Async m_event;
+};
+
+template<class T>
+class CQueue : public QueueBase {
+ public:
+ void push(T * t) { iPush(t, NULL); }
+ T * pop() { return (T *) iPop(NULL, false); }
};
};
diff --git a/includes/TaskMan.h b/includes/TaskMan.h
index d39598f..2b2742d 100644
--- a/includes/TaskMan.h
+++ b/includes/TaskMan.h
@@ -10,16 +10,31 @@
#include <Async.h>
#include <Threads.h>
#include <Exceptions.h>
+#include <Task.h>
namespace gnu = __gnu_cxx;
namespace Balau {
-class Task;
class TaskScheduler;
+namespace Events {
+
+class TaskEvent;
+
+};
+
class TaskMan {
public:
+ class TaskManThread : public Thread {
+ public:
+ virtual ~TaskManThread();
+ virtual void * proc();
+ void stopMe(int code = 0) { m_taskMan->stopMe(code); }
+ private:
+ TaskMan * m_taskMan;
+ };
+
TaskMan();
~TaskMan();
int mainLoop();
@@ -27,11 +42,25 @@ class TaskMan {
struct ev_loop * getLoop() { return m_loop; }
void signalTask(Task * t);
static void stop(int code);
- void stopMe(int code) { m_stopped = true; m_stopCode = code; }
- static Thread * createThreadedTaskMan();
+ void stopMe(int code = 0);
+ static TaskManThread * createThreadedTaskMan() {
+ TaskManThread * r = new TaskManThread();
+ r->threadStart();
+ return r;
+ }
+ static void stopThreadedTaskMan(TaskManThread * tmt) {
+ tmt->stopMe(0);
+ tmt->join();
+ delete tmt;
+ }
bool stopped() { return m_stopped; }
+ template<class T>
+ static T * registerTask(T * t, Task * stick = NULL) { TaskMan::iRegisterTask(t, stick, NULL); return t; }
+ template<class T>
+ static T * registerTask(T * t, Events::TaskEvent * event) { TaskMan::iRegisterTask(t, NULL, event); return t; }
+
private:
- static void registerTask(Task * t, Task * stick);
+ static void iRegisterTask(Task * t, Task * stick, Events::TaskEvent * event);
static void registerAsyncOp(AsyncOperation * op);
void * getStack();
void freeStack(void * stack);
@@ -51,8 +80,6 @@ class TaskMan {
friend class Task;
friend class TaskScheduler;
template<class T>
- friend T * createTask(T * t, Task * stick = NULL);
- template<class T>
friend T * createAsyncOp(T * op);
struct taskHasher { size_t operator()(const Task * t) const { return reinterpret_cast<uintptr_t>(t); } };
typedef gnu::hash_set<Task *, taskHasher> taskHash_t;
@@ -68,9 +95,6 @@ class TaskMan {
};
template<class T>
-T * createTask(T * t, Task * stick) { TaskMan::registerTask(t, stick); return t; }
-
-template<class T>
T * createAsyncOp(T * op) { TaskMan::registerAsyncOp(op); return op; }
};
diff --git a/includes/Threads.h b/includes/Threads.h
index 13527d5..0dfce1a 100644
--- a/includes/Threads.h
+++ b/includes/Threads.h
@@ -5,8 +5,7 @@
namespace Balau {
-template<class T>
-class Queue;
+class QueueBase;
class Lock {
public:
@@ -18,8 +17,7 @@ class Lock {
Lock(const Lock &) = delete;
Lock & operator=(const Lock &) = delete;
pthread_mutex_t m_lock;
- template<class T>
- friend class Queue;
+ friend class QueueBase;
};
class ScopeLock {
@@ -95,56 +93,4 @@ class GlobalThread : public Thread, public AtStart, public AtExit {
virtual void doExit() { join(); }
};
-template<class T>
-class Queue {
- public:
- Queue() { pthread_cond_init(&m_cond, NULL); }
- ~Queue() { while (!isEmpty()) pop(); pthread_cond_destroy(&m_cond); }
- void push(T * t) {
- ScopeLock sl(m_lock);
- Cell * c = new Cell(t);
- c->m_prev = m_back;
- if (m_back)
- m_back->m_next = c;
- else
- m_front = c;
- m_back = c;
- pthread_cond_signal(&m_cond);
- }
- T * pop(bool wait = true) {
- ScopeLock sl(m_lock);
- while (!m_front && wait)
- pthread_cond_wait(&m_cond, &m_lock.m_lock);
- Cell * c = m_front;
- if (!c)
- return NULL;
- m_front = c->m_next;
- if (m_front)
- m_front->m_prev = NULL;
- else
- m_back = NULL;
- T * t = c->m_elem;
- delete c;
- return t;
- }
- bool isEmpty() {
- ScopeLock sl(m_lock);
- return !m_front;
- }
- private:
- Queue(const Queue &) = delete;
- Queue & operator=(const Queue &) = delete;
- Lock m_lock;
- struct Cell {
- Cell(T * elem) : m_next(NULL), m_prev(NULL), m_elem(elem) { }
- Cell(const Cell &) = delete;
- Cell & operator=(const Cell &) = delete;
- Cell * m_next, * m_prev;
- T * m_elem;
- };
- Cell * volatile m_front = NULL;
- Cell * volatile m_back = NULL;
- pthread_cond_t m_cond;
-};
-
};