diff options
Diffstat (limited to 'src/TaskMan.cc')
-rw-r--r-- | src/TaskMan.cc | 102 |
1 files changed, 73 insertions, 29 deletions
diff --git a/src/TaskMan.cc b/src/TaskMan.cc index 49c326a..177cec1 100644 --- a/src/TaskMan.cc +++ b/src/TaskMan.cc @@ -101,7 +101,6 @@ void Balau::TaskScheduler::stopAll(int code) { m_taskManagers.pop(); altQueue.push(tm); tm->addToPending(new Stopper(code)); - tm->m_evt.send(); } while (!altQueue.empty()) { tm = altQueue.front(); @@ -130,7 +129,6 @@ void * Balau::TaskScheduler::proc() { m_lock.leave(); Printer::elog(E_TASK, "TaskScheduler popped task %s at %p; adding to TaskMan %p", t->getName(), t, tm); tm->addToPending(t); - tm->m_evt.send(); } Printer::elog(E_TASK, "TaskScheduler stopping."); return NULL; @@ -145,6 +143,16 @@ void asyncDummy(ev::async & w, int revents) { Balau::Printer::elog(Balau::E_TASK, "TaskMan is getting woken up..."); } +void Balau::TaskMan::stopMe(int code) { + Task * t = Task::getCurrentTask(); + if (t->getTaskMan() == this) { + m_stopped = true; + m_stopCode = code; + } else { + addToPending(new Stopper(code)); + } +} + Balau::TaskMan::TaskMan() { #ifndef _WIN32 coro_create(&m_returnContext, 0, 0, 0, 0); @@ -168,6 +176,8 @@ Balau::TaskMan::TaskMan() { } #ifdef _WIN32 +namespace { + class WinSocketStartup : public Balau::AtStart { public: WinSocketStartup() : AtStart(5) { } @@ -179,6 +189,8 @@ class WinSocketStartup : public Balau::AtStart { }; static WinSocketStartup wsa; + +}; #endif Balau::TaskMan * Balau::TaskMan::getDefaultTaskMan() { return localTaskMan.get(); } @@ -191,14 +203,16 @@ Balau::TaskMan::~TaskMan() { } s_scheduler.unregisterTaskMan(this); // probably way more work to do here in order to clean up tasks from that thread + m_evt.stop(); ev_loop_destroy(m_loop); } void * Balau::TaskMan::getStack() { + if (!Task::needsStacks()) + return NULL; void * r = NULL; if (m_nStacks == 0) { - if (Task::needsStacks()) - r = malloc(Task::stackSize()); + r = malloc(Task::stackSize()); } else { r = m_stacks.front(); m_stacks.pop(); @@ -230,30 +244,23 @@ int Balau::TaskMan::mainLoop() { s_async.setIdleReadyCallback(asyncIdleReady, this); do { - bool noWait = false; - Printer::elog(E_TASK, "TaskMan::mainLoop() at %p with m_tasks.size = %li", this, m_tasks.size()); - // checking "STARTING" tasks, and running them once; also try to build the status of the noWait boolean. + // checking "STARTING" tasks, and running them once while ((iH = starting.begin()) != starting.end()) { Task * t = *iH; IAssert(t->getStatus() == Task::STARTING, "Got task at %p in the starting list, but isn't starting.", t); t->switchTo(); IAssert(t->getStatus() != Task::STARTING, "Task at %p got switchedTo, but still is 'STARTING'.", t); starting.erase(iH); - if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) { - noWait = true; + if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) stopped.insert(t); - } - if (t->getStatus() == Task::YIELDED) { - noWait = true; + if (t->getStatus() == Task::YIELDED) yielded.insert(t); - } } // if we begin that loop with any pending task, just don't loop, so we can add them immediately. - if (!m_pendingAdd.isEmpty() || !yielded.empty()) - noWait = true; + bool noWait = !m_pendingAdd.isEmpty() || !yielded.empty() || !stopped.empty(); // libev's event "loop". We always runs it once though. m_allowedToSignal = true; @@ -261,7 +268,7 @@ int Balau::TaskMan::mainLoop() { ev_run(m_loop, noWait || m_stopped ? EVRUN_NOWAIT : EVRUN_ONCE); Printer::elog(E_TASK, "TaskMan at %p Getting out of libev main loop", this); - // calling async's idle loop here + // calling async's idle s_async.idle(); // let's check what task got stopped, and signal them @@ -276,7 +283,7 @@ int Balau::TaskMan::mainLoop() { // let's check who got signaled, and call them for (Task * t : m_signaledTasks) { Printer::elog(E_TASK, "TaskMan at %p Switching to task %p (%s - %s) that got signaled somehow.", this, t, t->getName(), ClassName(t).c_str()); - IAssert(t->getStatus() == Task::IDLE || t->getStatus() == Task::YIELDED, "We're switching to a non-idle/yielded task at %p... ? status = %i", t, t->getStatus()); + IAssert(t->getStatus() == Task::SLEEPING || t->getStatus() == Task::YIELDED, "We're switching to a non-sleeping/yielded task at %p... ? status = %i", t, t->getStatus()); bool wasYielded = t->getStatus() == Task::YIELDED; t->switchTo(); if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) { @@ -313,7 +320,7 @@ int Balau::TaskMan::mainLoop() { Printer::elog(E_TASK, "TaskMan at %p popped task %p...", this, t); IAssert(m_tasks.find(t) == m_tasks.end(), "TaskMan got task %p twice... ?", t); ev_now_update(m_loop); - t->setup(this, getStack()); + t->setup(this, t->isStackless() ? NULL : getStack()); m_tasks.insert(t); starting.insert(t); } @@ -345,12 +352,14 @@ int Balau::TaskMan::mainLoop() { return m_stopCode; } -void Balau::TaskMan::registerTask(Balau::Task * t, Balau::Task * stick) { +void Balau::TaskMan::iRegisterTask(Balau::Task * t, Balau::Task * stick, Events::TaskEvent * event) { if (stick) { + IAssert(!event, "inconsistent"); TaskMan * tm = stick->getTaskMan(); tm->addToPending(t); - tm->m_evt.send(); } else { + if (event) + event->attachToTask(t); s_scheduler.registerTask(t); } } @@ -361,6 +370,7 @@ void Balau::TaskMan::registerAsyncOp(Balau::AsyncOperation * op) { void Balau::TaskMan::addToPending(Balau::Task * t) { m_pendingAdd.push(t); + m_evt.send(); } void Balau::TaskMan::signalTask(Task * t) { @@ -373,17 +383,51 @@ void Balau::TaskMan::stop(int code) { s_scheduler.stopAll(code); } -class ThreadedTaskMan : public Balau::Thread { - virtual void * proc() { +void * Balau::TaskMan::TaskManThread::proc() { + bool success = false; + m_taskMan = NULL; + try { m_taskMan = new Balau::TaskMan(); m_taskMan->mainLoop(); - return NULL; + success = true; } - Balau::TaskMan * m_taskMan; -}; + catch (Exit e) { + Printer::log(M_ERROR, "We shouldn't have gotten an Exit exception here... exitting anyway"); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_ERROR, "%s", str.to_charp()); + } + catch (RessourceException e) { + Printer::log(M_ERROR | M_ALERT, "The TaskMan thread got a ressource problem: %s", e.getMsg()); + const char * details = e.getDetails(); + if (details) + Printer::log(M_ERROR, " %s", details); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_DEBUG, "%s", str.to_charp()); + } + catch (GeneralException e) { + Printer::log(M_ERROR | M_ALERT, "The TaskMan thread caused an exception: %s", e.getMsg()); + const char * details = e.getDetails(); + if (details) + Printer::log(M_ERROR, " %s", details); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_DEBUG, "%s", str.to_charp()); + } + catch (...) { + Printer::log(M_ERROR | M_ALERT, "The TaskMan thread caused an unknown exception"); + } + if (!success) { + if (m_taskMan) + delete m_taskMan; + m_taskMan = NULL; + TaskMan::stop(-1); + } + return NULL; +} -Balau::Thread * Balau::TaskMan::createThreadedTaskMan() { - Thread * r = new ThreadedTaskMan(); - r->threadStart(); - return r; +Balau::TaskMan::TaskManThread::~TaskManThread() { + if (m_taskMan) + delete m_taskMan; } |