summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--includes/Task.h11
-rw-r--r--src/Handle.cc6
-rw-r--r--src/HttpServer.cc2
-rw-r--r--src/Input.cc8
-rw-r--r--src/LuaTask.cc2
-rw-r--r--src/Output.cc8
-rw-r--r--src/Socket.cc10
-rw-r--r--src/Task.cc24
-rw-r--r--src/ZHandle.cc4
-rw-r--r--tests/test-Tasks.cc4
10 files changed, 49 insertions, 30 deletions
diff --git a/includes/Task.h b/includes/Task.h
index 27ba4b1..9347dc4 100644
--- a/includes/Task.h
+++ b/includes/Task.h
@@ -151,12 +151,21 @@ class Task {
Task * t = getCurrentTask();
t->waitFor(evt);
}
- static void yield(Events::BaseEvent * evt, bool interruptible = false) throw (GeneralException);
+ enum OperationYieldType {
+ SIMPLE,
+ INTERRUPTIBLE,
+ STACKLESS,
+ };
+ static void operationYield(Events::BaseEvent * evt = NULL, enum OperationYieldType yieldType = SIMPLE) throw (GeneralException);
TaskMan * getTaskMan() const { return m_taskMan; }
struct ev_loop * getLoop();
bool isStackless() { return m_stackless; }
protected:
void yield(bool stillRunning = false) throw (GeneralException);
+ void yield(Events::BaseEvent * evt) {
+ waitFor(evt);
+ yield();
+ }
virtual void Do() = 0;
void waitFor(Events::BaseEvent * event);
bool setOkayToEAgain(bool enable) {
diff --git a/src/Handle.cc b/src/Handle.cc
index dacd30d..9bcb419 100644
--- a/src/Handle.cc
+++ b/src/Handle.cc
@@ -97,7 +97,7 @@ ssize_t Balau::Handle::forceRead(void * _buf, size_t count, Events::BaseEvent *
catch (EAgain e) {
if (evt && evt->gotSignal())
return total;
- Task::yield(e.getEvent());
+ Task::operationYield(e.getEvent());
continue;
}
if (r < 0)
@@ -124,7 +124,7 @@ ssize_t Balau::Handle::forceWrite(const void * _buf, size_t count, Events::BaseE
catch (EAgain e) {
if (evt && evt->gotSignal())
return total;
- Task::yield(e.getEvent());
+ Task::operationYield(e.getEvent());
continue;
}
if (r < 0)
@@ -242,7 +242,7 @@ int Balau::FileSystem::mkdir(const char * path) throw (GeneralException) {
cbResults_t cbResults;
eio_req * r = eio_mkdir(path, 0755, 0, eioDone, &cbResults);
EAssert(r != NULL, "eio_mkdir returned a NULL eio_req");
- Task::yield(&cbResults.evt);
+ Task::operationYield(&cbResults.evt);
char str[4096];
if (cbResults.result < 0)
diff --git a/src/HttpServer.cc b/src/HttpServer.cc
index 69d58cd..751b01a 100644
--- a/src/HttpServer.cc
+++ b/src/HttpServer.cc
@@ -607,7 +607,7 @@ void Balau::HttpServer::stop() {
Task::prepare(&event);
listener->stop();
m_started = false;
- Task::yield(&event);
+ Task::operationYield(&event);
IAssert(event.gotSignal(), "HttpServer::stop didn't actually get the listener to stop");
event.ack();
}
diff --git a/src/Input.cc b/src/Input.cc
index 14aed91..46b9b94 100644
--- a/src/Input.cc
+++ b/src/Input.cc
@@ -56,7 +56,7 @@ Balau::Input::Input(const char * fname) throw (GeneralException) : m_fd(-1), m_s
cbResults_t cbResults;
eio_req * r = eio_open(fname, O_RDONLY, 0, 0, eioDone, &cbResults);
EAssert(r != NULL, "eio_open returned a NULL eio_req");
- Task::yield(&cbResults.evt);
+ Task::operationYield(&cbResults.evt);
if (cbResults.result < 0) {
char str[4096];
if (cbResults.errorno == ENOENT) {
@@ -71,7 +71,7 @@ Balau::Input::Input(const char * fname) throw (GeneralException) : m_fd(-1), m_s
cbStatsResults_t cbStatsResults;
r = eio_fstat(m_fd, 0, eioStatsDone, &cbStatsResults);
EAssert(r != NULL, "eio_fstat returned a NULL eio_req");
- Task::yield(&cbStatsResults.evt);
+ Task::operationYield(&cbStatsResults.evt);
if (cbStatsResults.result == 0) {
m_size = cbStatsResults.statdata.st_size;
m_mtime = cbStatsResults.statdata.st_mtime;
@@ -85,7 +85,7 @@ void Balau::Input::close() throw (GeneralException) {
eio_req * r = eio_close(m_fd, 0, eioDone, &cbResults);
EAssert(r != NULL, "eio_close returned a NULL eio_req");
m_fd = -1;
- Task::yield(&cbResults.evt);
+ Task::operationYield(&cbResults.evt);
if (cbResults.result < 0) {
char str[4096];
strerror_r(cbResults.errorno, str, sizeof(str));
@@ -99,7 +99,7 @@ ssize_t Balau::Input::read(void * buf, size_t count) throw (GeneralException) {
cbResults_t cbResults;
eio_req * r = eio_read(m_fd, buf, count, getROffset(), 0, eioDone, &cbResults);
EAssert(r != NULL, "eio_read returned a NULL eio_req");
- Task::yield(&cbResults.evt);
+ Task::operationYield(&cbResults.evt);
if (cbResults.result > 0) {
rseek(cbResults.result, SEEK_CUR);
} else {
diff --git a/src/LuaTask.cc b/src/LuaTask.cc
index 687c81a..b023f45 100644
--- a/src/LuaTask.cc
+++ b/src/LuaTask.cc
@@ -69,7 +69,7 @@ void Balau::LuaExecCell::exec(LuaMainTask * mainTask) {
Task::prepare(&m_event);
mainTask->exec(this);
if (!m_detached)
- Task::yield(&m_event);
+ Task::operationYield(&m_event);
}
void Balau::LuaExecString::run(Lua & L) {
diff --git a/src/Output.cc b/src/Output.cc
index 6de404e..c585ce5 100644
--- a/src/Output.cc
+++ b/src/Output.cc
@@ -56,7 +56,7 @@ Balau::Output::Output(const char * fname, bool truncate) throw (GeneralException
cbResults_t cbResults;
eio_req * r = eio_open(fname, O_WRONLY | O_CREAT | (truncate ? O_TRUNC : 0), 0755, 0, eioDone, &cbResults);
EAssert(r != NULL, "eio_open returned a NULL eio_req");
- Task::yield(&cbResults.evt);
+ Task::operationYield(&cbResults.evt);
if (cbResults.result < 0) {
char str[4096];
if (cbResults.errorno == ENOENT) {
@@ -71,7 +71,7 @@ Balau::Output::Output(const char * fname, bool truncate) throw (GeneralException
cbStatsResults_t cbStatsResults;
r = eio_fstat(m_fd, 0, eioStatsDone, &cbStatsResults);
EAssert(r != NULL, "eio_fstat returned a NULL eio_req");
- Task::yield(&cbStatsResults.evt);
+ Task::operationYield(&cbStatsResults.evt);
if (cbStatsResults.result == 0) {
m_size = cbStatsResults.statdata.st_size;
m_mtime = cbStatsResults.statdata.st_mtime;
@@ -85,7 +85,7 @@ void Balau::Output::close() throw (GeneralException) {
eio_req * r = eio_close(m_fd, 0, eioDone, &cbResults);
EAssert(r != NULL, "eio_close returned a NULL eio_req");
m_fd = -1;
- Task::yield(&cbResults.evt);
+ Task::operationYield(&cbResults.evt);
if (cbResults.result < 0) {
char str[4096];
strerror_r(cbResults.errorno, str, sizeof(str));
@@ -99,7 +99,7 @@ ssize_t Balau::Output::write(const void * buf, size_t count) throw (GeneralExcep
cbResults_t cbResults;
eio_req * r = eio_write(m_fd, const_cast<void *>(buf), count, getWOffset(), 0, eioDone, &cbResults);
EAssert(r != NULL, "eio_write returned a NULL eio_req");
- Task::yield(&cbResults.evt);
+ Task::operationYield(&cbResults.evt);
if (cbResults.result > 0) {
wseek(cbResults.result, SEEK_CUR);
} else {
diff --git a/src/Socket.cc b/src/Socket.cc
index d78c6e5..100b94e 100644
--- a/src/Socket.cc
+++ b/src/Socket.cc
@@ -243,7 +243,7 @@ static DNSRequest resolveName(const char * name, const char * service = NULL, st
Balau::Printer::elog(Balau::E_SOCKET, "Sending a request to the resolver thread");
Balau::Task::prepare(&evt);
resolverThread.pushRequest(&req);
- Balau::Task::yield(&evt);
+ Balau::Task::operationYield(&evt);
Balau::Atomic::MemoryFence();
@@ -476,7 +476,7 @@ bool Balau::Socket::connect(const char * hostname, int port) {
IAssert(spins == 0, "We shouldn't have spinned...");
}
- Task::yield(m_evtW, true);
+ Task::operationYield(m_evtW, Task::INTERRUPTIBLE);
// if we're still here, it means the parent task doesn't want to be thrown an exception
IAssert(m_evtW->gotSignal(), "We shouldn't have been awoken without getting our event signalled");
@@ -535,7 +535,7 @@ Balau::IO<Balau::Socket> Balau::Socket::accept() throw (GeneralException) {
if (s < 0) {
if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) {
- Task::yield(m_evtR, true);
+ Task::operationYield(m_evtR, Task::INTERRUPTIBLE);
} else {
String msg = getErrorMessage();
throw GeneralException(String("Unexpected error accepting a connection: #") + errno + "(" + msg + ")");
@@ -567,7 +567,7 @@ ssize_t Balau::Socket::read(void * buf, size_t count) throw (GeneralException) {
}
if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) {
- Task::yield(m_evtR, true);
+ Task::operationYield(m_evtR, Task::INTERRUPTIBLE);
} else {
m_evtR->stop();
return r;
@@ -602,7 +602,7 @@ ssize_t Balau::Socket::write(const void * buf, size_t count) throw (GeneralExcep
#endif
if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) {
- Task::yield(m_evtW, true);
+ Task::operationYield(m_evtW, Task::INTERRUPTIBLE);
} else {
m_evtW->stop();
return r;
diff --git a/src/Task.cc b/src/Task.cc
index f01bb18..df8eee2 100644
--- a/src/Task.cc
+++ b/src/Task.cc
@@ -260,27 +260,36 @@ void Balau::Events::Custom::gotOwner(Task * task) {
m_loop = task->getLoop();
}
-void Balau::Task::yield(Events::BaseEvent * evt, bool interruptible) throw (GeneralException) {
+void Balau::Task::operationYield(Events::BaseEvent * evt, enum OperationYieldType yieldType) throw (GeneralException) {
Task * t = getCurrentTask();
if (evt)
t->waitFor(evt);
if (t->m_stackless) {
- AAssert(interruptible, "You can't run non-interruptible operations from a stackless task.");
+ AAssert(yieldType != SIMPLE, "You can't run simple operations from a stackless task.");
+ }
+
+ if (yieldType == STACKLESS) {
+ AAssert(t->m_okayToEAgain, "You can't run a stackless operation from a non-okay-to-eagain task.");
}
bool gotSignal;
do {
t->yield(evt == NULL);
- Printer::elog(E_TASK, "operation back from yielding; interruptible = %s; okayToEAgain = %s", interruptible ? "true" : "false", t->m_okayToEAgain ? "true" : "false");
+ static const char * YieldTypeToString[] = {
+ "SIMPLE",
+ "INTERRUPTIBLE",
+ "STACKLESS",
+ };
+ Printer::elog(E_TASK, "operation back from yielding; yieldType = %s; okayToEAgain = %s", YieldTypeToString[yieldType], t->m_okayToEAgain ? "true" : "false");
gotSignal = evt ? evt->gotSignal() : true;
- } while ((!interruptible || !t->m_okayToEAgain) && !gotSignal);
+ } while (((yieldType == SIMPLE) || !t->m_okayToEAgain) && !gotSignal);
if (!evt)
return;
- if (interruptible && t->m_okayToEAgain && !evt->gotSignal()) {
+ if ((yieldType != SIMPLE) && t->m_okayToEAgain && !evt->gotSignal()) {
Printer::elog(E_TASK, "operation is throwing an exception.");
throw EAgain(evt);
}
@@ -302,12 +311,12 @@ void Balau::QueueBase::iPush(void * t, Events::Async * event) {
}
void * Balau::QueueBase::iPop(Events::Async * event) {
- ScopeLock sl(m_lock);
+ m_lock.enter();
while (!m_front) {
if (event) {
Task::prepare(event);
m_lock.leave();
- Task::yield(event);
+ Task::operationYield(event, Task::INTERRUPTIBLE);
m_lock.enter();
} else {
pthread_cond_wait(&m_cond, &m_lock.m_lock);
@@ -323,5 +332,6 @@ void * Balau::QueueBase::iPop(Events::Async * event) {
m_back = NULL;
void * t = c->m_elem;
delete c;
+ m_lock.leave();
return t;
}
diff --git a/src/ZHandle.cc b/src/ZHandle.cc
index 7ab0133..3a14d7d 100644
--- a/src/ZHandle.cc
+++ b/src/ZHandle.cc
@@ -79,7 +79,7 @@ ssize_t Balau::ZStream::read(void * buf, size_t count) throw (GeneralException)
if (r <= 0)
return readTotal;
}
- Task::yield(nullptr);
+ Task::operationYield();
int r = inflate(&m_zin, Z_SYNC_FLUSH);
size_t didRead = count - m_zin.avail_out;
readTotal += didRead;
@@ -108,7 +108,7 @@ ssize_t Balau::ZStream::write(const void * buf, size_t count) throw (GeneralExce
m_zout.avail_out = BLOCK_SIZE;
int r = deflate(&m_zout, Z_NO_FLUSH);
EAssert(r == Z_OK, "deflate() didn't return Z_OK but %i", r);
- Task::yield(nullptr);
+ Task::operationYield();
size_t compressed = BLOCK_SIZE - m_zout.avail_out;
if (compressed) {
size_t w = m_h->forceWrite(obuf, compressed);
diff --git a/tests/test-Tasks.cc b/tests/test-Tasks.cc
index 159b0f6..6bbb503 100644
--- a/tests/test-Tasks.cc
+++ b/tests/test-Tasks.cc
@@ -27,7 +27,7 @@ class TestOperation {
void Do() {
if (m_count++ == 0) {
m_timeout.set(0.2);
- Task::yield(&m_timeout, true);
+ Task::operationYield(&m_timeout, Task::STACKLESS);
}
TAssert(m_timeout.gotSignal());
m_completed = true;
@@ -56,7 +56,7 @@ class TestStackless : public StacklessTask {
static void yieldingFunction() {
Events::Timeout timeout(0.2);
- Task::yield(&timeout);
+ Task::operationYield(&timeout);
TAssert(timeout.gotSignal());
}