diff options
Diffstat (limited to 'src/TaskMan.cc')
-rw-r--r-- | src/TaskMan.cc | 124 |
1 files changed, 109 insertions, 15 deletions
diff --git a/src/TaskMan.cc b/src/TaskMan.cc index 0fc4668..8c99e52 100644 --- a/src/TaskMan.cc +++ b/src/TaskMan.cc @@ -3,9 +3,94 @@ #include "Main.h" #include "Local.h" +class Stopper : public Balau::Task { + virtual void Do(); + virtual const char * getName(); +}; + +void Stopper::Do() { + getMyTaskMan()->stopMe(); +} + +const char * Stopper::getName() { + return "Stopper"; +} + static Balau::DefaultTmpl<Balau::TaskMan> defaultTaskMan(50); static Balau::LocalTmpl<Balau::TaskMan> localTaskMan; +namespace Balau { + +class TaskScheduler : public Thread, public AtStart, public AtExit { + public: + TaskScheduler() : AtStart(100), m_stopping(false) { } + void registerTask(Task * t); + virtual void * proc(); + virtual void doStart(); + virtual void doExit(); + void registerTaskMan(TaskMan * t); + void unregisterTaskMan(TaskMan * t); + void stopAll(); + private: + Queue<Task *> m_queue; + volatile bool m_stopping; +}; + +}; + +static Balau::TaskScheduler s_scheduler; + +void Balau::TaskScheduler::registerTask(Task * t) { + Printer::elog(E_TASK, "TaskScheduler::registerTask with t = %p", t); + m_queue.push(t); +} + +void Balau::TaskScheduler::registerTaskMan(TaskMan * t) { + // meh. We need a round-robin queue system. +} + +void Balau::TaskScheduler::unregisterTaskMan(TaskMan * t) { + // and here, we need to remove that taskman from the round robin queue. +} + +void Balau::TaskScheduler::stopAll() { + m_stopping = true; + // and finally, we need to crawl the whole list and stop all of them. + TaskMan * tm = localTaskMan.getGlobal(); + tm->addToPending(new Stopper()); +} + +void * Balau::TaskScheduler::proc() { + while (true) { + Printer::elog(E_TASK, "TaskScheduler waiting for a task to pop"); + Task * t = m_queue.pop(); + if (!t) + break; + if (dynamic_cast<Stopper *>(t) || m_stopping) + break; + // pick up a task manager here... for now let's take the global one. + // but we need some sort of round robin across all of the threads, as described above. + TaskMan * tm = localTaskMan.getGlobal(); + Printer::elog(E_TASK, "TaskScheduler popped task %s at %p; adding to TaskMan %p", t->getName(), t, tm); + tm->addToPending(t); + tm->m_evt.send(); + } + Printer::elog(E_TASK, "TaskScheduler stopping."); + return NULL; +} + +void Balau::TaskScheduler::doStart() { + threadStart(); +} + +void Balau::TaskScheduler::doExit() { + Task * s = NULL; + m_queue.push(s); + join(); +} + +void asyncDummy(ev::async & w, int revents) { } + Balau::TaskMan::TaskMan() : m_stopped(false), m_allowedToSignal(false) { #ifndef _WIN32 coro_create(&m_returnContext, 0, 0, 0, 0); @@ -13,12 +98,17 @@ Balau::TaskMan::TaskMan() : m_stopped(false), m_allowedToSignal(false) { m_fiber = ConvertThreadToFiber(NULL); Assert(m_fiber); #endif - if (!localTaskMan.getGlobal()) { + TaskMan * global = localTaskMan.getGlobal(); + if (!global) { localTaskMan.setGlobal(this); m_loop = ev_default_loop(EVFLAG_AUTO); } else { m_loop = ev_loop_new(EVFLAG_AUTO); } + m_evt.set(m_loop); + m_evt.set<asyncDummy>(); + m_evt.start(); + s_scheduler.registerTaskMan(this); } #ifdef _WIN32 @@ -35,17 +125,18 @@ class WinSocketStartup : public Balau::AtStart { static WinSocketStartup wsa; #endif -Balau::TaskMan * Balau::TaskMan::getTaskMan() { return localTaskMan.get(); } +Balau::TaskMan * Balau::TaskMan::getDefaultTaskMan() { return localTaskMan.get(); } Balau::TaskMan::~TaskMan() { Assert(localTaskMan.getGlobal() != this); + s_scheduler.unregisterTaskMan(this); + // probably way more work to do here in order to clean up tasks from that thread ev_loop_destroy(m_loop); } void Balau::TaskMan::mainLoop() { // We need at least one round before bailing :) do { - taskList_t::iterator iL; taskHash_t::iterator iH; Task * t; bool noWait = false; @@ -65,15 +156,13 @@ void Balau::TaskMan::mainLoop() { if (m_tasks.size() == 0) noWait = true; - m_pendingLock.enter(); if (m_pendingAdd.size() != 0) noWait = true; - m_pendingLock.leave(); // libev's event "loop". We always runs it once though. m_allowedToSignal = true; Printer::elog(E_TASK, "Going to libev main loop"); - ev_run(m_loop, noWait ? EVRUN_NOWAIT : EVRUN_ONCE); + ev_run(m_loop, noWait || m_stopped ? EVRUN_NOWAIT : EVRUN_ONCE); Printer::elog(E_TASK, "Getting out of libev main loop"); // let's check what task got stopped, and signal them @@ -99,15 +188,13 @@ void Balau::TaskMan::mainLoop() { } m_signaledTasks.clear(); - m_pendingLock.enter(); // Adding tasks that were added, maybe from other threads - for (iL = m_pendingAdd.begin(); iL != m_pendingAdd.end(); iL++) { - t = *iL; + while ((m_pendingAdd.size() != 0) || (m_tasks.size() == 0) && !m_stopped) { + t = m_pendingAdd.pop(); Assert(m_tasks.find(t) == m_tasks.end()); + t->setup(this); m_tasks.insert(t); } - m_pendingAdd.clear(); - m_pendingLock.leave(); // Finally, let's destroy tasks that no longer are necessary. bool didDelete; @@ -125,13 +212,16 @@ void Balau::TaskMan::mainLoop() { } } while (didDelete); - } while (!m_stopped && m_tasks.size() != 0); + } while (!m_stopped); + Printer::elog(E_TASK, "TaskManager stopping."); } void Balau::TaskMan::registerTask(Balau::Task * t) { - m_pendingLock.enter(); - m_pendingAdd.push_back(t); - m_pendingLock.leave(); + s_scheduler.registerTask(t); +} + +void Balau::TaskMan::addToPending(Balau::Task * t) { + m_pendingAdd.push(t); } void Balau::TaskMan::signalTask(Task * t) { @@ -139,3 +229,7 @@ void Balau::TaskMan::signalTask(Task * t) { Assert(m_allowedToSignal); m_signaledTasks.insert(t); } + +void Balau::TaskMan::stop() { + s_scheduler.stopAll(); +} |