From d2fdc279df461d164860cf08645affca39232355 Mon Sep 17 00:00:00 2001 From: Nicolas 'Pixel' Noble Date: Mon, 21 Jan 2013 16:22:45 -0800 Subject: Output operations open, close and write are now fully interruptible. --- src/Input.cc | 12 ++-- src/Output.cc | 211 +++++++++++++++++++++++++++++++++++++++++++--------------- 2 files changed, 162 insertions(+), 61 deletions(-) (limited to 'src') diff --git a/src/Input.cc b/src/Input.cc index b83a1be..a86468e 100644 --- a/src/Input.cc +++ b/src/Input.cc @@ -35,7 +35,7 @@ class AsyncOpOpen : public Balau::AsyncOperation { public: AsyncOpOpen(const char * path, cbResults_t * results) : m_path(path), m_results(results) { } virtual void run() { - int r = m_results->result = open(m_path, O_RDONLY); + const ssize_t r = m_results->result = open(m_path, O_RDONLY); m_results->errorno = r < 0 ? errno : 0; } virtual void done() { @@ -51,7 +51,7 @@ class AsyncOpStat : public Balau::AsyncOperation { public: AsyncOpStat(int fd, cbResults_t * results) : m_fd(fd), m_results(results) { } virtual void run() { - int r = m_results->result = fstat(m_fd, &m_results->statdata); + const ssize_t r = m_results->result = fstat(m_fd, &m_results->statdata); m_results->errorno = r < 0 ? errno : 0; } virtual void done() { @@ -83,7 +83,7 @@ void Balau::Input::open() throw (GeneralException) { cbResults_t * cbResults; if (!m_pendingOp) { - m_pendingOp = cbResults = new cbResults_t; + m_pendingOp = cbResults = new cbResults_t(); cbResults->type = cbResults_t::NONE; } else { cbResults = (cbResults_t *) m_pendingOp; @@ -109,7 +109,7 @@ void Balau::Input::open() throw (GeneralException) { } delete cbResults; - m_pendingOp = cbResults = new cbResults_t; + m_pendingOp = cbResults = new cbResults_t(); cbResults->type = cbResults_t::STAT; createAsyncOp(new AsyncOpStat(m_fd, cbResults)); Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE); @@ -145,7 +145,7 @@ class AsyncOpClose : public Balau::AsyncOperation { public: AsyncOpClose(int fd, cbResults_t * results) : m_fd(fd), m_results(results) { } virtual void run() { - int r = m_results->result = close(m_fd); + const ssize_t r = m_results->result = close(m_fd); m_results->errorno = r < 0 ? errno : 0; } virtual void done() { @@ -211,7 +211,7 @@ class AsyncOpRead : public Balau::AsyncOperation { public: AsyncOpRead(int fd, void * buf, size_t count, off_t offset, cbResults_t * results) : m_fd(fd), m_buf(buf), m_count(count), m_offset(offset), m_results(results) { } virtual void run() { - ssize_t r = m_results->result = pread(m_fd, m_buf, m_count, m_offset); + const ssize_t r = m_results->result = pread(m_fd, m_buf, m_count, m_offset); m_results->errorno = r < 0 ? errno : 0; } virtual void done() { diff --git a/src/Output.cc b/src/Output.cc index 7a50b03..9f5e370 100644 --- a/src/Output.cc +++ b/src/Output.cc @@ -25,14 +25,17 @@ namespace { struct cbResults_t { Balau::Events::Custom evt; - int result, errorno; + ssize_t result; + int errorno; + struct stat statdata; + enum { NONE, OPEN, STAT, CLOSE, WRITE } type; }; class AsyncOpOpen : public Balau::AsyncOperation { public: AsyncOpOpen(const char * path, bool truncate, cbResults_t * results) : m_path(path), m_truncate(truncate), m_results(results) { } virtual void run() { - int r = m_results->result = open(m_path, O_WRONLY | O_CREAT | (m_truncate ? O_TRUNC : 0), 0755); + const ssize_t r = m_results->result = open(m_path, O_WRONLY | O_CREAT | (m_truncate ? O_TRUNC : 0), 0755); m_results->errorno = r < 0 ? errno : 0; } virtual void done() { @@ -45,17 +48,11 @@ class AsyncOpOpen : public Balau::AsyncOperation { cbResults_t * m_results; }; -struct cbStatsResults_t { - Balau::Events::Custom evt; - int result, errorno; - struct stat statdata; -}; - class AsyncOpStat : public Balau::AsyncOperation { public: - AsyncOpStat(int fd, cbStatsResults_t * results) : m_fd(fd), m_results(results) { } + AsyncOpStat(int fd, cbResults_t * results) : m_fd(fd), m_results(results) { } virtual void run() { - int r = m_results->result = fstat(m_fd, &m_results->statdata); + const ssize_t r = m_results->result = fstat(m_fd, &m_results->statdata); m_results->errorno = r < 0 ? errno : 0; } virtual void done() { @@ -64,7 +61,7 @@ class AsyncOpStat : public Balau::AsyncOperation { } private: int m_fd; - cbStatsResults_t * m_results; + cbResults_t * m_results; }; }; @@ -74,29 +71,71 @@ Balau::Output::Output(const char * fname) { m_fname = fname; } +bool Balau::Output::isPendingComplete() { + if (!m_pendingOp) + return true; + return reinterpret_cast(m_pendingOp)->evt.gotSignal(); +} + void Balau::Output::open(bool truncate) throw (GeneralException) { + AAssert(isClosed() || m_pendingOp, "Can't open a file twice."); Printer::elog(E_OUTPUT, "Opening file %s", m_fname.to_charp()); - cbResults_t cbResults; - createAsyncOp(new AsyncOpOpen(m_fname.to_charp(), truncate, &cbResults)); - Task::operationYield(&cbResults.evt); - if (cbResults.result < 0) { - if (cbResults.errorno == ENOENT) { - throw ENoEnt(m_fname); - } else { - char str[4096]; - throw GeneralException(String("Unable to open file ") + m_name + " for reading: " + strerror_r(cbResults.errorno, str, sizeof(str)) + " (err#" + cbResults.errorno + ")"); - } + cbResults_t * cbResults; + + if (!m_pendingOp) { + m_pendingOp = cbResults = new cbResults_t(); + cbResults->type = cbResults_t::NONE; } else { - m_fd = cbResults.result; + cbResults = (cbResults_t *) m_pendingOp; } - cbStatsResults_t cbStatsResults; - createAsyncOp(new AsyncOpStat(m_fd, &cbStatsResults)); - Task::operationYield(&cbStatsResults.evt); - if (cbStatsResults.result == 0) { - m_size = cbStatsResults.statdata.st_size; - m_mtime = cbStatsResults.statdata.st_mtime; + try { + switch (cbResults->type) { + case cbResults_t::NONE: + cbResults->type = cbResults_t::OPEN; + createAsyncOp(new AsyncOpOpen(m_fname.to_charp(), truncate, cbResults)); + Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE); + case cbResults_t::OPEN: + AAssert(isPendingComplete(), "Don't call open again without checking isPendingComplete."); + if (cbResults->result < 0) { + if (cbResults->errorno == ENOENT) { + throw ENoEnt(m_fname); + } else { + char str[4096]; + throw GeneralException(String("Unable to open file ") + m_name + " for reading: " + strerror_r(cbResults->errorno, str, sizeof(str)) + " (err#" + cbResults->errorno + ")"); + } + } else { + m_fd = cbResults->result; + } + + delete cbResults; + m_pendingOp = cbResults = new cbResults_t(); + cbResults->type = cbResults_t::STAT; + createAsyncOp(new AsyncOpStat(m_fd, cbResults)); + Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE); + case cbResults_t::STAT: + if (cbResults->result == 0) { + m_size = cbResults->statdata.st_size; + m_mtime = cbResults->statdata.st_mtime; + } + delete cbResults; + m_pendingOp = NULL; + break; + default: + AAssert(false, "Don't switch operations while one is still not complete."); + } + } + catch (Balau::TaskSwitch) { + throw; + } + catch (Balau::EAgain) { + throw; + } + catch (...) { + delete cbResults; + m_pendingOp = NULL; + throw; } } @@ -106,7 +145,7 @@ class AsyncOpClose : public Balau::AsyncOperation { public: AsyncOpClose(int fd, cbResults_t * results) : m_fd(fd), m_results(results) { } virtual void run() { - int r = m_results->result = close(m_fd); + const ssize_t r = m_results->result = close(m_fd); m_results->errorno = r < 0 ? errno : 0; } virtual void done() { @@ -121,32 +160,58 @@ class AsyncOpClose : public Balau::AsyncOperation { }; void Balau::Output::close() throw (GeneralException) { - if (m_fd < 0) + if ((m_fd < 0) && !m_pendingOp) return; - cbResults_t cbResults; - createAsyncOp(new AsyncOpClose(m_fd, &cbResults)); - Task::operationYield(&cbResults.evt); - m_fd = -1; - if (cbResults.result < 0) { - char str[4096]; - strerror_r(cbResults.errorno, str, sizeof(str)); - throw GeneralException(String("Unable to close file ") + m_name + ": " + str); + + cbResults_t * cbResults; + + if (!m_pendingOp) { + m_pendingOp = cbResults = new cbResults_t; + cbResults->type = cbResults_t::NONE; + } else { + cbResults = (cbResults_t *) m_pendingOp; + } + + try { + switch (cbResults->type) { + case cbResults_t::NONE: + cbResults->type = cbResults_t::CLOSE; + createAsyncOp(new AsyncOpClose(m_fd, cbResults)); + Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE); + case cbResults_t::CLOSE: + m_fd = -1; + if (cbResults->result < 0) { + char str[4096]; + strerror_r(cbResults->errorno, str, sizeof(str)); + throw GeneralException(String("Unable to close file ") + m_name + ": " + str); + } + delete cbResults; + m_pendingOp = NULL; + break; + default: + AAssert(false, "Don't switch operations while one is still not complete."); + } + } + catch (Balau::TaskSwitch) { + throw; + } + catch (Balau::EAgain) { + throw; + } + catch (...) { + delete cbResults; + m_pendingOp = NULL; + throw; } } namespace { -struct cbWriteResults_t { - Balau::Events::Custom evt; - ssize_t result; - int errorno; -}; - class AsyncOpWrite : public Balau::AsyncOperation { public: - AsyncOpWrite(int fd, const void * buf, size_t count, off_t offset, cbWriteResults_t * results) : m_fd(fd), m_buf(buf), m_count(count), m_offset(offset), m_results(results) { } + AsyncOpWrite(int fd, const void * buf, size_t count, off_t offset, cbResults_t * results) : m_fd(fd), m_buf(buf), m_count(count), m_offset(offset), m_results(results) { } virtual void run() { - int r = m_results->result = pwrite(m_fd, m_buf, m_count, m_offset); + const ssize_t r = m_results->result = pwrite(m_fd, m_buf, m_count, m_offset); m_results->errorno = r < 0 ? errno : 0; } virtual void done() { @@ -158,22 +223,58 @@ class AsyncOpWrite : public Balau::AsyncOperation { const void * m_buf; size_t m_count; off_t m_offset; - cbWriteResults_t * m_results; + cbResults_t * m_results; }; }; ssize_t Balau::Output::write(const void * buf, size_t count) throw (GeneralException) { - cbWriteResults_t cbResults; - createAsyncOp(new AsyncOpWrite(m_fd, buf, count, getWOffset(), &cbResults)); - Task::operationYield(&cbResults.evt); - if (cbResults.result > 0) { - wseek(cbResults.result, SEEK_CUR); + AAssert(!isClosed(), "Can't write a closed file"); + ssize_t result; + + cbResults_t * cbResults; + + if (!m_pendingOp) { + m_pendingOp = cbResults = new cbResults_t; + cbResults->type = cbResults_t::NONE; } else { - char str[4096]; - throw GeneralException(String("Unable to write file ") + m_name + ": " + strerror_r(cbResults.errorno, str, sizeof(str)) + " (err#" + cbResults.errorno + ")"); + cbResults = (cbResults_t *) m_pendingOp; + } + + try { + switch (cbResults->type) { + case cbResults_t::NONE: + cbResults->type = cbResults_t::WRITE; + createAsyncOp(new AsyncOpWrite(m_fd, buf, count, getWOffset(), cbResults)); + Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE); + case cbResults_t::WRITE: + result = cbResults->result; + if (result > 0) { + wseek(result, SEEK_CUR); + } else { + char str[4096]; + throw GeneralException(String("Unable to write file ") + m_name + ": " + strerror_r(cbResults->errorno, str, sizeof(str)) + " (err#" + cbResults->errorno + ")"); + } + delete cbResults; + m_pendingOp = NULL; + return result; + default: + AAssert(false, "Don't switch operations while one is still not complete."); + } + } + catch (Balau::TaskSwitch) { + throw; + } + catch (Balau::EAgain) { + throw; } - return cbResults.result; + catch (...) { + delete cbResults; + m_pendingOp = NULL; + throw; + } + + IAssert(false, "Shouldn't end up there."); } bool Balau::Output::isClosed() { -- cgit v1.2.3