diff options
-rw-r--r-- | includes/Input.h | 2 | ||||
-rw-r--r-- | src/Input.cc | 199 | ||||
-rw-r--r-- | tests/test-Tasks.cc | 7 |
3 files changed, 154 insertions, 54 deletions
diff --git a/includes/Input.h b/includes/Input.h index 51b17c7..af17107 100644 --- a/includes/Input.h +++ b/includes/Input.h @@ -15,6 +15,7 @@ class Input : public SeekableHandle { virtual const char * getName(); virtual off_t getSize(); virtual time_t getMTime(); + bool isPendingComplete(); const char * getFName() { return m_fname.to_charp(); } private: int m_fd = -1; @@ -22,6 +23,7 @@ class Input : public SeekableHandle { String m_fname; off_t m_size = -1; time_t m_mtime = -1; + void * m_pendingOp = NULL; }; }; diff --git a/src/Input.cc b/src/Input.cc index 2a89fa9..5fef78d 100644 --- a/src/Input.cc +++ b/src/Input.cc @@ -25,7 +25,10 @@ namespace { struct cbResults_t { Balau::Events::Custom evt; - int result, errorno; + ssize_t result; + int errorno; + struct stat statdata; + enum { NONE, OPEN, STAT, CLOSE, READ } type; }; class AsyncOpOpen : public Balau::AsyncOperation { @@ -44,15 +47,9 @@ class AsyncOpOpen : public Balau::AsyncOperation { cbResults_t * m_results; }; -struct cbStatsResults_t { - Balau::Events::Custom evt; - int result, errorno; - struct stat statdata; -}; - class AsyncOpStat : public Balau::AsyncOperation { public: - AsyncOpStat(int fd, cbStatsResults_t * results) : m_fd(fd), m_results(results) { } + AsyncOpStat(int fd, cbResults_t * results) : m_fd(fd), m_results(results) { } virtual void run() { int r = m_results->result = fstat(m_fd, &m_results->statdata); m_results->errorno = r < 0 ? errno : 0; @@ -63,7 +60,7 @@ class AsyncOpStat : public Balau::AsyncOperation { } private: int m_fd; - cbStatsResults_t * m_results; + cbResults_t * m_results; }; }; @@ -73,29 +70,69 @@ Balau::Input::Input(const char * fname) { m_fname = fname; } +bool Balau::Input::isPendingComplete() { + if (!m_pendingOp) + return true; + return reinterpret_cast<cbResults_t *>(m_pendingOp)->evt.gotSignal(); +} + void Balau::Input::open() throw (GeneralException) { + AAssert(isClosed() || m_pendingOp, "Can't open a file twice."); Printer::elog(E_INPUT, "Opening file %s", m_fname.to_charp()); - cbResults_t cbResults; - createAsyncOp(new AsyncOpOpen(m_fname.to_charp(), &cbResults)); - Task::operationYield(&cbResults.evt); - if (cbResults.result < 0) { - if (cbResults.errorno == ENOENT) { - throw ENoEnt(m_fname); - } else { - char str[4096]; - throw GeneralException(String("Unable to open file ") + m_name + " for reading: " + strerror_r(cbResults.errorno, str, sizeof(str)) + " (err#" + cbResults.errorno + ")"); - } - } else { - m_fd = cbResults.result; + cbResults_t * cbResults; + if (!m_pendingOp) { + m_pendingOp = cbResults = new cbResults_t; + cbResults->type = cbResults_t::NONE; } - cbStatsResults_t cbStatsResults; - createAsyncOp(new AsyncOpStat(m_fd, &cbStatsResults)); - Task::operationYield(&cbStatsResults.evt); - if (cbStatsResults.result == 0) { - m_size = cbStatsResults.statdata.st_size; - m_mtime = cbStatsResults.statdata.st_mtime; + try { + switch (cbResults->type) { + case cbResults_t::NONE: + cbResults->type = cbResults_t::OPEN; + createAsyncOp(new AsyncOpOpen(m_fname.to_charp(), cbResults)); + Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE); + case cbResults_t::OPEN: + AAssert(isPendingComplete(), "Don't call open again without checking isPendingComplete."); + if (cbResults->result < 0) { + if (cbResults->errorno == ENOENT) { + throw ENoEnt(m_fname); + } else { + char str[4096]; + throw GeneralException(String("Unable to open file ") + m_name + " for reading: " + strerror_r(cbResults->errorno, str, sizeof(str)) + " (err#" + cbResults->errorno + ")"); + } + } else { + m_fd = cbResults->result; + } + + delete cbResults; + m_pendingOp = cbResults = new cbResults_t; + cbResults->type = cbResults_t::STAT; + createAsyncOp(new AsyncOpStat(m_fd, cbResults)); + Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE); + case cbResults_t::STAT: + AAssert(isPendingComplete(), "Don't call open again without checking isPendingComplete."); + if (cbResults->result == 0) { + m_size = cbResults->statdata.st_size; + m_mtime = cbResults->statdata.st_mtime; + } + delete cbResults; + m_pendingOp = NULL; + break; + default: + AAssert(false, "Don't switch operations while one is still not complete."); + } + } + catch (Balau::TaskSwitch) { + throw; + } + catch (Balau::EAgain) { + throw; + } + catch (...) { + delete cbResults; + m_pendingOp = NULL; + throw; } } @@ -120,30 +157,52 @@ class AsyncOpClose : public Balau::AsyncOperation { }; void Balau::Input::close() throw (GeneralException) { - if (m_fd < 0) + if ((m_fd < 0) && !m_pendingOp) return; - cbResults_t cbResults; - createAsyncOp(new AsyncOpClose(m_fd, &cbResults)); - Task::operationYield(&cbResults.evt); - m_fd = -1; - if (cbResults.result < 0) { - char str[4096]; - strerror_r(cbResults.errorno, str, sizeof(str)); - throw GeneralException(String("Unable to close file ") + m_name + ": " + str); + cbResults_t * cbResults; + if (!m_pendingOp) { + m_pendingOp = cbResults = new cbResults_t; + cbResults->type = cbResults_t::NONE; + } + + try { + switch (cbResults->type) { + case cbResults_t::NONE: + cbResults->type = cbResults_t::CLOSE; + createAsyncOp(new AsyncOpClose(m_fd, cbResults)); + Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE); + case cbResults_t::CLOSE: + m_fd = -1; + if (cbResults->result < 0) { + char str[4096]; + strerror_r(cbResults->errorno, str, sizeof(str)); + throw GeneralException(String("Unable to close file ") + m_name + ": " + str); + } + delete cbResults; + m_pendingOp = NULL; + break; + default: + AAssert(false, "Don't switch operations while one is still not complete."); + } + } + catch (Balau::TaskSwitch) { + throw; + } + catch (Balau::EAgain) { + throw; + } + catch (...) { + delete cbResults; + m_pendingOp = NULL; + throw; } } namespace { -struct cbReadResults_t { - Balau::Events::Custom evt; - ssize_t result; - int errorno; -}; - class AsyncOpRead : public Balau::AsyncOperation { public: - AsyncOpRead(int fd, void * buf, size_t count, off_t offset, cbReadResults_t * results) : m_fd(fd), m_buf(buf), m_count(count), m_offset(offset), m_results(results) { } + AsyncOpRead(int fd, void * buf, size_t count, off_t offset, cbResults_t * results) : m_fd(fd), m_buf(buf), m_count(count), m_offset(offset), m_results(results) { } virtual void run() { ssize_t r = m_results->result = pread(m_fd, m_buf, m_count, m_offset); m_results->errorno = r < 0 ? errno : 0; @@ -157,22 +216,54 @@ class AsyncOpRead : public Balau::AsyncOperation { void * m_buf; size_t m_count; off_t m_offset; - cbReadResults_t * m_results; + cbResults_t * m_results; }; }; ssize_t Balau::Input::read(void * buf, size_t count) throw (GeneralException) { - cbReadResults_t cbResults; - createAsyncOp(new AsyncOpRead(m_fd, buf, count, getROffset(), &cbResults)); - Task::operationYield(&cbResults.evt); - if (cbResults.result > 0) { - rseek(cbResults.result, SEEK_CUR); - } else { - char str[4096]; - throw GeneralException(String("Unable to read file ") + m_name + ": " + strerror_r(cbResults.errorno, str, sizeof(str)) + " (err#" + cbResults.errorno + ")"); - } - return cbResults.result; + AAssert(!isClosed(), "Can't read a closed file"); + ssize_t result; + + cbResults_t * cbResults; + if (!m_pendingOp) { + m_pendingOp = cbResults = new cbResults_t; + cbResults->type = cbResults_t::NONE; + } + + try { + switch (cbResults->type) { + case cbResults_t::NONE: + cbResults->type = cbResults_t::READ; + createAsyncOp(new AsyncOpRead(m_fd, buf, count, getROffset(), cbResults)); + Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE); + case cbResults_t::READ: + result = cbResults->result; + if (result > 0) { + rseek(result, SEEK_CUR); + } else { + char str[4096]; + throw GeneralException(String("Unable to read file ") + m_name + ": " + strerror_r(cbResults->errorno, str, sizeof(str)) + " (err#" + cbResults->errorno + ")"); + } + delete cbResults; + m_pendingOp = NULL; + return result; + break; + default: + AAssert(false, "Don't switch operations while one is still not complete."); + } + } + catch (Balau::TaskSwitch) { + throw; + } + catch (Balau::EAgain) { + throw; + } + catch (...) { + delete cbResults; + m_pendingOp = NULL; + throw; + } } bool Balau::Input::isClosed() { diff --git a/tests/test-Tasks.cc b/tests/test-Tasks.cc index 6bbb503..87a506e 100644 --- a/tests/test-Tasks.cc +++ b/tests/test-Tasks.cc @@ -2,6 +2,7 @@ #include <Task.h> #include <TaskMan.h> #include <StacklessTask.h> +#include <Input.h> using namespace Balau; @@ -49,9 +50,15 @@ class TestStackless : public StacklessTask { StacklessOperation(m_operation->Do()); TAssert(m_operation->completed()); delete m_operation; + m_handle = new Input("tests/rtest.txt"); + StacklessOperation(m_handle->open()); + StacklessOperation(m_handle->read(buf, 10)); + StacklessOperation(m_handle->close()); StacklessEnd(); } TestOperation * m_operation; + IO<Input> m_handle; + char buf[10]; }; static void yieldingFunction() { |