summaryrefslogtreecommitdiff
path: root/includes
diff options
context:
space:
mode:
Diffstat (limited to 'includes')
-rw-r--r--includes/LuaTask.h5
-rw-r--r--includes/Socket.h2
-rw-r--r--includes/Task.h32
-rw-r--r--includes/TaskMan.h8
-rw-r--r--includes/Threads.h58
5 files changed, 41 insertions, 64 deletions
diff --git a/includes/LuaTask.h b/includes/LuaTask.h
index bdde9b7..ee9d7a7 100644
--- a/includes/LuaTask.h
+++ b/includes/LuaTask.h
@@ -53,16 +53,15 @@ class LuaTask : public Task {
class LuaMainTask : public Task {
public:
LuaMainTask();
- ~LuaMainTask() { L.close(); }
+ ~LuaMainTask();
void stop();
virtual const char * getName() const { return "LuaMainTask"; }
private:
void exec(LuaExecCell * cell);
virtual void Do();
Lua L;
- Events::Async m_queueEvent;
Queue<LuaExecCell> m_queue;
- bool m_stopping;
+ volatile bool m_stopping;
friend class LuaExecCell;
};
diff --git a/includes/Socket.h b/includes/Socket.h
index 4a73277..f63edf0 100644
--- a/includes/Socket.h
+++ b/includes/Socket.h
@@ -81,7 +81,7 @@ class Listener : public ListenerBase {
public:
Listener(int port, const char * local = "", void * opaque = NULL) : ListenerBase(port, local, opaque) { }
protected:
- virtual void factory(IO<Socket> & io, void * opaque) { createTask(new Worker(io, opaque)); }
+ virtual void factory(IO<Socket> & io, void * opaque) { TaskMan::createTask(new Worker(io, opaque)); }
virtual void setName() { m_name = String(ClassName(this).c_str()) + " - " + m_listener->getName(); }
};
diff --git a/includes/Task.h b/includes/Task.h
index 592d5c3..01d5e3f 100644
--- a/includes/Task.h
+++ b/includes/Task.h
@@ -169,4 +169,36 @@ class Task {
bool m_okayToEAgain;
};
+class QueueBase {
+ public:
+ bool isEmpty() { ScopeLock sl(m_lock); return !m_front; }
+ protected:
+ QueueBase() : m_front(NULL), m_back(NULL) { pthread_cond_init(&m_cond, NULL); }
+ ~QueueBase() { while (!isEmpty()) iPop(false); pthread_cond_destroy(&m_cond); }
+ void iPush(void * t);
+ void * iPop(bool wait);
+
+ private:
+ QueueBase(const QueueBase &) = delete;
+ QueueBase & operator=(const QueueBase &) = delete;
+ Lock m_lock;
+ struct Cell {
+ Cell(void * elem) : m_next(NULL), m_prev(NULL), m_elem(elem) { }
+ Cell(const Cell &) = delete;
+ Cell & operator=(const Cell &) = delete;
+ Cell * m_next, * m_prev;
+ void * m_elem;
+ };
+ Cell * m_front, * m_back;
+ pthread_cond_t m_cond;
+ Events::Async m_event;
+};
+
+template<class T>
+class Queue : public QueueBase {
+ public:
+ void push(T * t) { iPush(t); }
+ T * pop(bool wait = true) { return (T *) iPop(wait); }
+};
+
};
diff --git a/includes/TaskMan.h b/includes/TaskMan.h
index 6a50491..088d5d7 100644
--- a/includes/TaskMan.h
+++ b/includes/TaskMan.h
@@ -9,12 +9,12 @@
#include <queue>
#include <Threads.h>
#include <Exceptions.h>
+#include <Task.h>
namespace gnu = __gnu_cxx;
namespace Balau {
-class Task;
class TaskScheduler;
namespace Events {
@@ -35,6 +35,9 @@ class TaskMan {
void stopMe(int code) { m_stopped = true; m_stopCode = code; }
static Thread * createThreadedTaskMan();
bool stopped() { return m_stopped; }
+ template<class T>
+ static T * createTask(T * t, Task * stick = NULL) { TaskMan::registerTask(t, stick); return t; }
+
private:
static void registerTask(Task * t, Task * stick);
void * getStack();
@@ -62,7 +65,4 @@ class TaskMan {
int m_stopCode;
};
-template<class T>
-T * createTask(T * t, Task * stick) { TaskMan::registerTask(t, stick); return t; }
-
};
diff --git a/includes/Threads.h b/includes/Threads.h
index 46ea365..0dfce1a 100644
--- a/includes/Threads.h
+++ b/includes/Threads.h
@@ -5,8 +5,7 @@
namespace Balau {
-template<class T>
-class Queue;
+class QueueBase;
class Lock {
public:
@@ -18,8 +17,7 @@ class Lock {
Lock(const Lock &) = delete;
Lock & operator=(const Lock &) = delete;
pthread_mutex_t m_lock;
- template<class T>
- friend class Queue;
+ friend class QueueBase;
};
class ScopeLock {
@@ -95,56 +93,4 @@ class GlobalThread : public Thread, public AtStart, public AtExit {
virtual void doExit() { join(); }
};
-template<class T>
-class Queue {
- public:
- Queue() : m_front(NULL), m_back(NULL) { pthread_cond_init(&m_cond, NULL); }
- ~Queue() { while (!isEmpty()) pop(); pthread_cond_destroy(&m_cond); }
- void push(T * t) {
- ScopeLock sl(m_lock);
- Cell * c = new Cell(t);
- c->m_prev = m_back;
- if (m_back)
- m_back->m_next = c;
- else
- m_front = c;
- m_back = c;
- pthread_cond_signal(&m_cond);
- }
- T * pop(bool wait = true) {
- ScopeLock sl(m_lock);
- while (!m_front && wait)
- pthread_cond_wait(&m_cond, &m_lock.m_lock);
- Cell * c = m_front;
- if (!c)
- return NULL;
- m_front = c->m_next;
- if (m_front)
- m_front->m_prev = NULL;
- else
- m_back = NULL;
- T * t = c->m_elem;
- delete c;
- return t;
- }
- bool isEmpty() {
- ScopeLock sl(m_lock);
- return !m_front;
- }
- private:
- Queue(const Queue &) = delete;
- Queue & operator=(const Queue &) = delete;
- Lock m_lock;
- struct Cell {
- Cell(T * elem) : m_next(NULL), m_prev(NULL), m_elem(elem) { }
- Cell(const Cell &) = delete;
- Cell & operator=(const Cell &) = delete;
- Cell * m_next, * m_prev;
- T * m_elem;
- };
- Cell * volatile m_front;
- Cell * volatile m_back;
- pthread_cond_t m_cond;
-};
-
};