summaryrefslogtreecommitdiff
path: root/src/Task.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/Task.cc')
-rw-r--r--src/Task.cc160
1 files changed, 130 insertions, 30 deletions
diff --git a/src/Task.cc b/src/Task.cc
index aba29b8..4465203 100644
--- a/src/Task.cc
+++ b/src/Task.cc
@@ -6,10 +6,7 @@
static Balau::LocalTmpl<Balau::Task> localTask;
-Balau::Task::Task() {
- m_status = STARTING;
- m_okayToEAgain = false;
-
+Balau::Task::Task() : m_status(STARTING), m_okayToEAgain(false), m_stackless(false) {
Printer::elog(E_TASK, "Created a Task at %p", this);
}
@@ -22,16 +19,21 @@ bool Balau::Task::needsStacks() {
}
void Balau::Task::setup(TaskMan * taskMan, void * stack) {
- size_t size = stackSize();
+ if (m_stackless) {
+ IAssert(!stack, "Since we're stackless, no stack should've been allocated.");
+ m_stack = NULL;
+ } else {
+ size_t size = stackSize();
#ifndef _WIN32
- IAssert(stack, "Can't setup a coroutine without a stack");
- m_stack = stack;
- coro_create(&m_ctx, coroutineTrampoline, this, m_stack, size);
+ IAssert(stack, "Can't setup a coroutine without a stack");
+ m_stack = stack;
+ coro_create(&m_ctx, coroutineTrampoline, this, m_stack, size);
#else
- Assert(!stack, "We shouldn't allocate stacks with Fibers");
- m_stack = NULL;
- m_fiber = CreateFiber(size, coroutineTrampoline, this);
+ Assert(!stack, "We shouldn't allocate stacks with Fibers");
+ m_stack = NULL;
+ m_fiber = CreateFiber(size, coroutineTrampoline, this);
#endif
+ }
m_taskMan = taskMan;
@@ -54,7 +56,7 @@ void Balau::Task::coroutineTrampoline(void * arg) {
void Balau::Task::coroutine() {
try {
- IAssert(m_status == STARTING, "The Task at %p was badly initialized ? m_status = %i", this, m_status);
+ IAssert((m_status == STARTING) || (m_stackless && (m_status == RUNNING)), "The Task at %p has a bad status ? m_status = %s, stackless = %s", this, StatusToString(m_status), m_stackless ? "true" : "false");
m_status = RUNNING;
Do();
m_status = STOPPED;
@@ -66,6 +68,12 @@ void Balau::Task::coroutine() {
catch (TestException & e) {
m_status = STOPPED;
Printer::log(M_ERROR, "Unit test failed: %s", e.getMsg());
+ const char * details = e.getDetails();
+ if (details)
+ Printer::log(M_ERROR, " %s", details);
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_ERROR, "%s", str.to_charp());
TaskMan::stop(-1);
}
catch (RessourceException & e) {
@@ -74,8 +82,25 @@ void Balau::Task::coroutine() {
const char * details = e.getDetails();
if (details)
Printer::log(M_ERROR, " %s", details);
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_DEBUG, "%s", str.to_charp());
TaskMan::stop(-1);
}
+ catch (TaskSwitch & e) {
+ if (!m_stackless) {
+ Printer::log(M_ERROR, "Task %s at %p isn't stackless, but still caused a task switch.", getName(), this);
+ const char * details = e.getDetails();
+ if (details)
+ Printer::log(M_ERROR, " %s", details);
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_DEBUG, "%s", str.to_charp());
+ m_status = FAULTED;
+ } else {
+ Printer::elog(E_TASK, "Stackless task %s at %p is task-switching.", getName(), this);
+ }
+ }
catch (GeneralException & e) {
Printer::log(M_WARNING, "Task %s at %p caused an exception: `%s' - stopping.", getName(), this, e.getMsg());
const char * details = e.getDetails();
@@ -90,39 +115,50 @@ void Balau::Task::coroutine() {
Printer::log(M_WARNING, "Task %s at %p caused an unknown exception - stopping.", getName(), this);
m_status = FAULTED;
}
+ if (!m_stackless) {
#ifndef _WIN32
- coro_transfer(&m_ctx, &m_taskMan->m_returnContext);
+ coro_transfer(&m_ctx, &m_taskMan->m_returnContext);
#else
- SwitchToFiber(m_taskMan->m_fiber);
+ SwitchToFiber(m_taskMan->m_fiber);
#endif
+ }
}
void Balau::Task::switchTo() {
Printer::elog(E_TASK, "Switching to task %p - %s", this, getName());
- IAssert(m_status == YIELDED || m_status == IDLE || m_status == STARTING, "The task at %p isn't either yielded, idle or starting... ? m_status = %i", this, m_status);
+ IAssert(m_status == YIELDED || m_status == SLEEPING || m_status == STARTING, "The task at %p isn't either yielded, sleeping or starting... ? m_status = %s", this, StatusToString(m_status));
void * oldTLS = g_tlsManager->getTLS();
g_tlsManager->setTLS(m_tls);
- if (m_status == YIELDED || m_status == IDLE)
+ if (m_status == YIELDED || m_status == SLEEPING)
m_status = RUNNING;
+ if (m_stackless) {
+ coroutine();
+ } else {
#ifndef _WIN32
- coro_transfer(&m_taskMan->m_returnContext, &m_ctx);
+ coro_transfer(&m_taskMan->m_returnContext, &m_ctx);
#else
- SwitchToFiber(m_fiber);
+ SwitchToFiber(m_fiber);
#endif
+ }
g_tlsManager->setTLS(oldTLS);
- if (m_status == RUNNING)
- m_status = IDLE;
+ IAssert(m_status != RUNNING, "Task %s at %p is still running... ?", getName(), this);
}
-void Balau::Task::yield(bool changeStatus) {
+void Balau::Task::yield(bool stillRunning) throw (GeneralException) {
Printer::elog(E_TASK, "Task %p - %s yielding", this, getName());
- if (changeStatus)
+ if (stillRunning)
m_status = YIELDED;
+ else
+ m_status = SLEEPING;
+ if (m_stackless) {
+ throw EAgain(NULL);
+ } else {
#ifndef _WIN32
- coro_transfer(&m_ctx, &m_taskMan->m_returnContext);
+ coro_transfer(&m_ctx, &m_taskMan->m_returnContext);
#else
- SwitchToFiber(m_taskMan->m_fiber);
+ SwitchToFiber(m_taskMan->m_fiber);
#endif
+ }
}
Balau::Task * Balau::Task::getCurrentTask() {
@@ -149,7 +185,15 @@ void Balau::Events::BaseEvent::doSignal() {
}
}
-Balau::Events::TaskEvent::TaskEvent(Task * taskWaited) : m_taskWaited(taskWaited), m_ack(false), m_distant(false) {
+Balau::Events::TaskEvent::TaskEvent(Task * taskWaited) : m_taskWaited(NULL), m_ack(false), m_distant(false) {
+ if (taskWaited)
+ attachToTask(taskWaited);
+}
+
+void Balau::Events::TaskEvent::attachToTask(Task * taskWaited) {
+ AAssert(!m_taskWaited, "You can't attach a TaskEvent twice.");
+ m_ack = false;
+ m_taskWaited = taskWaited;
ScopeLock lock(m_taskWaited->m_eventLock);
m_taskWaited->m_waitedBy.push_back(this);
}
@@ -157,7 +201,8 @@ Balau::Events::TaskEvent::TaskEvent(Task * taskWaited) : m_taskWaited(taskWaited
void Balau::Events::TaskEvent::signal() {
if (m_distant)
m_evt.send();
- doSignal();
+ else
+ doSignal();
}
void Balau::Events::TaskEvent::gotOwner(Task * task) {
@@ -198,6 +243,7 @@ void Balau::Events::TaskEvent::ack() {
IAssert(deleted, "We didn't find task %p in the waitedBy lists... ?", this);
m_ack = true;
reset();
+ m_taskWaited = NULL;
}
void Balau::Events::Timeout::gotOwner(Task * task) {
@@ -214,23 +260,77 @@ void Balau::Events::Custom::gotOwner(Task * task) {
m_loop = task->getLoop();
}
-void Balau::Task::yield(Events::BaseEvent * evt, bool interruptible) throw (GeneralException) {
+void Balau::Task::operationYield(Events::BaseEvent * evt, enum OperationYieldType yieldType) throw (GeneralException) {
Task * t = getCurrentTask();
if (evt)
t->waitFor(evt);
+
+ if (t->m_stackless) {
+ AAssert(yieldType != SIMPLE, "You can't run simple operations from a stackless task.");
+ }
+
+ if (yieldType == STACKLESS) {
+ AAssert(t->m_okayToEAgain, "You can't run a stackless operation from a non-okay-to-eagain task.");
+ }
+
bool gotSignal;
do {
t->yield(evt == NULL);
- Printer::elog(E_TASK, "operation back from yielding; interruptible = %s; okayToEAgain = %s", interruptible ? "true" : "false", t->m_okayToEAgain ? "true" : "false");
+ static const char * YieldTypeToString[] = {
+ "SIMPLE",
+ "INTERRUPTIBLE",
+ "STACKLESS",
+ };
+ Printer::elog(E_TASK, "operation back from yielding; yieldType = %s; okayToEAgain = %s", YieldTypeToString[yieldType], t->m_okayToEAgain ? "true" : "false");
gotSignal = evt ? evt->gotSignal() : true;
- } while ((!interruptible || !t->m_okayToEAgain) && !gotSignal);
+ } while (((yieldType == SIMPLE) || !t->m_okayToEAgain) && !gotSignal);
if (!evt)
return;
- if (interruptible && t->m_okayToEAgain && !evt->gotSignal()) {
+ if ((yieldType != SIMPLE) && t->m_okayToEAgain && !evt->gotSignal()) {
Printer::elog(E_TASK, "operation is throwing an exception.");
throw EAgain(evt);
}
}
+
+void Balau::QueueBase::iPush(void * t, Events::Async * event) {
+ ScopeLock sl(m_lock);
+ Cell * c = new Cell(t);
+ c->m_prev = m_back;
+ if (m_back)
+ m_back->m_next = c;
+ else
+ m_front = c;
+ m_back = c;
+ if (event)
+ event->trigger();
+ else
+ pthread_cond_signal(&m_cond);
+}
+
+void * Balau::QueueBase::iPop(Events::Async * event, bool wait) {
+ ScopeLock sl(m_lock);
+ while (!m_front && wait) {
+ if (event) {
+ Task::prepare(event);
+ m_lock.leave();
+ Task::operationYield(event, Task::INTERRUPTIBLE);
+ m_lock.enter();
+ } else {
+ pthread_cond_wait(&m_cond, &m_lock.m_lock);
+ }
+ }
+ Cell * c = m_front;
+ if (!c)
+ return NULL;
+ m_front = c->m_next;
+ if (m_front)
+ m_front->m_prev = NULL;
+ else
+ m_back = NULL;
+ void * t = c->m_elem;
+ delete c;
+ return t;
+}