summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--includes/LuaTask.h5
-rw-r--r--includes/Socket.h2
-rw-r--r--includes/Task.h32
-rw-r--r--includes/TaskMan.h8
-rw-r--r--includes/Threads.h58
-rw-r--r--src/HttpServer.cc2
-rw-r--r--src/LuaTask.cc24
-rw-r--r--src/Main.cc2
-rw-r--r--src/Task.cc38
-rw-r--r--tests/test-Sockets.cc4
-rw-r--r--tests/test-Tasks.cc2
11 files changed, 100 insertions, 77 deletions
diff --git a/includes/LuaTask.h b/includes/LuaTask.h
index bdde9b7..ee9d7a7 100644
--- a/includes/LuaTask.h
+++ b/includes/LuaTask.h
@@ -53,16 +53,15 @@ class LuaTask : public Task {
class LuaMainTask : public Task {
public:
LuaMainTask();
- ~LuaMainTask() { L.close(); }
+ ~LuaMainTask();
void stop();
virtual const char * getName() const { return "LuaMainTask"; }
private:
void exec(LuaExecCell * cell);
virtual void Do();
Lua L;
- Events::Async m_queueEvent;
Queue<LuaExecCell> m_queue;
- bool m_stopping;
+ volatile bool m_stopping;
friend class LuaExecCell;
};
diff --git a/includes/Socket.h b/includes/Socket.h
index 4a73277..f63edf0 100644
--- a/includes/Socket.h
+++ b/includes/Socket.h
@@ -81,7 +81,7 @@ class Listener : public ListenerBase {
public:
Listener(int port, const char * local = "", void * opaque = NULL) : ListenerBase(port, local, opaque) { }
protected:
- virtual void factory(IO<Socket> & io, void * opaque) { createTask(new Worker(io, opaque)); }
+ virtual void factory(IO<Socket> & io, void * opaque) { TaskMan::createTask(new Worker(io, opaque)); }
virtual void setName() { m_name = String(ClassName(this).c_str()) + " - " + m_listener->getName(); }
};
diff --git a/includes/Task.h b/includes/Task.h
index 592d5c3..01d5e3f 100644
--- a/includes/Task.h
+++ b/includes/Task.h
@@ -169,4 +169,36 @@ class Task {
bool m_okayToEAgain;
};
+class QueueBase {
+ public:
+ bool isEmpty() { ScopeLock sl(m_lock); return !m_front; }
+ protected:
+ QueueBase() : m_front(NULL), m_back(NULL) { pthread_cond_init(&m_cond, NULL); }
+ ~QueueBase() { while (!isEmpty()) iPop(false); pthread_cond_destroy(&m_cond); }
+ void iPush(void * t);
+ void * iPop(bool wait);
+
+ private:
+ QueueBase(const QueueBase &) = delete;
+ QueueBase & operator=(const QueueBase &) = delete;
+ Lock m_lock;
+ struct Cell {
+ Cell(void * elem) : m_next(NULL), m_prev(NULL), m_elem(elem) { }
+ Cell(const Cell &) = delete;
+ Cell & operator=(const Cell &) = delete;
+ Cell * m_next, * m_prev;
+ void * m_elem;
+ };
+ Cell * m_front, * m_back;
+ pthread_cond_t m_cond;
+ Events::Async m_event;
+};
+
+template<class T>
+class Queue : public QueueBase {
+ public:
+ void push(T * t) { iPush(t); }
+ T * pop(bool wait = true) { return (T *) iPop(wait); }
+};
+
};
diff --git a/includes/TaskMan.h b/includes/TaskMan.h
index 6a50491..088d5d7 100644
--- a/includes/TaskMan.h
+++ b/includes/TaskMan.h
@@ -9,12 +9,12 @@
#include <queue>
#include <Threads.h>
#include <Exceptions.h>
+#include <Task.h>
namespace gnu = __gnu_cxx;
namespace Balau {
-class Task;
class TaskScheduler;
namespace Events {
@@ -35,6 +35,9 @@ class TaskMan {
void stopMe(int code) { m_stopped = true; m_stopCode = code; }
static Thread * createThreadedTaskMan();
bool stopped() { return m_stopped; }
+ template<class T>
+ static T * createTask(T * t, Task * stick = NULL) { TaskMan::registerTask(t, stick); return t; }
+
private:
static void registerTask(Task * t, Task * stick);
void * getStack();
@@ -62,7 +65,4 @@ class TaskMan {
int m_stopCode;
};
-template<class T>
-T * createTask(T * t, Task * stick) { TaskMan::registerTask(t, stick); return t; }
-
};
diff --git a/includes/Threads.h b/includes/Threads.h
index 46ea365..0dfce1a 100644
--- a/includes/Threads.h
+++ b/includes/Threads.h
@@ -5,8 +5,7 @@
namespace Balau {
-template<class T>
-class Queue;
+class QueueBase;
class Lock {
public:
@@ -18,8 +17,7 @@ class Lock {
Lock(const Lock &) = delete;
Lock & operator=(const Lock &) = delete;
pthread_mutex_t m_lock;
- template<class T>
- friend class Queue;
+ friend class QueueBase;
};
class ScopeLock {
@@ -95,56 +93,4 @@ class GlobalThread : public Thread, public AtStart, public AtExit {
virtual void doExit() { join(); }
};
-template<class T>
-class Queue {
- public:
- Queue() : m_front(NULL), m_back(NULL) { pthread_cond_init(&m_cond, NULL); }
- ~Queue() { while (!isEmpty()) pop(); pthread_cond_destroy(&m_cond); }
- void push(T * t) {
- ScopeLock sl(m_lock);
- Cell * c = new Cell(t);
- c->m_prev = m_back;
- if (m_back)
- m_back->m_next = c;
- else
- m_front = c;
- m_back = c;
- pthread_cond_signal(&m_cond);
- }
- T * pop(bool wait = true) {
- ScopeLock sl(m_lock);
- while (!m_front && wait)
- pthread_cond_wait(&m_cond, &m_lock.m_lock);
- Cell * c = m_front;
- if (!c)
- return NULL;
- m_front = c->m_next;
- if (m_front)
- m_front->m_prev = NULL;
- else
- m_back = NULL;
- T * t = c->m_elem;
- delete c;
- return t;
- }
- bool isEmpty() {
- ScopeLock sl(m_lock);
- return !m_front;
- }
- private:
- Queue(const Queue &) = delete;
- Queue & operator=(const Queue &) = delete;
- Lock m_lock;
- struct Cell {
- Cell(T * elem) : m_next(NULL), m_prev(NULL), m_elem(elem) { }
- Cell(const Cell &) = delete;
- Cell & operator=(const Cell &) = delete;
- Cell * m_next, * m_prev;
- T * m_elem;
- };
- Cell * volatile m_front;
- Cell * volatile m_back;
- pthread_cond_t m_cond;
-};
-
};
diff --git a/src/HttpServer.cc b/src/HttpServer.cc
index c9cb0ee..9fc48c5 100644
--- a/src/HttpServer.cc
+++ b/src/HttpServer.cc
@@ -594,7 +594,7 @@ typedef Balau::Listener<Balau::HttpWorker> HttpListener;
void Balau::HttpServer::start() {
AAssert(!m_started, "Don't start an HttpServer twice");
- m_listenerPtr = createTask(new HttpListener(m_port, m_local.to_charp(), this));
+ m_listenerPtr = TaskMan::createTask(new HttpListener(m_port, m_local.to_charp(), this));
m_started = true;
}
diff --git a/src/LuaTask.cc b/src/LuaTask.cc
index 9a7e978..acf63b5 100644
--- a/src/LuaTask.cc
+++ b/src/LuaTask.cc
@@ -2,6 +2,10 @@
#include "Main.h"
#include "TaskMan.h"
+class LuaTaskDummy : public Balau::LuaExecCell {
+ virtual void run(Balau::Lua &) { }
+};
+
Balau::LuaMainTask::LuaMainTask() : m_stopping(false) {
L.open_base();
L.open_table();
@@ -12,25 +16,29 @@ Balau::LuaMainTask::LuaMainTask() : m_stopping(false) {
L.open_jit();
}
+Balau::LuaMainTask::~LuaMainTask() {
+ L.close();
+}
+
void Balau::LuaMainTask::exec(LuaExecCell * cell) {
m_queue.push(cell);
- m_queueEvent.trigger();
}
void Balau::LuaMainTask::stop() {
Atomic::CmpXChgVal(&m_stopping, true, false);
- m_queueEvent.trigger();
+ exec(new LuaTaskDummy());
}
void Balau::LuaMainTask::Do() {
while (!m_stopping) {
- waitFor(&m_queueEvent);
-
LuaExecCell * cell;
- while ((cell = m_queue.pop(false)))
- createTask(new LuaTask(L.thread(), cell), this);
-
- yield();
+ while ((cell = m_queue.pop(false))) {
+ if (dynamic_cast<LuaTaskDummy *>(cell)) {
+ delete cell;
+ break;
+ }
+ TaskMan::createTask(new LuaTask(L.thread(), cell), this);
+ }
}
}
diff --git a/src/Main.cc b/src/Main.cc
index fd4021f..e1eae66 100644
--- a/src/Main.cc
+++ b/src/Main.cc
@@ -60,7 +60,7 @@ int Balau::Main::bootstrap(int _argc, char ** _argv) {
try {
m_status = RUNNING;
- createTask(new MainTask());
+ TaskMan::createTask(new MainTask());
r = TaskMan::getDefaultTaskMan()->mainLoop();
m_status = STOPPING;
}
diff --git a/src/Task.cc b/src/Task.cc
index 6278fd5..1486540 100644
--- a/src/Task.cc
+++ b/src/Task.cc
@@ -234,3 +234,41 @@ void Balau::Task::yield(Events::BaseEvent * evt, bool interruptible) throw (Gene
throw EAgain(evt);
}
}
+
+void Balau::QueueBase::iPush(void * t) {
+ ScopeLock sl(m_lock);
+ Cell * c = new Cell(t);
+ c->m_prev = m_back;
+ if (m_back)
+ m_back->m_next = c;
+ else
+ m_front = c;
+ m_back = c;
+ pthread_cond_signal(&m_cond);
+ m_event.trigger();
+}
+
+void * Balau::QueueBase::iPop(bool wait) {
+ ScopeLock sl(m_lock);
+ while (!m_front) {
+ if (wait) {
+ pthread_cond_wait(&m_cond, &m_lock.m_lock);
+ } else {
+ Task::prepare(&m_event);
+ m_lock.leave();
+ Task::yield(&m_event);
+ m_lock.enter();
+ }
+ }
+ Cell * c = m_front;
+ if (!c)
+ return NULL;
+ m_front = c->m_next;
+ if (m_front)
+ m_front->m_prev = NULL;
+ else
+ m_back = NULL;
+ void * t = c->m_elem;
+ delete c;
+ return t;
+}
diff --git a/tests/test-Sockets.cc b/tests/test-Sockets.cc
index a3e2755..c576c97 100644
--- a/tests/test-Sockets.cc
+++ b/tests/test-Sockets.cc
@@ -62,8 +62,8 @@ void MainTask::Do() {
Printer::enable(M_ALL);
Printer::log(M_STATUS, "Test::Sockets running.");
- Events::TaskEvent evtSvr(listener = Balau::createTask(new Listener<Worker>(1234)));
- Events::TaskEvent evtCln(Balau::createTask(new Client));
+ Events::TaskEvent evtSvr(listener = TaskMan::createTask(new Listener<Worker>(1234)));
+ Events::TaskEvent evtCln(TaskMan::createTask(new Client));
Printer::log(M_STATUS, "Created %s", listener->getName());
waitFor(&evtSvr);
waitFor(&evtCln);
diff --git a/tests/test-Tasks.cc b/tests/test-Tasks.cc
index 272e61d..60d1dd3 100644
--- a/tests/test-Tasks.cc
+++ b/tests/test-Tasks.cc
@@ -30,7 +30,7 @@ void MainTask::Do() {
customPrinter = new CustomPrinter();
Printer::log(M_STATUS, "Test::Tasks running.");
- Task * testTask = Balau::createTask(new TestTask());
+ Task * testTask = TaskMan::createTask(new TestTask());
Events::TaskEvent taskEvt(testTask);
waitFor(&taskEvt);
TAssert(!taskEvt.gotSignal());