summaryrefslogtreecommitdiff
path: root/includes
diff options
context:
space:
mode:
Diffstat (limited to 'includes')
-rw-r--r--includes/Main.h6
-rw-r--r--includes/Socket.h3
-rw-r--r--includes/Task.h5
-rw-r--r--includes/TaskMan.h30
-rw-r--r--includes/Threads.h7
5 files changed, 38 insertions, 13 deletions
diff --git a/includes/Main.h b/includes/Main.h
index 3f81137..5fa64c1 100644
--- a/includes/Main.h
+++ b/includes/Main.h
@@ -45,7 +45,7 @@ namespace Balau {
class MainTask : public Task {
public:
MainTask() : m_stopTaskManOnExit(true) { }
- virtual ~MainTask() { if (m_stopTaskManOnExit) TaskMan::getTaskMan()->stop(); }
+ virtual ~MainTask() { if (m_stopTaskManOnExit) TaskMan::stop(); }
virtual const char * getName() { return "Main Task"; }
virtual void Do();
void stopTaskManOnExit(bool v) { m_stopTaskManOnExit = v; }
@@ -77,8 +77,8 @@ class Main {
try {
m_status = RUNNING;
- MainTask * mainTask = new MainTask();
- TaskMan::getTaskMan()->mainLoop();
+ MainTask * mainTask = createTask(new MainTask());
+ TaskMan::getDefaultTaskMan()->mainLoop();
m_status = STOPPING;
}
catch (Exit e) {
diff --git a/includes/Socket.h b/includes/Socket.h
index 164090c..a19b1e2 100644
--- a/includes/Socket.h
+++ b/includes/Socket.h
@@ -8,6 +8,7 @@
#include <netdb.h>
#endif
#include <Handle.h>
+#include <TaskMan.h>
#include <Task.h>
#include <Printer.h>
@@ -78,7 +79,7 @@ class Listener : public ListenerBase {
public:
Listener(int port, const char * local = "", void * opaque = NULL) : ListenerBase(port, local, opaque) { }
protected:
- virtual void factory(IO<Socket> & io, void * opaque) { new Worker(io, opaque); }
+ virtual void factory(IO<Socket> & io, void * opaque) { createTask(new Worker(io, opaque)); }
virtual void setName() { m_name = String(ClassName(this).c_str()) + " - " + m_listener->getName(); }
};
diff --git a/includes/Task.h b/includes/Task.h
index 156add2..22c522e 100644
--- a/includes/Task.h
+++ b/includes/Task.h
@@ -128,10 +128,13 @@ class Task {
m_okayToEAgain = enable;
return oldValue;
}
+ TaskMan * getMyTaskMan() { return m_taskMan; }
private:
size_t stackSize() { return 128 * 1024; }
+ void setup(TaskMan * taskMan);
void switchTo();
- static void CALLBACK coroutine(void *);
+ static void CALLBACK coroutineTrampoline(void *);
+ void coroutine();
void * m_stack;
#ifndef _WIN32
coro_context m_ctx;
diff --git a/includes/TaskMan.h b/includes/TaskMan.h
index b4645fe..afc2b8a 100644
--- a/includes/TaskMan.h
+++ b/includes/TaskMan.h
@@ -8,40 +8,54 @@
#include <ext/hash_set>
#include <vector>
#include <Threads.h>
+#include <Exceptions.h>
namespace gnu = __gnu_cxx;
namespace Balau {
class Task;
+class TaskScheduler;
+
+namespace Events {
+
+class Async;
+
+};
class TaskMan {
public:
TaskMan();
~TaskMan();
void mainLoop();
- void stop() { m_stopped = true; }
- static TaskMan * getTaskMan();
+ static TaskMan * getDefaultTaskMan();
struct ev_loop * getLoop() { return m_loop; }
void signalTask(Task * t);
-
+ static void stop();
+ void stopMe() { m_stopped = true; }
private:
- void registerTask(Task * t);
+ static void registerTask(Task * t);
+ void addToPending(Task * t);
#ifndef _WIN32
coro_context m_returnContext;
#else
void * m_fiber;
#endif
friend class Task;
+ friend class TaskScheduler;
+ template<class T>
+ friend T * createTask(T * t);
struct taskHasher { size_t operator()(const Task * t) const { return reinterpret_cast<uintptr_t>(t); } };
typedef gnu::hash_set<Task *, taskHasher> taskHash_t;
- typedef std::vector<Task *> taskList_t;
taskHash_t m_tasks, m_signaledTasks;
- taskList_t m_pendingAdd;
- Lock m_pendingLock;
- volatile bool m_stopped;
+ Queue<Task *> m_pendingAdd;
+ bool m_stopped;
struct ev_loop * m_loop;
bool m_allowedToSignal;
+ ev::async m_evt;
};
+template<class T>
+T * createTask(T * t) { TaskMan::registerTask(t); Assert(dynamic_cast<Task *>(t)); return t; }
+
};
diff --git a/includes/Threads.h b/includes/Threads.h
index 2347a84..a1f270e 100644
--- a/includes/Threads.h
+++ b/includes/Threads.h
@@ -57,6 +57,13 @@ class Queue {
m_lock.leave();
return t;
}
+ int size() {
+ int r;
+ m_lock.enter();
+ r = m_queue.size();
+ m_lock.leave();
+ return r;
+ }
private:
std::queue<T> m_queue;
Lock m_lock;