summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--includes/Input.h2
-rw-r--r--src/Input.cc199
-rw-r--r--tests/test-Tasks.cc7
3 files changed, 154 insertions, 54 deletions
diff --git a/includes/Input.h b/includes/Input.h
index 51b17c7..af17107 100644
--- a/includes/Input.h
+++ b/includes/Input.h
@@ -15,6 +15,7 @@ class Input : public SeekableHandle {
virtual const char * getName();
virtual off_t getSize();
virtual time_t getMTime();
+ bool isPendingComplete();
const char * getFName() { return m_fname.to_charp(); }
private:
int m_fd = -1;
@@ -22,6 +23,7 @@ class Input : public SeekableHandle {
String m_fname;
off_t m_size = -1;
time_t m_mtime = -1;
+ void * m_pendingOp = NULL;
};
};
diff --git a/src/Input.cc b/src/Input.cc
index 2a89fa9..5fef78d 100644
--- a/src/Input.cc
+++ b/src/Input.cc
@@ -25,7 +25,10 @@ namespace {
struct cbResults_t {
Balau::Events::Custom evt;
- int result, errorno;
+ ssize_t result;
+ int errorno;
+ struct stat statdata;
+ enum { NONE, OPEN, STAT, CLOSE, READ } type;
};
class AsyncOpOpen : public Balau::AsyncOperation {
@@ -44,15 +47,9 @@ class AsyncOpOpen : public Balau::AsyncOperation {
cbResults_t * m_results;
};
-struct cbStatsResults_t {
- Balau::Events::Custom evt;
- int result, errorno;
- struct stat statdata;
-};
-
class AsyncOpStat : public Balau::AsyncOperation {
public:
- AsyncOpStat(int fd, cbStatsResults_t * results) : m_fd(fd), m_results(results) { }
+ AsyncOpStat(int fd, cbResults_t * results) : m_fd(fd), m_results(results) { }
virtual void run() {
int r = m_results->result = fstat(m_fd, &m_results->statdata);
m_results->errorno = r < 0 ? errno : 0;
@@ -63,7 +60,7 @@ class AsyncOpStat : public Balau::AsyncOperation {
}
private:
int m_fd;
- cbStatsResults_t * m_results;
+ cbResults_t * m_results;
};
};
@@ -73,29 +70,69 @@ Balau::Input::Input(const char * fname) {
m_fname = fname;
}
+bool Balau::Input::isPendingComplete() {
+ if (!m_pendingOp)
+ return true;
+ return reinterpret_cast<cbResults_t *>(m_pendingOp)->evt.gotSignal();
+}
+
void Balau::Input::open() throw (GeneralException) {
+ AAssert(isClosed() || m_pendingOp, "Can't open a file twice.");
Printer::elog(E_INPUT, "Opening file %s", m_fname.to_charp());
- cbResults_t cbResults;
- createAsyncOp(new AsyncOpOpen(m_fname.to_charp(), &cbResults));
- Task::operationYield(&cbResults.evt);
- if (cbResults.result < 0) {
- if (cbResults.errorno == ENOENT) {
- throw ENoEnt(m_fname);
- } else {
- char str[4096];
- throw GeneralException(String("Unable to open file ") + m_name + " for reading: " + strerror_r(cbResults.errorno, str, sizeof(str)) + " (err#" + cbResults.errorno + ")");
- }
- } else {
- m_fd = cbResults.result;
+ cbResults_t * cbResults;
+ if (!m_pendingOp) {
+ m_pendingOp = cbResults = new cbResults_t;
+ cbResults->type = cbResults_t::NONE;
}
- cbStatsResults_t cbStatsResults;
- createAsyncOp(new AsyncOpStat(m_fd, &cbStatsResults));
- Task::operationYield(&cbStatsResults.evt);
- if (cbStatsResults.result == 0) {
- m_size = cbStatsResults.statdata.st_size;
- m_mtime = cbStatsResults.statdata.st_mtime;
+ try {
+ switch (cbResults->type) {
+ case cbResults_t::NONE:
+ cbResults->type = cbResults_t::OPEN;
+ createAsyncOp(new AsyncOpOpen(m_fname.to_charp(), cbResults));
+ Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE);
+ case cbResults_t::OPEN:
+ AAssert(isPendingComplete(), "Don't call open again without checking isPendingComplete.");
+ if (cbResults->result < 0) {
+ if (cbResults->errorno == ENOENT) {
+ throw ENoEnt(m_fname);
+ } else {
+ char str[4096];
+ throw GeneralException(String("Unable to open file ") + m_name + " for reading: " + strerror_r(cbResults->errorno, str, sizeof(str)) + " (err#" + cbResults->errorno + ")");
+ }
+ } else {
+ m_fd = cbResults->result;
+ }
+
+ delete cbResults;
+ m_pendingOp = cbResults = new cbResults_t;
+ cbResults->type = cbResults_t::STAT;
+ createAsyncOp(new AsyncOpStat(m_fd, cbResults));
+ Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE);
+ case cbResults_t::STAT:
+ AAssert(isPendingComplete(), "Don't call open again without checking isPendingComplete.");
+ if (cbResults->result == 0) {
+ m_size = cbResults->statdata.st_size;
+ m_mtime = cbResults->statdata.st_mtime;
+ }
+ delete cbResults;
+ m_pendingOp = NULL;
+ break;
+ default:
+ AAssert(false, "Don't switch operations while one is still not complete.");
+ }
+ }
+ catch (Balau::TaskSwitch) {
+ throw;
+ }
+ catch (Balau::EAgain) {
+ throw;
+ }
+ catch (...) {
+ delete cbResults;
+ m_pendingOp = NULL;
+ throw;
}
}
@@ -120,30 +157,52 @@ class AsyncOpClose : public Balau::AsyncOperation {
};
void Balau::Input::close() throw (GeneralException) {
- if (m_fd < 0)
+ if ((m_fd < 0) && !m_pendingOp)
return;
- cbResults_t cbResults;
- createAsyncOp(new AsyncOpClose(m_fd, &cbResults));
- Task::operationYield(&cbResults.evt);
- m_fd = -1;
- if (cbResults.result < 0) {
- char str[4096];
- strerror_r(cbResults.errorno, str, sizeof(str));
- throw GeneralException(String("Unable to close file ") + m_name + ": " + str);
+ cbResults_t * cbResults;
+ if (!m_pendingOp) {
+ m_pendingOp = cbResults = new cbResults_t;
+ cbResults->type = cbResults_t::NONE;
+ }
+
+ try {
+ switch (cbResults->type) {
+ case cbResults_t::NONE:
+ cbResults->type = cbResults_t::CLOSE;
+ createAsyncOp(new AsyncOpClose(m_fd, cbResults));
+ Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE);
+ case cbResults_t::CLOSE:
+ m_fd = -1;
+ if (cbResults->result < 0) {
+ char str[4096];
+ strerror_r(cbResults->errorno, str, sizeof(str));
+ throw GeneralException(String("Unable to close file ") + m_name + ": " + str);
+ }
+ delete cbResults;
+ m_pendingOp = NULL;
+ break;
+ default:
+ AAssert(false, "Don't switch operations while one is still not complete.");
+ }
+ }
+ catch (Balau::TaskSwitch) {
+ throw;
+ }
+ catch (Balau::EAgain) {
+ throw;
+ }
+ catch (...) {
+ delete cbResults;
+ m_pendingOp = NULL;
+ throw;
}
}
namespace {
-struct cbReadResults_t {
- Balau::Events::Custom evt;
- ssize_t result;
- int errorno;
-};
-
class AsyncOpRead : public Balau::AsyncOperation {
public:
- AsyncOpRead(int fd, void * buf, size_t count, off_t offset, cbReadResults_t * results) : m_fd(fd), m_buf(buf), m_count(count), m_offset(offset), m_results(results) { }
+ AsyncOpRead(int fd, void * buf, size_t count, off_t offset, cbResults_t * results) : m_fd(fd), m_buf(buf), m_count(count), m_offset(offset), m_results(results) { }
virtual void run() {
ssize_t r = m_results->result = pread(m_fd, m_buf, m_count, m_offset);
m_results->errorno = r < 0 ? errno : 0;
@@ -157,22 +216,54 @@ class AsyncOpRead : public Balau::AsyncOperation {
void * m_buf;
size_t m_count;
off_t m_offset;
- cbReadResults_t * m_results;
+ cbResults_t * m_results;
};
};
ssize_t Balau::Input::read(void * buf, size_t count) throw (GeneralException) {
- cbReadResults_t cbResults;
- createAsyncOp(new AsyncOpRead(m_fd, buf, count, getROffset(), &cbResults));
- Task::operationYield(&cbResults.evt);
- if (cbResults.result > 0) {
- rseek(cbResults.result, SEEK_CUR);
- } else {
- char str[4096];
- throw GeneralException(String("Unable to read file ") + m_name + ": " + strerror_r(cbResults.errorno, str, sizeof(str)) + " (err#" + cbResults.errorno + ")");
- }
- return cbResults.result;
+ AAssert(!isClosed(), "Can't read a closed file");
+ ssize_t result;
+
+ cbResults_t * cbResults;
+ if (!m_pendingOp) {
+ m_pendingOp = cbResults = new cbResults_t;
+ cbResults->type = cbResults_t::NONE;
+ }
+
+ try {
+ switch (cbResults->type) {
+ case cbResults_t::NONE:
+ cbResults->type = cbResults_t::READ;
+ createAsyncOp(new AsyncOpRead(m_fd, buf, count, getROffset(), cbResults));
+ Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE);
+ case cbResults_t::READ:
+ result = cbResults->result;
+ if (result > 0) {
+ rseek(result, SEEK_CUR);
+ } else {
+ char str[4096];
+ throw GeneralException(String("Unable to read file ") + m_name + ": " + strerror_r(cbResults->errorno, str, sizeof(str)) + " (err#" + cbResults->errorno + ")");
+ }
+ delete cbResults;
+ m_pendingOp = NULL;
+ return result;
+ break;
+ default:
+ AAssert(false, "Don't switch operations while one is still not complete.");
+ }
+ }
+ catch (Balau::TaskSwitch) {
+ throw;
+ }
+ catch (Balau::EAgain) {
+ throw;
+ }
+ catch (...) {
+ delete cbResults;
+ m_pendingOp = NULL;
+ throw;
+ }
}
bool Balau::Input::isClosed() {
diff --git a/tests/test-Tasks.cc b/tests/test-Tasks.cc
index 6bbb503..87a506e 100644
--- a/tests/test-Tasks.cc
+++ b/tests/test-Tasks.cc
@@ -2,6 +2,7 @@
#include <Task.h>
#include <TaskMan.h>
#include <StacklessTask.h>
+#include <Input.h>
using namespace Balau;
@@ -49,9 +50,15 @@ class TestStackless : public StacklessTask {
StacklessOperation(m_operation->Do());
TAssert(m_operation->completed());
delete m_operation;
+ m_handle = new Input("tests/rtest.txt");
+ StacklessOperation(m_handle->open());
+ StacklessOperation(m_handle->read(buf, 10));
+ StacklessOperation(m_handle->close());
StacklessEnd();
}
TestOperation * m_operation;
+ IO<Input> m_handle;
+ char buf[10];
};
static void yieldingFunction() {