From 37221dc091725c6fea09181b845308ab8f26c795 Mon Sep 17 00:00:00 2001 From: Pixel Date: Tue, 3 Apr 2012 08:46:21 -0700 Subject: Reworking a bit the way the queues are working, and thus, the way the LuaTMainTask queue works. --- includes/LuaTask.h | 5 ++--- includes/Socket.h | 2 +- includes/Task.h | 32 ++++++++++++++++++++++++++++ includes/TaskMan.h | 8 +++---- includes/Threads.h | 58 ++------------------------------------------------- src/HttpServer.cc | 2 +- src/LuaTask.cc | 24 ++++++++++++++------- src/Main.cc | 2 +- src/Task.cc | 38 +++++++++++++++++++++++++++++++++ tests/test-Sockets.cc | 4 ++-- tests/test-Tasks.cc | 2 +- 11 files changed, 100 insertions(+), 77 deletions(-) diff --git a/includes/LuaTask.h b/includes/LuaTask.h index bdde9b7..ee9d7a7 100644 --- a/includes/LuaTask.h +++ b/includes/LuaTask.h @@ -53,16 +53,15 @@ class LuaTask : public Task { class LuaMainTask : public Task { public: LuaMainTask(); - ~LuaMainTask() { L.close(); } + ~LuaMainTask(); void stop(); virtual const char * getName() const { return "LuaMainTask"; } private: void exec(LuaExecCell * cell); virtual void Do(); Lua L; - Events::Async m_queueEvent; Queue m_queue; - bool m_stopping; + volatile bool m_stopping; friend class LuaExecCell; }; diff --git a/includes/Socket.h b/includes/Socket.h index 4a73277..f63edf0 100644 --- a/includes/Socket.h +++ b/includes/Socket.h @@ -81,7 +81,7 @@ class Listener : public ListenerBase { public: Listener(int port, const char * local = "", void * opaque = NULL) : ListenerBase(port, local, opaque) { } protected: - virtual void factory(IO & io, void * opaque) { createTask(new Worker(io, opaque)); } + virtual void factory(IO & io, void * opaque) { TaskMan::createTask(new Worker(io, opaque)); } virtual void setName() { m_name = String(ClassName(this).c_str()) + " - " + m_listener->getName(); } }; diff --git a/includes/Task.h b/includes/Task.h index 592d5c3..01d5e3f 100644 --- a/includes/Task.h +++ b/includes/Task.h @@ -169,4 +169,36 @@ class Task { bool m_okayToEAgain; }; +class QueueBase { + public: + bool isEmpty() { ScopeLock sl(m_lock); return !m_front; } + protected: + QueueBase() : m_front(NULL), m_back(NULL) { pthread_cond_init(&m_cond, NULL); } + ~QueueBase() { while (!isEmpty()) iPop(false); pthread_cond_destroy(&m_cond); } + void iPush(void * t); + void * iPop(bool wait); + + private: + QueueBase(const QueueBase &) = delete; + QueueBase & operator=(const QueueBase &) = delete; + Lock m_lock; + struct Cell { + Cell(void * elem) : m_next(NULL), m_prev(NULL), m_elem(elem) { } + Cell(const Cell &) = delete; + Cell & operator=(const Cell &) = delete; + Cell * m_next, * m_prev; + void * m_elem; + }; + Cell * m_front, * m_back; + pthread_cond_t m_cond; + Events::Async m_event; +}; + +template +class Queue : public QueueBase { + public: + void push(T * t) { iPush(t); } + T * pop(bool wait = true) { return (T *) iPop(wait); } +}; + }; diff --git a/includes/TaskMan.h b/includes/TaskMan.h index 6a50491..088d5d7 100644 --- a/includes/TaskMan.h +++ b/includes/TaskMan.h @@ -9,12 +9,12 @@ #include #include #include +#include namespace gnu = __gnu_cxx; namespace Balau { -class Task; class TaskScheduler; namespace Events { @@ -35,6 +35,9 @@ class TaskMan { void stopMe(int code) { m_stopped = true; m_stopCode = code; } static Thread * createThreadedTaskMan(); bool stopped() { return m_stopped; } + template + static T * createTask(T * t, Task * stick = NULL) { TaskMan::registerTask(t, stick); return t; } + private: static void registerTask(Task * t, Task * stick); void * getStack(); @@ -62,7 +65,4 @@ class TaskMan { int m_stopCode; }; -template -T * createTask(T * t, Task * stick) { TaskMan::registerTask(t, stick); return t; } - }; diff --git a/includes/Threads.h b/includes/Threads.h index 46ea365..0dfce1a 100644 --- a/includes/Threads.h +++ b/includes/Threads.h @@ -5,8 +5,7 @@ namespace Balau { -template -class Queue; +class QueueBase; class Lock { public: @@ -18,8 +17,7 @@ class Lock { Lock(const Lock &) = delete; Lock & operator=(const Lock &) = delete; pthread_mutex_t m_lock; - template - friend class Queue; + friend class QueueBase; }; class ScopeLock { @@ -95,56 +93,4 @@ class GlobalThread : public Thread, public AtStart, public AtExit { virtual void doExit() { join(); } }; -template -class Queue { - public: - Queue() : m_front(NULL), m_back(NULL) { pthread_cond_init(&m_cond, NULL); } - ~Queue() { while (!isEmpty()) pop(); pthread_cond_destroy(&m_cond); } - void push(T * t) { - ScopeLock sl(m_lock); - Cell * c = new Cell(t); - c->m_prev = m_back; - if (m_back) - m_back->m_next = c; - else - m_front = c; - m_back = c; - pthread_cond_signal(&m_cond); - } - T * pop(bool wait = true) { - ScopeLock sl(m_lock); - while (!m_front && wait) - pthread_cond_wait(&m_cond, &m_lock.m_lock); - Cell * c = m_front; - if (!c) - return NULL; - m_front = c->m_next; - if (m_front) - m_front->m_prev = NULL; - else - m_back = NULL; - T * t = c->m_elem; - delete c; - return t; - } - bool isEmpty() { - ScopeLock sl(m_lock); - return !m_front; - } - private: - Queue(const Queue &) = delete; - Queue & operator=(const Queue &) = delete; - Lock m_lock; - struct Cell { - Cell(T * elem) : m_next(NULL), m_prev(NULL), m_elem(elem) { } - Cell(const Cell &) = delete; - Cell & operator=(const Cell &) = delete; - Cell * m_next, * m_prev; - T * m_elem; - }; - Cell * volatile m_front; - Cell * volatile m_back; - pthread_cond_t m_cond; -}; - }; diff --git a/src/HttpServer.cc b/src/HttpServer.cc index c9cb0ee..9fc48c5 100644 --- a/src/HttpServer.cc +++ b/src/HttpServer.cc @@ -594,7 +594,7 @@ typedef Balau::Listener HttpListener; void Balau::HttpServer::start() { AAssert(!m_started, "Don't start an HttpServer twice"); - m_listenerPtr = createTask(new HttpListener(m_port, m_local.to_charp(), this)); + m_listenerPtr = TaskMan::createTask(new HttpListener(m_port, m_local.to_charp(), this)); m_started = true; } diff --git a/src/LuaTask.cc b/src/LuaTask.cc index 9a7e978..acf63b5 100644 --- a/src/LuaTask.cc +++ b/src/LuaTask.cc @@ -2,6 +2,10 @@ #include "Main.h" #include "TaskMan.h" +class LuaTaskDummy : public Balau::LuaExecCell { + virtual void run(Balau::Lua &) { } +}; + Balau::LuaMainTask::LuaMainTask() : m_stopping(false) { L.open_base(); L.open_table(); @@ -12,25 +16,29 @@ Balau::LuaMainTask::LuaMainTask() : m_stopping(false) { L.open_jit(); } +Balau::LuaMainTask::~LuaMainTask() { + L.close(); +} + void Balau::LuaMainTask::exec(LuaExecCell * cell) { m_queue.push(cell); - m_queueEvent.trigger(); } void Balau::LuaMainTask::stop() { Atomic::CmpXChgVal(&m_stopping, true, false); - m_queueEvent.trigger(); + exec(new LuaTaskDummy()); } void Balau::LuaMainTask::Do() { while (!m_stopping) { - waitFor(&m_queueEvent); - LuaExecCell * cell; - while ((cell = m_queue.pop(false))) - createTask(new LuaTask(L.thread(), cell), this); - - yield(); + while ((cell = m_queue.pop(false))) { + if (dynamic_cast(cell)) { + delete cell; + break; + } + TaskMan::createTask(new LuaTask(L.thread(), cell), this); + } } } diff --git a/src/Main.cc b/src/Main.cc index fd4021f..e1eae66 100644 --- a/src/Main.cc +++ b/src/Main.cc @@ -60,7 +60,7 @@ int Balau::Main::bootstrap(int _argc, char ** _argv) { try { m_status = RUNNING; - createTask(new MainTask()); + TaskMan::createTask(new MainTask()); r = TaskMan::getDefaultTaskMan()->mainLoop(); m_status = STOPPING; } diff --git a/src/Task.cc b/src/Task.cc index 6278fd5..1486540 100644 --- a/src/Task.cc +++ b/src/Task.cc @@ -234,3 +234,41 @@ void Balau::Task::yield(Events::BaseEvent * evt, bool interruptible) throw (Gene throw EAgain(evt); } } + +void Balau::QueueBase::iPush(void * t) { + ScopeLock sl(m_lock); + Cell * c = new Cell(t); + c->m_prev = m_back; + if (m_back) + m_back->m_next = c; + else + m_front = c; + m_back = c; + pthread_cond_signal(&m_cond); + m_event.trigger(); +} + +void * Balau::QueueBase::iPop(bool wait) { + ScopeLock sl(m_lock); + while (!m_front) { + if (wait) { + pthread_cond_wait(&m_cond, &m_lock.m_lock); + } else { + Task::prepare(&m_event); + m_lock.leave(); + Task::yield(&m_event); + m_lock.enter(); + } + } + Cell * c = m_front; + if (!c) + return NULL; + m_front = c->m_next; + if (m_front) + m_front->m_prev = NULL; + else + m_back = NULL; + void * t = c->m_elem; + delete c; + return t; +} diff --git a/tests/test-Sockets.cc b/tests/test-Sockets.cc index a3e2755..c576c97 100644 --- a/tests/test-Sockets.cc +++ b/tests/test-Sockets.cc @@ -62,8 +62,8 @@ void MainTask::Do() { Printer::enable(M_ALL); Printer::log(M_STATUS, "Test::Sockets running."); - Events::TaskEvent evtSvr(listener = Balau::createTask(new Listener(1234))); - Events::TaskEvent evtCln(Balau::createTask(new Client)); + Events::TaskEvent evtSvr(listener = TaskMan::createTask(new Listener(1234))); + Events::TaskEvent evtCln(TaskMan::createTask(new Client)); Printer::log(M_STATUS, "Created %s", listener->getName()); waitFor(&evtSvr); waitFor(&evtCln); diff --git a/tests/test-Tasks.cc b/tests/test-Tasks.cc index 272e61d..60d1dd3 100644 --- a/tests/test-Tasks.cc +++ b/tests/test-Tasks.cc @@ -30,7 +30,7 @@ void MainTask::Do() { customPrinter = new CustomPrinter(); Printer::log(M_STATUS, "Test::Tasks running."); - Task * testTask = Balau::createTask(new TestTask()); + Task * testTask = TaskMan::createTask(new TestTask()); Events::TaskEvent taskEvt(testTask); waitFor(&taskEvt); TAssert(!taskEvt.gotSignal()); -- cgit v1.2.3