summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Async.cc6
-rw-r--r--src/Handle.cc6
-rw-r--r--src/HttpServer.cc12
-rw-r--r--src/Input.cc8
-rw-r--r--src/LuaTask.cc58
-rw-r--r--src/Main.cc2
-rw-r--r--src/Output.cc8
-rw-r--r--src/SimpleMustache.cc12
-rw-r--r--src/Socket.cc45
-rw-r--r--src/Task.cc160
-rw-r--r--src/TaskMan.cc102
-rw-r--r--src/Threads.cc47
-rw-r--r--src/ZHandle.cc4
13 files changed, 362 insertions, 108 deletions
diff --git a/src/Async.cc b/src/Async.cc
index 829c677..18c8ef1 100644
--- a/src/Async.cc
+++ b/src/Async.cc
@@ -28,7 +28,7 @@ void Balau::AsyncManager::queueOp(AsyncOperation * op) {
TLS * tls = getTLS();
Printer::elog(E_ASYNC, "Queuing operation at %p", op);
if (op->needsSynchronousCallback()) {
- Printer::elog(E_ASYNC, "Operation at %p needs synchronous callback, copying values; idleQueue = %p; idleReadyCallback = %p; idleReadyParam = %p", &tls->idleQueue, tls->idleReadyCallback, tls->idleReadyParam);
+ Printer::elog(E_ASYNC, "Operation at %p needs synchronous callback, copying values; idleQueue = %p; idleReadyCallback = %p; idleReadyParam = %p", op, &tls->idleQueue, tls->idleReadyCallback, tls->idleReadyParam);
op->m_idleQueue = &tls->idleQueue;
op->m_idleReadyCallback = tls->idleReadyCallback;
op->m_idleReadyParam = tls->idleReadyParam;
@@ -98,11 +98,13 @@ void * Balau::AsyncManager::proc() {
}
stopAllWorkers();
+ Printer::elog(E_ASYNC, "Async thread waits for all idle queues to empty");
while (Atomic::Prefetch::Decrement(&m_numTLSes)) {
TLS * tls = m_TLSes.pop();
while (!tls->idleQueue.isEmpty());
}
+ Printer::elog(E_ASYNC, "Async thread stopping");
return NULL;
}
@@ -150,7 +152,7 @@ void Balau::AsyncManager::idle() {
while (!m_ready);
AsyncOperation * op;
TLS * tls = getTLS();
- while ((op = tls->idleQueue.pop(false))) {
+ while ((op = tls->idleQueue.pop())) {
Printer::elog(E_ASYNC, "AsyncManager::idle() is wrapping up operation %p", op);
op->done();
}
diff --git a/src/Handle.cc b/src/Handle.cc
index dacd30d..9bcb419 100644
--- a/src/Handle.cc
+++ b/src/Handle.cc
@@ -97,7 +97,7 @@ ssize_t Balau::Handle::forceRead(void * _buf, size_t count, Events::BaseEvent *
catch (EAgain e) {
if (evt && evt->gotSignal())
return total;
- Task::yield(e.getEvent());
+ Task::operationYield(e.getEvent());
continue;
}
if (r < 0)
@@ -124,7 +124,7 @@ ssize_t Balau::Handle::forceWrite(const void * _buf, size_t count, Events::BaseE
catch (EAgain e) {
if (evt && evt->gotSignal())
return total;
- Task::yield(e.getEvent());
+ Task::operationYield(e.getEvent());
continue;
}
if (r < 0)
@@ -242,7 +242,7 @@ int Balau::FileSystem::mkdir(const char * path) throw (GeneralException) {
cbResults_t cbResults;
eio_req * r = eio_mkdir(path, 0755, 0, eioDone, &cbResults);
EAssert(r != NULL, "eio_mkdir returned a NULL eio_req");
- Task::yield(&cbResults.evt);
+ Task::operationYield(&cbResults.evt);
char str[4096];
if (cbResults.result < 0)
diff --git a/src/HttpServer.cc b/src/HttpServer.cc
index c9cb0ee..23af486 100644
--- a/src/HttpServer.cc
+++ b/src/HttpServer.cc
@@ -35,6 +35,8 @@ class HttpWorker : public Task {
HttpWorker(IO<Handle> io, void * server);
~HttpWorker();
static void buildErrorTemplate(IO<Handle> h) { m_errorTemplate.setTemplate(h); }
+ template<size_t S>
+ static void buildErrorTemplate(const char (&str)[S]) { m_errorTemplate.setTemplate(str, S - 1); }
static void buildErrorTemplate(const char * str, ssize_t s) { m_errorTemplate.setTemplate(str, s); }
static void buildErrorTemplate(const String & str) { m_errorTemplate.setTemplate(str); }
private:
@@ -594,14 +596,20 @@ typedef Balau::Listener<Balau::HttpWorker> HttpListener;
void Balau::HttpServer::start() {
AAssert(!m_started, "Don't start an HttpServer twice");
- m_listenerPtr = createTask(new HttpListener(m_port, m_local.to_charp(), this));
+ m_listenerPtr = TaskMan::registerTask(new HttpListener(m_port, m_local.to_charp(), this));
m_started = true;
}
void Balau::HttpServer::stop() {
AAssert(m_started, "Don't stop an HttpServer that hasn't been started");
- reinterpret_cast<HttpListener *>(m_listenerPtr)->stop();
+ HttpListener * listener = reinterpret_cast<HttpListener *>(m_listenerPtr);
+ Events::TaskEvent event(listener);
+ Task::prepare(&event);
+ listener->stop();
m_started = false;
+ Task::operationYield(&event);
+ IAssert(event.gotSignal(), "HttpServer::stop didn't actually get the listener to stop");
+ event.ack();
}
void Balau::HttpServer::registerAction(Action * action) {
diff --git a/src/Input.cc b/src/Input.cc
index 8168231..3608cd0 100644
--- a/src/Input.cc
+++ b/src/Input.cc
@@ -56,7 +56,7 @@ Balau::Input::Input(const char * fname) throw (GeneralException) {
cbResults_t cbResults;
eio_req * r = eio_open(fname, O_RDONLY, 0, 0, eioDone, &cbResults);
EAssert(r != NULL, "eio_open returned a NULL eio_req");
- Task::yield(&cbResults.evt);
+ Task::operationYield(&cbResults.evt);
if (cbResults.result < 0) {
char str[4096];
if (cbResults.errorno == ENOENT) {
@@ -71,7 +71,7 @@ Balau::Input::Input(const char * fname) throw (GeneralException) {
cbStatsResults_t cbStatsResults;
r = eio_fstat(m_fd, 0, eioStatsDone, &cbStatsResults);
EAssert(r != NULL, "eio_fstat returned a NULL eio_req");
- Task::yield(&cbStatsResults.evt);
+ Task::operationYield(&cbStatsResults.evt);
if (cbStatsResults.result == 0) {
m_size = cbStatsResults.statdata.st_size;
m_mtime = cbStatsResults.statdata.st_mtime;
@@ -85,7 +85,7 @@ void Balau::Input::close() throw (GeneralException) {
eio_req * r = eio_close(m_fd, 0, eioDone, &cbResults);
EAssert(r != NULL, "eio_close returned a NULL eio_req");
m_fd = -1;
- Task::yield(&cbResults.evt);
+ Task::operationYield(&cbResults.evt);
if (cbResults.result < 0) {
char str[4096];
strerror_r(cbResults.errorno, str, sizeof(str));
@@ -99,7 +99,7 @@ ssize_t Balau::Input::read(void * buf, size_t count) throw (GeneralException) {
cbResults_t cbResults;
eio_req * r = eio_read(m_fd, buf, count, getROffset(), 0, eioDone, &cbResults);
EAssert(r != NULL, "eio_read returned a NULL eio_req");
- Task::yield(&cbResults.evt);
+ Task::operationYield(&cbResults.evt);
if (cbResults.result > 0) {
rseek(cbResults.result, SEEK_CUR);
} else {
diff --git a/src/LuaTask.cc b/src/LuaTask.cc
index d722847..7e0b256 100644
--- a/src/LuaTask.cc
+++ b/src/LuaTask.cc
@@ -1,26 +1,58 @@
#include "LuaTask.h"
#include "Main.h"
#include "TaskMan.h"
+#include "Printer.h"
+
+namespace {
+
+class LuaTaskStopper : public Balau::LuaExecCell {
+ virtual void run(Balau::Lua &) { }
+};
+
+};
+
+Balau::LuaExecCell::LuaExecCell() {
+ Printer::elog(E_TASK, "LuaExecCell created at %p", this);
+}
+
+Balau::LuaMainTask::LuaMainTask() {
+ Printer::elog(E_TASK, "LuaMainTask created at %p", this);
+ L.open_base();
+ L.open_table();
+ L.open_string();
+ L.open_math();
+ L.open_debug();
+ L.open_bit();
+ L.open_jit();
+}
+
+Balau::LuaMainTask::~LuaMainTask() {
+ L.close();
+}
void Balau::LuaMainTask::exec(LuaExecCell * cell) {
+ Printer::elog(E_TASK, "LuaMainTask at %p is asked to queue Cell %p", this, cell);
m_queue.push(cell);
- m_queueEvent.trigger();
}
void Balau::LuaMainTask::stop() {
- Atomic::CmpXChgVal(&m_stopping, true, false);
- m_queueEvent.trigger();
+ Printer::elog(E_TASK, "LuaMainTask at %p asked to stop", this);
+ exec(new LuaTaskStopper());
}
void Balau::LuaMainTask::Do() {
- while (!m_stopping) {
- waitFor(&m_queueEvent);
-
+ for (;;) {
LuaExecCell * cell;
- while ((cell = m_queue.pop(false)))
- createTask(new LuaTask(L.thread(), cell), this);
-
- yield();
+ Printer::elog(E_TASK, "LuaMainTask at %p tries to pop an ExecCell", this);
+ while ((cell = m_queue.pop())) {
+ Printer::elog(E_TASK, "LuaMainTask at %p popped %p", this, cell);
+ if (dynamic_cast<LuaTaskStopper *>(cell)) {
+ Printer::elog(E_TASK, "LuaMainTask at %p is stopping", this);
+ delete cell;
+ return;
+ }
+ TaskMan::registerTask(new LuaTask(L.thread(), cell), this);
+ }
}
}
@@ -37,9 +69,13 @@ void Balau::LuaExecCell::exec(LuaMainTask * mainTask) {
Task::prepare(&m_event);
mainTask->exec(this);
if (!m_detached)
- Task::yield(&m_event);
+ Task::operationYield(&m_event);
}
void Balau::LuaExecString::run(Lua & L) {
L.load(m_str);
}
+
+void Balau::LuaExecFile::run(Lua & L) {
+ L.load(m_file);
+}
diff --git a/src/Main.cc b/src/Main.cc
index fd4021f..32b7cf8 100644
--- a/src/Main.cc
+++ b/src/Main.cc
@@ -60,7 +60,7 @@ int Balau::Main::bootstrap(int _argc, char ** _argv) {
try {
m_status = RUNNING;
- createTask(new MainTask());
+ TaskMan::registerTask(new MainTask());
r = TaskMan::getDefaultTaskMan()->mainLoop();
m_status = STOPPING;
}
diff --git a/src/Output.cc b/src/Output.cc
index a4f6275..dd0a588 100644
--- a/src/Output.cc
+++ b/src/Output.cc
@@ -56,7 +56,7 @@ Balau::Output::Output(const char * fname, bool truncate) throw (GeneralException
cbResults_t cbResults;
eio_req * r = eio_open(fname, O_WRONLY | O_CREAT | (truncate ? O_TRUNC : 0), 0755, 0, eioDone, &cbResults);
EAssert(r != NULL, "eio_open returned a NULL eio_req");
- Task::yield(&cbResults.evt);
+ Task::operationYield(&cbResults.evt);
if (cbResults.result < 0) {
char str[4096];
if (cbResults.errorno == ENOENT) {
@@ -71,7 +71,7 @@ Balau::Output::Output(const char * fname, bool truncate) throw (GeneralException
cbStatsResults_t cbStatsResults;
r = eio_fstat(m_fd, 0, eioStatsDone, &cbStatsResults);
EAssert(r != NULL, "eio_fstat returned a NULL eio_req");
- Task::yield(&cbStatsResults.evt);
+ Task::operationYield(&cbStatsResults.evt);
if (cbStatsResults.result == 0) {
m_size = cbStatsResults.statdata.st_size;
m_mtime = cbStatsResults.statdata.st_mtime;
@@ -85,7 +85,7 @@ void Balau::Output::close() throw (GeneralException) {
eio_req * r = eio_close(m_fd, 0, eioDone, &cbResults);
EAssert(r != NULL, "eio_close returned a NULL eio_req");
m_fd = -1;
- Task::yield(&cbResults.evt);
+ Task::operationYield(&cbResults.evt);
if (cbResults.result < 0) {
char str[4096];
strerror_r(cbResults.errorno, str, sizeof(str));
@@ -99,7 +99,7 @@ ssize_t Balau::Output::write(const void * buf, size_t count) throw (GeneralExcep
cbResults_t cbResults;
eio_req * r = eio_write(m_fd, const_cast<void *>(buf), count, getWOffset(), 0, eioDone, &cbResults);
EAssert(r != NULL, "eio_write returned a NULL eio_req");
- Task::yield(&cbResults.evt);
+ Task::operationYield(&cbResults.evt);
if (cbResults.result > 0) {
wseek(cbResults.result, SEEK_CUR);
} else {
diff --git a/src/SimpleMustache.cc b/src/SimpleMustache.cc
index 92fefeb..d2db99f 100644
--- a/src/SimpleMustache.cc
+++ b/src/SimpleMustache.cc
@@ -328,9 +328,9 @@ void Balau::SimpleMustache::setTemplate(IO<Handle> _h) {
m_fragments.push_back(curFragment);
}
-Balau::SimpleMustache::Fragments::iterator Balau::SimpleMustache::checkTemplate_r(Fragments::iterator begin, const String & endSection) {
- Fragments::iterator cur;
- Fragments::iterator end = m_fragments.end();
+Balau::SimpleMustache::Fragments::const_iterator Balau::SimpleMustache::checkTemplate_r(Fragments::const_iterator begin, const String & endSection) const {
+ Fragments::const_iterator cur;
+ Fragments::const_iterator end = m_fragments.end();
for (cur = begin; cur != end; cur++) {
Fragment * fr = *cur;
@@ -346,9 +346,9 @@ Balau::SimpleMustache::Fragments::iterator Balau::SimpleMustache::checkTemplate_
return end;
}
-Balau::SimpleMustache::Fragments::iterator Balau::SimpleMustache::render_r(IO<Handle> h, Context * ctx, const String & endSection, Fragments::iterator begin, bool noWrite, int forceIdx) {
- Fragments::iterator cur;
- Fragments::iterator end = m_fragments.end();
+Balau::SimpleMustache::Fragments::const_iterator Balau::SimpleMustache::render_r(IO<Handle> h, Context * ctx, const String & endSection, Fragments::const_iterator begin, bool noWrite, int forceIdx) const {
+ Fragments::const_iterator cur;
+ Fragments::const_iterator end = m_fragments.end();
if (endSection.strlen() != 0) {
int depth = 0;
diff --git a/src/Socket.cc b/src/Socket.cc
index 148d0a4..7511bde 100644
--- a/src/Socket.cc
+++ b/src/Socket.cc
@@ -14,6 +14,26 @@
#include "Main.h"
#include "Atomic.h"
+#ifndef _WIN32
+namespace {
+
+class SigpipeBlocker : public Balau::AtStart {
+ public:
+ SigpipeBlocker() : AtStart(5) { }
+ virtual void doStart() {
+ struct sigaction new_actn, old_actn;
+ new_actn.sa_handler = SIG_IGN;
+ sigemptyset(&new_actn.sa_mask);
+ new_actn.sa_flags = 0;
+ sigaction(SIGPIPE, &new_actn, &old_actn);
+ }
+};
+
+static SigpipeBlocker sigpipeBlocker;
+
+};
+#endif
+
void Balau::Socket::SocketEvent::gotOwner(Task * task) {
Printer::elog(E_SOCKET, "Arming SocketEvent at %p", this);
if (!m_task) {
@@ -172,9 +192,8 @@ static const char * inet_ntop(int af, const void * src, char * dst, socklen_t si
#endif
-#if 0
-// TODO: use getaddrinfo_a, if available.
-#else
+namespace {
+
class ResolverThread : public Balau::GlobalThread {
public:
ResolverThread() : GlobalThread(8), m_stopping(false) { }
@@ -186,6 +205,8 @@ class ResolverThread : public Balau::GlobalThread {
volatile bool m_stopping;
};
+};
+
void ResolverThread::threadExit() {
m_stopping = true;
DNSRequest req;
@@ -222,13 +243,12 @@ static DNSRequest resolveName(const char * name, const char * service = NULL, st
Balau::Printer::elog(Balau::E_SOCKET, "Sending a request to the resolver thread");
Balau::Task::prepare(&evt);
resolverThread.pushRequest(&req);
- Balau::Task::yield(&evt);
+ Balau::Task::operationYield(&evt);
Balau::Atomic::MemoryFence();
return req;
}
-#endif
Balau::Socket::Socket() throw (GeneralException) : m_fd(socket(AF_INET6, SOCK_STREAM, 0)) {
m_name = "Socket(unconnected)";
@@ -457,7 +477,7 @@ bool Balau::Socket::connect(const char * hostname, int port) {
IAssert(spins == 0, "We shouldn't have spinned...");
}
- Task::yield(m_evtW, true);
+ Task::operationYield(m_evtW, Task::INTERRUPTIBLE);
// if we're still here, it means the parent task doesn't want to be thrown an exception
IAssert(m_evtW->gotSignal(), "We shouldn't have been awoken without getting our event signalled");
@@ -516,7 +536,7 @@ Balau::IO<Balau::Socket> Balau::Socket::accept() throw (GeneralException) {
if (s < 0) {
if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) {
- Task::yield(m_evtR, true);
+ Task::operationYield(m_evtR, Task::INTERRUPTIBLE);
} else {
String msg = getErrorMessage();
throw GeneralException(String("Unexpected error accepting a connection: #") + errno + "(" + msg + ")");
@@ -548,7 +568,7 @@ ssize_t Balau::Socket::read(void * buf, size_t count) throw (GeneralException) {
}
if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) {
- Task::yield(m_evtR, true);
+ Task::operationYield(m_evtR, Task::INTERRUPTIBLE);
} else {
m_evtR->stop();
return r;
@@ -575,8 +595,15 @@ ssize_t Balau::Socket::write(const void * buf, size_t count) throw (GeneralExcep
if (r > 0)
return r;
+#ifndef _WIN32
+ if (errno == EPIPE) {
+ close();
+ return 0;
+ }
+#endif
+
if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) {
- Task::yield(m_evtW, true);
+ Task::operationYield(m_evtW, Task::INTERRUPTIBLE);
} else {
m_evtW->stop();
return r;
diff --git a/src/Task.cc b/src/Task.cc
index aba29b8..4465203 100644
--- a/src/Task.cc
+++ b/src/Task.cc
@@ -6,10 +6,7 @@
static Balau::LocalTmpl<Balau::Task> localTask;
-Balau::Task::Task() {
- m_status = STARTING;
- m_okayToEAgain = false;
-
+Balau::Task::Task() : m_status(STARTING), m_okayToEAgain(false), m_stackless(false) {
Printer::elog(E_TASK, "Created a Task at %p", this);
}
@@ -22,16 +19,21 @@ bool Balau::Task::needsStacks() {
}
void Balau::Task::setup(TaskMan * taskMan, void * stack) {
- size_t size = stackSize();
+ if (m_stackless) {
+ IAssert(!stack, "Since we're stackless, no stack should've been allocated.");
+ m_stack = NULL;
+ } else {
+ size_t size = stackSize();
#ifndef _WIN32
- IAssert(stack, "Can't setup a coroutine without a stack");
- m_stack = stack;
- coro_create(&m_ctx, coroutineTrampoline, this, m_stack, size);
+ IAssert(stack, "Can't setup a coroutine without a stack");
+ m_stack = stack;
+ coro_create(&m_ctx, coroutineTrampoline, this, m_stack, size);
#else
- Assert(!stack, "We shouldn't allocate stacks with Fibers");
- m_stack = NULL;
- m_fiber = CreateFiber(size, coroutineTrampoline, this);
+ Assert(!stack, "We shouldn't allocate stacks with Fibers");
+ m_stack = NULL;
+ m_fiber = CreateFiber(size, coroutineTrampoline, this);
#endif
+ }
m_taskMan = taskMan;
@@ -54,7 +56,7 @@ void Balau::Task::coroutineTrampoline(void * arg) {
void Balau::Task::coroutine() {
try {
- IAssert(m_status == STARTING, "The Task at %p was badly initialized ? m_status = %i", this, m_status);
+ IAssert((m_status == STARTING) || (m_stackless && (m_status == RUNNING)), "The Task at %p has a bad status ? m_status = %s, stackless = %s", this, StatusToString(m_status), m_stackless ? "true" : "false");
m_status = RUNNING;
Do();
m_status = STOPPED;
@@ -66,6 +68,12 @@ void Balau::Task::coroutine() {
catch (TestException & e) {
m_status = STOPPED;
Printer::log(M_ERROR, "Unit test failed: %s", e.getMsg());
+ const char * details = e.getDetails();
+ if (details)
+ Printer::log(M_ERROR, " %s", details);
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_ERROR, "%s", str.to_charp());
TaskMan::stop(-1);
}
catch (RessourceException & e) {
@@ -74,8 +82,25 @@ void Balau::Task::coroutine() {
const char * details = e.getDetails();
if (details)
Printer::log(M_ERROR, " %s", details);
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_DEBUG, "%s", str.to_charp());
TaskMan::stop(-1);
}
+ catch (TaskSwitch & e) {
+ if (!m_stackless) {
+ Printer::log(M_ERROR, "Task %s at %p isn't stackless, but still caused a task switch.", getName(), this);
+ const char * details = e.getDetails();
+ if (details)
+ Printer::log(M_ERROR, " %s", details);
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_DEBUG, "%s", str.to_charp());
+ m_status = FAULTED;
+ } else {
+ Printer::elog(E_TASK, "Stackless task %s at %p is task-switching.", getName(), this);
+ }
+ }
catch (GeneralException & e) {
Printer::log(M_WARNING, "Task %s at %p caused an exception: `%s' - stopping.", getName(), this, e.getMsg());
const char * details = e.getDetails();
@@ -90,39 +115,50 @@ void Balau::Task::coroutine() {
Printer::log(M_WARNING, "Task %s at %p caused an unknown exception - stopping.", getName(), this);
m_status = FAULTED;
}
+ if (!m_stackless) {
#ifndef _WIN32
- coro_transfer(&m_ctx, &m_taskMan->m_returnContext);
+ coro_transfer(&m_ctx, &m_taskMan->m_returnContext);
#else
- SwitchToFiber(m_taskMan->m_fiber);
+ SwitchToFiber(m_taskMan->m_fiber);
#endif
+ }
}
void Balau::Task::switchTo() {
Printer::elog(E_TASK, "Switching to task %p - %s", this, getName());
- IAssert(m_status == YIELDED || m_status == IDLE || m_status == STARTING, "The task at %p isn't either yielded, idle or starting... ? m_status = %i", this, m_status);
+ IAssert(m_status == YIELDED || m_status == SLEEPING || m_status == STARTING, "The task at %p isn't either yielded, sleeping or starting... ? m_status = %s", this, StatusToString(m_status));
void * oldTLS = g_tlsManager->getTLS();
g_tlsManager->setTLS(m_tls);
- if (m_status == YIELDED || m_status == IDLE)
+ if (m_status == YIELDED || m_status == SLEEPING)
m_status = RUNNING;
+ if (m_stackless) {
+ coroutine();
+ } else {
#ifndef _WIN32
- coro_transfer(&m_taskMan->m_returnContext, &m_ctx);
+ coro_transfer(&m_taskMan->m_returnContext, &m_ctx);
#else
- SwitchToFiber(m_fiber);
+ SwitchToFiber(m_fiber);
#endif
+ }
g_tlsManager->setTLS(oldTLS);
- if (m_status == RUNNING)
- m_status = IDLE;
+ IAssert(m_status != RUNNING, "Task %s at %p is still running... ?", getName(), this);
}
-void Balau::Task::yield(bool changeStatus) {
+void Balau::Task::yield(bool stillRunning) throw (GeneralException) {
Printer::elog(E_TASK, "Task %p - %s yielding", this, getName());
- if (changeStatus)
+ if (stillRunning)
m_status = YIELDED;
+ else
+ m_status = SLEEPING;
+ if (m_stackless) {
+ throw EAgain(NULL);
+ } else {
#ifndef _WIN32
- coro_transfer(&m_ctx, &m_taskMan->m_returnContext);
+ coro_transfer(&m_ctx, &m_taskMan->m_returnContext);
#else
- SwitchToFiber(m_taskMan->m_fiber);
+ SwitchToFiber(m_taskMan->m_fiber);
#endif
+ }
}
Balau::Task * Balau::Task::getCurrentTask() {
@@ -149,7 +185,15 @@ void Balau::Events::BaseEvent::doSignal() {
}
}
-Balau::Events::TaskEvent::TaskEvent(Task * taskWaited) : m_taskWaited(taskWaited), m_ack(false), m_distant(false) {
+Balau::Events::TaskEvent::TaskEvent(Task * taskWaited) : m_taskWaited(NULL), m_ack(false), m_distant(false) {
+ if (taskWaited)
+ attachToTask(taskWaited);
+}
+
+void Balau::Events::TaskEvent::attachToTask(Task * taskWaited) {
+ AAssert(!m_taskWaited, "You can't attach a TaskEvent twice.");
+ m_ack = false;
+ m_taskWaited = taskWaited;
ScopeLock lock(m_taskWaited->m_eventLock);
m_taskWaited->m_waitedBy.push_back(this);
}
@@ -157,7 +201,8 @@ Balau::Events::TaskEvent::TaskEvent(Task * taskWaited) : m_taskWaited(taskWaited
void Balau::Events::TaskEvent::signal() {
if (m_distant)
m_evt.send();
- doSignal();
+ else
+ doSignal();
}
void Balau::Events::TaskEvent::gotOwner(Task * task) {
@@ -198,6 +243,7 @@ void Balau::Events::TaskEvent::ack() {
IAssert(deleted, "We didn't find task %p in the waitedBy lists... ?", this);
m_ack = true;
reset();
+ m_taskWaited = NULL;
}
void Balau::Events::Timeout::gotOwner(Task * task) {
@@ -214,23 +260,77 @@ void Balau::Events::Custom::gotOwner(Task * task) {
m_loop = task->getLoop();
}
-void Balau::Task::yield(Events::BaseEvent * evt, bool interruptible) throw (GeneralException) {
+void Balau::Task::operationYield(Events::BaseEvent * evt, enum OperationYieldType yieldType) throw (GeneralException) {
Task * t = getCurrentTask();
if (evt)
t->waitFor(evt);
+
+ if (t->m_stackless) {
+ AAssert(yieldType != SIMPLE, "You can't run simple operations from a stackless task.");
+ }
+
+ if (yieldType == STACKLESS) {
+ AAssert(t->m_okayToEAgain, "You can't run a stackless operation from a non-okay-to-eagain task.");
+ }
+
bool gotSignal;
do {
t->yield(evt == NULL);
- Printer::elog(E_TASK, "operation back from yielding; interruptible = %s; okayToEAgain = %s", interruptible ? "true" : "false", t->m_okayToEAgain ? "true" : "false");
+ static const char * YieldTypeToString[] = {
+ "SIMPLE",
+ "INTERRUPTIBLE",
+ "STACKLESS",
+ };
+ Printer::elog(E_TASK, "operation back from yielding; yieldType = %s; okayToEAgain = %s", YieldTypeToString[yieldType], t->m_okayToEAgain ? "true" : "false");
gotSignal = evt ? evt->gotSignal() : true;
- } while ((!interruptible || !t->m_okayToEAgain) && !gotSignal);
+ } while (((yieldType == SIMPLE) || !t->m_okayToEAgain) && !gotSignal);
if (!evt)
return;
- if (interruptible && t->m_okayToEAgain && !evt->gotSignal()) {
+ if ((yieldType != SIMPLE) && t->m_okayToEAgain && !evt->gotSignal()) {
Printer::elog(E_TASK, "operation is throwing an exception.");
throw EAgain(evt);
}
}
+
+void Balau::QueueBase::iPush(void * t, Events::Async * event) {
+ ScopeLock sl(m_lock);
+ Cell * c = new Cell(t);
+ c->m_prev = m_back;
+ if (m_back)
+ m_back->m_next = c;
+ else
+ m_front = c;
+ m_back = c;
+ if (event)
+ event->trigger();
+ else
+ pthread_cond_signal(&m_cond);
+}
+
+void * Balau::QueueBase::iPop(Events::Async * event, bool wait) {
+ ScopeLock sl(m_lock);
+ while (!m_front && wait) {
+ if (event) {
+ Task::prepare(event);
+ m_lock.leave();
+ Task::operationYield(event, Task::INTERRUPTIBLE);
+ m_lock.enter();
+ } else {
+ pthread_cond_wait(&m_cond, &m_lock.m_lock);
+ }
+ }
+ Cell * c = m_front;
+ if (!c)
+ return NULL;
+ m_front = c->m_next;
+ if (m_front)
+ m_front->m_prev = NULL;
+ else
+ m_back = NULL;
+ void * t = c->m_elem;
+ delete c;
+ return t;
+}
diff --git a/src/TaskMan.cc b/src/TaskMan.cc
index 49c326a..177cec1 100644
--- a/src/TaskMan.cc
+++ b/src/TaskMan.cc
@@ -101,7 +101,6 @@ void Balau::TaskScheduler::stopAll(int code) {
m_taskManagers.pop();
altQueue.push(tm);
tm->addToPending(new Stopper(code));
- tm->m_evt.send();
}
while (!altQueue.empty()) {
tm = altQueue.front();
@@ -130,7 +129,6 @@ void * Balau::TaskScheduler::proc() {
m_lock.leave();
Printer::elog(E_TASK, "TaskScheduler popped task %s at %p; adding to TaskMan %p", t->getName(), t, tm);
tm->addToPending(t);
- tm->m_evt.send();
}
Printer::elog(E_TASK, "TaskScheduler stopping.");
return NULL;
@@ -145,6 +143,16 @@ void asyncDummy(ev::async & w, int revents) {
Balau::Printer::elog(Balau::E_TASK, "TaskMan is getting woken up...");
}
+void Balau::TaskMan::stopMe(int code) {
+ Task * t = Task::getCurrentTask();
+ if (t->getTaskMan() == this) {
+ m_stopped = true;
+ m_stopCode = code;
+ } else {
+ addToPending(new Stopper(code));
+ }
+}
+
Balau::TaskMan::TaskMan() {
#ifndef _WIN32
coro_create(&m_returnContext, 0, 0, 0, 0);
@@ -168,6 +176,8 @@ Balau::TaskMan::TaskMan() {
}
#ifdef _WIN32
+namespace {
+
class WinSocketStartup : public Balau::AtStart {
public:
WinSocketStartup() : AtStart(5) { }
@@ -179,6 +189,8 @@ class WinSocketStartup : public Balau::AtStart {
};
static WinSocketStartup wsa;
+
+};
#endif
Balau::TaskMan * Balau::TaskMan::getDefaultTaskMan() { return localTaskMan.get(); }
@@ -191,14 +203,16 @@ Balau::TaskMan::~TaskMan() {
}
s_scheduler.unregisterTaskMan(this);
// probably way more work to do here in order to clean up tasks from that thread
+ m_evt.stop();
ev_loop_destroy(m_loop);
}
void * Balau::TaskMan::getStack() {
+ if (!Task::needsStacks())
+ return NULL;
void * r = NULL;
if (m_nStacks == 0) {
- if (Task::needsStacks())
- r = malloc(Task::stackSize());
+ r = malloc(Task::stackSize());
} else {
r = m_stacks.front();
m_stacks.pop();
@@ -230,30 +244,23 @@ int Balau::TaskMan::mainLoop() {
s_async.setIdleReadyCallback(asyncIdleReady, this);
do {
- bool noWait = false;
-
Printer::elog(E_TASK, "TaskMan::mainLoop() at %p with m_tasks.size = %li", this, m_tasks.size());
- // checking "STARTING" tasks, and running them once; also try to build the status of the noWait boolean.
+ // checking "STARTING" tasks, and running them once
while ((iH = starting.begin()) != starting.end()) {
Task * t = *iH;
IAssert(t->getStatus() == Task::STARTING, "Got task at %p in the starting list, but isn't starting.", t);
t->switchTo();
IAssert(t->getStatus() != Task::STARTING, "Task at %p got switchedTo, but still is 'STARTING'.", t);
starting.erase(iH);
- if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) {
- noWait = true;
+ if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED))
stopped.insert(t);
- }
- if (t->getStatus() == Task::YIELDED) {
- noWait = true;
+ if (t->getStatus() == Task::YIELDED)
yielded.insert(t);
- }
}
// if we begin that loop with any pending task, just don't loop, so we can add them immediately.
- if (!m_pendingAdd.isEmpty() || !yielded.empty())
- noWait = true;
+ bool noWait = !m_pendingAdd.isEmpty() || !yielded.empty() || !stopped.empty();
// libev's event "loop". We always runs it once though.
m_allowedToSignal = true;
@@ -261,7 +268,7 @@ int Balau::TaskMan::mainLoop() {
ev_run(m_loop, noWait || m_stopped ? EVRUN_NOWAIT : EVRUN_ONCE);
Printer::elog(E_TASK, "TaskMan at %p Getting out of libev main loop", this);
- // calling async's idle loop here
+ // calling async's idle
s_async.idle();
// let's check what task got stopped, and signal them
@@ -276,7 +283,7 @@ int Balau::TaskMan::mainLoop() {
// let's check who got signaled, and call them
for (Task * t : m_signaledTasks) {
Printer::elog(E_TASK, "TaskMan at %p Switching to task %p (%s - %s) that got signaled somehow.", this, t, t->getName(), ClassName(t).c_str());
- IAssert(t->getStatus() == Task::IDLE || t->getStatus() == Task::YIELDED, "We're switching to a non-idle/yielded task at %p... ? status = %i", t, t->getStatus());
+ IAssert(t->getStatus() == Task::SLEEPING || t->getStatus() == Task::YIELDED, "We're switching to a non-sleeping/yielded task at %p... ? status = %i", t, t->getStatus());
bool wasYielded = t->getStatus() == Task::YIELDED;
t->switchTo();
if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) {
@@ -313,7 +320,7 @@ int Balau::TaskMan::mainLoop() {
Printer::elog(E_TASK, "TaskMan at %p popped task %p...", this, t);
IAssert(m_tasks.find(t) == m_tasks.end(), "TaskMan got task %p twice... ?", t);
ev_now_update(m_loop);
- t->setup(this, getStack());
+ t->setup(this, t->isStackless() ? NULL : getStack());
m_tasks.insert(t);
starting.insert(t);
}
@@ -345,12 +352,14 @@ int Balau::TaskMan::mainLoop() {
return m_stopCode;
}
-void Balau::TaskMan::registerTask(Balau::Task * t, Balau::Task * stick) {
+void Balau::TaskMan::iRegisterTask(Balau::Task * t, Balau::Task * stick, Events::TaskEvent * event) {
if (stick) {
+ IAssert(!event, "inconsistent");
TaskMan * tm = stick->getTaskMan();
tm->addToPending(t);
- tm->m_evt.send();
} else {
+ if (event)
+ event->attachToTask(t);
s_scheduler.registerTask(t);
}
}
@@ -361,6 +370,7 @@ void Balau::TaskMan::registerAsyncOp(Balau::AsyncOperation * op) {
void Balau::TaskMan::addToPending(Balau::Task * t) {
m_pendingAdd.push(t);
+ m_evt.send();
}
void Balau::TaskMan::signalTask(Task * t) {
@@ -373,17 +383,51 @@ void Balau::TaskMan::stop(int code) {
s_scheduler.stopAll(code);
}
-class ThreadedTaskMan : public Balau::Thread {
- virtual void * proc() {
+void * Balau::TaskMan::TaskManThread::proc() {
+ bool success = false;
+ m_taskMan = NULL;
+ try {
m_taskMan = new Balau::TaskMan();
m_taskMan->mainLoop();
- return NULL;
+ success = true;
}
- Balau::TaskMan * m_taskMan;
-};
+ catch (Exit e) {
+ Printer::log(M_ERROR, "We shouldn't have gotten an Exit exception here... exitting anyway");
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_ERROR, "%s", str.to_charp());
+ }
+ catch (RessourceException e) {
+ Printer::log(M_ERROR | M_ALERT, "The TaskMan thread got a ressource problem: %s", e.getMsg());
+ const char * details = e.getDetails();
+ if (details)
+ Printer::log(M_ERROR, " %s", details);
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_DEBUG, "%s", str.to_charp());
+ }
+ catch (GeneralException e) {
+ Printer::log(M_ERROR | M_ALERT, "The TaskMan thread caused an exception: %s", e.getMsg());
+ const char * details = e.getDetails();
+ if (details)
+ Printer::log(M_ERROR, " %s", details);
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_DEBUG, "%s", str.to_charp());
+ }
+ catch (...) {
+ Printer::log(M_ERROR | M_ALERT, "The TaskMan thread caused an unknown exception");
+ }
+ if (!success) {
+ if (m_taskMan)
+ delete m_taskMan;
+ m_taskMan = NULL;
+ TaskMan::stop(-1);
+ }
+ return NULL;
+}
-Balau::Thread * Balau::TaskMan::createThreadedTaskMan() {
- Thread * r = new ThreadedTaskMan();
- r->threadStart();
- return r;
+Balau::TaskMan::TaskManThread::~TaskManThread() {
+ if (m_taskMan)
+ delete m_taskMan;
}
diff --git a/src/Threads.cc b/src/Threads.cc
index 9ecfff5..7b1b7bc 100644
--- a/src/Threads.cc
+++ b/src/Threads.cc
@@ -2,6 +2,7 @@
#include "Threads.h"
#include "Local.h"
#include "Atomic.h"
+#include "TaskMan.h"
namespace Balau {
@@ -39,11 +40,47 @@ Balau::RWLock::RWLock() {
}
void * Balau::ThreadHelper::threadProc(void * arg) {
- void * tls = Local::createTLS();
- g_tlsManager->setTLS(tls);
- Balau::Thread * thread = reinterpret_cast<Balau::Thread *>(arg);
- void * r = thread->proc();
- free(tls);
+ void * r = NULL;
+ bool success = false;
+ try {
+ void * tls = Local::createTLS();
+ g_tlsManager->setTLS(tls);
+ Balau::Thread * thread = reinterpret_cast<Balau::Thread *>(arg);
+ r = thread->proc();
+ free(tls);
+ success = true;
+ }
+ catch (Exit e) {
+ Printer::log(M_ERROR, "We shouldn't have gotten an Exit exception here... exitting anyway");
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_ERROR, "%s", str.to_charp());
+ }
+ catch (RessourceException e) {
+ Printer::log(M_ERROR | M_ALERT, "The Thread got a ressource problem: %s", e.getMsg());
+ const char * details = e.getDetails();
+ if (details)
+ Printer::log(M_ERROR, " %s", details);
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_DEBUG, "%s", str.to_charp());
+ }
+ catch (GeneralException e) {
+ Printer::log(M_ERROR | M_ALERT, "The Thread caused an exception: %s", e.getMsg());
+ const char * details = e.getDetails();
+ if (details)
+ Printer::log(M_ERROR, " %s", details);
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_DEBUG, "%s", str.to_charp());
+ }
+ catch (...) {
+ Printer::log(M_ERROR | M_ALERT, "The Thread caused an unknown exception");
+ }
+
+ if (!success)
+ TaskMan::stop(-1);
+
return r;
}
diff --git a/src/ZHandle.cc b/src/ZHandle.cc
index e2f22f0..4f216c7 100644
--- a/src/ZHandle.cc
+++ b/src/ZHandle.cc
@@ -79,7 +79,7 @@ ssize_t Balau::ZStream::read(void * buf, size_t count) throw (GeneralException)
if (r <= 0)
return readTotal;
}
- Task::yield(nullptr);
+ Task::operationYield();
int r = inflate(&m_zin, Z_SYNC_FLUSH);
size_t didRead = count - m_zin.avail_out;
readTotal += didRead;
@@ -108,7 +108,7 @@ ssize_t Balau::ZStream::write(const void * buf, size_t count) throw (GeneralExce
m_zout.avail_out = BLOCK_SIZE;
int r = deflate(&m_zout, Z_NO_FLUSH);
EAssert(r == Z_OK, "deflate() didn't return Z_OK but %i", r);
- Task::yield(nullptr);
+ Task::operationYield();
size_t compressed = BLOCK_SIZE - m_zout.avail_out;
if (compressed) {
size_t w = m_h->forceWrite(obuf, compressed);