summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPixel <pixel@nobis-crew.org>2011-11-16 17:26:28 -0800
committerPixel <pixel@nobis-crew.org>2011-11-16 17:26:28 -0800
commit1d796e6cf639354614f1152baab65d317271c357 (patch)
tree5d1e073f9d3b3c045e1c65d8ac2b658ca95a03f9
parent2718578affdfb202450e66e6b159ff634d6b28ab (diff)
Kind of a big revamp of the TaskMan / Task model, in order to introduce a TaskScheduler.
The idea is that we need to support multiple task managers from multiple threads. So that revamp means we now should be able to support that, except the TaskScheduler needs to implement a round robin system, to distribute tasks across multiple task managers. But at least, the fundamental redesign to permit this is here.
-rw-r--r--includes/Main.h6
-rw-r--r--includes/Socket.h3
-rw-r--r--includes/Task.h5
-rw-r--r--includes/TaskMan.h30
-rw-r--r--includes/Threads.h7
-rw-r--r--src/Handle.cc2
-rw-r--r--src/HttpServer.cc2
-rw-r--r--src/Task.cc45
-rw-r--r--src/TaskMan.cc124
-rw-r--r--tests/test-Sockets.cc4
-rw-r--r--tests/test-Tasks.cc2
11 files changed, 177 insertions, 53 deletions
diff --git a/includes/Main.h b/includes/Main.h
index 3f81137..5fa64c1 100644
--- a/includes/Main.h
+++ b/includes/Main.h
@@ -45,7 +45,7 @@ namespace Balau {
class MainTask : public Task {
public:
MainTask() : m_stopTaskManOnExit(true) { }
- virtual ~MainTask() { if (m_stopTaskManOnExit) TaskMan::getTaskMan()->stop(); }
+ virtual ~MainTask() { if (m_stopTaskManOnExit) TaskMan::stop(); }
virtual const char * getName() { return "Main Task"; }
virtual void Do();
void stopTaskManOnExit(bool v) { m_stopTaskManOnExit = v; }
@@ -77,8 +77,8 @@ class Main {
try {
m_status = RUNNING;
- MainTask * mainTask = new MainTask();
- TaskMan::getTaskMan()->mainLoop();
+ MainTask * mainTask = createTask(new MainTask());
+ TaskMan::getDefaultTaskMan()->mainLoop();
m_status = STOPPING;
}
catch (Exit e) {
diff --git a/includes/Socket.h b/includes/Socket.h
index 164090c..a19b1e2 100644
--- a/includes/Socket.h
+++ b/includes/Socket.h
@@ -8,6 +8,7 @@
#include <netdb.h>
#endif
#include <Handle.h>
+#include <TaskMan.h>
#include <Task.h>
#include <Printer.h>
@@ -78,7 +79,7 @@ class Listener : public ListenerBase {
public:
Listener(int port, const char * local = "", void * opaque = NULL) : ListenerBase(port, local, opaque) { }
protected:
- virtual void factory(IO<Socket> & io, void * opaque) { new Worker(io, opaque); }
+ virtual void factory(IO<Socket> & io, void * opaque) { createTask(new Worker(io, opaque)); }
virtual void setName() { m_name = String(ClassName(this).c_str()) + " - " + m_listener->getName(); }
};
diff --git a/includes/Task.h b/includes/Task.h
index 156add2..22c522e 100644
--- a/includes/Task.h
+++ b/includes/Task.h
@@ -128,10 +128,13 @@ class Task {
m_okayToEAgain = enable;
return oldValue;
}
+ TaskMan * getMyTaskMan() { return m_taskMan; }
private:
size_t stackSize() { return 128 * 1024; }
+ void setup(TaskMan * taskMan);
void switchTo();
- static void CALLBACK coroutine(void *);
+ static void CALLBACK coroutineTrampoline(void *);
+ void coroutine();
void * m_stack;
#ifndef _WIN32
coro_context m_ctx;
diff --git a/includes/TaskMan.h b/includes/TaskMan.h
index b4645fe..afc2b8a 100644
--- a/includes/TaskMan.h
+++ b/includes/TaskMan.h
@@ -8,40 +8,54 @@
#include <ext/hash_set>
#include <vector>
#include <Threads.h>
+#include <Exceptions.h>
namespace gnu = __gnu_cxx;
namespace Balau {
class Task;
+class TaskScheduler;
+
+namespace Events {
+
+class Async;
+
+};
class TaskMan {
public:
TaskMan();
~TaskMan();
void mainLoop();
- void stop() { m_stopped = true; }
- static TaskMan * getTaskMan();
+ static TaskMan * getDefaultTaskMan();
struct ev_loop * getLoop() { return m_loop; }
void signalTask(Task * t);
-
+ static void stop();
+ void stopMe() { m_stopped = true; }
private:
- void registerTask(Task * t);
+ static void registerTask(Task * t);
+ void addToPending(Task * t);
#ifndef _WIN32
coro_context m_returnContext;
#else
void * m_fiber;
#endif
friend class Task;
+ friend class TaskScheduler;
+ template<class T>
+ friend T * createTask(T * t);
struct taskHasher { size_t operator()(const Task * t) const { return reinterpret_cast<uintptr_t>(t); } };
typedef gnu::hash_set<Task *, taskHasher> taskHash_t;
- typedef std::vector<Task *> taskList_t;
taskHash_t m_tasks, m_signaledTasks;
- taskList_t m_pendingAdd;
- Lock m_pendingLock;
- volatile bool m_stopped;
+ Queue<Task *> m_pendingAdd;
+ bool m_stopped;
struct ev_loop * m_loop;
bool m_allowedToSignal;
+ ev::async m_evt;
};
+template<class T>
+T * createTask(T * t) { TaskMan::registerTask(t); Assert(dynamic_cast<Task *>(t)); return t; }
+
};
diff --git a/includes/Threads.h b/includes/Threads.h
index 2347a84..a1f270e 100644
--- a/includes/Threads.h
+++ b/includes/Threads.h
@@ -57,6 +57,13 @@ class Queue {
m_lock.leave();
return t;
}
+ int size() {
+ int r;
+ m_lock.enter();
+ r = m_queue.size();
+ m_lock.leave();
+ return r;
+ }
private:
std::queue<T> m_queue;
Lock m_lock;
diff --git a/src/Handle.cc b/src/Handle.cc
index 839c58a..b8dd45e 100644
--- a/src/Handle.cc
+++ b/src/Handle.cc
@@ -43,7 +43,7 @@ void eioInterface::readyCB(ev::async & w, int revents) {
void eioInterface::doStart() {
Balau::Printer::elog(Balau::E_HANDLE, "Starting the eio interface");
- Balau::TaskMan * taskMan = Balau::TaskMan::getTaskMan();
+ Balau::TaskMan * taskMan = Balau::TaskMan::getDefaultTaskMan();
Assert(taskMan);
struct ev_loop * loop = taskMan->getLoop();
diff --git a/src/HttpServer.cc b/src/HttpServer.cc
index 608b8a8..b027824 100644
--- a/src/HttpServer.cc
+++ b/src/HttpServer.cc
@@ -354,7 +354,7 @@ typedef Balau::Listener<Balau::HttpWorker> HttpListener;
void Balau::HttpServer::start() {
Assert(!m_started);
- m_listenerPtr = new HttpListener(m_port, m_local.to_charp());
+ m_listenerPtr = createTask(new HttpListener(m_port, m_local.to_charp()));
m_started = true;
}
diff --git a/src/Task.cc b/src/Task.cc
index 3772dd5..3a6f8ad 100644
--- a/src/Task.cc
+++ b/src/Task.cc
@@ -7,28 +7,29 @@
static Balau::LocalTmpl<Balau::Task> localTask;
Balau::Task::Task() {
+ m_status = STARTING;
+ m_okayToEAgain = false;
+
+ Printer::elog(E_TASK, "Created a Task at %p");
+}
+
+void Balau::Task::setup(TaskMan * taskMan) {
size_t size = stackSize();
#ifndef _WIN32
m_stack = malloc(size);
- coro_create(&m_ctx, coroutine, this, m_stack, size);
+ coro_create(&m_ctx, coroutineTrampoline, this, m_stack, size);
#else
m_stack = NULL;
- m_fiber = CreateFiber(size, coroutine, this);
+ m_fiber = CreateFiber(size, coroutineTrampoline, this);
#endif
- m_taskMan = TaskMan::getTaskMan();
- m_taskMan->registerTask(this);
+ m_taskMan = taskMan;
m_tls = g_tlsManager->createTLS();
void * oldTLS = g_tlsManager->getTLS();
g_tlsManager->setTLS(m_tls);
localTask.set(this);
g_tlsManager->setTLS(oldTLS);
-
- m_status = STARTING;
- m_okayToEAgain = false;
-
- Printer::elog(E_TASK, "Created a Task at %p");
}
Balau::Task::~Task() {
@@ -37,27 +38,31 @@ Balau::Task::~Task() {
free(m_tls);
}
-void Balau::Task::coroutine(void * arg) {
+void Balau::Task::coroutineTrampoline(void * arg) {
Task * task = reinterpret_cast<Task *>(arg);
Assert(task);
- Assert(task->m_status == STARTING);
+ task->coroutine();
+}
+
+void Balau::Task::coroutine() {
+ Assert(m_status == STARTING);
try {
- task->m_status = RUNNING;
- task->Do();
- task->m_status = STOPPED;
+ m_status = RUNNING;
+ Do();
+ m_status = STOPPED;
}
catch (GeneralException & e) {
- Printer::log(M_WARNING, "Task %s at %p caused an exception: `%s' - stopping.", task->getName(), task, e.getMsg());
- task->m_status = FAULTED;
+ Printer::log(M_WARNING, "Task %s at %p caused an exception: `%s' - stopping.", getName(), this, e.getMsg());
+ m_status = FAULTED;
}
catch (...) {
- Printer::log(M_WARNING, "Task %s at %p caused an unknown exception - stopping.", task->getName(), task);
- task->m_status = FAULTED;
+ Printer::log(M_WARNING, "Task %s at %p caused an unknown exception - stopping.", getName(), this);
+ m_status = FAULTED;
}
#ifndef _WIN32
- coro_transfer(&task->m_ctx, &task->m_taskMan->m_returnContext);
+ coro_transfer(&m_ctx, &m_taskMan->m_returnContext);
#else
- SwitchToFiber(task->m_taskMan->m_fiber);
+ SwitchToFiber(m_taskMan->m_fiber);
#endif
}
diff --git a/src/TaskMan.cc b/src/TaskMan.cc
index 0fc4668..8c99e52 100644
--- a/src/TaskMan.cc
+++ b/src/TaskMan.cc
@@ -3,9 +3,94 @@
#include "Main.h"
#include "Local.h"
+class Stopper : public Balau::Task {
+ virtual void Do();
+ virtual const char * getName();
+};
+
+void Stopper::Do() {
+ getMyTaskMan()->stopMe();
+}
+
+const char * Stopper::getName() {
+ return "Stopper";
+}
+
static Balau::DefaultTmpl<Balau::TaskMan> defaultTaskMan(50);
static Balau::LocalTmpl<Balau::TaskMan> localTaskMan;
+namespace Balau {
+
+class TaskScheduler : public Thread, public AtStart, public AtExit {
+ public:
+ TaskScheduler() : AtStart(100), m_stopping(false) { }
+ void registerTask(Task * t);
+ virtual void * proc();
+ virtual void doStart();
+ virtual void doExit();
+ void registerTaskMan(TaskMan * t);
+ void unregisterTaskMan(TaskMan * t);
+ void stopAll();
+ private:
+ Queue<Task *> m_queue;
+ volatile bool m_stopping;
+};
+
+};
+
+static Balau::TaskScheduler s_scheduler;
+
+void Balau::TaskScheduler::registerTask(Task * t) {
+ Printer::elog(E_TASK, "TaskScheduler::registerTask with t = %p", t);
+ m_queue.push(t);
+}
+
+void Balau::TaskScheduler::registerTaskMan(TaskMan * t) {
+ // meh. We need a round-robin queue system.
+}
+
+void Balau::TaskScheduler::unregisterTaskMan(TaskMan * t) {
+ // and here, we need to remove that taskman from the round robin queue.
+}
+
+void Balau::TaskScheduler::stopAll() {
+ m_stopping = true;
+ // and finally, we need to crawl the whole list and stop all of them.
+ TaskMan * tm = localTaskMan.getGlobal();
+ tm->addToPending(new Stopper());
+}
+
+void * Balau::TaskScheduler::proc() {
+ while (true) {
+ Printer::elog(E_TASK, "TaskScheduler waiting for a task to pop");
+ Task * t = m_queue.pop();
+ if (!t)
+ break;
+ if (dynamic_cast<Stopper *>(t) || m_stopping)
+ break;
+ // pick up a task manager here... for now let's take the global one.
+ // but we need some sort of round robin across all of the threads, as described above.
+ TaskMan * tm = localTaskMan.getGlobal();
+ Printer::elog(E_TASK, "TaskScheduler popped task %s at %p; adding to TaskMan %p", t->getName(), t, tm);
+ tm->addToPending(t);
+ tm->m_evt.send();
+ }
+ Printer::elog(E_TASK, "TaskScheduler stopping.");
+ return NULL;
+}
+
+void Balau::TaskScheduler::doStart() {
+ threadStart();
+}
+
+void Balau::TaskScheduler::doExit() {
+ Task * s = NULL;
+ m_queue.push(s);
+ join();
+}
+
+void asyncDummy(ev::async & w, int revents) { }
+
Balau::TaskMan::TaskMan() : m_stopped(false), m_allowedToSignal(false) {
#ifndef _WIN32
coro_create(&m_returnContext, 0, 0, 0, 0);
@@ -13,12 +98,17 @@ Balau::TaskMan::TaskMan() : m_stopped(false), m_allowedToSignal(false) {
m_fiber = ConvertThreadToFiber(NULL);
Assert(m_fiber);
#endif
- if (!localTaskMan.getGlobal()) {
+ TaskMan * global = localTaskMan.getGlobal();
+ if (!global) {
localTaskMan.setGlobal(this);
m_loop = ev_default_loop(EVFLAG_AUTO);
} else {
m_loop = ev_loop_new(EVFLAG_AUTO);
}
+ m_evt.set(m_loop);
+ m_evt.set<asyncDummy>();
+ m_evt.start();
+ s_scheduler.registerTaskMan(this);
}
#ifdef _WIN32
@@ -35,17 +125,18 @@ class WinSocketStartup : public Balau::AtStart {
static WinSocketStartup wsa;
#endif
-Balau::TaskMan * Balau::TaskMan::getTaskMan() { return localTaskMan.get(); }
+Balau::TaskMan * Balau::TaskMan::getDefaultTaskMan() { return localTaskMan.get(); }
Balau::TaskMan::~TaskMan() {
Assert(localTaskMan.getGlobal() != this);
+ s_scheduler.unregisterTaskMan(this);
+ // probably way more work to do here in order to clean up tasks from that thread
ev_loop_destroy(m_loop);
}
void Balau::TaskMan::mainLoop() {
// We need at least one round before bailing :)
do {
- taskList_t::iterator iL;
taskHash_t::iterator iH;
Task * t;
bool noWait = false;
@@ -65,15 +156,13 @@ void Balau::TaskMan::mainLoop() {
if (m_tasks.size() == 0)
noWait = true;
- m_pendingLock.enter();
if (m_pendingAdd.size() != 0)
noWait = true;
- m_pendingLock.leave();
// libev's event "loop". We always runs it once though.
m_allowedToSignal = true;
Printer::elog(E_TASK, "Going to libev main loop");
- ev_run(m_loop, noWait ? EVRUN_NOWAIT : EVRUN_ONCE);
+ ev_run(m_loop, noWait || m_stopped ? EVRUN_NOWAIT : EVRUN_ONCE);
Printer::elog(E_TASK, "Getting out of libev main loop");
// let's check what task got stopped, and signal them
@@ -99,15 +188,13 @@ void Balau::TaskMan::mainLoop() {
}
m_signaledTasks.clear();
- m_pendingLock.enter();
// Adding tasks that were added, maybe from other threads
- for (iL = m_pendingAdd.begin(); iL != m_pendingAdd.end(); iL++) {
- t = *iL;
+ while ((m_pendingAdd.size() != 0) || (m_tasks.size() == 0) && !m_stopped) {
+ t = m_pendingAdd.pop();
Assert(m_tasks.find(t) == m_tasks.end());
+ t->setup(this);
m_tasks.insert(t);
}
- m_pendingAdd.clear();
- m_pendingLock.leave();
// Finally, let's destroy tasks that no longer are necessary.
bool didDelete;
@@ -125,13 +212,16 @@ void Balau::TaskMan::mainLoop() {
}
} while (didDelete);
- } while (!m_stopped && m_tasks.size() != 0);
+ } while (!m_stopped);
+ Printer::elog(E_TASK, "TaskManager stopping.");
}
void Balau::TaskMan::registerTask(Balau::Task * t) {
- m_pendingLock.enter();
- m_pendingAdd.push_back(t);
- m_pendingLock.leave();
+ s_scheduler.registerTask(t);
+}
+
+void Balau::TaskMan::addToPending(Balau::Task * t) {
+ m_pendingAdd.push(t);
}
void Balau::TaskMan::signalTask(Task * t) {
@@ -139,3 +229,7 @@ void Balau::TaskMan::signalTask(Task * t) {
Assert(m_allowedToSignal);
m_signaledTasks.insert(t);
}
+
+void Balau::TaskMan::stop() {
+ s_scheduler.stopAll();
+}
diff --git a/tests/test-Sockets.cc b/tests/test-Sockets.cc
index 0afedb1..87d557b 100644
--- a/tests/test-Sockets.cc
+++ b/tests/test-Sockets.cc
@@ -64,8 +64,8 @@ void MainTask::Do() {
Printer::enable(M_ALL);
Printer::log(M_STATUS, "Test::Sockets running.");
- Events::TaskEvent evtSvr(listener = new Listener<Worker>(1234));
- Events::TaskEvent evtCln(new Client);
+ Events::TaskEvent evtSvr(listener = Balau::createTask(new Listener<Worker>(1234)));
+ Events::TaskEvent evtCln(Balau::createTask(new Client));
Printer::log(M_STATUS, "Created %s", listener->getName());
waitFor(&evtSvr);
waitFor(&evtCln);
diff --git a/tests/test-Tasks.cc b/tests/test-Tasks.cc
index 2760e52..8677fb7 100644
--- a/tests/test-Tasks.cc
+++ b/tests/test-Tasks.cc
@@ -32,7 +32,7 @@ void MainTask::Do() {
customPrinter = new CustomPrinter();
Printer::log(M_STATUS, "Test::Tasks running.");
- Task * testTask = new TestTask();
+ Task * testTask = Balau::createTask(new TestTask());
Events::TaskEvent taskEvt(testTask);
waitFor(&taskEvt);
Assert(!taskEvt.gotSignal());