From f6e2bfe69c014b9b99423320ba787a3df938754b Mon Sep 17 00:00:00 2001 From: Nicolas 'Pixel' Noble Date: Sun, 20 Jan 2013 20:13:29 -0800 Subject: Input's open, read and close are now operations fully interruptible. --- src/Input.cc | 199 +++++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 145 insertions(+), 54 deletions(-) (limited to 'src') diff --git a/src/Input.cc b/src/Input.cc index 2a89fa9..5fef78d 100644 --- a/src/Input.cc +++ b/src/Input.cc @@ -25,7 +25,10 @@ namespace { struct cbResults_t { Balau::Events::Custom evt; - int result, errorno; + ssize_t result; + int errorno; + struct stat statdata; + enum { NONE, OPEN, STAT, CLOSE, READ } type; }; class AsyncOpOpen : public Balau::AsyncOperation { @@ -44,15 +47,9 @@ class AsyncOpOpen : public Balau::AsyncOperation { cbResults_t * m_results; }; -struct cbStatsResults_t { - Balau::Events::Custom evt; - int result, errorno; - struct stat statdata; -}; - class AsyncOpStat : public Balau::AsyncOperation { public: - AsyncOpStat(int fd, cbStatsResults_t * results) : m_fd(fd), m_results(results) { } + AsyncOpStat(int fd, cbResults_t * results) : m_fd(fd), m_results(results) { } virtual void run() { int r = m_results->result = fstat(m_fd, &m_results->statdata); m_results->errorno = r < 0 ? errno : 0; @@ -63,7 +60,7 @@ class AsyncOpStat : public Balau::AsyncOperation { } private: int m_fd; - cbStatsResults_t * m_results; + cbResults_t * m_results; }; }; @@ -73,29 +70,69 @@ Balau::Input::Input(const char * fname) { m_fname = fname; } +bool Balau::Input::isPendingComplete() { + if (!m_pendingOp) + return true; + return reinterpret_cast(m_pendingOp)->evt.gotSignal(); +} + void Balau::Input::open() throw (GeneralException) { + AAssert(isClosed() || m_pendingOp, "Can't open a file twice."); Printer::elog(E_INPUT, "Opening file %s", m_fname.to_charp()); - cbResults_t cbResults; - createAsyncOp(new AsyncOpOpen(m_fname.to_charp(), &cbResults)); - Task::operationYield(&cbResults.evt); - if (cbResults.result < 0) { - if (cbResults.errorno == ENOENT) { - throw ENoEnt(m_fname); - } else { - char str[4096]; - throw GeneralException(String("Unable to open file ") + m_name + " for reading: " + strerror_r(cbResults.errorno, str, sizeof(str)) + " (err#" + cbResults.errorno + ")"); - } - } else { - m_fd = cbResults.result; + cbResults_t * cbResults; + if (!m_pendingOp) { + m_pendingOp = cbResults = new cbResults_t; + cbResults->type = cbResults_t::NONE; } - cbStatsResults_t cbStatsResults; - createAsyncOp(new AsyncOpStat(m_fd, &cbStatsResults)); - Task::operationYield(&cbStatsResults.evt); - if (cbStatsResults.result == 0) { - m_size = cbStatsResults.statdata.st_size; - m_mtime = cbStatsResults.statdata.st_mtime; + try { + switch (cbResults->type) { + case cbResults_t::NONE: + cbResults->type = cbResults_t::OPEN; + createAsyncOp(new AsyncOpOpen(m_fname.to_charp(), cbResults)); + Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE); + case cbResults_t::OPEN: + AAssert(isPendingComplete(), "Don't call open again without checking isPendingComplete."); + if (cbResults->result < 0) { + if (cbResults->errorno == ENOENT) { + throw ENoEnt(m_fname); + } else { + char str[4096]; + throw GeneralException(String("Unable to open file ") + m_name + " for reading: " + strerror_r(cbResults->errorno, str, sizeof(str)) + " (err#" + cbResults->errorno + ")"); + } + } else { + m_fd = cbResults->result; + } + + delete cbResults; + m_pendingOp = cbResults = new cbResults_t; + cbResults->type = cbResults_t::STAT; + createAsyncOp(new AsyncOpStat(m_fd, cbResults)); + Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE); + case cbResults_t::STAT: + AAssert(isPendingComplete(), "Don't call open again without checking isPendingComplete."); + if (cbResults->result == 0) { + m_size = cbResults->statdata.st_size; + m_mtime = cbResults->statdata.st_mtime; + } + delete cbResults; + m_pendingOp = NULL; + break; + default: + AAssert(false, "Don't switch operations while one is still not complete."); + } + } + catch (Balau::TaskSwitch) { + throw; + } + catch (Balau::EAgain) { + throw; + } + catch (...) { + delete cbResults; + m_pendingOp = NULL; + throw; } } @@ -120,30 +157,52 @@ class AsyncOpClose : public Balau::AsyncOperation { }; void Balau::Input::close() throw (GeneralException) { - if (m_fd < 0) + if ((m_fd < 0) && !m_pendingOp) return; - cbResults_t cbResults; - createAsyncOp(new AsyncOpClose(m_fd, &cbResults)); - Task::operationYield(&cbResults.evt); - m_fd = -1; - if (cbResults.result < 0) { - char str[4096]; - strerror_r(cbResults.errorno, str, sizeof(str)); - throw GeneralException(String("Unable to close file ") + m_name + ": " + str); + cbResults_t * cbResults; + if (!m_pendingOp) { + m_pendingOp = cbResults = new cbResults_t; + cbResults->type = cbResults_t::NONE; + } + + try { + switch (cbResults->type) { + case cbResults_t::NONE: + cbResults->type = cbResults_t::CLOSE; + createAsyncOp(new AsyncOpClose(m_fd, cbResults)); + Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE); + case cbResults_t::CLOSE: + m_fd = -1; + if (cbResults->result < 0) { + char str[4096]; + strerror_r(cbResults->errorno, str, sizeof(str)); + throw GeneralException(String("Unable to close file ") + m_name + ": " + str); + } + delete cbResults; + m_pendingOp = NULL; + break; + default: + AAssert(false, "Don't switch operations while one is still not complete."); + } + } + catch (Balau::TaskSwitch) { + throw; + } + catch (Balau::EAgain) { + throw; + } + catch (...) { + delete cbResults; + m_pendingOp = NULL; + throw; } } namespace { -struct cbReadResults_t { - Balau::Events::Custom evt; - ssize_t result; - int errorno; -}; - class AsyncOpRead : public Balau::AsyncOperation { public: - AsyncOpRead(int fd, void * buf, size_t count, off_t offset, cbReadResults_t * results) : m_fd(fd), m_buf(buf), m_count(count), m_offset(offset), m_results(results) { } + AsyncOpRead(int fd, void * buf, size_t count, off_t offset, cbResults_t * results) : m_fd(fd), m_buf(buf), m_count(count), m_offset(offset), m_results(results) { } virtual void run() { ssize_t r = m_results->result = pread(m_fd, m_buf, m_count, m_offset); m_results->errorno = r < 0 ? errno : 0; @@ -157,22 +216,54 @@ class AsyncOpRead : public Balau::AsyncOperation { void * m_buf; size_t m_count; off_t m_offset; - cbReadResults_t * m_results; + cbResults_t * m_results; }; }; ssize_t Balau::Input::read(void * buf, size_t count) throw (GeneralException) { - cbReadResults_t cbResults; - createAsyncOp(new AsyncOpRead(m_fd, buf, count, getROffset(), &cbResults)); - Task::operationYield(&cbResults.evt); - if (cbResults.result > 0) { - rseek(cbResults.result, SEEK_CUR); - } else { - char str[4096]; - throw GeneralException(String("Unable to read file ") + m_name + ": " + strerror_r(cbResults.errorno, str, sizeof(str)) + " (err#" + cbResults.errorno + ")"); - } - return cbResults.result; + AAssert(!isClosed(), "Can't read a closed file"); + ssize_t result; + + cbResults_t * cbResults; + if (!m_pendingOp) { + m_pendingOp = cbResults = new cbResults_t; + cbResults->type = cbResults_t::NONE; + } + + try { + switch (cbResults->type) { + case cbResults_t::NONE: + cbResults->type = cbResults_t::READ; + createAsyncOp(new AsyncOpRead(m_fd, buf, count, getROffset(), cbResults)); + Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE); + case cbResults_t::READ: + result = cbResults->result; + if (result > 0) { + rseek(result, SEEK_CUR); + } else { + char str[4096]; + throw GeneralException(String("Unable to read file ") + m_name + ": " + strerror_r(cbResults->errorno, str, sizeof(str)) + " (err#" + cbResults->errorno + ")"); + } + delete cbResults; + m_pendingOp = NULL; + return result; + break; + default: + AAssert(false, "Don't switch operations while one is still not complete."); + } + } + catch (Balau::TaskSwitch) { + throw; + } + catch (Balau::EAgain) { + throw; + } + catch (...) { + delete cbResults; + m_pendingOp = NULL; + throw; + } } bool Balau::Input::isClosed() { -- cgit v1.2.3