diff options
Diffstat (limited to 'includes')
-rw-r--r-- | includes/LuaTask.h | 5 | ||||
-rw-r--r-- | includes/Socket.h | 2 | ||||
-rw-r--r-- | includes/Task.h | 32 | ||||
-rw-r--r-- | includes/TaskMan.h | 8 | ||||
-rw-r--r-- | includes/Threads.h | 58 |
5 files changed, 41 insertions, 64 deletions
diff --git a/includes/LuaTask.h b/includes/LuaTask.h index bdde9b7..ee9d7a7 100644 --- a/includes/LuaTask.h +++ b/includes/LuaTask.h @@ -53,16 +53,15 @@ class LuaTask : public Task { class LuaMainTask : public Task { public: LuaMainTask(); - ~LuaMainTask() { L.close(); } + ~LuaMainTask(); void stop(); virtual const char * getName() const { return "LuaMainTask"; } private: void exec(LuaExecCell * cell); virtual void Do(); Lua L; - Events::Async m_queueEvent; Queue<LuaExecCell> m_queue; - bool m_stopping; + volatile bool m_stopping; friend class LuaExecCell; }; diff --git a/includes/Socket.h b/includes/Socket.h index 4a73277..f63edf0 100644 --- a/includes/Socket.h +++ b/includes/Socket.h @@ -81,7 +81,7 @@ class Listener : public ListenerBase { public: Listener(int port, const char * local = "", void * opaque = NULL) : ListenerBase(port, local, opaque) { } protected: - virtual void factory(IO<Socket> & io, void * opaque) { createTask(new Worker(io, opaque)); } + virtual void factory(IO<Socket> & io, void * opaque) { TaskMan::createTask(new Worker(io, opaque)); } virtual void setName() { m_name = String(ClassName(this).c_str()) + " - " + m_listener->getName(); } }; diff --git a/includes/Task.h b/includes/Task.h index 592d5c3..01d5e3f 100644 --- a/includes/Task.h +++ b/includes/Task.h @@ -169,4 +169,36 @@ class Task { bool m_okayToEAgain; }; +class QueueBase { + public: + bool isEmpty() { ScopeLock sl(m_lock); return !m_front; } + protected: + QueueBase() : m_front(NULL), m_back(NULL) { pthread_cond_init(&m_cond, NULL); } + ~QueueBase() { while (!isEmpty()) iPop(false); pthread_cond_destroy(&m_cond); } + void iPush(void * t); + void * iPop(bool wait); + + private: + QueueBase(const QueueBase &) = delete; + QueueBase & operator=(const QueueBase &) = delete; + Lock m_lock; + struct Cell { + Cell(void * elem) : m_next(NULL), m_prev(NULL), m_elem(elem) { } + Cell(const Cell &) = delete; + Cell & operator=(const Cell &) = delete; + Cell * m_next, * m_prev; + void * m_elem; + }; + Cell * m_front, * m_back; + pthread_cond_t m_cond; + Events::Async m_event; +}; + +template<class T> +class Queue : public QueueBase { + public: + void push(T * t) { iPush(t); } + T * pop(bool wait = true) { return (T *) iPop(wait); } +}; + }; diff --git a/includes/TaskMan.h b/includes/TaskMan.h index 6a50491..088d5d7 100644 --- a/includes/TaskMan.h +++ b/includes/TaskMan.h @@ -9,12 +9,12 @@ #include <queue> #include <Threads.h> #include <Exceptions.h> +#include <Task.h> namespace gnu = __gnu_cxx; namespace Balau { -class Task; class TaskScheduler; namespace Events { @@ -35,6 +35,9 @@ class TaskMan { void stopMe(int code) { m_stopped = true; m_stopCode = code; } static Thread * createThreadedTaskMan(); bool stopped() { return m_stopped; } + template<class T> + static T * createTask(T * t, Task * stick = NULL) { TaskMan::registerTask(t, stick); return t; } + private: static void registerTask(Task * t, Task * stick); void * getStack(); @@ -62,7 +65,4 @@ class TaskMan { int m_stopCode; }; -template<class T> -T * createTask(T * t, Task * stick) { TaskMan::registerTask(t, stick); return t; } - }; diff --git a/includes/Threads.h b/includes/Threads.h index 46ea365..0dfce1a 100644 --- a/includes/Threads.h +++ b/includes/Threads.h @@ -5,8 +5,7 @@ namespace Balau { -template<class T> -class Queue; +class QueueBase; class Lock { public: @@ -18,8 +17,7 @@ class Lock { Lock(const Lock &) = delete; Lock & operator=(const Lock &) = delete; pthread_mutex_t m_lock; - template<class T> - friend class Queue; + friend class QueueBase; }; class ScopeLock { @@ -95,56 +93,4 @@ class GlobalThread : public Thread, public AtStart, public AtExit { virtual void doExit() { join(); } }; -template<class T> -class Queue { - public: - Queue() : m_front(NULL), m_back(NULL) { pthread_cond_init(&m_cond, NULL); } - ~Queue() { while (!isEmpty()) pop(); pthread_cond_destroy(&m_cond); } - void push(T * t) { - ScopeLock sl(m_lock); - Cell * c = new Cell(t); - c->m_prev = m_back; - if (m_back) - m_back->m_next = c; - else - m_front = c; - m_back = c; - pthread_cond_signal(&m_cond); - } - T * pop(bool wait = true) { - ScopeLock sl(m_lock); - while (!m_front && wait) - pthread_cond_wait(&m_cond, &m_lock.m_lock); - Cell * c = m_front; - if (!c) - return NULL; - m_front = c->m_next; - if (m_front) - m_front->m_prev = NULL; - else - m_back = NULL; - T * t = c->m_elem; - delete c; - return t; - } - bool isEmpty() { - ScopeLock sl(m_lock); - return !m_front; - } - private: - Queue(const Queue &) = delete; - Queue & operator=(const Queue &) = delete; - Lock m_lock; - struct Cell { - Cell(T * elem) : m_next(NULL), m_prev(NULL), m_elem(elem) { } - Cell(const Cell &) = delete; - Cell & operator=(const Cell &) = delete; - Cell * m_next, * m_prev; - T * m_elem; - }; - Cell * volatile m_front; - Cell * volatile m_back; - pthread_cond_t m_cond; -}; - }; |