summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--includes/Task.h3
-rw-r--r--src/Task.cc22
-rw-r--r--src/TaskMan.cc76
3 files changed, 79 insertions, 22 deletions
diff --git a/includes/Task.h b/includes/Task.h
index 03233f8..4617521 100644
--- a/includes/Task.h
+++ b/includes/Task.h
@@ -122,6 +122,7 @@ class Task {
IDLE,
STOPPED,
FAULTED,
+ YIELDED,
};
Task();
virtual ~Task();
@@ -136,7 +137,7 @@ class Task {
TaskMan * getTaskMan() { return m_taskMan; }
struct ev_loop * getLoop();
protected:
- void yield();
+ void yield(bool changeStatus = false);
virtual void Do() = 0;
void waitFor(Events::BaseEvent * event);
bool setOkayToEAgain(bool enable) {
diff --git a/src/Task.cc b/src/Task.cc
index cc6fa94..8f30cd4 100644
--- a/src/Task.cc
+++ b/src/Task.cc
@@ -53,8 +53,8 @@ void Balau::Task::coroutineTrampoline(void * arg) {
}
void Balau::Task::coroutine() {
- IAssert(m_status == STARTING, "The Task at %p was badly initialized ? m_status = %i", this, m_status);
try {
+ IAssert(m_status == STARTING, "The Task at %p was badly initialized ? m_status = %i", this, m_status);
m_status = RUNNING;
Do();
m_status = STOPPED;
@@ -99,9 +99,11 @@ void Balau::Task::coroutine() {
void Balau::Task::switchTo() {
Printer::elog(E_TASK, "Switching to task %p - %s", this, getName());
- IAssert(m_status == IDLE || m_status == STARTING, "The task at %p isn't either idle or starting... ? m_status = %i", this, m_status);
+ IAssert(m_status == YIELDED || m_status == IDLE || m_status == STARTING, "The task at %p isn't either yielded, idle or starting... ? m_status = %i", this, m_status);
void * oldTLS = g_tlsManager->getTLS();
g_tlsManager->setTLS(m_tls);
+ if (m_status == YIELDED || m_status == IDLE)
+ m_status = RUNNING;
#ifndef _WIN32
coro_transfer(&m_taskMan->m_returnContext, &m_ctx);
#else
@@ -112,8 +114,10 @@ void Balau::Task::switchTo() {
m_status = IDLE;
}
-void Balau::Task::yield() {
+void Balau::Task::yield(bool changeStatus) {
Printer::elog(E_TASK, "Task %p - %s yielding", this, getName());
+ if (changeStatus)
+ m_status = YIELDED;
#ifndef _WIN32
coro_transfer(&m_ctx, &m_taskMan->m_returnContext);
#else
@@ -210,12 +214,18 @@ void Balau::Events::Custom::gotOwner(Task * task) {
void Balau::Task::yield(Events::BaseEvent * evt, bool interruptible) throw (GeneralException) {
Task * t = getCurrentTask();
- t->waitFor(evt);
+ if (evt)
+ t->waitFor(evt);
+ bool gotSignal;
do {
- t->yield();
+ t->yield(evt == NULL);
Printer::elog(E_TASK, "operation back from yielding; interruptible = %s; okayToEAgain = %s", interruptible ? "true" : "false", t->m_okayToEAgain ? "true" : "false");
- } while ((!interruptible || !t->m_okayToEAgain) && !evt->gotSignal());
+ gotSignal = evt ? evt->gotSignal() : true;
+ } while ((!interruptible || !t->m_okayToEAgain) && !gotSignal);
+
+ if (!evt)
+ return;
if (interruptible && t->m_okayToEAgain && !evt->gotSignal()) {
Printer::elog(E_TASK, "operation is throwing an exception.");
diff --git a/src/TaskMan.cc b/src/TaskMan.cc
index a1d8255..843423f 100644
--- a/src/TaskMan.cc
+++ b/src/TaskMan.cc
@@ -202,24 +202,39 @@ void Balau::TaskMan::freeStack(void * stack) {
}
int Balau::TaskMan::mainLoop() {
+ taskHash_t starting, stopped, yielded, yielded2;
+ taskHash_t::iterator iH;
+ Task * t;
+
+ // we start by pushing all of the 'STARTING' tasks into the appropriate queue.
+ for (iH = m_tasks.begin(); iH != m_tasks.end(); iH++)
+ if (t->getStatus() == Task::STARTING)
+ starting.insert(*iH);
+
do {
- taskHash_t::iterator iH;
- Task * t;
bool noWait = false;
Printer::elog(E_TASK, "TaskMan::mainLoop() at %p with m_tasks.size = %li", this, m_tasks.size());
// checking "STARTING" tasks, and running them once; also try to build the status of the noWait boolean.
- for (iH = m_tasks.begin(); iH != m_tasks.end(); iH++) {
+ while ((iH = starting.begin()) != starting.end()) {
t = *iH;
- if (t->getStatus() == Task::STARTING)
- t->switchTo();
- if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED))
+ IAssert(t->getStatus() == Task::STARTING, "Got task at %p in the starting list, but isn't starting.", t);
+ t->switchTo();
+ IAssert(t->getStatus() != Task::STARTING, "Task at %p got switchedTo, but still is 'STARTING'.", t);
+ starting.erase(iH);
+ if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) {
+ noWait = true;
+ stopped.insert(t);
+ }
+ if (t->getStatus() == Task::YIELDED) {
noWait = true;
+ yielded.insert(t);
+ }
}
// if we begin that loop with any pending task, just don't loop, so we can add them immediately.
- if (!m_pendingAdd.isEmpty())
+ if (!m_pendingAdd.isEmpty() || !yielded.empty())
noWait = true;
// libev's event "loop". We always runs it once though.
@@ -229,10 +244,10 @@ int Balau::TaskMan::mainLoop() {
Printer::elog(E_TASK, "TaskMan at %p Getting out of libev main loop", this);
// let's check what task got stopped, and signal them
- for (iH = m_tasks.begin(); iH != m_tasks.end(); iH++) {
+ for (iH = stopped.begin(); iH != stopped.end(); iH++) {
t = *iH;
- if (((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) &&
- (t->m_waitedBy.size() != 0)) {
+ IAssert((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED), "Task %p in stopped list but isn't stopped.", t);
+ if (t->m_waitedBy.size() != 0) {
Task::waitedByList_t::iterator i;
for (i = t->m_waitedBy.begin(); i != t->m_waitedBy.end(); i++) {
Events::TaskEvent * e = *i;
@@ -246,11 +261,37 @@ int Balau::TaskMan::mainLoop() {
for (iH = m_signaledTasks.begin(); iH != m_signaledTasks.end(); iH++) {
t = *iH;
Printer::elog(E_TASK, "TaskMan at %p Switching to task %p (%s - %s) that got signaled somehow.", this, t, t->getName(), ClassName(t).c_str());
- IAssert(t->getStatus() == Task::IDLE, "We're switching to a non-idle task... ? status = %i", t->getStatus());
+ IAssert(t->getStatus() == Task::IDLE || t->getStatus() == Task::YIELDED, "We're switching to a non-idle/yielded task at %p... ? status = %i", t, t->getStatus());
+ bool wasYielded = t->getStatus() == Task::YIELDED;
t->switchTo();
+ if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) {
+ stopped.insert(t);
+ if (wasYielded) {
+ taskHash_t::iterator i = yielded.find(t);
+ IAssert(i != yielded.end(), "Task at %p was yielded, but not in yielded list... ?", t);
+ yielded.erase(i);
+ }
+ } else if (t->getStatus() == Task::YIELDED) {
+ yielded.insert(t);
+ }
}
m_signaledTasks.clear();
+ // now let's make a round of yielded tasks
+ for (iH = yielded.begin(); iH != yielded.end(); iH++) {
+ t = *iH;
+ Printer::elog(E_TASK, "TaskMan at %p Switching to task %p (%s - %s) that was yielded.", this, t, t->getName(), ClassName(t).c_str());
+ IAssert(t->getStatus() == Task::YIELDED, "Task %p was in yielded list, but wasn't yielded ?", t);
+ t->switchTo();
+ if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) {
+ stopped.insert(t);
+ } else if (t->getStatus() == Task::YIELDED) {
+ yielded2.insert(t);
+ }
+ }
+ yielded = yielded2;
+ yielded2.clear();
+
// Adding tasks that were added, maybe from other threads
while (!m_pendingAdd.isEmpty()) {
Printer::elog(E_TASK, "TaskMan at %p trying to pop a task...", this);
@@ -260,19 +301,24 @@ int Balau::TaskMan::mainLoop() {
ev_now_update(m_loop);
t->setup(this, getStack());
m_tasks.insert(t);
+ starting.insert(t);
}
// Finally, let's destroy tasks that no longer are necessary.
bool didDelete;
do {
didDelete = false;
- for (iH = m_tasks.begin(); iH != m_tasks.end(); iH++) {
+ for (iH = stopped.begin(); iH != stopped.end(); iH++) {
t = *iH;
- if (((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) &&
- (t->m_waitedBy.size() == 0)) {
+ IAssert((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED), "Task %p in stopped list but isn't stopped.", t);
+ if (t->m_waitedBy.size() == 0) {
freeStack(t->m_stack);
- delete t;
+ stopped.erase(iH);
+ iH = m_tasks.find(t);
+ IAssert(iH != m_tasks.end(), "Task %p in stopped list but not in m_tasks...", t);
m_tasks.erase(iH);
+ IAssert(yielded.find(t) == yielded.end(), "Task %p is deleted but is in yielded list... ?", t);
+ delete t;
didDelete = true;
break;
}