diff options
Diffstat (limited to 'src/Task.cc')
-rw-r--r-- | src/Task.cc | 160 |
1 files changed, 130 insertions, 30 deletions
diff --git a/src/Task.cc b/src/Task.cc index aba29b8..4465203 100644 --- a/src/Task.cc +++ b/src/Task.cc @@ -6,10 +6,7 @@ static Balau::LocalTmpl<Balau::Task> localTask; -Balau::Task::Task() { - m_status = STARTING; - m_okayToEAgain = false; - +Balau::Task::Task() : m_status(STARTING), m_okayToEAgain(false), m_stackless(false) { Printer::elog(E_TASK, "Created a Task at %p", this); } @@ -22,16 +19,21 @@ bool Balau::Task::needsStacks() { } void Balau::Task::setup(TaskMan * taskMan, void * stack) { - size_t size = stackSize(); + if (m_stackless) { + IAssert(!stack, "Since we're stackless, no stack should've been allocated."); + m_stack = NULL; + } else { + size_t size = stackSize(); #ifndef _WIN32 - IAssert(stack, "Can't setup a coroutine without a stack"); - m_stack = stack; - coro_create(&m_ctx, coroutineTrampoline, this, m_stack, size); + IAssert(stack, "Can't setup a coroutine without a stack"); + m_stack = stack; + coro_create(&m_ctx, coroutineTrampoline, this, m_stack, size); #else - Assert(!stack, "We shouldn't allocate stacks with Fibers"); - m_stack = NULL; - m_fiber = CreateFiber(size, coroutineTrampoline, this); + Assert(!stack, "We shouldn't allocate stacks with Fibers"); + m_stack = NULL; + m_fiber = CreateFiber(size, coroutineTrampoline, this); #endif + } m_taskMan = taskMan; @@ -54,7 +56,7 @@ void Balau::Task::coroutineTrampoline(void * arg) { void Balau::Task::coroutine() { try { - IAssert(m_status == STARTING, "The Task at %p was badly initialized ? m_status = %i", this, m_status); + IAssert((m_status == STARTING) || (m_stackless && (m_status == RUNNING)), "The Task at %p has a bad status ? m_status = %s, stackless = %s", this, StatusToString(m_status), m_stackless ? "true" : "false"); m_status = RUNNING; Do(); m_status = STOPPED; @@ -66,6 +68,12 @@ void Balau::Task::coroutine() { catch (TestException & e) { m_status = STOPPED; Printer::log(M_ERROR, "Unit test failed: %s", e.getMsg()); + const char * details = e.getDetails(); + if (details) + Printer::log(M_ERROR, " %s", details); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_ERROR, "%s", str.to_charp()); TaskMan::stop(-1); } catch (RessourceException & e) { @@ -74,8 +82,25 @@ void Balau::Task::coroutine() { const char * details = e.getDetails(); if (details) Printer::log(M_ERROR, " %s", details); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_DEBUG, "%s", str.to_charp()); TaskMan::stop(-1); } + catch (TaskSwitch & e) { + if (!m_stackless) { + Printer::log(M_ERROR, "Task %s at %p isn't stackless, but still caused a task switch.", getName(), this); + const char * details = e.getDetails(); + if (details) + Printer::log(M_ERROR, " %s", details); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_DEBUG, "%s", str.to_charp()); + m_status = FAULTED; + } else { + Printer::elog(E_TASK, "Stackless task %s at %p is task-switching.", getName(), this); + } + } catch (GeneralException & e) { Printer::log(M_WARNING, "Task %s at %p caused an exception: `%s' - stopping.", getName(), this, e.getMsg()); const char * details = e.getDetails(); @@ -90,39 +115,50 @@ void Balau::Task::coroutine() { Printer::log(M_WARNING, "Task %s at %p caused an unknown exception - stopping.", getName(), this); m_status = FAULTED; } + if (!m_stackless) { #ifndef _WIN32 - coro_transfer(&m_ctx, &m_taskMan->m_returnContext); + coro_transfer(&m_ctx, &m_taskMan->m_returnContext); #else - SwitchToFiber(m_taskMan->m_fiber); + SwitchToFiber(m_taskMan->m_fiber); #endif + } } void Balau::Task::switchTo() { Printer::elog(E_TASK, "Switching to task %p - %s", this, getName()); - IAssert(m_status == YIELDED || m_status == IDLE || m_status == STARTING, "The task at %p isn't either yielded, idle or starting... ? m_status = %i", this, m_status); + IAssert(m_status == YIELDED || m_status == SLEEPING || m_status == STARTING, "The task at %p isn't either yielded, sleeping or starting... ? m_status = %s", this, StatusToString(m_status)); void * oldTLS = g_tlsManager->getTLS(); g_tlsManager->setTLS(m_tls); - if (m_status == YIELDED || m_status == IDLE) + if (m_status == YIELDED || m_status == SLEEPING) m_status = RUNNING; + if (m_stackless) { + coroutine(); + } else { #ifndef _WIN32 - coro_transfer(&m_taskMan->m_returnContext, &m_ctx); + coro_transfer(&m_taskMan->m_returnContext, &m_ctx); #else - SwitchToFiber(m_fiber); + SwitchToFiber(m_fiber); #endif + } g_tlsManager->setTLS(oldTLS); - if (m_status == RUNNING) - m_status = IDLE; + IAssert(m_status != RUNNING, "Task %s at %p is still running... ?", getName(), this); } -void Balau::Task::yield(bool changeStatus) { +void Balau::Task::yield(bool stillRunning) throw (GeneralException) { Printer::elog(E_TASK, "Task %p - %s yielding", this, getName()); - if (changeStatus) + if (stillRunning) m_status = YIELDED; + else + m_status = SLEEPING; + if (m_stackless) { + throw EAgain(NULL); + } else { #ifndef _WIN32 - coro_transfer(&m_ctx, &m_taskMan->m_returnContext); + coro_transfer(&m_ctx, &m_taskMan->m_returnContext); #else - SwitchToFiber(m_taskMan->m_fiber); + SwitchToFiber(m_taskMan->m_fiber); #endif + } } Balau::Task * Balau::Task::getCurrentTask() { @@ -149,7 +185,15 @@ void Balau::Events::BaseEvent::doSignal() { } } -Balau::Events::TaskEvent::TaskEvent(Task * taskWaited) : m_taskWaited(taskWaited), m_ack(false), m_distant(false) { +Balau::Events::TaskEvent::TaskEvent(Task * taskWaited) : m_taskWaited(NULL), m_ack(false), m_distant(false) { + if (taskWaited) + attachToTask(taskWaited); +} + +void Balau::Events::TaskEvent::attachToTask(Task * taskWaited) { + AAssert(!m_taskWaited, "You can't attach a TaskEvent twice."); + m_ack = false; + m_taskWaited = taskWaited; ScopeLock lock(m_taskWaited->m_eventLock); m_taskWaited->m_waitedBy.push_back(this); } @@ -157,7 +201,8 @@ Balau::Events::TaskEvent::TaskEvent(Task * taskWaited) : m_taskWaited(taskWaited void Balau::Events::TaskEvent::signal() { if (m_distant) m_evt.send(); - doSignal(); + else + doSignal(); } void Balau::Events::TaskEvent::gotOwner(Task * task) { @@ -198,6 +243,7 @@ void Balau::Events::TaskEvent::ack() { IAssert(deleted, "We didn't find task %p in the waitedBy lists... ?", this); m_ack = true; reset(); + m_taskWaited = NULL; } void Balau::Events::Timeout::gotOwner(Task * task) { @@ -214,23 +260,77 @@ void Balau::Events::Custom::gotOwner(Task * task) { m_loop = task->getLoop(); } -void Balau::Task::yield(Events::BaseEvent * evt, bool interruptible) throw (GeneralException) { +void Balau::Task::operationYield(Events::BaseEvent * evt, enum OperationYieldType yieldType) throw (GeneralException) { Task * t = getCurrentTask(); if (evt) t->waitFor(evt); + + if (t->m_stackless) { + AAssert(yieldType != SIMPLE, "You can't run simple operations from a stackless task."); + } + + if (yieldType == STACKLESS) { + AAssert(t->m_okayToEAgain, "You can't run a stackless operation from a non-okay-to-eagain task."); + } + bool gotSignal; do { t->yield(evt == NULL); - Printer::elog(E_TASK, "operation back from yielding; interruptible = %s; okayToEAgain = %s", interruptible ? "true" : "false", t->m_okayToEAgain ? "true" : "false"); + static const char * YieldTypeToString[] = { + "SIMPLE", + "INTERRUPTIBLE", + "STACKLESS", + }; + Printer::elog(E_TASK, "operation back from yielding; yieldType = %s; okayToEAgain = %s", YieldTypeToString[yieldType], t->m_okayToEAgain ? "true" : "false"); gotSignal = evt ? evt->gotSignal() : true; - } while ((!interruptible || !t->m_okayToEAgain) && !gotSignal); + } while (((yieldType == SIMPLE) || !t->m_okayToEAgain) && !gotSignal); if (!evt) return; - if (interruptible && t->m_okayToEAgain && !evt->gotSignal()) { + if ((yieldType != SIMPLE) && t->m_okayToEAgain && !evt->gotSignal()) { Printer::elog(E_TASK, "operation is throwing an exception."); throw EAgain(evt); } } + +void Balau::QueueBase::iPush(void * t, Events::Async * event) { + ScopeLock sl(m_lock); + Cell * c = new Cell(t); + c->m_prev = m_back; + if (m_back) + m_back->m_next = c; + else + m_front = c; + m_back = c; + if (event) + event->trigger(); + else + pthread_cond_signal(&m_cond); +} + +void * Balau::QueueBase::iPop(Events::Async * event, bool wait) { + ScopeLock sl(m_lock); + while (!m_front && wait) { + if (event) { + Task::prepare(event); + m_lock.leave(); + Task::operationYield(event, Task::INTERRUPTIBLE); + m_lock.enter(); + } else { + pthread_cond_wait(&m_cond, &m_lock.m_lock); + } + } + Cell * c = m_front; + if (!c) + return NULL; + m_front = c->m_next; + if (m_front) + m_front->m_prev = NULL; + else + m_back = NULL; + void * t = c->m_elem; + delete c; + return t; +} |