diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Async.cc | 6 | ||||
-rw-r--r-- | src/Handle.cc | 6 | ||||
-rw-r--r-- | src/HttpServer.cc | 12 | ||||
-rw-r--r-- | src/Input.cc | 8 | ||||
-rw-r--r-- | src/LuaTask.cc | 58 | ||||
-rw-r--r-- | src/Main.cc | 2 | ||||
-rw-r--r-- | src/Output.cc | 8 | ||||
-rw-r--r-- | src/SimpleMustache.cc | 12 | ||||
-rw-r--r-- | src/Socket.cc | 45 | ||||
-rw-r--r-- | src/Task.cc | 160 | ||||
-rw-r--r-- | src/TaskMan.cc | 102 | ||||
-rw-r--r-- | src/Threads.cc | 47 | ||||
-rw-r--r-- | src/ZHandle.cc | 4 |
13 files changed, 362 insertions, 108 deletions
diff --git a/src/Async.cc b/src/Async.cc index 829c677..18c8ef1 100644 --- a/src/Async.cc +++ b/src/Async.cc @@ -28,7 +28,7 @@ void Balau::AsyncManager::queueOp(AsyncOperation * op) { TLS * tls = getTLS(); Printer::elog(E_ASYNC, "Queuing operation at %p", op); if (op->needsSynchronousCallback()) { - Printer::elog(E_ASYNC, "Operation at %p needs synchronous callback, copying values; idleQueue = %p; idleReadyCallback = %p; idleReadyParam = %p", &tls->idleQueue, tls->idleReadyCallback, tls->idleReadyParam); + Printer::elog(E_ASYNC, "Operation at %p needs synchronous callback, copying values; idleQueue = %p; idleReadyCallback = %p; idleReadyParam = %p", op, &tls->idleQueue, tls->idleReadyCallback, tls->idleReadyParam); op->m_idleQueue = &tls->idleQueue; op->m_idleReadyCallback = tls->idleReadyCallback; op->m_idleReadyParam = tls->idleReadyParam; @@ -98,11 +98,13 @@ void * Balau::AsyncManager::proc() { } stopAllWorkers(); + Printer::elog(E_ASYNC, "Async thread waits for all idle queues to empty"); while (Atomic::Prefetch::Decrement(&m_numTLSes)) { TLS * tls = m_TLSes.pop(); while (!tls->idleQueue.isEmpty()); } + Printer::elog(E_ASYNC, "Async thread stopping"); return NULL; } @@ -150,7 +152,7 @@ void Balau::AsyncManager::idle() { while (!m_ready); AsyncOperation * op; TLS * tls = getTLS(); - while ((op = tls->idleQueue.pop(false))) { + while ((op = tls->idleQueue.pop())) { Printer::elog(E_ASYNC, "AsyncManager::idle() is wrapping up operation %p", op); op->done(); } diff --git a/src/Handle.cc b/src/Handle.cc index dacd30d..9bcb419 100644 --- a/src/Handle.cc +++ b/src/Handle.cc @@ -97,7 +97,7 @@ ssize_t Balau::Handle::forceRead(void * _buf, size_t count, Events::BaseEvent * catch (EAgain e) { if (evt && evt->gotSignal()) return total; - Task::yield(e.getEvent()); + Task::operationYield(e.getEvent()); continue; } if (r < 0) @@ -124,7 +124,7 @@ ssize_t Balau::Handle::forceWrite(const void * _buf, size_t count, Events::BaseE catch (EAgain e) { if (evt && evt->gotSignal()) return total; - Task::yield(e.getEvent()); + Task::operationYield(e.getEvent()); continue; } if (r < 0) @@ -242,7 +242,7 @@ int Balau::FileSystem::mkdir(const char * path) throw (GeneralException) { cbResults_t cbResults; eio_req * r = eio_mkdir(path, 0755, 0, eioDone, &cbResults); EAssert(r != NULL, "eio_mkdir returned a NULL eio_req"); - Task::yield(&cbResults.evt); + Task::operationYield(&cbResults.evt); char str[4096]; if (cbResults.result < 0) diff --git a/src/HttpServer.cc b/src/HttpServer.cc index c9cb0ee..23af486 100644 --- a/src/HttpServer.cc +++ b/src/HttpServer.cc @@ -35,6 +35,8 @@ class HttpWorker : public Task { HttpWorker(IO<Handle> io, void * server); ~HttpWorker(); static void buildErrorTemplate(IO<Handle> h) { m_errorTemplate.setTemplate(h); } + template<size_t S> + static void buildErrorTemplate(const char (&str)[S]) { m_errorTemplate.setTemplate(str, S - 1); } static void buildErrorTemplate(const char * str, ssize_t s) { m_errorTemplate.setTemplate(str, s); } static void buildErrorTemplate(const String & str) { m_errorTemplate.setTemplate(str); } private: @@ -594,14 +596,20 @@ typedef Balau::Listener<Balau::HttpWorker> HttpListener; void Balau::HttpServer::start() { AAssert(!m_started, "Don't start an HttpServer twice"); - m_listenerPtr = createTask(new HttpListener(m_port, m_local.to_charp(), this)); + m_listenerPtr = TaskMan::registerTask(new HttpListener(m_port, m_local.to_charp(), this)); m_started = true; } void Balau::HttpServer::stop() { AAssert(m_started, "Don't stop an HttpServer that hasn't been started"); - reinterpret_cast<HttpListener *>(m_listenerPtr)->stop(); + HttpListener * listener = reinterpret_cast<HttpListener *>(m_listenerPtr); + Events::TaskEvent event(listener); + Task::prepare(&event); + listener->stop(); m_started = false; + Task::operationYield(&event); + IAssert(event.gotSignal(), "HttpServer::stop didn't actually get the listener to stop"); + event.ack(); } void Balau::HttpServer::registerAction(Action * action) { diff --git a/src/Input.cc b/src/Input.cc index 8168231..3608cd0 100644 --- a/src/Input.cc +++ b/src/Input.cc @@ -56,7 +56,7 @@ Balau::Input::Input(const char * fname) throw (GeneralException) { cbResults_t cbResults; eio_req * r = eio_open(fname, O_RDONLY, 0, 0, eioDone, &cbResults); EAssert(r != NULL, "eio_open returned a NULL eio_req"); - Task::yield(&cbResults.evt); + Task::operationYield(&cbResults.evt); if (cbResults.result < 0) { char str[4096]; if (cbResults.errorno == ENOENT) { @@ -71,7 +71,7 @@ Balau::Input::Input(const char * fname) throw (GeneralException) { cbStatsResults_t cbStatsResults; r = eio_fstat(m_fd, 0, eioStatsDone, &cbStatsResults); EAssert(r != NULL, "eio_fstat returned a NULL eio_req"); - Task::yield(&cbStatsResults.evt); + Task::operationYield(&cbStatsResults.evt); if (cbStatsResults.result == 0) { m_size = cbStatsResults.statdata.st_size; m_mtime = cbStatsResults.statdata.st_mtime; @@ -85,7 +85,7 @@ void Balau::Input::close() throw (GeneralException) { eio_req * r = eio_close(m_fd, 0, eioDone, &cbResults); EAssert(r != NULL, "eio_close returned a NULL eio_req"); m_fd = -1; - Task::yield(&cbResults.evt); + Task::operationYield(&cbResults.evt); if (cbResults.result < 0) { char str[4096]; strerror_r(cbResults.errorno, str, sizeof(str)); @@ -99,7 +99,7 @@ ssize_t Balau::Input::read(void * buf, size_t count) throw (GeneralException) { cbResults_t cbResults; eio_req * r = eio_read(m_fd, buf, count, getROffset(), 0, eioDone, &cbResults); EAssert(r != NULL, "eio_read returned a NULL eio_req"); - Task::yield(&cbResults.evt); + Task::operationYield(&cbResults.evt); if (cbResults.result > 0) { rseek(cbResults.result, SEEK_CUR); } else { diff --git a/src/LuaTask.cc b/src/LuaTask.cc index d722847..7e0b256 100644 --- a/src/LuaTask.cc +++ b/src/LuaTask.cc @@ -1,26 +1,58 @@ #include "LuaTask.h" #include "Main.h" #include "TaskMan.h" +#include "Printer.h" + +namespace { + +class LuaTaskStopper : public Balau::LuaExecCell { + virtual void run(Balau::Lua &) { } +}; + +}; + +Balau::LuaExecCell::LuaExecCell() { + Printer::elog(E_TASK, "LuaExecCell created at %p", this); +} + +Balau::LuaMainTask::LuaMainTask() { + Printer::elog(E_TASK, "LuaMainTask created at %p", this); + L.open_base(); + L.open_table(); + L.open_string(); + L.open_math(); + L.open_debug(); + L.open_bit(); + L.open_jit(); +} + +Balau::LuaMainTask::~LuaMainTask() { + L.close(); +} void Balau::LuaMainTask::exec(LuaExecCell * cell) { + Printer::elog(E_TASK, "LuaMainTask at %p is asked to queue Cell %p", this, cell); m_queue.push(cell); - m_queueEvent.trigger(); } void Balau::LuaMainTask::stop() { - Atomic::CmpXChgVal(&m_stopping, true, false); - m_queueEvent.trigger(); + Printer::elog(E_TASK, "LuaMainTask at %p asked to stop", this); + exec(new LuaTaskStopper()); } void Balau::LuaMainTask::Do() { - while (!m_stopping) { - waitFor(&m_queueEvent); - + for (;;) { LuaExecCell * cell; - while ((cell = m_queue.pop(false))) - createTask(new LuaTask(L.thread(), cell), this); - - yield(); + Printer::elog(E_TASK, "LuaMainTask at %p tries to pop an ExecCell", this); + while ((cell = m_queue.pop())) { + Printer::elog(E_TASK, "LuaMainTask at %p popped %p", this, cell); + if (dynamic_cast<LuaTaskStopper *>(cell)) { + Printer::elog(E_TASK, "LuaMainTask at %p is stopping", this); + delete cell; + return; + } + TaskMan::registerTask(new LuaTask(L.thread(), cell), this); + } } } @@ -37,9 +69,13 @@ void Balau::LuaExecCell::exec(LuaMainTask * mainTask) { Task::prepare(&m_event); mainTask->exec(this); if (!m_detached) - Task::yield(&m_event); + Task::operationYield(&m_event); } void Balau::LuaExecString::run(Lua & L) { L.load(m_str); } + +void Balau::LuaExecFile::run(Lua & L) { + L.load(m_file); +} diff --git a/src/Main.cc b/src/Main.cc index fd4021f..32b7cf8 100644 --- a/src/Main.cc +++ b/src/Main.cc @@ -60,7 +60,7 @@ int Balau::Main::bootstrap(int _argc, char ** _argv) { try { m_status = RUNNING; - createTask(new MainTask()); + TaskMan::registerTask(new MainTask()); r = TaskMan::getDefaultTaskMan()->mainLoop(); m_status = STOPPING; } diff --git a/src/Output.cc b/src/Output.cc index a4f6275..dd0a588 100644 --- a/src/Output.cc +++ b/src/Output.cc @@ -56,7 +56,7 @@ Balau::Output::Output(const char * fname, bool truncate) throw (GeneralException cbResults_t cbResults; eio_req * r = eio_open(fname, O_WRONLY | O_CREAT | (truncate ? O_TRUNC : 0), 0755, 0, eioDone, &cbResults); EAssert(r != NULL, "eio_open returned a NULL eio_req"); - Task::yield(&cbResults.evt); + Task::operationYield(&cbResults.evt); if (cbResults.result < 0) { char str[4096]; if (cbResults.errorno == ENOENT) { @@ -71,7 +71,7 @@ Balau::Output::Output(const char * fname, bool truncate) throw (GeneralException cbStatsResults_t cbStatsResults; r = eio_fstat(m_fd, 0, eioStatsDone, &cbStatsResults); EAssert(r != NULL, "eio_fstat returned a NULL eio_req"); - Task::yield(&cbStatsResults.evt); + Task::operationYield(&cbStatsResults.evt); if (cbStatsResults.result == 0) { m_size = cbStatsResults.statdata.st_size; m_mtime = cbStatsResults.statdata.st_mtime; @@ -85,7 +85,7 @@ void Balau::Output::close() throw (GeneralException) { eio_req * r = eio_close(m_fd, 0, eioDone, &cbResults); EAssert(r != NULL, "eio_close returned a NULL eio_req"); m_fd = -1; - Task::yield(&cbResults.evt); + Task::operationYield(&cbResults.evt); if (cbResults.result < 0) { char str[4096]; strerror_r(cbResults.errorno, str, sizeof(str)); @@ -99,7 +99,7 @@ ssize_t Balau::Output::write(const void * buf, size_t count) throw (GeneralExcep cbResults_t cbResults; eio_req * r = eio_write(m_fd, const_cast<void *>(buf), count, getWOffset(), 0, eioDone, &cbResults); EAssert(r != NULL, "eio_write returned a NULL eio_req"); - Task::yield(&cbResults.evt); + Task::operationYield(&cbResults.evt); if (cbResults.result > 0) { wseek(cbResults.result, SEEK_CUR); } else { diff --git a/src/SimpleMustache.cc b/src/SimpleMustache.cc index 92fefeb..d2db99f 100644 --- a/src/SimpleMustache.cc +++ b/src/SimpleMustache.cc @@ -328,9 +328,9 @@ void Balau::SimpleMustache::setTemplate(IO<Handle> _h) { m_fragments.push_back(curFragment); } -Balau::SimpleMustache::Fragments::iterator Balau::SimpleMustache::checkTemplate_r(Fragments::iterator begin, const String & endSection) { - Fragments::iterator cur; - Fragments::iterator end = m_fragments.end(); +Balau::SimpleMustache::Fragments::const_iterator Balau::SimpleMustache::checkTemplate_r(Fragments::const_iterator begin, const String & endSection) const { + Fragments::const_iterator cur; + Fragments::const_iterator end = m_fragments.end(); for (cur = begin; cur != end; cur++) { Fragment * fr = *cur; @@ -346,9 +346,9 @@ Balau::SimpleMustache::Fragments::iterator Balau::SimpleMustache::checkTemplate_ return end; } -Balau::SimpleMustache::Fragments::iterator Balau::SimpleMustache::render_r(IO<Handle> h, Context * ctx, const String & endSection, Fragments::iterator begin, bool noWrite, int forceIdx) { - Fragments::iterator cur; - Fragments::iterator end = m_fragments.end(); +Balau::SimpleMustache::Fragments::const_iterator Balau::SimpleMustache::render_r(IO<Handle> h, Context * ctx, const String & endSection, Fragments::const_iterator begin, bool noWrite, int forceIdx) const { + Fragments::const_iterator cur; + Fragments::const_iterator end = m_fragments.end(); if (endSection.strlen() != 0) { int depth = 0; diff --git a/src/Socket.cc b/src/Socket.cc index 148d0a4..7511bde 100644 --- a/src/Socket.cc +++ b/src/Socket.cc @@ -14,6 +14,26 @@ #include "Main.h" #include "Atomic.h" +#ifndef _WIN32 +namespace { + +class SigpipeBlocker : public Balau::AtStart { + public: + SigpipeBlocker() : AtStart(5) { } + virtual void doStart() { + struct sigaction new_actn, old_actn; + new_actn.sa_handler = SIG_IGN; + sigemptyset(&new_actn.sa_mask); + new_actn.sa_flags = 0; + sigaction(SIGPIPE, &new_actn, &old_actn); + } +}; + +static SigpipeBlocker sigpipeBlocker; + +}; +#endif + void Balau::Socket::SocketEvent::gotOwner(Task * task) { Printer::elog(E_SOCKET, "Arming SocketEvent at %p", this); if (!m_task) { @@ -172,9 +192,8 @@ static const char * inet_ntop(int af, const void * src, char * dst, socklen_t si #endif -#if 0 -// TODO: use getaddrinfo_a, if available. -#else +namespace { + class ResolverThread : public Balau::GlobalThread { public: ResolverThread() : GlobalThread(8), m_stopping(false) { } @@ -186,6 +205,8 @@ class ResolverThread : public Balau::GlobalThread { volatile bool m_stopping; }; +}; + void ResolverThread::threadExit() { m_stopping = true; DNSRequest req; @@ -222,13 +243,12 @@ static DNSRequest resolveName(const char * name, const char * service = NULL, st Balau::Printer::elog(Balau::E_SOCKET, "Sending a request to the resolver thread"); Balau::Task::prepare(&evt); resolverThread.pushRequest(&req); - Balau::Task::yield(&evt); + Balau::Task::operationYield(&evt); Balau::Atomic::MemoryFence(); return req; } -#endif Balau::Socket::Socket() throw (GeneralException) : m_fd(socket(AF_INET6, SOCK_STREAM, 0)) { m_name = "Socket(unconnected)"; @@ -457,7 +477,7 @@ bool Balau::Socket::connect(const char * hostname, int port) { IAssert(spins == 0, "We shouldn't have spinned..."); } - Task::yield(m_evtW, true); + Task::operationYield(m_evtW, Task::INTERRUPTIBLE); // if we're still here, it means the parent task doesn't want to be thrown an exception IAssert(m_evtW->gotSignal(), "We shouldn't have been awoken without getting our event signalled"); @@ -516,7 +536,7 @@ Balau::IO<Balau::Socket> Balau::Socket::accept() throw (GeneralException) { if (s < 0) { if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) { - Task::yield(m_evtR, true); + Task::operationYield(m_evtR, Task::INTERRUPTIBLE); } else { String msg = getErrorMessage(); throw GeneralException(String("Unexpected error accepting a connection: #") + errno + "(" + msg + ")"); @@ -548,7 +568,7 @@ ssize_t Balau::Socket::read(void * buf, size_t count) throw (GeneralException) { } if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) { - Task::yield(m_evtR, true); + Task::operationYield(m_evtR, Task::INTERRUPTIBLE); } else { m_evtR->stop(); return r; @@ -575,8 +595,15 @@ ssize_t Balau::Socket::write(const void * buf, size_t count) throw (GeneralExcep if (r > 0) return r; +#ifndef _WIN32 + if (errno == EPIPE) { + close(); + return 0; + } +#endif + if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) { - Task::yield(m_evtW, true); + Task::operationYield(m_evtW, Task::INTERRUPTIBLE); } else { m_evtW->stop(); return r; diff --git a/src/Task.cc b/src/Task.cc index aba29b8..4465203 100644 --- a/src/Task.cc +++ b/src/Task.cc @@ -6,10 +6,7 @@ static Balau::LocalTmpl<Balau::Task> localTask; -Balau::Task::Task() { - m_status = STARTING; - m_okayToEAgain = false; - +Balau::Task::Task() : m_status(STARTING), m_okayToEAgain(false), m_stackless(false) { Printer::elog(E_TASK, "Created a Task at %p", this); } @@ -22,16 +19,21 @@ bool Balau::Task::needsStacks() { } void Balau::Task::setup(TaskMan * taskMan, void * stack) { - size_t size = stackSize(); + if (m_stackless) { + IAssert(!stack, "Since we're stackless, no stack should've been allocated."); + m_stack = NULL; + } else { + size_t size = stackSize(); #ifndef _WIN32 - IAssert(stack, "Can't setup a coroutine without a stack"); - m_stack = stack; - coro_create(&m_ctx, coroutineTrampoline, this, m_stack, size); + IAssert(stack, "Can't setup a coroutine without a stack"); + m_stack = stack; + coro_create(&m_ctx, coroutineTrampoline, this, m_stack, size); #else - Assert(!stack, "We shouldn't allocate stacks with Fibers"); - m_stack = NULL; - m_fiber = CreateFiber(size, coroutineTrampoline, this); + Assert(!stack, "We shouldn't allocate stacks with Fibers"); + m_stack = NULL; + m_fiber = CreateFiber(size, coroutineTrampoline, this); #endif + } m_taskMan = taskMan; @@ -54,7 +56,7 @@ void Balau::Task::coroutineTrampoline(void * arg) { void Balau::Task::coroutine() { try { - IAssert(m_status == STARTING, "The Task at %p was badly initialized ? m_status = %i", this, m_status); + IAssert((m_status == STARTING) || (m_stackless && (m_status == RUNNING)), "The Task at %p has a bad status ? m_status = %s, stackless = %s", this, StatusToString(m_status), m_stackless ? "true" : "false"); m_status = RUNNING; Do(); m_status = STOPPED; @@ -66,6 +68,12 @@ void Balau::Task::coroutine() { catch (TestException & e) { m_status = STOPPED; Printer::log(M_ERROR, "Unit test failed: %s", e.getMsg()); + const char * details = e.getDetails(); + if (details) + Printer::log(M_ERROR, " %s", details); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_ERROR, "%s", str.to_charp()); TaskMan::stop(-1); } catch (RessourceException & e) { @@ -74,8 +82,25 @@ void Balau::Task::coroutine() { const char * details = e.getDetails(); if (details) Printer::log(M_ERROR, " %s", details); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_DEBUG, "%s", str.to_charp()); TaskMan::stop(-1); } + catch (TaskSwitch & e) { + if (!m_stackless) { + Printer::log(M_ERROR, "Task %s at %p isn't stackless, but still caused a task switch.", getName(), this); + const char * details = e.getDetails(); + if (details) + Printer::log(M_ERROR, " %s", details); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_DEBUG, "%s", str.to_charp()); + m_status = FAULTED; + } else { + Printer::elog(E_TASK, "Stackless task %s at %p is task-switching.", getName(), this); + } + } catch (GeneralException & e) { Printer::log(M_WARNING, "Task %s at %p caused an exception: `%s' - stopping.", getName(), this, e.getMsg()); const char * details = e.getDetails(); @@ -90,39 +115,50 @@ void Balau::Task::coroutine() { Printer::log(M_WARNING, "Task %s at %p caused an unknown exception - stopping.", getName(), this); m_status = FAULTED; } + if (!m_stackless) { #ifndef _WIN32 - coro_transfer(&m_ctx, &m_taskMan->m_returnContext); + coro_transfer(&m_ctx, &m_taskMan->m_returnContext); #else - SwitchToFiber(m_taskMan->m_fiber); + SwitchToFiber(m_taskMan->m_fiber); #endif + } } void Balau::Task::switchTo() { Printer::elog(E_TASK, "Switching to task %p - %s", this, getName()); - IAssert(m_status == YIELDED || m_status == IDLE || m_status == STARTING, "The task at %p isn't either yielded, idle or starting... ? m_status = %i", this, m_status); + IAssert(m_status == YIELDED || m_status == SLEEPING || m_status == STARTING, "The task at %p isn't either yielded, sleeping or starting... ? m_status = %s", this, StatusToString(m_status)); void * oldTLS = g_tlsManager->getTLS(); g_tlsManager->setTLS(m_tls); - if (m_status == YIELDED || m_status == IDLE) + if (m_status == YIELDED || m_status == SLEEPING) m_status = RUNNING; + if (m_stackless) { + coroutine(); + } else { #ifndef _WIN32 - coro_transfer(&m_taskMan->m_returnContext, &m_ctx); + coro_transfer(&m_taskMan->m_returnContext, &m_ctx); #else - SwitchToFiber(m_fiber); + SwitchToFiber(m_fiber); #endif + } g_tlsManager->setTLS(oldTLS); - if (m_status == RUNNING) - m_status = IDLE; + IAssert(m_status != RUNNING, "Task %s at %p is still running... ?", getName(), this); } -void Balau::Task::yield(bool changeStatus) { +void Balau::Task::yield(bool stillRunning) throw (GeneralException) { Printer::elog(E_TASK, "Task %p - %s yielding", this, getName()); - if (changeStatus) + if (stillRunning) m_status = YIELDED; + else + m_status = SLEEPING; + if (m_stackless) { + throw EAgain(NULL); + } else { #ifndef _WIN32 - coro_transfer(&m_ctx, &m_taskMan->m_returnContext); + coro_transfer(&m_ctx, &m_taskMan->m_returnContext); #else - SwitchToFiber(m_taskMan->m_fiber); + SwitchToFiber(m_taskMan->m_fiber); #endif + } } Balau::Task * Balau::Task::getCurrentTask() { @@ -149,7 +185,15 @@ void Balau::Events::BaseEvent::doSignal() { } } -Balau::Events::TaskEvent::TaskEvent(Task * taskWaited) : m_taskWaited(taskWaited), m_ack(false), m_distant(false) { +Balau::Events::TaskEvent::TaskEvent(Task * taskWaited) : m_taskWaited(NULL), m_ack(false), m_distant(false) { + if (taskWaited) + attachToTask(taskWaited); +} + +void Balau::Events::TaskEvent::attachToTask(Task * taskWaited) { + AAssert(!m_taskWaited, "You can't attach a TaskEvent twice."); + m_ack = false; + m_taskWaited = taskWaited; ScopeLock lock(m_taskWaited->m_eventLock); m_taskWaited->m_waitedBy.push_back(this); } @@ -157,7 +201,8 @@ Balau::Events::TaskEvent::TaskEvent(Task * taskWaited) : m_taskWaited(taskWaited void Balau::Events::TaskEvent::signal() { if (m_distant) m_evt.send(); - doSignal(); + else + doSignal(); } void Balau::Events::TaskEvent::gotOwner(Task * task) { @@ -198,6 +243,7 @@ void Balau::Events::TaskEvent::ack() { IAssert(deleted, "We didn't find task %p in the waitedBy lists... ?", this); m_ack = true; reset(); + m_taskWaited = NULL; } void Balau::Events::Timeout::gotOwner(Task * task) { @@ -214,23 +260,77 @@ void Balau::Events::Custom::gotOwner(Task * task) { m_loop = task->getLoop(); } -void Balau::Task::yield(Events::BaseEvent * evt, bool interruptible) throw (GeneralException) { +void Balau::Task::operationYield(Events::BaseEvent * evt, enum OperationYieldType yieldType) throw (GeneralException) { Task * t = getCurrentTask(); if (evt) t->waitFor(evt); + + if (t->m_stackless) { + AAssert(yieldType != SIMPLE, "You can't run simple operations from a stackless task."); + } + + if (yieldType == STACKLESS) { + AAssert(t->m_okayToEAgain, "You can't run a stackless operation from a non-okay-to-eagain task."); + } + bool gotSignal; do { t->yield(evt == NULL); - Printer::elog(E_TASK, "operation back from yielding; interruptible = %s; okayToEAgain = %s", interruptible ? "true" : "false", t->m_okayToEAgain ? "true" : "false"); + static const char * YieldTypeToString[] = { + "SIMPLE", + "INTERRUPTIBLE", + "STACKLESS", + }; + Printer::elog(E_TASK, "operation back from yielding; yieldType = %s; okayToEAgain = %s", YieldTypeToString[yieldType], t->m_okayToEAgain ? "true" : "false"); gotSignal = evt ? evt->gotSignal() : true; - } while ((!interruptible || !t->m_okayToEAgain) && !gotSignal); + } while (((yieldType == SIMPLE) || !t->m_okayToEAgain) && !gotSignal); if (!evt) return; - if (interruptible && t->m_okayToEAgain && !evt->gotSignal()) { + if ((yieldType != SIMPLE) && t->m_okayToEAgain && !evt->gotSignal()) { Printer::elog(E_TASK, "operation is throwing an exception."); throw EAgain(evt); } } + +void Balau::QueueBase::iPush(void * t, Events::Async * event) { + ScopeLock sl(m_lock); + Cell * c = new Cell(t); + c->m_prev = m_back; + if (m_back) + m_back->m_next = c; + else + m_front = c; + m_back = c; + if (event) + event->trigger(); + else + pthread_cond_signal(&m_cond); +} + +void * Balau::QueueBase::iPop(Events::Async * event, bool wait) { + ScopeLock sl(m_lock); + while (!m_front && wait) { + if (event) { + Task::prepare(event); + m_lock.leave(); + Task::operationYield(event, Task::INTERRUPTIBLE); + m_lock.enter(); + } else { + pthread_cond_wait(&m_cond, &m_lock.m_lock); + } + } + Cell * c = m_front; + if (!c) + return NULL; + m_front = c->m_next; + if (m_front) + m_front->m_prev = NULL; + else + m_back = NULL; + void * t = c->m_elem; + delete c; + return t; +} diff --git a/src/TaskMan.cc b/src/TaskMan.cc index 49c326a..177cec1 100644 --- a/src/TaskMan.cc +++ b/src/TaskMan.cc @@ -101,7 +101,6 @@ void Balau::TaskScheduler::stopAll(int code) { m_taskManagers.pop(); altQueue.push(tm); tm->addToPending(new Stopper(code)); - tm->m_evt.send(); } while (!altQueue.empty()) { tm = altQueue.front(); @@ -130,7 +129,6 @@ void * Balau::TaskScheduler::proc() { m_lock.leave(); Printer::elog(E_TASK, "TaskScheduler popped task %s at %p; adding to TaskMan %p", t->getName(), t, tm); tm->addToPending(t); - tm->m_evt.send(); } Printer::elog(E_TASK, "TaskScheduler stopping."); return NULL; @@ -145,6 +143,16 @@ void asyncDummy(ev::async & w, int revents) { Balau::Printer::elog(Balau::E_TASK, "TaskMan is getting woken up..."); } +void Balau::TaskMan::stopMe(int code) { + Task * t = Task::getCurrentTask(); + if (t->getTaskMan() == this) { + m_stopped = true; + m_stopCode = code; + } else { + addToPending(new Stopper(code)); + } +} + Balau::TaskMan::TaskMan() { #ifndef _WIN32 coro_create(&m_returnContext, 0, 0, 0, 0); @@ -168,6 +176,8 @@ Balau::TaskMan::TaskMan() { } #ifdef _WIN32 +namespace { + class WinSocketStartup : public Balau::AtStart { public: WinSocketStartup() : AtStart(5) { } @@ -179,6 +189,8 @@ class WinSocketStartup : public Balau::AtStart { }; static WinSocketStartup wsa; + +}; #endif Balau::TaskMan * Balau::TaskMan::getDefaultTaskMan() { return localTaskMan.get(); } @@ -191,14 +203,16 @@ Balau::TaskMan::~TaskMan() { } s_scheduler.unregisterTaskMan(this); // probably way more work to do here in order to clean up tasks from that thread + m_evt.stop(); ev_loop_destroy(m_loop); } void * Balau::TaskMan::getStack() { + if (!Task::needsStacks()) + return NULL; void * r = NULL; if (m_nStacks == 0) { - if (Task::needsStacks()) - r = malloc(Task::stackSize()); + r = malloc(Task::stackSize()); } else { r = m_stacks.front(); m_stacks.pop(); @@ -230,30 +244,23 @@ int Balau::TaskMan::mainLoop() { s_async.setIdleReadyCallback(asyncIdleReady, this); do { - bool noWait = false; - Printer::elog(E_TASK, "TaskMan::mainLoop() at %p with m_tasks.size = %li", this, m_tasks.size()); - // checking "STARTING" tasks, and running them once; also try to build the status of the noWait boolean. + // checking "STARTING" tasks, and running them once while ((iH = starting.begin()) != starting.end()) { Task * t = *iH; IAssert(t->getStatus() == Task::STARTING, "Got task at %p in the starting list, but isn't starting.", t); t->switchTo(); IAssert(t->getStatus() != Task::STARTING, "Task at %p got switchedTo, but still is 'STARTING'.", t); starting.erase(iH); - if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) { - noWait = true; + if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) stopped.insert(t); - } - if (t->getStatus() == Task::YIELDED) { - noWait = true; + if (t->getStatus() == Task::YIELDED) yielded.insert(t); - } } // if we begin that loop with any pending task, just don't loop, so we can add them immediately. - if (!m_pendingAdd.isEmpty() || !yielded.empty()) - noWait = true; + bool noWait = !m_pendingAdd.isEmpty() || !yielded.empty() || !stopped.empty(); // libev's event "loop". We always runs it once though. m_allowedToSignal = true; @@ -261,7 +268,7 @@ int Balau::TaskMan::mainLoop() { ev_run(m_loop, noWait || m_stopped ? EVRUN_NOWAIT : EVRUN_ONCE); Printer::elog(E_TASK, "TaskMan at %p Getting out of libev main loop", this); - // calling async's idle loop here + // calling async's idle s_async.idle(); // let's check what task got stopped, and signal them @@ -276,7 +283,7 @@ int Balau::TaskMan::mainLoop() { // let's check who got signaled, and call them for (Task * t : m_signaledTasks) { Printer::elog(E_TASK, "TaskMan at %p Switching to task %p (%s - %s) that got signaled somehow.", this, t, t->getName(), ClassName(t).c_str()); - IAssert(t->getStatus() == Task::IDLE || t->getStatus() == Task::YIELDED, "We're switching to a non-idle/yielded task at %p... ? status = %i", t, t->getStatus()); + IAssert(t->getStatus() == Task::SLEEPING || t->getStatus() == Task::YIELDED, "We're switching to a non-sleeping/yielded task at %p... ? status = %i", t, t->getStatus()); bool wasYielded = t->getStatus() == Task::YIELDED; t->switchTo(); if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) { @@ -313,7 +320,7 @@ int Balau::TaskMan::mainLoop() { Printer::elog(E_TASK, "TaskMan at %p popped task %p...", this, t); IAssert(m_tasks.find(t) == m_tasks.end(), "TaskMan got task %p twice... ?", t); ev_now_update(m_loop); - t->setup(this, getStack()); + t->setup(this, t->isStackless() ? NULL : getStack()); m_tasks.insert(t); starting.insert(t); } @@ -345,12 +352,14 @@ int Balau::TaskMan::mainLoop() { return m_stopCode; } -void Balau::TaskMan::registerTask(Balau::Task * t, Balau::Task * stick) { +void Balau::TaskMan::iRegisterTask(Balau::Task * t, Balau::Task * stick, Events::TaskEvent * event) { if (stick) { + IAssert(!event, "inconsistent"); TaskMan * tm = stick->getTaskMan(); tm->addToPending(t); - tm->m_evt.send(); } else { + if (event) + event->attachToTask(t); s_scheduler.registerTask(t); } } @@ -361,6 +370,7 @@ void Balau::TaskMan::registerAsyncOp(Balau::AsyncOperation * op) { void Balau::TaskMan::addToPending(Balau::Task * t) { m_pendingAdd.push(t); + m_evt.send(); } void Balau::TaskMan::signalTask(Task * t) { @@ -373,17 +383,51 @@ void Balau::TaskMan::stop(int code) { s_scheduler.stopAll(code); } -class ThreadedTaskMan : public Balau::Thread { - virtual void * proc() { +void * Balau::TaskMan::TaskManThread::proc() { + bool success = false; + m_taskMan = NULL; + try { m_taskMan = new Balau::TaskMan(); m_taskMan->mainLoop(); - return NULL; + success = true; } - Balau::TaskMan * m_taskMan; -}; + catch (Exit e) { + Printer::log(M_ERROR, "We shouldn't have gotten an Exit exception here... exitting anyway"); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_ERROR, "%s", str.to_charp()); + } + catch (RessourceException e) { + Printer::log(M_ERROR | M_ALERT, "The TaskMan thread got a ressource problem: %s", e.getMsg()); + const char * details = e.getDetails(); + if (details) + Printer::log(M_ERROR, " %s", details); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_DEBUG, "%s", str.to_charp()); + } + catch (GeneralException e) { + Printer::log(M_ERROR | M_ALERT, "The TaskMan thread caused an exception: %s", e.getMsg()); + const char * details = e.getDetails(); + if (details) + Printer::log(M_ERROR, " %s", details); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_DEBUG, "%s", str.to_charp()); + } + catch (...) { + Printer::log(M_ERROR | M_ALERT, "The TaskMan thread caused an unknown exception"); + } + if (!success) { + if (m_taskMan) + delete m_taskMan; + m_taskMan = NULL; + TaskMan::stop(-1); + } + return NULL; +} -Balau::Thread * Balau::TaskMan::createThreadedTaskMan() { - Thread * r = new ThreadedTaskMan(); - r->threadStart(); - return r; +Balau::TaskMan::TaskManThread::~TaskManThread() { + if (m_taskMan) + delete m_taskMan; } diff --git a/src/Threads.cc b/src/Threads.cc index 9ecfff5..7b1b7bc 100644 --- a/src/Threads.cc +++ b/src/Threads.cc @@ -2,6 +2,7 @@ #include "Threads.h" #include "Local.h" #include "Atomic.h" +#include "TaskMan.h" namespace Balau { @@ -39,11 +40,47 @@ Balau::RWLock::RWLock() { } void * Balau::ThreadHelper::threadProc(void * arg) { - void * tls = Local::createTLS(); - g_tlsManager->setTLS(tls); - Balau::Thread * thread = reinterpret_cast<Balau::Thread *>(arg); - void * r = thread->proc(); - free(tls); + void * r = NULL; + bool success = false; + try { + void * tls = Local::createTLS(); + g_tlsManager->setTLS(tls); + Balau::Thread * thread = reinterpret_cast<Balau::Thread *>(arg); + r = thread->proc(); + free(tls); + success = true; + } + catch (Exit e) { + Printer::log(M_ERROR, "We shouldn't have gotten an Exit exception here... exitting anyway"); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_ERROR, "%s", str.to_charp()); + } + catch (RessourceException e) { + Printer::log(M_ERROR | M_ALERT, "The Thread got a ressource problem: %s", e.getMsg()); + const char * details = e.getDetails(); + if (details) + Printer::log(M_ERROR, " %s", details); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_DEBUG, "%s", str.to_charp()); + } + catch (GeneralException e) { + Printer::log(M_ERROR | M_ALERT, "The Thread caused an exception: %s", e.getMsg()); + const char * details = e.getDetails(); + if (details) + Printer::log(M_ERROR, " %s", details); + auto trace = e.getTrace(); + for (String & str : trace) + Printer::log(M_DEBUG, "%s", str.to_charp()); + } + catch (...) { + Printer::log(M_ERROR | M_ALERT, "The Thread caused an unknown exception"); + } + + if (!success) + TaskMan::stop(-1); + return r; } diff --git a/src/ZHandle.cc b/src/ZHandle.cc index e2f22f0..4f216c7 100644 --- a/src/ZHandle.cc +++ b/src/ZHandle.cc @@ -79,7 +79,7 @@ ssize_t Balau::ZStream::read(void * buf, size_t count) throw (GeneralException) if (r <= 0) return readTotal; } - Task::yield(nullptr); + Task::operationYield(); int r = inflate(&m_zin, Z_SYNC_FLUSH); size_t didRead = count - m_zin.avail_out; readTotal += didRead; @@ -108,7 +108,7 @@ ssize_t Balau::ZStream::write(const void * buf, size_t count) throw (GeneralExce m_zout.avail_out = BLOCK_SIZE; int r = deflate(&m_zout, Z_NO_FLUSH); EAssert(r == Z_OK, "deflate() didn't return Z_OK but %i", r); - Task::yield(nullptr); + Task::operationYield(); size_t compressed = BLOCK_SIZE - m_zout.avail_out; if (compressed) { size_t w = m_h->forceWrite(obuf, compressed); |