From d2fdc279df461d164860cf08645affca39232355 Mon Sep 17 00:00:00 2001
From: Nicolas 'Pixel' Noble <pixel@nobis-crew.org>
Date: Mon, 21 Jan 2013 16:22:45 -0800
Subject: Output operations open, close and write are now fully interruptible.

---
 src/Input.cc  |  12 ++--
 src/Output.cc | 211 +++++++++++++++++++++++++++++++++++++++++++---------------
 2 files changed, 162 insertions(+), 61 deletions(-)

(limited to 'src')

diff --git a/src/Input.cc b/src/Input.cc
index b83a1be..a86468e 100644
--- a/src/Input.cc
+++ b/src/Input.cc
@@ -35,7 +35,7 @@ class AsyncOpOpen : public Balau::AsyncOperation {
   public:
       AsyncOpOpen(const char * path, cbResults_t * results) : m_path(path), m_results(results) { }
     virtual void run() {
-        int r = m_results->result = open(m_path, O_RDONLY);
+        const ssize_t r = m_results->result = open(m_path, O_RDONLY);
         m_results->errorno = r < 0 ? errno : 0;
     }
     virtual void done() {
@@ -51,7 +51,7 @@ class AsyncOpStat : public Balau::AsyncOperation {
   public:
       AsyncOpStat(int fd, cbResults_t * results) : m_fd(fd), m_results(results) { }
     virtual void run() {
-        int r = m_results->result = fstat(m_fd, &m_results->statdata);
+        const ssize_t r = m_results->result = fstat(m_fd, &m_results->statdata);
         m_results->errorno = r < 0 ? errno : 0;
     }
     virtual void done() {
@@ -83,7 +83,7 @@ void Balau::Input::open() throw (GeneralException) {
     cbResults_t * cbResults;
 
     if (!m_pendingOp) {
-        m_pendingOp = cbResults = new cbResults_t;
+        m_pendingOp = cbResults = new cbResults_t();
         cbResults->type = cbResults_t::NONE;
     } else {
         cbResults = (cbResults_t *) m_pendingOp;
@@ -109,7 +109,7 @@ void Balau::Input::open() throw (GeneralException) {
             }
 
             delete cbResults;
-            m_pendingOp = cbResults = new cbResults_t;
+            m_pendingOp = cbResults = new cbResults_t();
             cbResults->type = cbResults_t::STAT;
             createAsyncOp(new AsyncOpStat(m_fd, cbResults));
             Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE);
@@ -145,7 +145,7 @@ class AsyncOpClose : public Balau::AsyncOperation {
   public:
       AsyncOpClose(int fd, cbResults_t * results) : m_fd(fd), m_results(results) { }
     virtual void run() {
-        int r = m_results->result = close(m_fd);
+        const ssize_t r = m_results->result = close(m_fd);
         m_results->errorno = r < 0 ? errno : 0;
     }
     virtual void done() {
@@ -211,7 +211,7 @@ class AsyncOpRead : public Balau::AsyncOperation {
   public:
       AsyncOpRead(int fd, void * buf, size_t count, off_t offset, cbResults_t * results) : m_fd(fd), m_buf(buf), m_count(count), m_offset(offset), m_results(results) { }
     virtual void run() {
-        ssize_t r = m_results->result = pread(m_fd, m_buf, m_count, m_offset);
+        const ssize_t r = m_results->result = pread(m_fd, m_buf, m_count, m_offset);
         m_results->errorno = r < 0 ? errno : 0;
     }
     virtual void done() {
diff --git a/src/Output.cc b/src/Output.cc
index 7a50b03..9f5e370 100644
--- a/src/Output.cc
+++ b/src/Output.cc
@@ -25,14 +25,17 @@ namespace {
 
 struct cbResults_t {
     Balau::Events::Custom evt;
-    int result, errorno;
+    ssize_t result;
+    int errorno;
+    struct stat statdata;
+    enum { NONE, OPEN, STAT, CLOSE, WRITE } type;
 };
 
 class AsyncOpOpen : public Balau::AsyncOperation {
   public:
       AsyncOpOpen(const char * path, bool truncate, cbResults_t * results) : m_path(path), m_truncate(truncate), m_results(results) { }
     virtual void run() {
-        int r = m_results->result = open(m_path, O_WRONLY | O_CREAT | (m_truncate ? O_TRUNC : 0), 0755);
+        const ssize_t r = m_results->result = open(m_path, O_WRONLY | O_CREAT | (m_truncate ? O_TRUNC : 0), 0755);
         m_results->errorno = r < 0 ? errno : 0;
     }
     virtual void done() {
@@ -45,17 +48,11 @@ class AsyncOpOpen : public Balau::AsyncOperation {
     cbResults_t * m_results;
 };
 
-struct cbStatsResults_t {
-    Balau::Events::Custom evt;
-    int result, errorno;
-    struct stat statdata;
-};
-
 class AsyncOpStat : public Balau::AsyncOperation {
   public:
-      AsyncOpStat(int fd, cbStatsResults_t * results) : m_fd(fd), m_results(results) { }
+      AsyncOpStat(int fd, cbResults_t * results) : m_fd(fd), m_results(results) { }
     virtual void run() {
-        int r = m_results->result = fstat(m_fd, &m_results->statdata);
+        const ssize_t r = m_results->result = fstat(m_fd, &m_results->statdata);
         m_results->errorno = r < 0 ? errno : 0;
     }
     virtual void done() {
@@ -64,7 +61,7 @@ class AsyncOpStat : public Balau::AsyncOperation {
     }
   private:
     int m_fd;
-    cbStatsResults_t * m_results;
+    cbResults_t * m_results;
 };
 
 };
@@ -74,29 +71,71 @@ Balau::Output::Output(const char * fname) {
     m_fname = fname;
 }
 
+bool Balau::Output::isPendingComplete() {
+    if (!m_pendingOp)
+        return true;
+    return reinterpret_cast<cbResults_t *>(m_pendingOp)->evt.gotSignal();
+}
+
 void Balau::Output::open(bool truncate) throw (GeneralException) {
+    AAssert(isClosed() || m_pendingOp, "Can't open a file twice.");
     Printer::elog(E_OUTPUT, "Opening file %s", m_fname.to_charp());
 
-    cbResults_t cbResults;
-    createAsyncOp(new AsyncOpOpen(m_fname.to_charp(), truncate, &cbResults));
-    Task::operationYield(&cbResults.evt);
-    if (cbResults.result < 0) {
-        if (cbResults.errorno == ENOENT) {
-            throw ENoEnt(m_fname);
-        } else {
-            char str[4096];
-            throw GeneralException(String("Unable to open file ") + m_name + " for reading: " + strerror_r(cbResults.errorno, str, sizeof(str)) + " (err#" + cbResults.errorno + ")");
-        }
+    cbResults_t * cbResults;
+
+    if (!m_pendingOp) {
+        m_pendingOp = cbResults = new cbResults_t();
+        cbResults->type = cbResults_t::NONE;
     } else {
-        m_fd = cbResults.result;
+        cbResults = (cbResults_t *) m_pendingOp;
     }
 
-    cbStatsResults_t cbStatsResults;
-    createAsyncOp(new AsyncOpStat(m_fd, &cbStatsResults));
-    Task::operationYield(&cbStatsResults.evt);
-    if (cbStatsResults.result == 0) {
-        m_size = cbStatsResults.statdata.st_size;
-        m_mtime = cbStatsResults.statdata.st_mtime;
+    try {
+        switch (cbResults->type) {
+        case cbResults_t::NONE:
+            cbResults->type = cbResults_t::OPEN;
+            createAsyncOp(new AsyncOpOpen(m_fname.to_charp(), truncate, cbResults));
+            Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE);
+        case cbResults_t::OPEN:
+            AAssert(isPendingComplete(), "Don't call open again without checking isPendingComplete.");
+            if (cbResults->result < 0) {
+                if (cbResults->errorno == ENOENT) {
+                    throw ENoEnt(m_fname);
+                } else {
+                    char str[4096];
+                    throw GeneralException(String("Unable to open file ") + m_name + " for reading: " + strerror_r(cbResults->errorno, str, sizeof(str)) + " (err#" + cbResults->errorno + ")");
+                }
+            } else {
+                m_fd = cbResults->result;
+            }
+
+            delete cbResults;
+            m_pendingOp = cbResults = new cbResults_t();
+            cbResults->type = cbResults_t::STAT;
+            createAsyncOp(new AsyncOpStat(m_fd, cbResults));
+            Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE);
+        case cbResults_t::STAT:
+            if (cbResults->result == 0) {
+                m_size = cbResults->statdata.st_size;
+                m_mtime = cbResults->statdata.st_mtime;
+            }
+            delete cbResults;
+            m_pendingOp = NULL;
+            break;
+        default:
+            AAssert(false, "Don't switch operations while one is still not complete.");
+        }
+    }
+    catch (Balau::TaskSwitch) {
+        throw;
+    }
+    catch (Balau::EAgain) {
+        throw;
+    }
+    catch (...) {
+        delete cbResults;
+        m_pendingOp = NULL;
+        throw;
     }
 }
 
@@ -106,7 +145,7 @@ class AsyncOpClose : public Balau::AsyncOperation {
   public:
       AsyncOpClose(int fd, cbResults_t * results) : m_fd(fd), m_results(results) { }
     virtual void run() {
-        int r = m_results->result = close(m_fd);
+        const ssize_t r = m_results->result = close(m_fd);
         m_results->errorno = r < 0 ? errno : 0;
     }
     virtual void done() {
@@ -121,32 +160,58 @@ class AsyncOpClose : public Balau::AsyncOperation {
 };
 
 void Balau::Output::close() throw (GeneralException) {
-    if (m_fd < 0)
+    if ((m_fd < 0) && !m_pendingOp)
         return;
-    cbResults_t cbResults;
-    createAsyncOp(new AsyncOpClose(m_fd, &cbResults));
-    Task::operationYield(&cbResults.evt);
-    m_fd = -1;
-    if (cbResults.result < 0) {
-        char str[4096];
-        strerror_r(cbResults.errorno, str, sizeof(str));
-        throw GeneralException(String("Unable to close file ") + m_name + ": " + str);
+
+    cbResults_t * cbResults;
+
+    if (!m_pendingOp) {
+        m_pendingOp = cbResults = new cbResults_t;
+        cbResults->type = cbResults_t::NONE;
+    } else {
+        cbResults = (cbResults_t *) m_pendingOp;
+    }
+
+    try {
+        switch (cbResults->type) {
+        case cbResults_t::NONE:
+            cbResults->type = cbResults_t::CLOSE;
+            createAsyncOp(new AsyncOpClose(m_fd, cbResults));
+            Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE);
+        case cbResults_t::CLOSE:
+            m_fd = -1;
+            if (cbResults->result < 0) {
+                char str[4096];
+                strerror_r(cbResults->errorno, str, sizeof(str));
+                throw GeneralException(String("Unable to close file ") + m_name + ": " + str);
+            }
+            delete cbResults;
+            m_pendingOp = NULL;
+            break;
+        default:
+            AAssert(false, "Don't switch operations while one is still not complete.");
+        }
+    }
+    catch (Balau::TaskSwitch) {
+        throw;
+    }
+    catch (Balau::EAgain) {
+        throw;
+    }
+    catch (...) {
+        delete cbResults;
+        m_pendingOp = NULL;
+        throw;
     }
 }
 
 namespace {
 
-struct cbWriteResults_t {
-    Balau::Events::Custom evt;
-    ssize_t result;
-    int errorno;
-};
-
 class AsyncOpWrite : public Balau::AsyncOperation {
   public:
-      AsyncOpWrite(int fd, const void * buf, size_t count, off_t offset, cbWriteResults_t * results) : m_fd(fd), m_buf(buf), m_count(count), m_offset(offset), m_results(results) { }
+      AsyncOpWrite(int fd, const void * buf, size_t count, off_t offset, cbResults_t * results) : m_fd(fd), m_buf(buf), m_count(count), m_offset(offset), m_results(results) { }
     virtual void run() {
-        int r = m_results->result = pwrite(m_fd, m_buf, m_count, m_offset);
+        const ssize_t r = m_results->result = pwrite(m_fd, m_buf, m_count, m_offset);
         m_results->errorno = r < 0 ? errno : 0;
     }
     virtual void done() {
@@ -158,22 +223,58 @@ class AsyncOpWrite : public Balau::AsyncOperation {
     const void * m_buf;
     size_t m_count;
     off_t m_offset;
-    cbWriteResults_t * m_results;
+    cbResults_t * m_results;
 };
 
 };
 
 ssize_t Balau::Output::write(const void * buf, size_t count) throw (GeneralException) {
-    cbWriteResults_t cbResults;
-    createAsyncOp(new AsyncOpWrite(m_fd, buf, count, getWOffset(), &cbResults));
-    Task::operationYield(&cbResults.evt);
-    if (cbResults.result > 0) {
-        wseek(cbResults.result, SEEK_CUR);
+    AAssert(!isClosed(), "Can't write a closed file");
+    ssize_t result;
+
+    cbResults_t * cbResults;
+
+    if (!m_pendingOp) {
+        m_pendingOp = cbResults = new cbResults_t;
+        cbResults->type = cbResults_t::NONE;
     } else {
-        char str[4096];
-        throw GeneralException(String("Unable to write file ") + m_name + ": " + strerror_r(cbResults.errorno, str, sizeof(str)) + " (err#" + cbResults.errorno + ")");
+        cbResults = (cbResults_t *) m_pendingOp;
+    }
+
+    try {
+        switch (cbResults->type) {
+        case cbResults_t::NONE:
+            cbResults->type = cbResults_t::WRITE;
+            createAsyncOp(new AsyncOpWrite(m_fd, buf, count, getWOffset(), cbResults));
+            Task::operationYield(&cbResults->evt, Task::INTERRUPTIBLE);
+        case cbResults_t::WRITE:
+            result = cbResults->result;
+            if (result > 0) {
+                wseek(result, SEEK_CUR);
+            } else {
+                char str[4096];
+                throw GeneralException(String("Unable to write file ") + m_name + ": " + strerror_r(cbResults->errorno, str, sizeof(str)) + " (err#" + cbResults->errorno + ")");
+            }
+            delete cbResults;
+            m_pendingOp = NULL;
+            return result;
+        default:
+            AAssert(false, "Don't switch operations while one is still not complete.");
+        }
+    }
+    catch (Balau::TaskSwitch) {
+        throw;
+    }
+    catch (Balau::EAgain) {
+        throw;
     }
-    return cbResults.result;
+    catch (...) {
+        delete cbResults;
+        m_pendingOp = NULL;
+        throw;
+    }
+
+    IAssert(false, "Shouldn't end up there.");
 }
 
 bool Balau::Output::isClosed() {
-- 
cgit v1.2.3