diff options
Diffstat (limited to 'includes')
-rw-r--r-- | includes/Main.h | 6 | ||||
-rw-r--r-- | includes/Socket.h | 3 | ||||
-rw-r--r-- | includes/Task.h | 5 | ||||
-rw-r--r-- | includes/TaskMan.h | 30 | ||||
-rw-r--r-- | includes/Threads.h | 7 |
5 files changed, 38 insertions, 13 deletions
diff --git a/includes/Main.h b/includes/Main.h index 3f81137..5fa64c1 100644 --- a/includes/Main.h +++ b/includes/Main.h @@ -45,7 +45,7 @@ namespace Balau { class MainTask : public Task { public: MainTask() : m_stopTaskManOnExit(true) { } - virtual ~MainTask() { if (m_stopTaskManOnExit) TaskMan::getTaskMan()->stop(); } + virtual ~MainTask() { if (m_stopTaskManOnExit) TaskMan::stop(); } virtual const char * getName() { return "Main Task"; } virtual void Do(); void stopTaskManOnExit(bool v) { m_stopTaskManOnExit = v; } @@ -77,8 +77,8 @@ class Main { try { m_status = RUNNING; - MainTask * mainTask = new MainTask(); - TaskMan::getTaskMan()->mainLoop(); + MainTask * mainTask = createTask(new MainTask()); + TaskMan::getDefaultTaskMan()->mainLoop(); m_status = STOPPING; } catch (Exit e) { diff --git a/includes/Socket.h b/includes/Socket.h index 164090c..a19b1e2 100644 --- a/includes/Socket.h +++ b/includes/Socket.h @@ -8,6 +8,7 @@ #include <netdb.h> #endif #include <Handle.h> +#include <TaskMan.h> #include <Task.h> #include <Printer.h> @@ -78,7 +79,7 @@ class Listener : public ListenerBase { public: Listener(int port, const char * local = "", void * opaque = NULL) : ListenerBase(port, local, opaque) { } protected: - virtual void factory(IO<Socket> & io, void * opaque) { new Worker(io, opaque); } + virtual void factory(IO<Socket> & io, void * opaque) { createTask(new Worker(io, opaque)); } virtual void setName() { m_name = String(ClassName(this).c_str()) + " - " + m_listener->getName(); } }; diff --git a/includes/Task.h b/includes/Task.h index 156add2..22c522e 100644 --- a/includes/Task.h +++ b/includes/Task.h @@ -128,10 +128,13 @@ class Task { m_okayToEAgain = enable; return oldValue; } + TaskMan * getMyTaskMan() { return m_taskMan; } private: size_t stackSize() { return 128 * 1024; } + void setup(TaskMan * taskMan); void switchTo(); - static void CALLBACK coroutine(void *); + static void CALLBACK coroutineTrampoline(void *); + void coroutine(); void * m_stack; #ifndef _WIN32 coro_context m_ctx; diff --git a/includes/TaskMan.h b/includes/TaskMan.h index b4645fe..afc2b8a 100644 --- a/includes/TaskMan.h +++ b/includes/TaskMan.h @@ -8,40 +8,54 @@ #include <ext/hash_set> #include <vector> #include <Threads.h> +#include <Exceptions.h> namespace gnu = __gnu_cxx; namespace Balau { class Task; +class TaskScheduler; + +namespace Events { + +class Async; + +}; class TaskMan { public: TaskMan(); ~TaskMan(); void mainLoop(); - void stop() { m_stopped = true; } - static TaskMan * getTaskMan(); + static TaskMan * getDefaultTaskMan(); struct ev_loop * getLoop() { return m_loop; } void signalTask(Task * t); - + static void stop(); + void stopMe() { m_stopped = true; } private: - void registerTask(Task * t); + static void registerTask(Task * t); + void addToPending(Task * t); #ifndef _WIN32 coro_context m_returnContext; #else void * m_fiber; #endif friend class Task; + friend class TaskScheduler; + template<class T> + friend T * createTask(T * t); struct taskHasher { size_t operator()(const Task * t) const { return reinterpret_cast<uintptr_t>(t); } }; typedef gnu::hash_set<Task *, taskHasher> taskHash_t; - typedef std::vector<Task *> taskList_t; taskHash_t m_tasks, m_signaledTasks; - taskList_t m_pendingAdd; - Lock m_pendingLock; - volatile bool m_stopped; + Queue<Task *> m_pendingAdd; + bool m_stopped; struct ev_loop * m_loop; bool m_allowedToSignal; + ev::async m_evt; }; +template<class T> +T * createTask(T * t) { TaskMan::registerTask(t); Assert(dynamic_cast<Task *>(t)); return t; } + }; diff --git a/includes/Threads.h b/includes/Threads.h index 2347a84..a1f270e 100644 --- a/includes/Threads.h +++ b/includes/Threads.h @@ -57,6 +57,13 @@ class Queue { m_lock.leave(); return t; } + int size() { + int r; + m_lock.enter(); + r = m_queue.size(); + m_lock.leave(); + return r; + } private: std::queue<T> m_queue; Lock m_lock; |