diff options
-rw-r--r-- | includes/Task.h | 11 | ||||
-rw-r--r-- | src/Handle.cc | 6 | ||||
-rw-r--r-- | src/HttpServer.cc | 2 | ||||
-rw-r--r-- | src/Input.cc | 8 | ||||
-rw-r--r-- | src/LuaTask.cc | 2 | ||||
-rw-r--r-- | src/Output.cc | 8 | ||||
-rw-r--r-- | src/Socket.cc | 10 | ||||
-rw-r--r-- | src/Task.cc | 24 | ||||
-rw-r--r-- | src/ZHandle.cc | 4 | ||||
-rw-r--r-- | tests/test-Tasks.cc | 4 |
10 files changed, 49 insertions, 30 deletions
diff --git a/includes/Task.h b/includes/Task.h index 27ba4b1..9347dc4 100644 --- a/includes/Task.h +++ b/includes/Task.h @@ -151,12 +151,21 @@ class Task { Task * t = getCurrentTask(); t->waitFor(evt); } - static void yield(Events::BaseEvent * evt, bool interruptible = false) throw (GeneralException); + enum OperationYieldType { + SIMPLE, + INTERRUPTIBLE, + STACKLESS, + }; + static void operationYield(Events::BaseEvent * evt = NULL, enum OperationYieldType yieldType = SIMPLE) throw (GeneralException); TaskMan * getTaskMan() const { return m_taskMan; } struct ev_loop * getLoop(); bool isStackless() { return m_stackless; } protected: void yield(bool stillRunning = false) throw (GeneralException); + void yield(Events::BaseEvent * evt) { + waitFor(evt); + yield(); + } virtual void Do() = 0; void waitFor(Events::BaseEvent * event); bool setOkayToEAgain(bool enable) { diff --git a/src/Handle.cc b/src/Handle.cc index dacd30d..9bcb419 100644 --- a/src/Handle.cc +++ b/src/Handle.cc @@ -97,7 +97,7 @@ ssize_t Balau::Handle::forceRead(void * _buf, size_t count, Events::BaseEvent * catch (EAgain e) { if (evt && evt->gotSignal()) return total; - Task::yield(e.getEvent()); + Task::operationYield(e.getEvent()); continue; } if (r < 0) @@ -124,7 +124,7 @@ ssize_t Balau::Handle::forceWrite(const void * _buf, size_t count, Events::BaseE catch (EAgain e) { if (evt && evt->gotSignal()) return total; - Task::yield(e.getEvent()); + Task::operationYield(e.getEvent()); continue; } if (r < 0) @@ -242,7 +242,7 @@ int Balau::FileSystem::mkdir(const char * path) throw (GeneralException) { cbResults_t cbResults; eio_req * r = eio_mkdir(path, 0755, 0, eioDone, &cbResults); EAssert(r != NULL, "eio_mkdir returned a NULL eio_req"); - Task::yield(&cbResults.evt); + Task::operationYield(&cbResults.evt); char str[4096]; if (cbResults.result < 0) diff --git a/src/HttpServer.cc b/src/HttpServer.cc index 69d58cd..751b01a 100644 --- a/src/HttpServer.cc +++ b/src/HttpServer.cc @@ -607,7 +607,7 @@ void Balau::HttpServer::stop() { Task::prepare(&event); listener->stop(); m_started = false; - Task::yield(&event); + Task::operationYield(&event); IAssert(event.gotSignal(), "HttpServer::stop didn't actually get the listener to stop"); event.ack(); } diff --git a/src/Input.cc b/src/Input.cc index 14aed91..46b9b94 100644 --- a/src/Input.cc +++ b/src/Input.cc @@ -56,7 +56,7 @@ Balau::Input::Input(const char * fname) throw (GeneralException) : m_fd(-1), m_s cbResults_t cbResults; eio_req * r = eio_open(fname, O_RDONLY, 0, 0, eioDone, &cbResults); EAssert(r != NULL, "eio_open returned a NULL eio_req"); - Task::yield(&cbResults.evt); + Task::operationYield(&cbResults.evt); if (cbResults.result < 0) { char str[4096]; if (cbResults.errorno == ENOENT) { @@ -71,7 +71,7 @@ Balau::Input::Input(const char * fname) throw (GeneralException) : m_fd(-1), m_s cbStatsResults_t cbStatsResults; r = eio_fstat(m_fd, 0, eioStatsDone, &cbStatsResults); EAssert(r != NULL, "eio_fstat returned a NULL eio_req"); - Task::yield(&cbStatsResults.evt); + Task::operationYield(&cbStatsResults.evt); if (cbStatsResults.result == 0) { m_size = cbStatsResults.statdata.st_size; m_mtime = cbStatsResults.statdata.st_mtime; @@ -85,7 +85,7 @@ void Balau::Input::close() throw (GeneralException) { eio_req * r = eio_close(m_fd, 0, eioDone, &cbResults); EAssert(r != NULL, "eio_close returned a NULL eio_req"); m_fd = -1; - Task::yield(&cbResults.evt); + Task::operationYield(&cbResults.evt); if (cbResults.result < 0) { char str[4096]; strerror_r(cbResults.errorno, str, sizeof(str)); @@ -99,7 +99,7 @@ ssize_t Balau::Input::read(void * buf, size_t count) throw (GeneralException) { cbResults_t cbResults; eio_req * r = eio_read(m_fd, buf, count, getROffset(), 0, eioDone, &cbResults); EAssert(r != NULL, "eio_read returned a NULL eio_req"); - Task::yield(&cbResults.evt); + Task::operationYield(&cbResults.evt); if (cbResults.result > 0) { rseek(cbResults.result, SEEK_CUR); } else { diff --git a/src/LuaTask.cc b/src/LuaTask.cc index 687c81a..b023f45 100644 --- a/src/LuaTask.cc +++ b/src/LuaTask.cc @@ -69,7 +69,7 @@ void Balau::LuaExecCell::exec(LuaMainTask * mainTask) { Task::prepare(&m_event); mainTask->exec(this); if (!m_detached) - Task::yield(&m_event); + Task::operationYield(&m_event); } void Balau::LuaExecString::run(Lua & L) { diff --git a/src/Output.cc b/src/Output.cc index 6de404e..c585ce5 100644 --- a/src/Output.cc +++ b/src/Output.cc @@ -56,7 +56,7 @@ Balau::Output::Output(const char * fname, bool truncate) throw (GeneralException cbResults_t cbResults; eio_req * r = eio_open(fname, O_WRONLY | O_CREAT | (truncate ? O_TRUNC : 0), 0755, 0, eioDone, &cbResults); EAssert(r != NULL, "eio_open returned a NULL eio_req"); - Task::yield(&cbResults.evt); + Task::operationYield(&cbResults.evt); if (cbResults.result < 0) { char str[4096]; if (cbResults.errorno == ENOENT) { @@ -71,7 +71,7 @@ Balau::Output::Output(const char * fname, bool truncate) throw (GeneralException cbStatsResults_t cbStatsResults; r = eio_fstat(m_fd, 0, eioStatsDone, &cbStatsResults); EAssert(r != NULL, "eio_fstat returned a NULL eio_req"); - Task::yield(&cbStatsResults.evt); + Task::operationYield(&cbStatsResults.evt); if (cbStatsResults.result == 0) { m_size = cbStatsResults.statdata.st_size; m_mtime = cbStatsResults.statdata.st_mtime; @@ -85,7 +85,7 @@ void Balau::Output::close() throw (GeneralException) { eio_req * r = eio_close(m_fd, 0, eioDone, &cbResults); EAssert(r != NULL, "eio_close returned a NULL eio_req"); m_fd = -1; - Task::yield(&cbResults.evt); + Task::operationYield(&cbResults.evt); if (cbResults.result < 0) { char str[4096]; strerror_r(cbResults.errorno, str, sizeof(str)); @@ -99,7 +99,7 @@ ssize_t Balau::Output::write(const void * buf, size_t count) throw (GeneralExcep cbResults_t cbResults; eio_req * r = eio_write(m_fd, const_cast<void *>(buf), count, getWOffset(), 0, eioDone, &cbResults); EAssert(r != NULL, "eio_write returned a NULL eio_req"); - Task::yield(&cbResults.evt); + Task::operationYield(&cbResults.evt); if (cbResults.result > 0) { wseek(cbResults.result, SEEK_CUR); } else { diff --git a/src/Socket.cc b/src/Socket.cc index d78c6e5..100b94e 100644 --- a/src/Socket.cc +++ b/src/Socket.cc @@ -243,7 +243,7 @@ static DNSRequest resolveName(const char * name, const char * service = NULL, st Balau::Printer::elog(Balau::E_SOCKET, "Sending a request to the resolver thread"); Balau::Task::prepare(&evt); resolverThread.pushRequest(&req); - Balau::Task::yield(&evt); + Balau::Task::operationYield(&evt); Balau::Atomic::MemoryFence(); @@ -476,7 +476,7 @@ bool Balau::Socket::connect(const char * hostname, int port) { IAssert(spins == 0, "We shouldn't have spinned..."); } - Task::yield(m_evtW, true); + Task::operationYield(m_evtW, Task::INTERRUPTIBLE); // if we're still here, it means the parent task doesn't want to be thrown an exception IAssert(m_evtW->gotSignal(), "We shouldn't have been awoken without getting our event signalled"); @@ -535,7 +535,7 @@ Balau::IO<Balau::Socket> Balau::Socket::accept() throw (GeneralException) { if (s < 0) { if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) { - Task::yield(m_evtR, true); + Task::operationYield(m_evtR, Task::INTERRUPTIBLE); } else { String msg = getErrorMessage(); throw GeneralException(String("Unexpected error accepting a connection: #") + errno + "(" + msg + ")"); @@ -567,7 +567,7 @@ ssize_t Balau::Socket::read(void * buf, size_t count) throw (GeneralException) { } if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) { - Task::yield(m_evtR, true); + Task::operationYield(m_evtR, Task::INTERRUPTIBLE); } else { m_evtR->stop(); return r; @@ -602,7 +602,7 @@ ssize_t Balau::Socket::write(const void * buf, size_t count) throw (GeneralExcep #endif if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) { - Task::yield(m_evtW, true); + Task::operationYield(m_evtW, Task::INTERRUPTIBLE); } else { m_evtW->stop(); return r; diff --git a/src/Task.cc b/src/Task.cc index f01bb18..df8eee2 100644 --- a/src/Task.cc +++ b/src/Task.cc @@ -260,27 +260,36 @@ void Balau::Events::Custom::gotOwner(Task * task) { m_loop = task->getLoop(); } -void Balau::Task::yield(Events::BaseEvent * evt, bool interruptible) throw (GeneralException) { +void Balau::Task::operationYield(Events::BaseEvent * evt, enum OperationYieldType yieldType) throw (GeneralException) { Task * t = getCurrentTask(); if (evt) t->waitFor(evt); if (t->m_stackless) { - AAssert(interruptible, "You can't run non-interruptible operations from a stackless task."); + AAssert(yieldType != SIMPLE, "You can't run simple operations from a stackless task."); + } + + if (yieldType == STACKLESS) { + AAssert(t->m_okayToEAgain, "You can't run a stackless operation from a non-okay-to-eagain task."); } bool gotSignal; do { t->yield(evt == NULL); - Printer::elog(E_TASK, "operation back from yielding; interruptible = %s; okayToEAgain = %s", interruptible ? "true" : "false", t->m_okayToEAgain ? "true" : "false"); + static const char * YieldTypeToString[] = { + "SIMPLE", + "INTERRUPTIBLE", + "STACKLESS", + }; + Printer::elog(E_TASK, "operation back from yielding; yieldType = %s; okayToEAgain = %s", YieldTypeToString[yieldType], t->m_okayToEAgain ? "true" : "false"); gotSignal = evt ? evt->gotSignal() : true; - } while ((!interruptible || !t->m_okayToEAgain) && !gotSignal); + } while (((yieldType == SIMPLE) || !t->m_okayToEAgain) && !gotSignal); if (!evt) return; - if (interruptible && t->m_okayToEAgain && !evt->gotSignal()) { + if ((yieldType != SIMPLE) && t->m_okayToEAgain && !evt->gotSignal()) { Printer::elog(E_TASK, "operation is throwing an exception."); throw EAgain(evt); } @@ -302,12 +311,12 @@ void Balau::QueueBase::iPush(void * t, Events::Async * event) { } void * Balau::QueueBase::iPop(Events::Async * event) { - ScopeLock sl(m_lock); + m_lock.enter(); while (!m_front) { if (event) { Task::prepare(event); m_lock.leave(); - Task::yield(event); + Task::operationYield(event, Task::INTERRUPTIBLE); m_lock.enter(); } else { pthread_cond_wait(&m_cond, &m_lock.m_lock); @@ -323,5 +332,6 @@ void * Balau::QueueBase::iPop(Events::Async * event) { m_back = NULL; void * t = c->m_elem; delete c; + m_lock.leave(); return t; } diff --git a/src/ZHandle.cc b/src/ZHandle.cc index 7ab0133..3a14d7d 100644 --- a/src/ZHandle.cc +++ b/src/ZHandle.cc @@ -79,7 +79,7 @@ ssize_t Balau::ZStream::read(void * buf, size_t count) throw (GeneralException) if (r <= 0) return readTotal; } - Task::yield(nullptr); + Task::operationYield(); int r = inflate(&m_zin, Z_SYNC_FLUSH); size_t didRead = count - m_zin.avail_out; readTotal += didRead; @@ -108,7 +108,7 @@ ssize_t Balau::ZStream::write(const void * buf, size_t count) throw (GeneralExce m_zout.avail_out = BLOCK_SIZE; int r = deflate(&m_zout, Z_NO_FLUSH); EAssert(r == Z_OK, "deflate() didn't return Z_OK but %i", r); - Task::yield(nullptr); + Task::operationYield(); size_t compressed = BLOCK_SIZE - m_zout.avail_out; if (compressed) { size_t w = m_h->forceWrite(obuf, compressed); diff --git a/tests/test-Tasks.cc b/tests/test-Tasks.cc index 159b0f6..6bbb503 100644 --- a/tests/test-Tasks.cc +++ b/tests/test-Tasks.cc @@ -27,7 +27,7 @@ class TestOperation { void Do() { if (m_count++ == 0) { m_timeout.set(0.2); - Task::yield(&m_timeout, true); + Task::operationYield(&m_timeout, Task::STACKLESS); } TAssert(m_timeout.gotSignal()); m_completed = true; @@ -56,7 +56,7 @@ class TestStackless : public StacklessTask { static void yieldingFunction() { Events::Timeout timeout(0.2); - Task::yield(&timeout); + Task::operationYield(&timeout); TAssert(timeout.gotSignal()); } |