From 1d796e6cf639354614f1152baab65d317271c357 Mon Sep 17 00:00:00 2001 From: Pixel Date: Wed, 16 Nov 2011 17:26:28 -0800 Subject: Kind of a big revamp of the TaskMan / Task model, in order to introduce a TaskScheduler. The idea is that we need to support multiple task managers from multiple threads. So that revamp means we now should be able to support that, except the TaskScheduler needs to implement a round robin system, to distribute tasks across multiple task managers. But at least, the fundamental redesign to permit this is here. --- src/TaskMan.cc | 124 ++++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 109 insertions(+), 15 deletions(-) (limited to 'src/TaskMan.cc') diff --git a/src/TaskMan.cc b/src/TaskMan.cc index 0fc4668..8c99e52 100644 --- a/src/TaskMan.cc +++ b/src/TaskMan.cc @@ -3,9 +3,94 @@ #include "Main.h" #include "Local.h" +class Stopper : public Balau::Task { + virtual void Do(); + virtual const char * getName(); +}; + +void Stopper::Do() { + getMyTaskMan()->stopMe(); +} + +const char * Stopper::getName() { + return "Stopper"; +} + static Balau::DefaultTmpl defaultTaskMan(50); static Balau::LocalTmpl localTaskMan; +namespace Balau { + +class TaskScheduler : public Thread, public AtStart, public AtExit { + public: + TaskScheduler() : AtStart(100), m_stopping(false) { } + void registerTask(Task * t); + virtual void * proc(); + virtual void doStart(); + virtual void doExit(); + void registerTaskMan(TaskMan * t); + void unregisterTaskMan(TaskMan * t); + void stopAll(); + private: + Queue m_queue; + volatile bool m_stopping; +}; + +}; + +static Balau::TaskScheduler s_scheduler; + +void Balau::TaskScheduler::registerTask(Task * t) { + Printer::elog(E_TASK, "TaskScheduler::registerTask with t = %p", t); + m_queue.push(t); +} + +void Balau::TaskScheduler::registerTaskMan(TaskMan * t) { + // meh. We need a round-robin queue system. +} + +void Balau::TaskScheduler::unregisterTaskMan(TaskMan * t) { + // and here, we need to remove that taskman from the round robin queue. +} + +void Balau::TaskScheduler::stopAll() { + m_stopping = true; + // and finally, we need to crawl the whole list and stop all of them. + TaskMan * tm = localTaskMan.getGlobal(); + tm->addToPending(new Stopper()); +} + +void * Balau::TaskScheduler::proc() { + while (true) { + Printer::elog(E_TASK, "TaskScheduler waiting for a task to pop"); + Task * t = m_queue.pop(); + if (!t) + break; + if (dynamic_cast(t) || m_stopping) + break; + // pick up a task manager here... for now let's take the global one. + // but we need some sort of round robin across all of the threads, as described above. + TaskMan * tm = localTaskMan.getGlobal(); + Printer::elog(E_TASK, "TaskScheduler popped task %s at %p; adding to TaskMan %p", t->getName(), t, tm); + tm->addToPending(t); + tm->m_evt.send(); + } + Printer::elog(E_TASK, "TaskScheduler stopping."); + return NULL; +} + +void Balau::TaskScheduler::doStart() { + threadStart(); +} + +void Balau::TaskScheduler::doExit() { + Task * s = NULL; + m_queue.push(s); + join(); +} + +void asyncDummy(ev::async & w, int revents) { } + Balau::TaskMan::TaskMan() : m_stopped(false), m_allowedToSignal(false) { #ifndef _WIN32 coro_create(&m_returnContext, 0, 0, 0, 0); @@ -13,12 +98,17 @@ Balau::TaskMan::TaskMan() : m_stopped(false), m_allowedToSignal(false) { m_fiber = ConvertThreadToFiber(NULL); Assert(m_fiber); #endif - if (!localTaskMan.getGlobal()) { + TaskMan * global = localTaskMan.getGlobal(); + if (!global) { localTaskMan.setGlobal(this); m_loop = ev_default_loop(EVFLAG_AUTO); } else { m_loop = ev_loop_new(EVFLAG_AUTO); } + m_evt.set(m_loop); + m_evt.set(); + m_evt.start(); + s_scheduler.registerTaskMan(this); } #ifdef _WIN32 @@ -35,17 +125,18 @@ class WinSocketStartup : public Balau::AtStart { static WinSocketStartup wsa; #endif -Balau::TaskMan * Balau::TaskMan::getTaskMan() { return localTaskMan.get(); } +Balau::TaskMan * Balau::TaskMan::getDefaultTaskMan() { return localTaskMan.get(); } Balau::TaskMan::~TaskMan() { Assert(localTaskMan.getGlobal() != this); + s_scheduler.unregisterTaskMan(this); + // probably way more work to do here in order to clean up tasks from that thread ev_loop_destroy(m_loop); } void Balau::TaskMan::mainLoop() { // We need at least one round before bailing :) do { - taskList_t::iterator iL; taskHash_t::iterator iH; Task * t; bool noWait = false; @@ -65,15 +156,13 @@ void Balau::TaskMan::mainLoop() { if (m_tasks.size() == 0) noWait = true; - m_pendingLock.enter(); if (m_pendingAdd.size() != 0) noWait = true; - m_pendingLock.leave(); // libev's event "loop". We always runs it once though. m_allowedToSignal = true; Printer::elog(E_TASK, "Going to libev main loop"); - ev_run(m_loop, noWait ? EVRUN_NOWAIT : EVRUN_ONCE); + ev_run(m_loop, noWait || m_stopped ? EVRUN_NOWAIT : EVRUN_ONCE); Printer::elog(E_TASK, "Getting out of libev main loop"); // let's check what task got stopped, and signal them @@ -99,15 +188,13 @@ void Balau::TaskMan::mainLoop() { } m_signaledTasks.clear(); - m_pendingLock.enter(); // Adding tasks that were added, maybe from other threads - for (iL = m_pendingAdd.begin(); iL != m_pendingAdd.end(); iL++) { - t = *iL; + while ((m_pendingAdd.size() != 0) || (m_tasks.size() == 0) && !m_stopped) { + t = m_pendingAdd.pop(); Assert(m_tasks.find(t) == m_tasks.end()); + t->setup(this); m_tasks.insert(t); } - m_pendingAdd.clear(); - m_pendingLock.leave(); // Finally, let's destroy tasks that no longer are necessary. bool didDelete; @@ -125,13 +212,16 @@ void Balau::TaskMan::mainLoop() { } } while (didDelete); - } while (!m_stopped && m_tasks.size() != 0); + } while (!m_stopped); + Printer::elog(E_TASK, "TaskManager stopping."); } void Balau::TaskMan::registerTask(Balau::Task * t) { - m_pendingLock.enter(); - m_pendingAdd.push_back(t); - m_pendingLock.leave(); + s_scheduler.registerTask(t); +} + +void Balau::TaskMan::addToPending(Balau::Task * t) { + m_pendingAdd.push(t); } void Balau::TaskMan::signalTask(Task * t) { @@ -139,3 +229,7 @@ void Balau::TaskMan::signalTask(Task * t) { Assert(m_allowedToSignal); m_signaledTasks.insert(t); } + +void Balau::TaskMan::stop() { + s_scheduler.stopAll(); +} -- cgit v1.2.3