summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--includes/Async.h4
-rw-r--r--includes/BString.h2
-rw-r--r--includes/Exceptions.h22
-rw-r--r--includes/Handle.h2
-rw-r--r--includes/LuaTask.h20
-rw-r--r--includes/SimpleMustache.h14
-rw-r--r--includes/Socket.h2
-rw-r--r--includes/StacklessTask.h81
-rw-r--r--includes/Task.h94
-rw-r--r--includes/TaskMan.h42
-rw-r--r--includes/Threads.h58
-rw-r--r--src/Async.cc6
-rw-r--r--src/Handle.cc6
-rw-r--r--src/HttpServer.cc12
-rw-r--r--src/Input.cc8
-rw-r--r--src/LuaTask.cc58
-rw-r--r--src/Main.cc2
-rw-r--r--src/Output.cc8
-rw-r--r--src/SimpleMustache.cc12
-rw-r--r--src/Socket.cc45
-rw-r--r--src/Task.cc160
-rw-r--r--src/TaskMan.cc102
-rw-r--r--src/Threads.cc47
-rw-r--r--src/ZHandle.cc4
-rw-r--r--tests/test-Http.cc138
-rw-r--r--tests/test-Sockets.cc11
-rw-r--r--tests/test-Tasks.cc47
27 files changed, 773 insertions, 234 deletions
diff --git a/includes/Async.h b/includes/Async.h
index bd6ce7e..15e0ac7 100644
--- a/includes/Async.h
+++ b/includes/Async.h
@@ -23,7 +23,7 @@ class AsyncOperation {
protected:
virtual ~AsyncOperation() { }
private:
- Queue<AsyncOperation> * m_idleQueue = NULL;
+ CQueue<AsyncOperation> * m_idleQueue = NULL;
IdleReadyCallback_t m_idleReadyCallback = NULL;
void * m_idleReadyParam = NULL;
void finalize();
@@ -65,7 +65,7 @@ class AsyncManager : public Thread {
void stopAllWorkers();
virtual void * proc();
struct TLS {
- Queue<AsyncOperation> idleQueue;
+ CQueue<AsyncOperation> idleQueue;
IdleReadyCallback_t idleReadyCallback;
void * idleReadyParam;
};
diff --git a/includes/BString.h b/includes/BString.h
index 29f3d22..09e0d40 100644
--- a/includes/BString.h
+++ b/includes/BString.h
@@ -16,6 +16,8 @@ class String : private std::string {
public:
String() : std::string() { }
String(const char * str) : std::string(str ? str : "") { }
+ template<size_t L>
+ String(const char (&str)[L]) : std::string(str, L - 1) { }
String(const char * str, size_t n) : std::string(str ? str : "", str ? n : 0) { }
String(char c) { set("%c", c); }
String(int32_t i) { set("%i", i); }
diff --git a/includes/Exceptions.h b/includes/Exceptions.h
index 829a5d1..d7856fe 100644
--- a/includes/Exceptions.h
+++ b/includes/Exceptions.h
@@ -74,19 +74,19 @@ static inline void AssertHelperInner(const String & msg, const char * details =
throw GeneralException(msg, details);
}
-static inline void AssertHelper(const String & msg, const char * fmt = NULL, ...) __attribute__((format(printf, 2, 3)));
+static inline void AssertHelper(const String & msg, const char * fmt, ...) __attribute__((format(printf, 2, 3)));
static inline void AssertHelper(const String & msg, const char * fmt, ...) {
- if (fmt) {
- String details;
- va_list ap;
- va_start(ap, fmt);
- details.set(fmt, ap);
- va_end(ap);
- AssertHelperInner(msg, details.to_charp());
- } else {
- AssertHelperInner(msg);
- }
+ String details;
+ va_list ap;
+ va_start(ap, fmt);
+ details.set(fmt, ap);
+ va_end(ap);
+ AssertHelperInner(msg, details.to_charp());
+}
+
+static inline void AssertHelper(const String & msg) {
+ AssertHelperInner(msg);
}
class TestException : public GeneralException {
diff --git a/includes/Handle.h b/includes/Handle.h
index 5634583..7c502ea 100644
--- a/includes/Handle.h
+++ b/includes/Handle.h
@@ -40,7 +40,7 @@ class Handle {
virtual const char * getName() = 0;
virtual ssize_t read(void * buf, size_t count) throw (GeneralException);
virtual ssize_t write(const void * buf, size_t count) throw (GeneralException);
- template <ssize_t L> void writeString(const char str[L]) { writeString(str, L - 1); }
+ template <size_t L> void writeString(const char (&str)[L]) { writeString(str, L - 1); }
void writeString(const char * str, ssize_t len) { if (len < 0) len = strlen(str); forceWrite(str, len); }
void writeString(const String & str) { forceWrite(str.to_charp(), str.strlen()); }
void seek(off_t offset, int whence = SEEK_SET) { rseek(offset, whence); }
diff --git a/includes/LuaTask.h b/includes/LuaTask.h
index 48cef97..37d64fa 100644
--- a/includes/LuaTask.h
+++ b/includes/LuaTask.h
@@ -11,7 +11,7 @@ class LuaMainTask;
class LuaExecCell {
public:
- LuaExecCell() : m_detached(false) { }
+ LuaExecCell();
virtual ~LuaExecCell() { }
void detach() { m_detached = true; }
void exec(LuaMainTask * mainTask);
@@ -19,7 +19,7 @@ class LuaExecCell {
virtual void run(Lua &) = 0;
private:
Events::Async m_event;
- bool m_detached;
+ bool m_detached = false;
friend class LuaTask;
};
@@ -31,6 +31,14 @@ class LuaExecString : public LuaExecCell {
String m_str;
};
+class LuaExecFile : public LuaExecCell {
+ public:
+ LuaExecFile(IO<Handle> file) : m_file(file) { }
+ private:
+ virtual void run(Lua &);
+ IO<Handle> m_file;
+};
+
class LuaTask : public Task {
public:
~LuaTask() { L.weaken(); }
@@ -45,17 +53,15 @@ class LuaTask : public Task {
class LuaMainTask : public Task {
public:
- LuaMainTask() : m_stopping(false) { }
- ~LuaMainTask() { L.close(); }
+ LuaMainTask();
+ ~LuaMainTask();
void stop();
virtual const char * getName() const { return "LuaMainTask"; }
private:
void exec(LuaExecCell * cell);
virtual void Do();
Lua L;
- Events::Async m_queueEvent;
- Queue<LuaExecCell> m_queue;
- bool m_stopping;
+ TQueue<LuaExecCell> m_queue;
friend class LuaExecCell;
};
diff --git a/includes/SimpleMustache.h b/includes/SimpleMustache.h
index 9d16377..28cb477 100644
--- a/includes/SimpleMustache.h
+++ b/includes/SimpleMustache.h
@@ -80,11 +80,13 @@ class SimpleMustache {
IO<Buffer> b(new Buffer(str, s));
setTemplate(b);
}
- void setTemplate(const char * str, ssize_t s = -1) { setTemplate((const uint8_t *) str, s); }
+ template<size_t S>
+ void setTemplate(const char (&str)[S]) { setTemplate((const uint8_t *) str, S - 1); }
+ void setTemplate(const char * str, ssize_t s) { setTemplate((const uint8_t *) str, s); }
void setTemplate(const String & str) { setTemplate((const uint8_t *) str.to_charp(), str.strlen()); }
- void render(IO<Handle> h, Context * ctx) { AAssert(ctx, "Please pass on a context to render"); render_r(h, ctx, "", m_fragments.begin(), false, -1); }
+ void render(IO<Handle> h, Context * ctx) const { AAssert(ctx, "Please pass on a context to render"); render_r(h, ctx, "", m_fragments.begin(), false, -1); }
void empty() { while (!m_fragments.empty()) { delete m_fragments.front(); m_fragments.pop_front(); } }
- void checkTemplate() { Fragments::iterator end = checkTemplate_r(m_fragments.begin()); AAssert(end == m_fragments.end(), "The template wasn't fully checked; possibly mismatched sections"); }
+ void checkTemplate() { Fragments::const_iterator end = checkTemplate_r(m_fragments.begin()); AAssert(end == m_fragments.end(), "The template wasn't fully checked; possibly mismatched sections"); }
~SimpleMustache() { empty(); }
private:
struct Fragment {
@@ -102,10 +104,10 @@ class SimpleMustache {
typedef std::list<Fragment *> Fragments;
Fragments m_fragments;
- Fragments::iterator render_r(IO<Handle> h, Context * ctx, const String & endSection, Fragments::iterator begin, bool noWrite, int forceIdx);
- String escape(const String & s);
+ Fragments::const_iterator render_r(IO<Handle> h, Context * ctx, const String & endSection, Fragments::const_iterator begin, bool noWrite, int forceIdx) const;
+ static String escape(const String & s);
- Fragments::iterator checkTemplate_r(Fragments::iterator begin, const String & endSection = "");
+ Fragments::const_iterator checkTemplate_r(Fragments::const_iterator begin, const String & endSection = "") const;
};
};
diff --git a/includes/Socket.h b/includes/Socket.h
index 981a385..e5a9f21 100644
--- a/includes/Socket.h
+++ b/includes/Socket.h
@@ -81,7 +81,7 @@ class Listener : public ListenerBase {
public:
Listener(int port, const char * local = "", void * opaque = NULL) : ListenerBase(port, local, opaque) { }
protected:
- virtual void factory(IO<Socket> & io, void * opaque) { createTask(new Worker(io, opaque)); }
+ virtual void factory(IO<Socket> & io, void * opaque) { TaskMan::registerTask(new Worker(io, opaque)); }
virtual void setName() { m_name = String(ClassName(this).c_str()) + " - " + m_listener->getName(); }
};
diff --git a/includes/StacklessTask.h b/includes/StacklessTask.h
new file mode 100644
index 0000000..bb18a7a
--- /dev/null
+++ b/includes/StacklessTask.h
@@ -0,0 +1,81 @@
+#pragma once
+
+#include <Task.h>
+
+namespace Balau {
+
+class StacklessTask : public Task {
+ public:
+ StacklessTask() : m_state(0) { setStackless(); }
+ protected:
+ void taskSwitch() throw (GeneralException) { throw TaskSwitch(); }
+ unsigned int m_state;
+};
+
+};
+
+#define StacklessBegin() \
+ switch(m_state) { \
+ case 0: { \
+
+
+#define StacklessOperation(operation) \
+ m_state = __LINE__; \
+ } \
+ case __LINE__: { \
+ try { \
+ operation; \
+ } \
+ catch (Balau::EAgain & e) { \
+ taskSwitch(); \
+ } \
+
+
+#define StacklessOperationOrCond(operation, cond) \
+ m_state = __LINE__; \
+ } \
+ case __LINE__: { \
+ try { \
+ if (!(cond)) { \
+ operation; \
+ } \
+ } \
+ catch (Balau::EAgain & e) { \
+ taskSwitch(); \
+ } \
+
+
+#define StacklessWaitFor(evt) \
+ m_state = __LINE__; \
+ waitFor(evt); \
+ taskSwitch(); \
+ } \
+ case __LINE__: { \
+
+
+#define StacklessWaitCond(cond) \
+ m_state = __LINE__; \
+ } \
+ case __LINE__: { \
+ if (!(cond)) \
+ taskSwitch(); \
+
+
+#define StacklessYield() \
+ m_state = __LINE__; \
+ try { \
+ yield(true); \
+ } \
+ catch (Balau::EAgain & e) { \
+ taskSwitch(); \
+ } \
+ } \
+ case __LINE__: { \
+
+
+#define StacklessEnd() \
+ break; \
+ } \
+ default: \
+ AssertHelper("unknown state", "State %i is out of range in task %s at %p", m_state, getName(), this); \
+ }
diff --git a/includes/Task.h b/includes/Task.h
index b3f395a..86bbce1 100644
--- a/includes/Task.h
+++ b/includes/Task.h
@@ -21,6 +21,11 @@ class EAgain : public GeneralException {
Events::BaseEvent * m_evt;
};
+class TaskSwitch : public GeneralException {
+ public:
+ TaskSwitch() : GeneralException("Task Switch") { }
+};
+
class TaskMan;
class Task;
@@ -64,6 +69,7 @@ class BaseEvent {
class Timeout : public BaseEvent {
public:
+ Timeout() { }
Timeout(ev_tstamp tstamp) { set(tstamp); }
virtual ~Timeout() { m_evt.stop(); }
void evt_cb(ev::timer & w, int revents) { doSignal(); }
@@ -75,11 +81,12 @@ class Timeout : public BaseEvent {
class TaskEvent : public BaseEvent {
public:
- TaskEvent(Task * taskWaited);
+ TaskEvent(Task * taskWaited = NULL);
virtual ~TaskEvent();
void ack();
void signal();
Task * taskWaited() { return m_taskWaited; }
+ void attachToTask(Task * taskWaited);
void evt_cb(ev::async & w, int revents) { doSignal(); }
protected:
virtual void gotOwner(Task * task);
@@ -121,11 +128,22 @@ class Task {
enum Status {
STARTING,
RUNNING,
- IDLE,
+ SLEEPING,
STOPPED,
FAULTED,
YIELDED,
};
+ static const char * StatusToString(enum Status status) {
+ static const char * strs[] = {
+ "STARTING",
+ "RUNNING",
+ "SLEEPING",
+ "STOPPED",
+ "FAULTED",
+ "YIELDED",
+ };
+ return strs[status];
+ };
Task();
virtual ~Task();
virtual const char * getName() const = 0;
@@ -135,18 +153,37 @@ class Task {
Task * t = getCurrentTask();
t->waitFor(evt);
}
- static void yield(Events::BaseEvent * evt, bool interruptible = false) throw (GeneralException);
+ enum OperationYieldType {
+ SIMPLE,
+ INTERRUPTIBLE,
+ STACKLESS,
+ };
+ static void operationYield(Events::BaseEvent * evt = NULL, enum OperationYieldType yieldType = SIMPLE) throw (GeneralException);
TaskMan * getTaskMan() const { return m_taskMan; }
struct ev_loop * getLoop();
+ bool isStackless() { return m_stackless; }
protected:
- void yield(bool changeStatus = false);
+ void yield(bool stillRunning = false) throw (GeneralException);
+ void yield(Events::BaseEvent * evt) {
+ waitFor(evt);
+ yield();
+ }
virtual void Do() = 0;
void waitFor(Events::BaseEvent * event);
bool setOkayToEAgain(bool enable) {
+ if (m_stackless) {
+ AAssert(enable, "You can't make a task go not-okay-to-eagain if it's stackless.");
+ }
bool oldValue = m_okayToEAgain;
m_okayToEAgain = enable;
return oldValue;
}
+ void setStackless() {
+ AAssert(!m_stackless, "Can't set a task to be stackless twice");
+ AAssert(m_status == STARTING, "Can't set a task to be stackless after it started. status = %s", StatusToString(m_status));
+ m_stackless = true;
+ m_okayToEAgain = true;
+ }
private:
static size_t stackSize() { return 64 * 1024; }
void setup(TaskMan * taskMan, void * stack);
@@ -168,7 +205,54 @@ class Task {
Lock m_eventLock;
typedef std::list<Events::TaskEvent *> waitedByList_t;
waitedByList_t m_waitedBy;
- bool m_okayToEAgain;
+ bool m_okayToEAgain, m_stackless;
+};
+
+class QueueBase {
+ public:
+ bool isEmpty() { ScopeLock sl(m_lock); return !m_front; }
+ protected:
+ QueueBase() { pthread_cond_init(&m_cond, NULL); }
+ ~QueueBase() { while (!isEmpty()) iPop(NULL, false); pthread_cond_destroy(&m_cond); }
+ void iPush(void * t, Events::Async * event);
+ void * iPop(Events::Async * event, bool wait);
+
+ private:
+ QueueBase(const QueueBase &) = delete;
+ QueueBase & operator=(const QueueBase &) = delete;
+ Lock m_lock;
+ struct Cell {
+ Cell(void * elem) : m_elem(elem) { }
+ Cell(const Cell &) = delete;
+ Cell & operator=(const Cell &) = delete;
+ Cell * m_next = NULL, * m_prev = NULL;
+ void * m_elem;
+ };
+ Cell * m_front = NULL, * m_back = NULL;
+ pthread_cond_t m_cond;
+};
+
+template<class T>
+class Queue : public QueueBase {
+ public:
+ void push(T * t) { iPush(t, NULL); }
+ T * pop() { return (T *) iPop(NULL, true); }
+};
+
+template<class T>
+class TQueue : public QueueBase {
+ public:
+ void push(T * t) { iPush(t, &m_event); }
+ T * pop() { return (T *) iPop(&m_event, true); }
+ private:
+ Events::Async m_event;
+};
+
+template<class T>
+class CQueue : public QueueBase {
+ public:
+ void push(T * t) { iPush(t, NULL); }
+ T * pop() { return (T *) iPop(NULL, false); }
};
};
diff --git a/includes/TaskMan.h b/includes/TaskMan.h
index d39598f..2b2742d 100644
--- a/includes/TaskMan.h
+++ b/includes/TaskMan.h
@@ -10,16 +10,31 @@
#include <Async.h>
#include <Threads.h>
#include <Exceptions.h>
+#include <Task.h>
namespace gnu = __gnu_cxx;
namespace Balau {
-class Task;
class TaskScheduler;
+namespace Events {
+
+class TaskEvent;
+
+};
+
class TaskMan {
public:
+ class TaskManThread : public Thread {
+ public:
+ virtual ~TaskManThread();
+ virtual void * proc();
+ void stopMe(int code = 0) { m_taskMan->stopMe(code); }
+ private:
+ TaskMan * m_taskMan;
+ };
+
TaskMan();
~TaskMan();
int mainLoop();
@@ -27,11 +42,25 @@ class TaskMan {
struct ev_loop * getLoop() { return m_loop; }
void signalTask(Task * t);
static void stop(int code);
- void stopMe(int code) { m_stopped = true; m_stopCode = code; }
- static Thread * createThreadedTaskMan();
+ void stopMe(int code = 0);
+ static TaskManThread * createThreadedTaskMan() {
+ TaskManThread * r = new TaskManThread();
+ r->threadStart();
+ return r;
+ }
+ static void stopThreadedTaskMan(TaskManThread * tmt) {
+ tmt->stopMe(0);
+ tmt->join();
+ delete tmt;
+ }
bool stopped() { return m_stopped; }
+ template<class T>
+ static T * registerTask(T * t, Task * stick = NULL) { TaskMan::iRegisterTask(t, stick, NULL); return t; }
+ template<class T>
+ static T * registerTask(T * t, Events::TaskEvent * event) { TaskMan::iRegisterTask(t, NULL, event); return t; }
+
private:
- static void registerTask(Task * t, Task * stick);
+ static void iRegisterTask(Task * t, Task * stick, Events::TaskEvent * event);
static void registerAsyncOp(AsyncOperation * op);
void * getStack();
void freeStack(void * stack);
@@ -51,8 +80,6 @@ class TaskMan {
friend class Task;
friend class TaskScheduler;
template<class T>
- friend T * createTask(T * t, Task * stick = NULL);
- template<class T>
friend T * createAsyncOp(T * op);
struct taskHasher { size_t operator()(const Task * t) const { return reinterpret_cast<uintptr_t>(t); } };
typedef gnu::hash_set<Task *, taskHasher> taskHash_t;
@@ -68,9 +95,6 @@ class TaskMan {
};
template<class T>
-T * createTask(T * t, Task * stick) { TaskMan::registerTask(t, stick); return t; }
-
-template<class T>
T * createAsyncOp(T * op) { TaskMan::registerAsyncOp(op); return op; }
};
diff --git a/includes/Threads.h b/includes/Threads.h
index 13527d5..0dfce1a 100644
--- a/includes/Threads.h
+++ b/includes/Threads.h
@@ -5,8 +5,7 @@
namespace Balau {
-template<class T>
-class Queue;
+class QueueBase;
class Lock {
public:
@@ -18,8 +17,7 @@ class Lock {
Lock(const Lock &) = delete;
Lock & operator=(const Lock &) = delete;
pthread_mutex_t m_lock;
- template<class T>
- friend class Queue;
+ friend class QueueBase;
};
class ScopeLock {
@@ -95,56 +93,4 @@ class GlobalThread : public Thread, public AtStart, public AtExit {
virtual void doExit() { join(); }
};
-template<class T>
-class Queue {
- public:
- Queue() { pthread_cond_init(&m_cond, NULL); }
- ~Queue() { while (!isEmpty()) pop(); pthread_cond_destroy(&m_cond); }
- void push(T * t) {
- ScopeLock sl(m_lock);
- Cell * c = new Cell(t);
- c->m_prev = m_back;
- if (m_back)
- m_back->m_next = c;
- else
- m_front = c;
- m_back = c;
- pthread_cond_signal(&m_cond);
- }
- T * pop(bool wait = true) {
- ScopeLock sl(m_lock);
- while (!m_front && wait)
- pthread_cond_wait(&m_cond, &m_lock.m_lock);
- Cell * c = m_front;
- if (!c)
- return NULL;
- m_front = c->m_next;
- if (m_front)
- m_front->m_prev = NULL;
- else
- m_back = NULL;
- T * t = c->m_elem;
- delete c;
- return t;
- }
- bool isEmpty() {
- ScopeLock sl(m_lock);
- return !m_front;
- }
- private:
- Queue(const Queue &) = delete;
- Queue & operator=(const Queue &) = delete;
- Lock m_lock;
- struct Cell {
- Cell(T * elem) : m_next(NULL), m_prev(NULL), m_elem(elem) { }
- Cell(const Cell &) = delete;
- Cell & operator=(const Cell &) = delete;
- Cell * m_next, * m_prev;
- T * m_elem;
- };
- Cell * volatile m_front = NULL;
- Cell * volatile m_back = NULL;
- pthread_cond_t m_cond;
-};
-
};
diff --git a/src/Async.cc b/src/Async.cc
index 829c677..18c8ef1 100644
--- a/src/Async.cc
+++ b/src/Async.cc
@@ -28,7 +28,7 @@ void Balau::AsyncManager::queueOp(AsyncOperation * op) {
TLS * tls = getTLS();
Printer::elog(E_ASYNC, "Queuing operation at %p", op);
if (op->needsSynchronousCallback()) {
- Printer::elog(E_ASYNC, "Operation at %p needs synchronous callback, copying values; idleQueue = %p; idleReadyCallback = %p; idleReadyParam = %p", &tls->idleQueue, tls->idleReadyCallback, tls->idleReadyParam);
+ Printer::elog(E_ASYNC, "Operation at %p needs synchronous callback, copying values; idleQueue = %p; idleReadyCallback = %p; idleReadyParam = %p", op, &tls->idleQueue, tls->idleReadyCallback, tls->idleReadyParam);
op->m_idleQueue = &tls->idleQueue;
op->m_idleReadyCallback = tls->idleReadyCallback;
op->m_idleReadyParam = tls->idleReadyParam;
@@ -98,11 +98,13 @@ void * Balau::AsyncManager::proc() {
}
stopAllWorkers();
+ Printer::elog(E_ASYNC, "Async thread waits for all idle queues to empty");
while (Atomic::Prefetch::Decrement(&m_numTLSes)) {
TLS * tls = m_TLSes.pop();
while (!tls->idleQueue.isEmpty());
}
+ Printer::elog(E_ASYNC, "Async thread stopping");
return NULL;
}
@@ -150,7 +152,7 @@ void Balau::AsyncManager::idle() {
while (!m_ready);
AsyncOperation * op;
TLS * tls = getTLS();
- while ((op = tls->idleQueue.pop(false))) {
+ while ((op = tls->idleQueue.pop())) {
Printer::elog(E_ASYNC, "AsyncManager::idle() is wrapping up operation %p", op);
op->done();
}
diff --git a/src/Handle.cc b/src/Handle.cc
index dacd30d..9bcb419 100644
--- a/src/Handle.cc
+++ b/src/Handle.cc
@@ -97,7 +97,7 @@ ssize_t Balau::Handle::forceRead(void * _buf, size_t count, Events::BaseEvent *
catch (EAgain e) {
if (evt && evt->gotSignal())
return total;
- Task::yield(e.getEvent());
+ Task::operationYield(e.getEvent());
continue;
}
if (r < 0)
@@ -124,7 +124,7 @@ ssize_t Balau::Handle::forceWrite(const void * _buf, size_t count, Events::BaseE
catch (EAgain e) {
if (evt && evt->gotSignal())
return total;
- Task::yield(e.getEvent());
+ Task::operationYield(e.getEvent());
continue;
}
if (r < 0)
@@ -242,7 +242,7 @@ int Balau::FileSystem::mkdir(const char * path) throw (GeneralException) {
cbResults_t cbResults;
eio_req * r = eio_mkdir(path, 0755, 0, eioDone, &cbResults);
EAssert(r != NULL, "eio_mkdir returned a NULL eio_req");
- Task::yield(&cbResults.evt);
+ Task::operationYield(&cbResults.evt);
char str[4096];
if (cbResults.result < 0)
diff --git a/src/HttpServer.cc b/src/HttpServer.cc
index c9cb0ee..23af486 100644
--- a/src/HttpServer.cc
+++ b/src/HttpServer.cc
@@ -35,6 +35,8 @@ class HttpWorker : public Task {
HttpWorker(IO<Handle> io, void * server);
~HttpWorker();
static void buildErrorTemplate(IO<Handle> h) { m_errorTemplate.setTemplate(h); }
+ template<size_t S>
+ static void buildErrorTemplate(const char (&str)[S]) { m_errorTemplate.setTemplate(str, S - 1); }
static void buildErrorTemplate(const char * str, ssize_t s) { m_errorTemplate.setTemplate(str, s); }
static void buildErrorTemplate(const String & str) { m_errorTemplate.setTemplate(str); }
private:
@@ -594,14 +596,20 @@ typedef Balau::Listener<Balau::HttpWorker> HttpListener;
void Balau::HttpServer::start() {
AAssert(!m_started, "Don't start an HttpServer twice");
- m_listenerPtr = createTask(new HttpListener(m_port, m_local.to_charp(), this));
+ m_listenerPtr = TaskMan::registerTask(new HttpListener(m_port, m_local.to_charp(), this));
m_started = true;
}
void Balau::HttpServer::stop() {
AAssert(m_started, "Don't stop an HttpServer that hasn't been started");
- reinterpret_cast<HttpListener *>(m_listenerPtr)->stop();
+ HttpListener * listener = reinterpret_cast<HttpListener *>(m_listenerPtr);
+ Events::TaskEvent event(listener);
+ Task::prepare(&event);
+ listener->stop();
m_started = false;
+ Task::operationYield(&event);
+ IAssert(event.gotSignal(), "HttpServer::stop didn't actually get the listener to stop");
+ event.ack();
}
void Balau::HttpServer::registerAction(Action * action) {
diff --git a/src/Input.cc b/src/Input.cc
index 8168231..3608cd0 100644
--- a/src/Input.cc
+++ b/src/Input.cc
@@ -56,7 +56,7 @@ Balau::Input::Input(const char * fname) throw (GeneralException) {
cbResults_t cbResults;
eio_req * r = eio_open(fname, O_RDONLY, 0, 0, eioDone, &cbResults);
EAssert(r != NULL, "eio_open returned a NULL eio_req");
- Task::yield(&cbResults.evt);
+ Task::operationYield(&cbResults.evt);
if (cbResults.result < 0) {
char str[4096];
if (cbResults.errorno == ENOENT) {
@@ -71,7 +71,7 @@ Balau::Input::Input(const char * fname) throw (GeneralException) {
cbStatsResults_t cbStatsResults;
r = eio_fstat(m_fd, 0, eioStatsDone, &cbStatsResults);
EAssert(r != NULL, "eio_fstat returned a NULL eio_req");
- Task::yield(&cbStatsResults.evt);
+ Task::operationYield(&cbStatsResults.evt);
if (cbStatsResults.result == 0) {
m_size = cbStatsResults.statdata.st_size;
m_mtime = cbStatsResults.statdata.st_mtime;
@@ -85,7 +85,7 @@ void Balau::Input::close() throw (GeneralException) {
eio_req * r = eio_close(m_fd, 0, eioDone, &cbResults);
EAssert(r != NULL, "eio_close returned a NULL eio_req");
m_fd = -1;
- Task::yield(&cbResults.evt);
+ Task::operationYield(&cbResults.evt);
if (cbResults.result < 0) {
char str[4096];
strerror_r(cbResults.errorno, str, sizeof(str));
@@ -99,7 +99,7 @@ ssize_t Balau::Input::read(void * buf, size_t count) throw (GeneralException) {
cbResults_t cbResults;
eio_req * r = eio_read(m_fd, buf, count, getROffset(), 0, eioDone, &cbResults);
EAssert(r != NULL, "eio_read returned a NULL eio_req");
- Task::yield(&cbResults.evt);
+ Task::operationYield(&cbResults.evt);
if (cbResults.result > 0) {
rseek(cbResults.result, SEEK_CUR);
} else {
diff --git a/src/LuaTask.cc b/src/LuaTask.cc
index d722847..7e0b256 100644
--- a/src/LuaTask.cc
+++ b/src/LuaTask.cc
@@ -1,26 +1,58 @@
#include "LuaTask.h"
#include "Main.h"
#include "TaskMan.h"
+#include "Printer.h"
+
+namespace {
+
+class LuaTaskStopper : public Balau::LuaExecCell {
+ virtual void run(Balau::Lua &) { }
+};
+
+};
+
+Balau::LuaExecCell::LuaExecCell() {
+ Printer::elog(E_TASK, "LuaExecCell created at %p", this);
+}
+
+Balau::LuaMainTask::LuaMainTask() {
+ Printer::elog(E_TASK, "LuaMainTask created at %p", this);
+ L.open_base();
+ L.open_table();
+ L.open_string();
+ L.open_math();
+ L.open_debug();
+ L.open_bit();
+ L.open_jit();
+}
+
+Balau::LuaMainTask::~LuaMainTask() {
+ L.close();
+}
void Balau::LuaMainTask::exec(LuaExecCell * cell) {
+ Printer::elog(E_TASK, "LuaMainTask at %p is asked to queue Cell %p", this, cell);
m_queue.push(cell);
- m_queueEvent.trigger();
}
void Balau::LuaMainTask::stop() {
- Atomic::CmpXChgVal(&m_stopping, true, false);
- m_queueEvent.trigger();
+ Printer::elog(E_TASK, "LuaMainTask at %p asked to stop", this);
+ exec(new LuaTaskStopper());
}
void Balau::LuaMainTask::Do() {
- while (!m_stopping) {
- waitFor(&m_queueEvent);
-
+ for (;;) {
LuaExecCell * cell;
- while ((cell = m_queue.pop(false)))
- createTask(new LuaTask(L.thread(), cell), this);
-
- yield();
+ Printer::elog(E_TASK, "LuaMainTask at %p tries to pop an ExecCell", this);
+ while ((cell = m_queue.pop())) {
+ Printer::elog(E_TASK, "LuaMainTask at %p popped %p", this, cell);
+ if (dynamic_cast<LuaTaskStopper *>(cell)) {
+ Printer::elog(E_TASK, "LuaMainTask at %p is stopping", this);
+ delete cell;
+ return;
+ }
+ TaskMan::registerTask(new LuaTask(L.thread(), cell), this);
+ }
}
}
@@ -37,9 +69,13 @@ void Balau::LuaExecCell::exec(LuaMainTask * mainTask) {
Task::prepare(&m_event);
mainTask->exec(this);
if (!m_detached)
- Task::yield(&m_event);
+ Task::operationYield(&m_event);
}
void Balau::LuaExecString::run(Lua & L) {
L.load(m_str);
}
+
+void Balau::LuaExecFile::run(Lua & L) {
+ L.load(m_file);
+}
diff --git a/src/Main.cc b/src/Main.cc
index fd4021f..32b7cf8 100644
--- a/src/Main.cc
+++ b/src/Main.cc
@@ -60,7 +60,7 @@ int Balau::Main::bootstrap(int _argc, char ** _argv) {
try {
m_status = RUNNING;
- createTask(new MainTask());
+ TaskMan::registerTask(new MainTask());
r = TaskMan::getDefaultTaskMan()->mainLoop();
m_status = STOPPING;
}
diff --git a/src/Output.cc b/src/Output.cc
index a4f6275..dd0a588 100644
--- a/src/Output.cc
+++ b/src/Output.cc
@@ -56,7 +56,7 @@ Balau::Output::Output(const char * fname, bool truncate) throw (GeneralException
cbResults_t cbResults;
eio_req * r = eio_open(fname, O_WRONLY | O_CREAT | (truncate ? O_TRUNC : 0), 0755, 0, eioDone, &cbResults);
EAssert(r != NULL, "eio_open returned a NULL eio_req");
- Task::yield(&cbResults.evt);
+ Task::operationYield(&cbResults.evt);
if (cbResults.result < 0) {
char str[4096];
if (cbResults.errorno == ENOENT) {
@@ -71,7 +71,7 @@ Balau::Output::Output(const char * fname, bool truncate) throw (GeneralException
cbStatsResults_t cbStatsResults;
r = eio_fstat(m_fd, 0, eioStatsDone, &cbStatsResults);
EAssert(r != NULL, "eio_fstat returned a NULL eio_req");
- Task::yield(&cbStatsResults.evt);
+ Task::operationYield(&cbStatsResults.evt);
if (cbStatsResults.result == 0) {
m_size = cbStatsResults.statdata.st_size;
m_mtime = cbStatsResults.statdata.st_mtime;
@@ -85,7 +85,7 @@ void Balau::Output::close() throw (GeneralException) {
eio_req * r = eio_close(m_fd, 0, eioDone, &cbResults);
EAssert(r != NULL, "eio_close returned a NULL eio_req");
m_fd = -1;
- Task::yield(&cbResults.evt);
+ Task::operationYield(&cbResults.evt);
if (cbResults.result < 0) {
char str[4096];
strerror_r(cbResults.errorno, str, sizeof(str));
@@ -99,7 +99,7 @@ ssize_t Balau::Output::write(const void * buf, size_t count) throw (GeneralExcep
cbResults_t cbResults;
eio_req * r = eio_write(m_fd, const_cast<void *>(buf), count, getWOffset(), 0, eioDone, &cbResults);
EAssert(r != NULL, "eio_write returned a NULL eio_req");
- Task::yield(&cbResults.evt);
+ Task::operationYield(&cbResults.evt);
if (cbResults.result > 0) {
wseek(cbResults.result, SEEK_CUR);
} else {
diff --git a/src/SimpleMustache.cc b/src/SimpleMustache.cc
index 92fefeb..d2db99f 100644
--- a/src/SimpleMustache.cc
+++ b/src/SimpleMustache.cc
@@ -328,9 +328,9 @@ void Balau::SimpleMustache::setTemplate(IO<Handle> _h) {
m_fragments.push_back(curFragment);
}
-Balau::SimpleMustache::Fragments::iterator Balau::SimpleMustache::checkTemplate_r(Fragments::iterator begin, const String & endSection) {
- Fragments::iterator cur;
- Fragments::iterator end = m_fragments.end();
+Balau::SimpleMustache::Fragments::const_iterator Balau::SimpleMustache::checkTemplate_r(Fragments::const_iterator begin, const String & endSection) const {
+ Fragments::const_iterator cur;
+ Fragments::const_iterator end = m_fragments.end();
for (cur = begin; cur != end; cur++) {
Fragment * fr = *cur;
@@ -346,9 +346,9 @@ Balau::SimpleMustache::Fragments::iterator Balau::SimpleMustache::checkTemplate_
return end;
}
-Balau::SimpleMustache::Fragments::iterator Balau::SimpleMustache::render_r(IO<Handle> h, Context * ctx, const String & endSection, Fragments::iterator begin, bool noWrite, int forceIdx) {
- Fragments::iterator cur;
- Fragments::iterator end = m_fragments.end();
+Balau::SimpleMustache::Fragments::const_iterator Balau::SimpleMustache::render_r(IO<Handle> h, Context * ctx, const String & endSection, Fragments::const_iterator begin, bool noWrite, int forceIdx) const {
+ Fragments::const_iterator cur;
+ Fragments::const_iterator end = m_fragments.end();
if (endSection.strlen() != 0) {
int depth = 0;
diff --git a/src/Socket.cc b/src/Socket.cc
index 148d0a4..7511bde 100644
--- a/src/Socket.cc
+++ b/src/Socket.cc
@@ -14,6 +14,26 @@
#include "Main.h"
#include "Atomic.h"
+#ifndef _WIN32
+namespace {
+
+class SigpipeBlocker : public Balau::AtStart {
+ public:
+ SigpipeBlocker() : AtStart(5) { }
+ virtual void doStart() {
+ struct sigaction new_actn, old_actn;
+ new_actn.sa_handler = SIG_IGN;
+ sigemptyset(&new_actn.sa_mask);
+ new_actn.sa_flags = 0;
+ sigaction(SIGPIPE, &new_actn, &old_actn);
+ }
+};
+
+static SigpipeBlocker sigpipeBlocker;
+
+};
+#endif
+
void Balau::Socket::SocketEvent::gotOwner(Task * task) {
Printer::elog(E_SOCKET, "Arming SocketEvent at %p", this);
if (!m_task) {
@@ -172,9 +192,8 @@ static const char * inet_ntop(int af, const void * src, char * dst, socklen_t si
#endif
-#if 0
-// TODO: use getaddrinfo_a, if available.
-#else
+namespace {
+
class ResolverThread : public Balau::GlobalThread {
public:
ResolverThread() : GlobalThread(8), m_stopping(false) { }
@@ -186,6 +205,8 @@ class ResolverThread : public Balau::GlobalThread {
volatile bool m_stopping;
};
+};
+
void ResolverThread::threadExit() {
m_stopping = true;
DNSRequest req;
@@ -222,13 +243,12 @@ static DNSRequest resolveName(const char * name, const char * service = NULL, st
Balau::Printer::elog(Balau::E_SOCKET, "Sending a request to the resolver thread");
Balau::Task::prepare(&evt);
resolverThread.pushRequest(&req);
- Balau::Task::yield(&evt);
+ Balau::Task::operationYield(&evt);
Balau::Atomic::MemoryFence();
return req;
}
-#endif
Balau::Socket::Socket() throw (GeneralException) : m_fd(socket(AF_INET6, SOCK_STREAM, 0)) {
m_name = "Socket(unconnected)";
@@ -457,7 +477,7 @@ bool Balau::Socket::connect(const char * hostname, int port) {
IAssert(spins == 0, "We shouldn't have spinned...");
}
- Task::yield(m_evtW, true);
+ Task::operationYield(m_evtW, Task::INTERRUPTIBLE);
// if we're still here, it means the parent task doesn't want to be thrown an exception
IAssert(m_evtW->gotSignal(), "We shouldn't have been awoken without getting our event signalled");
@@ -516,7 +536,7 @@ Balau::IO<Balau::Socket> Balau::Socket::accept() throw (GeneralException) {
if (s < 0) {
if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) {
- Task::yield(m_evtR, true);
+ Task::operationYield(m_evtR, Task::INTERRUPTIBLE);
} else {
String msg = getErrorMessage();
throw GeneralException(String("Unexpected error accepting a connection: #") + errno + "(" + msg + ")");
@@ -548,7 +568,7 @@ ssize_t Balau::Socket::read(void * buf, size_t count) throw (GeneralException) {
}
if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) {
- Task::yield(m_evtR, true);
+ Task::operationYield(m_evtR, Task::INTERRUPTIBLE);
} else {
m_evtR->stop();
return r;
@@ -575,8 +595,15 @@ ssize_t Balau::Socket::write(const void * buf, size_t count) throw (GeneralExcep
if (r > 0)
return r;
+#ifndef _WIN32
+ if (errno == EPIPE) {
+ close();
+ return 0;
+ }
+#endif
+
if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) {
- Task::yield(m_evtW, true);
+ Task::operationYield(m_evtW, Task::INTERRUPTIBLE);
} else {
m_evtW->stop();
return r;
diff --git a/src/Task.cc b/src/Task.cc
index aba29b8..4465203 100644
--- a/src/Task.cc
+++ b/src/Task.cc
@@ -6,10 +6,7 @@
static Balau::LocalTmpl<Balau::Task> localTask;
-Balau::Task::Task() {
- m_status = STARTING;
- m_okayToEAgain = false;
-
+Balau::Task::Task() : m_status(STARTING), m_okayToEAgain(false), m_stackless(false) {
Printer::elog(E_TASK, "Created a Task at %p", this);
}
@@ -22,16 +19,21 @@ bool Balau::Task::needsStacks() {
}
void Balau::Task::setup(TaskMan * taskMan, void * stack) {
- size_t size = stackSize();
+ if (m_stackless) {
+ IAssert(!stack, "Since we're stackless, no stack should've been allocated.");
+ m_stack = NULL;
+ } else {
+ size_t size = stackSize();
#ifndef _WIN32
- IAssert(stack, "Can't setup a coroutine without a stack");
- m_stack = stack;
- coro_create(&m_ctx, coroutineTrampoline, this, m_stack, size);
+ IAssert(stack, "Can't setup a coroutine without a stack");
+ m_stack = stack;
+ coro_create(&m_ctx, coroutineTrampoline, this, m_stack, size);
#else
- Assert(!stack, "We shouldn't allocate stacks with Fibers");
- m_stack = NULL;
- m_fiber = CreateFiber(size, coroutineTrampoline, this);
+ Assert(!stack, "We shouldn't allocate stacks with Fibers");
+ m_stack = NULL;
+ m_fiber = CreateFiber(size, coroutineTrampoline, this);
#endif
+ }
m_taskMan = taskMan;
@@ -54,7 +56,7 @@ void Balau::Task::coroutineTrampoline(void * arg) {
void Balau::Task::coroutine() {
try {
- IAssert(m_status == STARTING, "The Task at %p was badly initialized ? m_status = %i", this, m_status);
+ IAssert((m_status == STARTING) || (m_stackless && (m_status == RUNNING)), "The Task at %p has a bad status ? m_status = %s, stackless = %s", this, StatusToString(m_status), m_stackless ? "true" : "false");
m_status = RUNNING;
Do();
m_status = STOPPED;
@@ -66,6 +68,12 @@ void Balau::Task::coroutine() {
catch (TestException & e) {
m_status = STOPPED;
Printer::log(M_ERROR, "Unit test failed: %s", e.getMsg());
+ const char * details = e.getDetails();
+ if (details)
+ Printer::log(M_ERROR, " %s", details);
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_ERROR, "%s", str.to_charp());
TaskMan::stop(-1);
}
catch (RessourceException & e) {
@@ -74,8 +82,25 @@ void Balau::Task::coroutine() {
const char * details = e.getDetails();
if (details)
Printer::log(M_ERROR, " %s", details);
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_DEBUG, "%s", str.to_charp());
TaskMan::stop(-1);
}
+ catch (TaskSwitch & e) {
+ if (!m_stackless) {
+ Printer::log(M_ERROR, "Task %s at %p isn't stackless, but still caused a task switch.", getName(), this);
+ const char * details = e.getDetails();
+ if (details)
+ Printer::log(M_ERROR, " %s", details);
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_DEBUG, "%s", str.to_charp());
+ m_status = FAULTED;
+ } else {
+ Printer::elog(E_TASK, "Stackless task %s at %p is task-switching.", getName(), this);
+ }
+ }
catch (GeneralException & e) {
Printer::log(M_WARNING, "Task %s at %p caused an exception: `%s' - stopping.", getName(), this, e.getMsg());
const char * details = e.getDetails();
@@ -90,39 +115,50 @@ void Balau::Task::coroutine() {
Printer::log(M_WARNING, "Task %s at %p caused an unknown exception - stopping.", getName(), this);
m_status = FAULTED;
}
+ if (!m_stackless) {
#ifndef _WIN32
- coro_transfer(&m_ctx, &m_taskMan->m_returnContext);
+ coro_transfer(&m_ctx, &m_taskMan->m_returnContext);
#else
- SwitchToFiber(m_taskMan->m_fiber);
+ SwitchToFiber(m_taskMan->m_fiber);
#endif
+ }
}
void Balau::Task::switchTo() {
Printer::elog(E_TASK, "Switching to task %p - %s", this, getName());
- IAssert(m_status == YIELDED || m_status == IDLE || m_status == STARTING, "The task at %p isn't either yielded, idle or starting... ? m_status = %i", this, m_status);
+ IAssert(m_status == YIELDED || m_status == SLEEPING || m_status == STARTING, "The task at %p isn't either yielded, sleeping or starting... ? m_status = %s", this, StatusToString(m_status));
void * oldTLS = g_tlsManager->getTLS();
g_tlsManager->setTLS(m_tls);
- if (m_status == YIELDED || m_status == IDLE)
+ if (m_status == YIELDED || m_status == SLEEPING)
m_status = RUNNING;
+ if (m_stackless) {
+ coroutine();
+ } else {
#ifndef _WIN32
- coro_transfer(&m_taskMan->m_returnContext, &m_ctx);
+ coro_transfer(&m_taskMan->m_returnContext, &m_ctx);
#else
- SwitchToFiber(m_fiber);
+ SwitchToFiber(m_fiber);
#endif
+ }
g_tlsManager->setTLS(oldTLS);
- if (m_status == RUNNING)
- m_status = IDLE;
+ IAssert(m_status != RUNNING, "Task %s at %p is still running... ?", getName(), this);
}
-void Balau::Task::yield(bool changeStatus) {
+void Balau::Task::yield(bool stillRunning) throw (GeneralException) {
Printer::elog(E_TASK, "Task %p - %s yielding", this, getName());
- if (changeStatus)
+ if (stillRunning)
m_status = YIELDED;
+ else
+ m_status = SLEEPING;
+ if (m_stackless) {
+ throw EAgain(NULL);
+ } else {
#ifndef _WIN32
- coro_transfer(&m_ctx, &m_taskMan->m_returnContext);
+ coro_transfer(&m_ctx, &m_taskMan->m_returnContext);
#else
- SwitchToFiber(m_taskMan->m_fiber);
+ SwitchToFiber(m_taskMan->m_fiber);
#endif
+ }
}
Balau::Task * Balau::Task::getCurrentTask() {
@@ -149,7 +185,15 @@ void Balau::Events::BaseEvent::doSignal() {
}
}
-Balau::Events::TaskEvent::TaskEvent(Task * taskWaited) : m_taskWaited(taskWaited), m_ack(false), m_distant(false) {
+Balau::Events::TaskEvent::TaskEvent(Task * taskWaited) : m_taskWaited(NULL), m_ack(false), m_distant(false) {
+ if (taskWaited)
+ attachToTask(taskWaited);
+}
+
+void Balau::Events::TaskEvent::attachToTask(Task * taskWaited) {
+ AAssert(!m_taskWaited, "You can't attach a TaskEvent twice.");
+ m_ack = false;
+ m_taskWaited = taskWaited;
ScopeLock lock(m_taskWaited->m_eventLock);
m_taskWaited->m_waitedBy.push_back(this);
}
@@ -157,7 +201,8 @@ Balau::Events::TaskEvent::TaskEvent(Task * taskWaited) : m_taskWaited(taskWaited
void Balau::Events::TaskEvent::signal() {
if (m_distant)
m_evt.send();
- doSignal();
+ else
+ doSignal();
}
void Balau::Events::TaskEvent::gotOwner(Task * task) {
@@ -198,6 +243,7 @@ void Balau::Events::TaskEvent::ack() {
IAssert(deleted, "We didn't find task %p in the waitedBy lists... ?", this);
m_ack = true;
reset();
+ m_taskWaited = NULL;
}
void Balau::Events::Timeout::gotOwner(Task * task) {
@@ -214,23 +260,77 @@ void Balau::Events::Custom::gotOwner(Task * task) {
m_loop = task->getLoop();
}
-void Balau::Task::yield(Events::BaseEvent * evt, bool interruptible) throw (GeneralException) {
+void Balau::Task::operationYield(Events::BaseEvent * evt, enum OperationYieldType yieldType) throw (GeneralException) {
Task * t = getCurrentTask();
if (evt)
t->waitFor(evt);
+
+ if (t->m_stackless) {
+ AAssert(yieldType != SIMPLE, "You can't run simple operations from a stackless task.");
+ }
+
+ if (yieldType == STACKLESS) {
+ AAssert(t->m_okayToEAgain, "You can't run a stackless operation from a non-okay-to-eagain task.");
+ }
+
bool gotSignal;
do {
t->yield(evt == NULL);
- Printer::elog(E_TASK, "operation back from yielding; interruptible = %s; okayToEAgain = %s", interruptible ? "true" : "false", t->m_okayToEAgain ? "true" : "false");
+ static const char * YieldTypeToString[] = {
+ "SIMPLE",
+ "INTERRUPTIBLE",
+ "STACKLESS",
+ };
+ Printer::elog(E_TASK, "operation back from yielding; yieldType = %s; okayToEAgain = %s", YieldTypeToString[yieldType], t->m_okayToEAgain ? "true" : "false");
gotSignal = evt ? evt->gotSignal() : true;
- } while ((!interruptible || !t->m_okayToEAgain) && !gotSignal);
+ } while (((yieldType == SIMPLE) || !t->m_okayToEAgain) && !gotSignal);
if (!evt)
return;
- if (interruptible && t->m_okayToEAgain && !evt->gotSignal()) {
+ if ((yieldType != SIMPLE) && t->m_okayToEAgain && !evt->gotSignal()) {
Printer::elog(E_TASK, "operation is throwing an exception.");
throw EAgain(evt);
}
}
+
+void Balau::QueueBase::iPush(void * t, Events::Async * event) {
+ ScopeLock sl(m_lock);
+ Cell * c = new Cell(t);
+ c->m_prev = m_back;
+ if (m_back)
+ m_back->m_next = c;
+ else
+ m_front = c;
+ m_back = c;
+ if (event)
+ event->trigger();
+ else
+ pthread_cond_signal(&m_cond);
+}
+
+void * Balau::QueueBase::iPop(Events::Async * event, bool wait) {
+ ScopeLock sl(m_lock);
+ while (!m_front && wait) {
+ if (event) {
+ Task::prepare(event);
+ m_lock.leave();
+ Task::operationYield(event, Task::INTERRUPTIBLE);
+ m_lock.enter();
+ } else {
+ pthread_cond_wait(&m_cond, &m_lock.m_lock);
+ }
+ }
+ Cell * c = m_front;
+ if (!c)
+ return NULL;
+ m_front = c->m_next;
+ if (m_front)
+ m_front->m_prev = NULL;
+ else
+ m_back = NULL;
+ void * t = c->m_elem;
+ delete c;
+ return t;
+}
diff --git a/src/TaskMan.cc b/src/TaskMan.cc
index 49c326a..177cec1 100644
--- a/src/TaskMan.cc
+++ b/src/TaskMan.cc
@@ -101,7 +101,6 @@ void Balau::TaskScheduler::stopAll(int code) {
m_taskManagers.pop();
altQueue.push(tm);
tm->addToPending(new Stopper(code));
- tm->m_evt.send();
}
while (!altQueue.empty()) {
tm = altQueue.front();
@@ -130,7 +129,6 @@ void * Balau::TaskScheduler::proc() {
m_lock.leave();
Printer::elog(E_TASK, "TaskScheduler popped task %s at %p; adding to TaskMan %p", t->getName(), t, tm);
tm->addToPending(t);
- tm->m_evt.send();
}
Printer::elog(E_TASK, "TaskScheduler stopping.");
return NULL;
@@ -145,6 +143,16 @@ void asyncDummy(ev::async & w, int revents) {
Balau::Printer::elog(Balau::E_TASK, "TaskMan is getting woken up...");
}
+void Balau::TaskMan::stopMe(int code) {
+ Task * t = Task::getCurrentTask();
+ if (t->getTaskMan() == this) {
+ m_stopped = true;
+ m_stopCode = code;
+ } else {
+ addToPending(new Stopper(code));
+ }
+}
+
Balau::TaskMan::TaskMan() {
#ifndef _WIN32
coro_create(&m_returnContext, 0, 0, 0, 0);
@@ -168,6 +176,8 @@ Balau::TaskMan::TaskMan() {
}
#ifdef _WIN32
+namespace {
+
class WinSocketStartup : public Balau::AtStart {
public:
WinSocketStartup() : AtStart(5) { }
@@ -179,6 +189,8 @@ class WinSocketStartup : public Balau::AtStart {
};
static WinSocketStartup wsa;
+
+};
#endif
Balau::TaskMan * Balau::TaskMan::getDefaultTaskMan() { return localTaskMan.get(); }
@@ -191,14 +203,16 @@ Balau::TaskMan::~TaskMan() {
}
s_scheduler.unregisterTaskMan(this);
// probably way more work to do here in order to clean up tasks from that thread
+ m_evt.stop();
ev_loop_destroy(m_loop);
}
void * Balau::TaskMan::getStack() {
+ if (!Task::needsStacks())
+ return NULL;
void * r = NULL;
if (m_nStacks == 0) {
- if (Task::needsStacks())
- r = malloc(Task::stackSize());
+ r = malloc(Task::stackSize());
} else {
r = m_stacks.front();
m_stacks.pop();
@@ -230,30 +244,23 @@ int Balau::TaskMan::mainLoop() {
s_async.setIdleReadyCallback(asyncIdleReady, this);
do {
- bool noWait = false;
-
Printer::elog(E_TASK, "TaskMan::mainLoop() at %p with m_tasks.size = %li", this, m_tasks.size());
- // checking "STARTING" tasks, and running them once; also try to build the status of the noWait boolean.
+ // checking "STARTING" tasks, and running them once
while ((iH = starting.begin()) != starting.end()) {
Task * t = *iH;
IAssert(t->getStatus() == Task::STARTING, "Got task at %p in the starting list, but isn't starting.", t);
t->switchTo();
IAssert(t->getStatus() != Task::STARTING, "Task at %p got switchedTo, but still is 'STARTING'.", t);
starting.erase(iH);
- if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) {
- noWait = true;
+ if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED))
stopped.insert(t);
- }
- if (t->getStatus() == Task::YIELDED) {
- noWait = true;
+ if (t->getStatus() == Task::YIELDED)
yielded.insert(t);
- }
}
// if we begin that loop with any pending task, just don't loop, so we can add them immediately.
- if (!m_pendingAdd.isEmpty() || !yielded.empty())
- noWait = true;
+ bool noWait = !m_pendingAdd.isEmpty() || !yielded.empty() || !stopped.empty();
// libev's event "loop". We always runs it once though.
m_allowedToSignal = true;
@@ -261,7 +268,7 @@ int Balau::TaskMan::mainLoop() {
ev_run(m_loop, noWait || m_stopped ? EVRUN_NOWAIT : EVRUN_ONCE);
Printer::elog(E_TASK, "TaskMan at %p Getting out of libev main loop", this);
- // calling async's idle loop here
+ // calling async's idle
s_async.idle();
// let's check what task got stopped, and signal them
@@ -276,7 +283,7 @@ int Balau::TaskMan::mainLoop() {
// let's check who got signaled, and call them
for (Task * t : m_signaledTasks) {
Printer::elog(E_TASK, "TaskMan at %p Switching to task %p (%s - %s) that got signaled somehow.", this, t, t->getName(), ClassName(t).c_str());
- IAssert(t->getStatus() == Task::IDLE || t->getStatus() == Task::YIELDED, "We're switching to a non-idle/yielded task at %p... ? status = %i", t, t->getStatus());
+ IAssert(t->getStatus() == Task::SLEEPING || t->getStatus() == Task::YIELDED, "We're switching to a non-sleeping/yielded task at %p... ? status = %i", t, t->getStatus());
bool wasYielded = t->getStatus() == Task::YIELDED;
t->switchTo();
if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) {
@@ -313,7 +320,7 @@ int Balau::TaskMan::mainLoop() {
Printer::elog(E_TASK, "TaskMan at %p popped task %p...", this, t);
IAssert(m_tasks.find(t) == m_tasks.end(), "TaskMan got task %p twice... ?", t);
ev_now_update(m_loop);
- t->setup(this, getStack());
+ t->setup(this, t->isStackless() ? NULL : getStack());
m_tasks.insert(t);
starting.insert(t);
}
@@ -345,12 +352,14 @@ int Balau::TaskMan::mainLoop() {
return m_stopCode;
}
-void Balau::TaskMan::registerTask(Balau::Task * t, Balau::Task * stick) {
+void Balau::TaskMan::iRegisterTask(Balau::Task * t, Balau::Task * stick, Events::TaskEvent * event) {
if (stick) {
+ IAssert(!event, "inconsistent");
TaskMan * tm = stick->getTaskMan();
tm->addToPending(t);
- tm->m_evt.send();
} else {
+ if (event)
+ event->attachToTask(t);
s_scheduler.registerTask(t);
}
}
@@ -361,6 +370,7 @@ void Balau::TaskMan::registerAsyncOp(Balau::AsyncOperation * op) {
void Balau::TaskMan::addToPending(Balau::Task * t) {
m_pendingAdd.push(t);
+ m_evt.send();
}
void Balau::TaskMan::signalTask(Task * t) {
@@ -373,17 +383,51 @@ void Balau::TaskMan::stop(int code) {
s_scheduler.stopAll(code);
}
-class ThreadedTaskMan : public Balau::Thread {
- virtual void * proc() {
+void * Balau::TaskMan::TaskManThread::proc() {
+ bool success = false;
+ m_taskMan = NULL;
+ try {
m_taskMan = new Balau::TaskMan();
m_taskMan->mainLoop();
- return NULL;
+ success = true;
}
- Balau::TaskMan * m_taskMan;
-};
+ catch (Exit e) {
+ Printer::log(M_ERROR, "We shouldn't have gotten an Exit exception here... exitting anyway");
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_ERROR, "%s", str.to_charp());
+ }
+ catch (RessourceException e) {
+ Printer::log(M_ERROR | M_ALERT, "The TaskMan thread got a ressource problem: %s", e.getMsg());
+ const char * details = e.getDetails();
+ if (details)
+ Printer::log(M_ERROR, " %s", details);
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_DEBUG, "%s", str.to_charp());
+ }
+ catch (GeneralException e) {
+ Printer::log(M_ERROR | M_ALERT, "The TaskMan thread caused an exception: %s", e.getMsg());
+ const char * details = e.getDetails();
+ if (details)
+ Printer::log(M_ERROR, " %s", details);
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_DEBUG, "%s", str.to_charp());
+ }
+ catch (...) {
+ Printer::log(M_ERROR | M_ALERT, "The TaskMan thread caused an unknown exception");
+ }
+ if (!success) {
+ if (m_taskMan)
+ delete m_taskMan;
+ m_taskMan = NULL;
+ TaskMan::stop(-1);
+ }
+ return NULL;
+}
-Balau::Thread * Balau::TaskMan::createThreadedTaskMan() {
- Thread * r = new ThreadedTaskMan();
- r->threadStart();
- return r;
+Balau::TaskMan::TaskManThread::~TaskManThread() {
+ if (m_taskMan)
+ delete m_taskMan;
}
diff --git a/src/Threads.cc b/src/Threads.cc
index 9ecfff5..7b1b7bc 100644
--- a/src/Threads.cc
+++ b/src/Threads.cc
@@ -2,6 +2,7 @@
#include "Threads.h"
#include "Local.h"
#include "Atomic.h"
+#include "TaskMan.h"
namespace Balau {
@@ -39,11 +40,47 @@ Balau::RWLock::RWLock() {
}
void * Balau::ThreadHelper::threadProc(void * arg) {
- void * tls = Local::createTLS();
- g_tlsManager->setTLS(tls);
- Balau::Thread * thread = reinterpret_cast<Balau::Thread *>(arg);
- void * r = thread->proc();
- free(tls);
+ void * r = NULL;
+ bool success = false;
+ try {
+ void * tls = Local::createTLS();
+ g_tlsManager->setTLS(tls);
+ Balau::Thread * thread = reinterpret_cast<Balau::Thread *>(arg);
+ r = thread->proc();
+ free(tls);
+ success = true;
+ }
+ catch (Exit e) {
+ Printer::log(M_ERROR, "We shouldn't have gotten an Exit exception here... exitting anyway");
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_ERROR, "%s", str.to_charp());
+ }
+ catch (RessourceException e) {
+ Printer::log(M_ERROR | M_ALERT, "The Thread got a ressource problem: %s", e.getMsg());
+ const char * details = e.getDetails();
+ if (details)
+ Printer::log(M_ERROR, " %s", details);
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_DEBUG, "%s", str.to_charp());
+ }
+ catch (GeneralException e) {
+ Printer::log(M_ERROR | M_ALERT, "The Thread caused an exception: %s", e.getMsg());
+ const char * details = e.getDetails();
+ if (details)
+ Printer::log(M_ERROR, " %s", details);
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_DEBUG, "%s", str.to_charp());
+ }
+ catch (...) {
+ Printer::log(M_ERROR | M_ALERT, "The Thread caused an unknown exception");
+ }
+
+ if (!success)
+ TaskMan::stop(-1);
+
return r;
}
diff --git a/src/ZHandle.cc b/src/ZHandle.cc
index e2f22f0..4f216c7 100644
--- a/src/ZHandle.cc
+++ b/src/ZHandle.cc
@@ -79,7 +79,7 @@ ssize_t Balau::ZStream::read(void * buf, size_t count) throw (GeneralException)
if (r <= 0)
return readTotal;
}
- Task::yield(nullptr);
+ Task::operationYield();
int r = inflate(&m_zin, Z_SYNC_FLUSH);
size_t didRead = count - m_zin.avail_out;
readTotal += didRead;
@@ -108,7 +108,7 @@ ssize_t Balau::ZStream::write(const void * buf, size_t count) throw (GeneralExce
m_zout.avail_out = BLOCK_SIZE;
int r = deflate(&m_zout, Z_NO_FLUSH);
EAssert(r == Z_OK, "deflate() didn't return Z_OK but %i", r);
- Task::yield(nullptr);
+ Task::operationYield();
size_t compressed = BLOCK_SIZE - m_zout.avail_out;
if (compressed) {
size_t w = m_h->forceWrite(obuf, compressed);
diff --git a/tests/test-Http.cc b/tests/test-Http.cc
index af78e43..88d7a8d 100644
--- a/tests/test-Http.cc
+++ b/tests/test-Http.cc
@@ -1,41 +1,92 @@
#include <Main.h>
#include <HttpServer.h>
#include <TaskMan.h>
-
-#define DAEMON_NAME "Balau/1.0"
+#include <Socket.h>
+#include <SimpleMustache.h>
using namespace Balau;
-class TestAction : public HttpServer::Action {
- public:
- TestAction() : Action(Regexes::any) { }
- virtual bool Do(HttpServer * server, Http::Request & req, HttpServer::Action::ActionMatch & match, IO<Handle> out) throw (GeneralException);
-};
-
-bool TestAction::Do(HttpServer * server, Http::Request & req, HttpServer::Action::ActionMatch & match, IO<Handle> out) throw (GeneralException) {
- HttpServer::Response response(server, req, out);
- response->writeString(
+const char htmlTemplateStr[] =
"<!DOCTYPE html PUBLIC \"-//W3C//DTD XHTML 1.0 Transitional//EN\"\n"
"\"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd\">\n"
"<html xmlns=\"http://www.w3.org/1999/xhtml\">\n"
" <head>\n"
-" <title>Test</title>\n"
+" <meta http-equiv=\"content-type\" content=\"text/html; charset=utf-8\" />\n"
+" <title>{{title}}</title>\n"
+" <style type=\"text/css\">\n"
+" body { font-family: arial, helvetica, sans-serif; }\n"
+" </style>\n"
" </head>\n"
"\n"
" <body>\n"
-" This is a test document.\n"
+" <h1>{{title}}</h1>\n"
+" <h2>{{msg}}</h2>\n"
" </body>\n"
-"</html>\n");
+"</html>\n"
+;
+
+class TestHtmlTemplate : public AtStart {
+ public:
+ TestHtmlTemplate() : AtStart(10), htmlTemplate(m_template) { }
+ virtual void doStart() { m_template.setTemplate(htmlTemplateStr); }
+
+ const SimpleMustache & htmlTemplate;
+ private:
+ SimpleMustache m_template;
+};
+
+static TestHtmlTemplate testHtmlTemplate;
+
+static Regex stopURL("/stop$");
+
+class StopAction : public HttpServer::Action {
+ public:
+ StopAction(Events::Async & event, bool & stop) : Action(stopURL), m_event(event), m_stop(stop) { }
+ private:
+ virtual bool Do(HttpServer * server, Http::Request & req, HttpServer::Action::ActionMatch & match, IO<Handle> out) throw (GeneralException);
+ Events::Async & m_event;
+ bool & m_stop;
+};
+
+bool StopAction::Do(HttpServer * server, Http::Request & req, HttpServer::Action::ActionMatch & match, IO<Handle> out) throw (GeneralException) {
+ m_stop = true;
+ m_event.trigger();
+ SimpleMustache::Context ctx;
+ HttpServer::Response response(server, req, out);
+
+ ctx["title"] = "Stop";
+ ctx["msg"] = "Server stopping";
+ testHtmlTemplate.htmlTemplate.render(response.get(), &ctx);
response.Flush();
return true;
}
-Balau::Regex testFailureURL("^/failure.html$");
+class TestAction : public HttpServer::Action {
+ public:
+ TestAction() : Action(Regexes::any) { }
+ private:
+ virtual bool Do(HttpServer * server, Http::Request & req, HttpServer::Action::ActionMatch & match, IO<Handle> out) throw (GeneralException);
+};
+
+bool TestAction::Do(HttpServer * server, Http::Request & req, HttpServer::Action::ActionMatch & match, IO<Handle> out) throw (GeneralException) {
+ SimpleMustache::Context ctx;
+ HttpServer::Response response(server, req, out);
+
+ ctx["title"] = "Test";
+ ctx["msg"] = "This is a test document.";
+
+ testHtmlTemplate.htmlTemplate.render(response.get(), &ctx);
+ response.Flush();
+ return true;
+}
+
+static Regex testFailureURL("^/failure.html$");
class TestFailure : public HttpServer::Action {
public:
TestFailure() : Action(testFailureURL) { }
+ private:
virtual bool Do(HttpServer * server, Http::Request & req, HttpServer::Action::ActionMatch & match, IO<Handle> out) throw (GeneralException);
};
@@ -43,32 +94,75 @@ bool TestFailure::Do(HttpServer * server, Http::Request & req, HttpServer::Actio
throw GeneralException("Test...");
}
-#define NTHREADS 4
+class Stopper : public Task {
+ virtual const char * getName() const { return "ServerStopper"; }
+ virtual void Do();
+};
+
+void Stopper::Do() {
+ IO<Socket> s(new Socket());
+ bool c = s->connect("localhost", 8080);
+ TAssert(c);
+ s->writeString("GET /stop HTTP/1.0\r\n\r\n");
+}
+
+static const int NTHREADS = 4;
void MainTask::Do() {
Printer::enable(M_DEBUG);
Printer::log(M_STATUS, "Test::Http running.");
- Thread * tms[NTHREADS];
+ TaskMan::TaskManThread * tms[NTHREADS];
for (int i = 0; i < NTHREADS; i++)
tms[i] = TaskMan::createThreadedTaskMan();
+ Events::Async event;
+ bool stop = false;
+
+ waitFor(&event);
+
HttpServer * s = new HttpServer();
- HttpServer::Action * a = new TestAction();
- HttpServer::Action * f = new TestFailure();
- a->registerMe(s);
- f->registerMe(s);
+ s->registerAction(new TestAction());
+ s->registerAction(new TestFailure());
+ s->registerAction(new StopAction(event, stop));
s->setPort(8080);
s->setLocal("localhost");
s->start();
+ Events::Timeout timeout(1);
+ waitFor(&timeout);
yield();
+ Events::TaskEvent stopperEvent;
+ Task * stopper = TaskMan::registerTask(new Stopper, &stopperEvent);
+ waitFor(&stopperEvent);
+
+ bool gotEvent = false, gotStopperEvent = false;
+ int count = 0;
+
+ while (!gotEvent || !gotStopperEvent) {
+ count++;
+ yield();
+ if (event.gotSignal()) {
+ TAssert(!gotEvent);
+ gotEvent = true;
+ }
+ if (stopperEvent.gotSignal()) {
+ TAssert(!gotStopperEvent);
+ gotStopperEvent = true;
+ stopperEvent.ack();
+ }
+ }
+ TAssert(count <= 2);
+
+ TAssert(stop);
+ Printer::log(M_STATUS, "Test::Http is stopping.");
+
s->stop();
for (int i = 0; i < NTHREADS; i++)
- tms[i]->join();
+ TaskMan::stopThreadedTaskMan(tms[i]);
Printer::log(M_STATUS, "Test::Http passed.");
}
diff --git a/tests/test-Sockets.cc b/tests/test-Sockets.cc
index a3e2755..4ef0a78 100644
--- a/tests/test-Sockets.cc
+++ b/tests/test-Sockets.cc
@@ -62,11 +62,16 @@ void MainTask::Do() {
Printer::enable(M_ALL);
Printer::log(M_STATUS, "Test::Sockets running.");
- Events::TaskEvent evtSvr(listener = Balau::createTask(new Listener<Worker>(1234)));
- Events::TaskEvent evtCln(Balau::createTask(new Client));
- Printer::log(M_STATUS, "Created %s", listener->getName());
+ Events::TaskEvent evtSvr;
+ Events::TaskEvent evtCln;
+
+ listener = TaskMan::registerTask(new Listener<Worker>(1234), &evtSvr);
+ TaskMan::registerTask(new Client, &evtCln);
+
waitFor(&evtSvr);
waitFor(&evtCln);
+
+ Printer::log(M_STATUS, "Created %s", listener->getName());
bool svrDone = false, clnDone = false;
while (!svrDone || !clnDone) {
yield();
diff --git a/tests/test-Tasks.cc b/tests/test-Tasks.cc
index 272e61d..6bbb503 100644
--- a/tests/test-Tasks.cc
+++ b/tests/test-Tasks.cc
@@ -1,6 +1,7 @@
#include <Main.h>
#include <Task.h>
#include <TaskMan.h>
+#include <StacklessTask.h>
using namespace Balau;
@@ -20,9 +21,42 @@ class TestTask : public Task {
}
};
+class TestOperation {
+ public:
+ TestOperation() : m_count(0), m_completed(false) { }
+ void Do() {
+ if (m_count++ == 0) {
+ m_timeout.set(0.2);
+ Task::operationYield(&m_timeout, Task::STACKLESS);
+ }
+ TAssert(m_timeout.gotSignal());
+ m_completed = true;
+ }
+ bool completed() { return m_completed; }
+ private:
+ int m_count;
+ bool m_completed;
+ Events::Timeout m_timeout;
+};
+
+class TestStackless : public StacklessTask {
+ public:
+ virtual const char * getName() const { return "TestStackless"; }
+ private:
+ virtual void Do() {
+ StacklessBegin();
+ m_operation = new TestOperation();
+ StacklessOperation(m_operation->Do());
+ TAssert(m_operation->completed());
+ delete m_operation;
+ StacklessEnd();
+ }
+ TestOperation * m_operation;
+};
+
static void yieldingFunction() {
Events::Timeout timeout(0.2);
- Task::yield(&timeout);
+ Task::operationYield(&timeout);
TAssert(timeout.gotSignal());
}
@@ -30,8 +64,15 @@ void MainTask::Do() {
customPrinter = new CustomPrinter();
Printer::log(M_STATUS, "Test::Tasks running.");
- Task * testTask = Balau::createTask(new TestTask());
- Events::TaskEvent taskEvt(testTask);
+ Events::TaskEvent taskEvt;
+ Task * testTask = TaskMan::registerTask(new TestTask(), &taskEvt);
+ waitFor(&taskEvt);
+ TAssert(!taskEvt.gotSignal());
+ yield();
+ TAssert(taskEvt.gotSignal());
+ taskEvt.ack();
+
+ Task * testStackless = TaskMan::registerTask(new TestStackless(), &taskEvt);
waitFor(&taskEvt);
TAssert(!taskEvt.gotSignal());
yield();