diff options
-rw-r--r-- | includes/Task.h | 3 | ||||
-rw-r--r-- | src/Task.cc | 22 | ||||
-rw-r--r-- | src/TaskMan.cc | 76 |
3 files changed, 79 insertions, 22 deletions
diff --git a/includes/Task.h b/includes/Task.h index 03233f8..4617521 100644 --- a/includes/Task.h +++ b/includes/Task.h @@ -122,6 +122,7 @@ class Task { IDLE, STOPPED, FAULTED, + YIELDED, }; Task(); virtual ~Task(); @@ -136,7 +137,7 @@ class Task { TaskMan * getTaskMan() { return m_taskMan; } struct ev_loop * getLoop(); protected: - void yield(); + void yield(bool changeStatus = false); virtual void Do() = 0; void waitFor(Events::BaseEvent * event); bool setOkayToEAgain(bool enable) { diff --git a/src/Task.cc b/src/Task.cc index cc6fa94..8f30cd4 100644 --- a/src/Task.cc +++ b/src/Task.cc @@ -53,8 +53,8 @@ void Balau::Task::coroutineTrampoline(void * arg) { } void Balau::Task::coroutine() { - IAssert(m_status == STARTING, "The Task at %p was badly initialized ? m_status = %i", this, m_status); try { + IAssert(m_status == STARTING, "The Task at %p was badly initialized ? m_status = %i", this, m_status); m_status = RUNNING; Do(); m_status = STOPPED; @@ -99,9 +99,11 @@ void Balau::Task::coroutine() { void Balau::Task::switchTo() { Printer::elog(E_TASK, "Switching to task %p - %s", this, getName()); - IAssert(m_status == IDLE || m_status == STARTING, "The task at %p isn't either idle or starting... ? m_status = %i", this, m_status); + IAssert(m_status == YIELDED || m_status == IDLE || m_status == STARTING, "The task at %p isn't either yielded, idle or starting... ? m_status = %i", this, m_status); void * oldTLS = g_tlsManager->getTLS(); g_tlsManager->setTLS(m_tls); + if (m_status == YIELDED || m_status == IDLE) + m_status = RUNNING; #ifndef _WIN32 coro_transfer(&m_taskMan->m_returnContext, &m_ctx); #else @@ -112,8 +114,10 @@ void Balau::Task::switchTo() { m_status = IDLE; } -void Balau::Task::yield() { +void Balau::Task::yield(bool changeStatus) { Printer::elog(E_TASK, "Task %p - %s yielding", this, getName()); + if (changeStatus) + m_status = YIELDED; #ifndef _WIN32 coro_transfer(&m_ctx, &m_taskMan->m_returnContext); #else @@ -210,12 +214,18 @@ void Balau::Events::Custom::gotOwner(Task * task) { void Balau::Task::yield(Events::BaseEvent * evt, bool interruptible) throw (GeneralException) { Task * t = getCurrentTask(); - t->waitFor(evt); + if (evt) + t->waitFor(evt); + bool gotSignal; do { - t->yield(); + t->yield(evt == NULL); Printer::elog(E_TASK, "operation back from yielding; interruptible = %s; okayToEAgain = %s", interruptible ? "true" : "false", t->m_okayToEAgain ? "true" : "false"); - } while ((!interruptible || !t->m_okayToEAgain) && !evt->gotSignal()); + gotSignal = evt ? evt->gotSignal() : true; + } while ((!interruptible || !t->m_okayToEAgain) && !gotSignal); + + if (!evt) + return; if (interruptible && t->m_okayToEAgain && !evt->gotSignal()) { Printer::elog(E_TASK, "operation is throwing an exception."); diff --git a/src/TaskMan.cc b/src/TaskMan.cc index a1d8255..843423f 100644 --- a/src/TaskMan.cc +++ b/src/TaskMan.cc @@ -202,24 +202,39 @@ void Balau::TaskMan::freeStack(void * stack) { } int Balau::TaskMan::mainLoop() { + taskHash_t starting, stopped, yielded, yielded2; + taskHash_t::iterator iH; + Task * t; + + // we start by pushing all of the 'STARTING' tasks into the appropriate queue. + for (iH = m_tasks.begin(); iH != m_tasks.end(); iH++) + if (t->getStatus() == Task::STARTING) + starting.insert(*iH); + do { - taskHash_t::iterator iH; - Task * t; bool noWait = false; Printer::elog(E_TASK, "TaskMan::mainLoop() at %p with m_tasks.size = %li", this, m_tasks.size()); // checking "STARTING" tasks, and running them once; also try to build the status of the noWait boolean. - for (iH = m_tasks.begin(); iH != m_tasks.end(); iH++) { + while ((iH = starting.begin()) != starting.end()) { t = *iH; - if (t->getStatus() == Task::STARTING) - t->switchTo(); - if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) + IAssert(t->getStatus() == Task::STARTING, "Got task at %p in the starting list, but isn't starting.", t); + t->switchTo(); + IAssert(t->getStatus() != Task::STARTING, "Task at %p got switchedTo, but still is 'STARTING'.", t); + starting.erase(iH); + if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) { + noWait = true; + stopped.insert(t); + } + if (t->getStatus() == Task::YIELDED) { noWait = true; + yielded.insert(t); + } } // if we begin that loop with any pending task, just don't loop, so we can add them immediately. - if (!m_pendingAdd.isEmpty()) + if (!m_pendingAdd.isEmpty() || !yielded.empty()) noWait = true; // libev's event "loop". We always runs it once though. @@ -229,10 +244,10 @@ int Balau::TaskMan::mainLoop() { Printer::elog(E_TASK, "TaskMan at %p Getting out of libev main loop", this); // let's check what task got stopped, and signal them - for (iH = m_tasks.begin(); iH != m_tasks.end(); iH++) { + for (iH = stopped.begin(); iH != stopped.end(); iH++) { t = *iH; - if (((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) && - (t->m_waitedBy.size() != 0)) { + IAssert((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED), "Task %p in stopped list but isn't stopped.", t); + if (t->m_waitedBy.size() != 0) { Task::waitedByList_t::iterator i; for (i = t->m_waitedBy.begin(); i != t->m_waitedBy.end(); i++) { Events::TaskEvent * e = *i; @@ -246,11 +261,37 @@ int Balau::TaskMan::mainLoop() { for (iH = m_signaledTasks.begin(); iH != m_signaledTasks.end(); iH++) { t = *iH; Printer::elog(E_TASK, "TaskMan at %p Switching to task %p (%s - %s) that got signaled somehow.", this, t, t->getName(), ClassName(t).c_str()); - IAssert(t->getStatus() == Task::IDLE, "We're switching to a non-idle task... ? status = %i", t->getStatus()); + IAssert(t->getStatus() == Task::IDLE || t->getStatus() == Task::YIELDED, "We're switching to a non-idle/yielded task at %p... ? status = %i", t, t->getStatus()); + bool wasYielded = t->getStatus() == Task::YIELDED; t->switchTo(); + if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) { + stopped.insert(t); + if (wasYielded) { + taskHash_t::iterator i = yielded.find(t); + IAssert(i != yielded.end(), "Task at %p was yielded, but not in yielded list... ?", t); + yielded.erase(i); + } + } else if (t->getStatus() == Task::YIELDED) { + yielded.insert(t); + } } m_signaledTasks.clear(); + // now let's make a round of yielded tasks + for (iH = yielded.begin(); iH != yielded.end(); iH++) { + t = *iH; + Printer::elog(E_TASK, "TaskMan at %p Switching to task %p (%s - %s) that was yielded.", this, t, t->getName(), ClassName(t).c_str()); + IAssert(t->getStatus() == Task::YIELDED, "Task %p was in yielded list, but wasn't yielded ?", t); + t->switchTo(); + if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) { + stopped.insert(t); + } else if (t->getStatus() == Task::YIELDED) { + yielded2.insert(t); + } + } + yielded = yielded2; + yielded2.clear(); + // Adding tasks that were added, maybe from other threads while (!m_pendingAdd.isEmpty()) { Printer::elog(E_TASK, "TaskMan at %p trying to pop a task...", this); @@ -260,19 +301,24 @@ int Balau::TaskMan::mainLoop() { ev_now_update(m_loop); t->setup(this, getStack()); m_tasks.insert(t); + starting.insert(t); } // Finally, let's destroy tasks that no longer are necessary. bool didDelete; do { didDelete = false; - for (iH = m_tasks.begin(); iH != m_tasks.end(); iH++) { + for (iH = stopped.begin(); iH != stopped.end(); iH++) { t = *iH; - if (((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) && - (t->m_waitedBy.size() == 0)) { + IAssert((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED), "Task %p in stopped list but isn't stopped.", t); + if (t->m_waitedBy.size() == 0) { freeStack(t->m_stack); - delete t; + stopped.erase(iH); + iH = m_tasks.find(t); + IAssert(iH != m_tasks.end(), "Task %p in stopped list but not in m_tasks...", t); m_tasks.erase(iH); + IAssert(yielded.find(t) == yielded.end(), "Task %p is deleted but is in yielded list... ?", t); + delete t; didDelete = true; break; } |