summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNicolas 'Pixel' Noble <pixel@nobis-crew.org>2013-01-21 16:22:45 -0800
committerNicolas 'Pixel' Noble <pixel@nobis-crew.org>2013-01-21 16:22:45 -0800
commitd2fdc279df461d164860cf08645affca39232355 (patch)
tree295f453fefc2bf2b0e8daf4d05418f65bc015a5b
parent56b8ca62c666a1b747766a5ede70db070977ad37 (diff)
Output operations open, close and write are now fully interruptible.
-rw-r--r--includes/Output.h2
-rw-r--r--src/Input.cc12
-rw-r--r--src/Output.cc211
3 files changed, 164 insertions, 61 deletions
diff --git a/includes/Output.h b/includes/Output.h
index fec3154..c80cf40 100644
--- a/includes/Output.h
+++ b/includes/Output.h
@@ -15,6 +15,7 @@ class Output : public SeekableHandle {
virtual const char * getName();
virtual off_t getSize();
virtual time_t getMTime();
+ virtual bool isPendingComplete();
const char * getFName() { return m_fname.to_charp(); }
private:
int m_fd = -1;
@@ -22,6 +23,7 @@ class Output : public SeekableHandle {
String m_fname;
off_t m_size = -1;
time_t m_mtime = -1;
+ void * m_pendingOp = NULL;
};
};
diff --git a/src/Input.cc b/src/Input.cc
index b83a1be..a86468e 100644
--- a/src/Input.cc
+++ b/src/Input.cc
@@ -35,7 +35,7 @@ class AsyncOpOpen : public Balau::AsyncOperation {
public:
AsyncOpOpen(const char * path, cbResults_t * results) : m_path(path), m_results(results) { }
virtual void run() {
- int r = m_results->result = open(m_path, O_RDONLY);
+ const ssize_t r = m_results->result = open(m_path, O_RDONLY);
m_results->errorno = r < 0 ? errno : 0;
}
virtual void done() {
@@ -51,7 +51,7 @@ class AsyncOpStat : public Balau::AsyncOperation {
public:
AsyncOpStat(int fd, cbResults_t * results) : m_fd(fd), m_results(results) { }
virtual void run() {
- int r = m_results->result = fstat(m_fd, &m_results->statdata);
+ const ssize_t r = m_results->result = fstat(m_fd, &m_results->statdata);
m_results->errorno = r < 0 ? errno : 0;
}
virtual void done() {
@@ -83,7 +83,7 @@ void Balau::Input::open() throw (GeneralException) {
cbResults_t * cbResults;
if (!m_pendingOp) {
- m_pendingOp = cbResults = new cbResults_t;
+ m_pendingOp = cbResults = new cbResults_t();
cbResults->type = cbResults_t::NONE;
} else {
cbResults = (cbResults_t *) m_pendingOp;
@@ -109,7 +109,7 @@ void Balau::Input::open() throw (GeneralException) {
}
delete cbResults;
- m_pendingOp = cbResults = new cbResults_t;
+ m_pendingOp = cbResults = new cbResults_t();
cbResults->type = cbResults_t::STAT;
createAsyncOp(new AsyncOpStat(m_fd, cbResults));
Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE);
@@ -145,7 +145,7 @@ class AsyncOpClose : public Balau::AsyncOperation {
public:
AsyncOpClose(int fd, cbResults_t * results) : m_fd(fd), m_results(results) { }
virtual void run() {
- int r = m_results->result = close(m_fd);
+ const ssize_t r = m_results->result = close(m_fd);
m_results->errorno = r < 0 ? errno : 0;
}
virtual void done() {
@@ -211,7 +211,7 @@ class AsyncOpRead : public Balau::AsyncOperation {
public:
AsyncOpRead(int fd, void * buf, size_t count, off_t offset, cbResults_t * results) : m_fd(fd), m_buf(buf), m_count(count), m_offset(offset), m_results(results) { }
virtual void run() {
- ssize_t r = m_results->result = pread(m_fd, m_buf, m_count, m_offset);
+ const ssize_t r = m_results->result = pread(m_fd, m_buf, m_count, m_offset);
m_results->errorno = r < 0 ? errno : 0;
}
virtual void done() {
diff --git a/src/Output.cc b/src/Output.cc
index 7a50b03..9f5e370 100644
--- a/src/Output.cc
+++ b/src/Output.cc
@@ -25,14 +25,17 @@ namespace {
struct cbResults_t {
Balau::Events::Custom evt;
- int result, errorno;
+ ssize_t result;
+ int errorno;
+ struct stat statdata;
+ enum { NONE, OPEN, STAT, CLOSE, WRITE } type;
};
class AsyncOpOpen : public Balau::AsyncOperation {
public:
AsyncOpOpen(const char * path, bool truncate, cbResults_t * results) : m_path(path), m_truncate(truncate), m_results(results) { }
virtual void run() {
- int r = m_results->result = open(m_path, O_WRONLY | O_CREAT | (m_truncate ? O_TRUNC : 0), 0755);
+ const ssize_t r = m_results->result = open(m_path, O_WRONLY | O_CREAT | (m_truncate ? O_TRUNC : 0), 0755);
m_results->errorno = r < 0 ? errno : 0;
}
virtual void done() {
@@ -45,17 +48,11 @@ class AsyncOpOpen : public Balau::AsyncOperation {
cbResults_t * m_results;
};
-struct cbStatsResults_t {
- Balau::Events::Custom evt;
- int result, errorno;
- struct stat statdata;
-};
-
class AsyncOpStat : public Balau::AsyncOperation {
public:
- AsyncOpStat(int fd, cbStatsResults_t * results) : m_fd(fd), m_results(results) { }
+ AsyncOpStat(int fd, cbResults_t * results) : m_fd(fd), m_results(results) { }
virtual void run() {
- int r = m_results->result = fstat(m_fd, &m_results->statdata);
+ const ssize_t r = m_results->result = fstat(m_fd, &m_results->statdata);
m_results->errorno = r < 0 ? errno : 0;
}
virtual void done() {
@@ -64,7 +61,7 @@ class AsyncOpStat : public Balau::AsyncOperation {
}
private:
int m_fd;
- cbStatsResults_t * m_results;
+ cbResults_t * m_results;
};
};
@@ -74,29 +71,71 @@ Balau::Output::Output(const char * fname) {
m_fname = fname;
}
+bool Balau::Output::isPendingComplete() {
+ if (!m_pendingOp)
+ return true;
+ return reinterpret_cast<cbResults_t *>(m_pendingOp)->evt.gotSignal();
+}
+
void Balau::Output::open(bool truncate) throw (GeneralException) {
+ AAssert(isClosed() || m_pendingOp, "Can't open a file twice.");
Printer::elog(E_OUTPUT, "Opening file %s", m_fname.to_charp());
- cbResults_t cbResults;
- createAsyncOp(new AsyncOpOpen(m_fname.to_charp(), truncate, &cbResults));
- Task::operationYield(&cbResults.evt);
- if (cbResults.result < 0) {
- if (cbResults.errorno == ENOENT) {
- throw ENoEnt(m_fname);
- } else {
- char str[4096];
- throw GeneralException(String("Unable to open file ") + m_name + " for reading: " + strerror_r(cbResults.errorno, str, sizeof(str)) + " (err#" + cbResults.errorno + ")");
- }
+ cbResults_t * cbResults;
+
+ if (!m_pendingOp) {
+ m_pendingOp = cbResults = new cbResults_t();
+ cbResults->type = cbResults_t::NONE;
} else {
- m_fd = cbResults.result;
+ cbResults = (cbResults_t *) m_pendingOp;
}
- cbStatsResults_t cbStatsResults;
- createAsyncOp(new AsyncOpStat(m_fd, &cbStatsResults));
- Task::operationYield(&cbStatsResults.evt);
- if (cbStatsResults.result == 0) {
- m_size = cbStatsResults.statdata.st_size;
- m_mtime = cbStatsResults.statdata.st_mtime;
+ try {
+ switch (cbResults->type) {
+ case cbResults_t::NONE:
+ cbResults->type = cbResults_t::OPEN;
+ createAsyncOp(new AsyncOpOpen(m_fname.to_charp(), truncate, cbResults));
+ Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE);
+ case cbResults_t::OPEN:
+ AAssert(isPendingComplete(), "Don't call open again without checking isPendingComplete.");
+ if (cbResults->result < 0) {
+ if (cbResults->errorno == ENOENT) {
+ throw ENoEnt(m_fname);
+ } else {
+ char str[4096];
+ throw GeneralException(String("Unable to open file ") + m_name + " for reading: " + strerror_r(cbResults->errorno, str, sizeof(str)) + " (err#" + cbResults->errorno + ")");
+ }
+ } else {
+ m_fd = cbResults->result;
+ }
+
+ delete cbResults;
+ m_pendingOp = cbResults = new cbResults_t();
+ cbResults->type = cbResults_t::STAT;
+ createAsyncOp(new AsyncOpStat(m_fd, cbResults));
+ Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE);
+ case cbResults_t::STAT:
+ if (cbResults->result == 0) {
+ m_size = cbResults->statdata.st_size;
+ m_mtime = cbResults->statdata.st_mtime;
+ }
+ delete cbResults;
+ m_pendingOp = NULL;
+ break;
+ default:
+ AAssert(false, "Don't switch operations while one is still not complete.");
+ }
+ }
+ catch (Balau::TaskSwitch) {
+ throw;
+ }
+ catch (Balau::EAgain) {
+ throw;
+ }
+ catch (...) {
+ delete cbResults;
+ m_pendingOp = NULL;
+ throw;
}
}
@@ -106,7 +145,7 @@ class AsyncOpClose : public Balau::AsyncOperation {
public:
AsyncOpClose(int fd, cbResults_t * results) : m_fd(fd), m_results(results) { }
virtual void run() {
- int r = m_results->result = close(m_fd);
+ const ssize_t r = m_results->result = close(m_fd);
m_results->errorno = r < 0 ? errno : 0;
}
virtual void done() {
@@ -121,32 +160,58 @@ class AsyncOpClose : public Balau::AsyncOperation {
};
void Balau::Output::close() throw (GeneralException) {
- if (m_fd < 0)
+ if ((m_fd < 0) && !m_pendingOp)
return;
- cbResults_t cbResults;
- createAsyncOp(new AsyncOpClose(m_fd, &cbResults));
- Task::operationYield(&cbResults.evt);
- m_fd = -1;
- if (cbResults.result < 0) {
- char str[4096];
- strerror_r(cbResults.errorno, str, sizeof(str));
- throw GeneralException(String("Unable to close file ") + m_name + ": " + str);
+
+ cbResults_t * cbResults;
+
+ if (!m_pendingOp) {
+ m_pendingOp = cbResults = new cbResults_t;
+ cbResults->type = cbResults_t::NONE;
+ } else {
+ cbResults = (cbResults_t *) m_pendingOp;
+ }
+
+ try {
+ switch (cbResults->type) {
+ case cbResults_t::NONE:
+ cbResults->type = cbResults_t::CLOSE;
+ createAsyncOp(new AsyncOpClose(m_fd, cbResults));
+ Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE);
+ case cbResults_t::CLOSE:
+ m_fd = -1;
+ if (cbResults->result < 0) {
+ char str[4096];
+ strerror_r(cbResults->errorno, str, sizeof(str));
+ throw GeneralException(String("Unable to close file ") + m_name + ": " + str);
+ }
+ delete cbResults;
+ m_pendingOp = NULL;
+ break;
+ default:
+ AAssert(false, "Don't switch operations while one is still not complete.");
+ }
+ }
+ catch (Balau::TaskSwitch) {
+ throw;
+ }
+ catch (Balau::EAgain) {
+ throw;
+ }
+ catch (...) {
+ delete cbResults;
+ m_pendingOp = NULL;
+ throw;
}
}
namespace {
-struct cbWriteResults_t {
- Balau::Events::Custom evt;
- ssize_t result;
- int errorno;
-};
-
class AsyncOpWrite : public Balau::AsyncOperation {
public:
- AsyncOpWrite(int fd, const void * buf, size_t count, off_t offset, cbWriteResults_t * results) : m_fd(fd), m_buf(buf), m_count(count), m_offset(offset), m_results(results) { }
+ AsyncOpWrite(int fd, const void * buf, size_t count, off_t offset, cbResults_t * results) : m_fd(fd), m_buf(buf), m_count(count), m_offset(offset), m_results(results) { }
virtual void run() {
- int r = m_results->result = pwrite(m_fd, m_buf, m_count, m_offset);
+ const ssize_t r = m_results->result = pwrite(m_fd, m_buf, m_count, m_offset);
m_results->errorno = r < 0 ? errno : 0;
}
virtual void done() {
@@ -158,22 +223,58 @@ class AsyncOpWrite : public Balau::AsyncOperation {
const void * m_buf;
size_t m_count;
off_t m_offset;
- cbWriteResults_t * m_results;
+ cbResults_t * m_results;
};
};
ssize_t Balau::Output::write(const void * buf, size_t count) throw (GeneralException) {
- cbWriteResults_t cbResults;
- createAsyncOp(new AsyncOpWrite(m_fd, buf, count, getWOffset(), &cbResults));
- Task::operationYield(&cbResults.evt);
- if (cbResults.result > 0) {
- wseek(cbResults.result, SEEK_CUR);
+ AAssert(!isClosed(), "Can't write a closed file");
+ ssize_t result;
+
+ cbResults_t * cbResults;
+
+ if (!m_pendingOp) {
+ m_pendingOp = cbResults = new cbResults_t;
+ cbResults->type = cbResults_t::NONE;
} else {
- char str[4096];
- throw GeneralException(String("Unable to write file ") + m_name + ": " + strerror_r(cbResults.errorno, str, sizeof(str)) + " (err#" + cbResults.errorno + ")");
+ cbResults = (cbResults_t *) m_pendingOp;
+ }
+
+ try {
+ switch (cbResults->type) {
+ case cbResults_t::NONE:
+ cbResults->type = cbResults_t::WRITE;
+ createAsyncOp(new AsyncOpWrite(m_fd, buf, count, getWOffset(), cbResults));
+ Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE);
+ case cbResults_t::WRITE:
+ result = cbResults->result;
+ if (result > 0) {
+ wseek(result, SEEK_CUR);
+ } else {
+ char str[4096];
+ throw GeneralException(String("Unable to write file ") + m_name + ": " + strerror_r(cbResults->errorno, str, sizeof(str)) + " (err#" + cbResults->errorno + ")");
+ }
+ delete cbResults;
+ m_pendingOp = NULL;
+ return result;
+ default:
+ AAssert(false, "Don't switch operations while one is still not complete.");
+ }
+ }
+ catch (Balau::TaskSwitch) {
+ throw;
+ }
+ catch (Balau::EAgain) {
+ throw;
}
- return cbResults.result;
+ catch (...) {
+ delete cbResults;
+ m_pendingOp = NULL;
+ throw;
+ }
+
+ IAssert(false, "Shouldn't end up there.");
}
bool Balau::Output::isClosed() {