From 1d796e6cf639354614f1152baab65d317271c357 Mon Sep 17 00:00:00 2001 From: Pixel Date: Wed, 16 Nov 2011 17:26:28 -0800 Subject: Kind of a big revamp of the TaskMan / Task model, in order to introduce a TaskScheduler. The idea is that we need to support multiple task managers from multiple threads. So that revamp means we now should be able to support that, except the TaskScheduler needs to implement a round robin system, to distribute tasks across multiple task managers. But at least, the fundamental redesign to permit this is here. --- includes/Main.h | 6 +++--- includes/Socket.h | 3 ++- includes/Task.h | 5 ++++- includes/TaskMan.h | 30 ++++++++++++++++++++++-------- includes/Threads.h | 7 +++++++ 5 files changed, 38 insertions(+), 13 deletions(-) (limited to 'includes') diff --git a/includes/Main.h b/includes/Main.h index 3f81137..5fa64c1 100644 --- a/includes/Main.h +++ b/includes/Main.h @@ -45,7 +45,7 @@ namespace Balau { class MainTask : public Task { public: MainTask() : m_stopTaskManOnExit(true) { } - virtual ~MainTask() { if (m_stopTaskManOnExit) TaskMan::getTaskMan()->stop(); } + virtual ~MainTask() { if (m_stopTaskManOnExit) TaskMan::stop(); } virtual const char * getName() { return "Main Task"; } virtual void Do(); void stopTaskManOnExit(bool v) { m_stopTaskManOnExit = v; } @@ -77,8 +77,8 @@ class Main { try { m_status = RUNNING; - MainTask * mainTask = new MainTask(); - TaskMan::getTaskMan()->mainLoop(); + MainTask * mainTask = createTask(new MainTask()); + TaskMan::getDefaultTaskMan()->mainLoop(); m_status = STOPPING; } catch (Exit e) { diff --git a/includes/Socket.h b/includes/Socket.h index 164090c..a19b1e2 100644 --- a/includes/Socket.h +++ b/includes/Socket.h @@ -8,6 +8,7 @@ #include #endif #include +#include #include #include @@ -78,7 +79,7 @@ class Listener : public ListenerBase { public: Listener(int port, const char * local = "", void * opaque = NULL) : ListenerBase(port, local, opaque) { } protected: - virtual void factory(IO & io, void * opaque) { new Worker(io, opaque); } + virtual void factory(IO & io, void * opaque) { createTask(new Worker(io, opaque)); } virtual void setName() { m_name = String(ClassName(this).c_str()) + " - " + m_listener->getName(); } }; diff --git a/includes/Task.h b/includes/Task.h index 156add2..22c522e 100644 --- a/includes/Task.h +++ b/includes/Task.h @@ -128,10 +128,13 @@ class Task { m_okayToEAgain = enable; return oldValue; } + TaskMan * getMyTaskMan() { return m_taskMan; } private: size_t stackSize() { return 128 * 1024; } + void setup(TaskMan * taskMan); void switchTo(); - static void CALLBACK coroutine(void *); + static void CALLBACK coroutineTrampoline(void *); + void coroutine(); void * m_stack; #ifndef _WIN32 coro_context m_ctx; diff --git a/includes/TaskMan.h b/includes/TaskMan.h index b4645fe..afc2b8a 100644 --- a/includes/TaskMan.h +++ b/includes/TaskMan.h @@ -8,40 +8,54 @@ #include #include #include +#include namespace gnu = __gnu_cxx; namespace Balau { class Task; +class TaskScheduler; + +namespace Events { + +class Async; + +}; class TaskMan { public: TaskMan(); ~TaskMan(); void mainLoop(); - void stop() { m_stopped = true; } - static TaskMan * getTaskMan(); + static TaskMan * getDefaultTaskMan(); struct ev_loop * getLoop() { return m_loop; } void signalTask(Task * t); - + static void stop(); + void stopMe() { m_stopped = true; } private: - void registerTask(Task * t); + static void registerTask(Task * t); + void addToPending(Task * t); #ifndef _WIN32 coro_context m_returnContext; #else void * m_fiber; #endif friend class Task; + friend class TaskScheduler; + template + friend T * createTask(T * t); struct taskHasher { size_t operator()(const Task * t) const { return reinterpret_cast(t); } }; typedef gnu::hash_set taskHash_t; - typedef std::vector taskList_t; taskHash_t m_tasks, m_signaledTasks; - taskList_t m_pendingAdd; - Lock m_pendingLock; - volatile bool m_stopped; + Queue m_pendingAdd; + bool m_stopped; struct ev_loop * m_loop; bool m_allowedToSignal; + ev::async m_evt; }; +template +T * createTask(T * t) { TaskMan::registerTask(t); Assert(dynamic_cast(t)); return t; } + }; diff --git a/includes/Threads.h b/includes/Threads.h index 2347a84..a1f270e 100644 --- a/includes/Threads.h +++ b/includes/Threads.h @@ -57,6 +57,13 @@ class Queue { m_lock.leave(); return t; } + int size() { + int r; + m_lock.enter(); + r = m_queue.size(); + m_lock.leave(); + return r; + } private: std::queue m_queue; Lock m_lock; -- cgit v1.2.3