summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--includes/Handle.h1
-rw-r--r--includes/Input.h2
-rw-r--r--includes/ZHandle.h20
-rw-r--r--src/ZHandle.cc322
4 files changed, 217 insertions, 128 deletions
diff --git a/includes/Handle.h b/includes/Handle.h
index 32a5975..091c59c 100644
--- a/includes/Handle.h
+++ b/includes/Handle.h
@@ -52,6 +52,7 @@ class Handle {
virtual off_t wtell() throw (GeneralException);
virtual off_t getSize();
virtual time_t getMTime();
+ virtual bool isPendingComplete() { return true; }
ssize_t forceRead(void * buf, size_t count, Events::BaseEvent * evt = NULL) throw (GeneralException);
ssize_t forceWrite(const void * buf, size_t count, Events::BaseEvent * evt = NULL) throw (GeneralException);
ssize_t write(const String & str) { return write(str.to_charp(), str.strlen()); }
diff --git a/includes/Input.h b/includes/Input.h
index af17107..09e0ee4 100644
--- a/includes/Input.h
+++ b/includes/Input.h
@@ -15,7 +15,7 @@ class Input : public SeekableHandle {
virtual const char * getName();
virtual off_t getSize();
virtual time_t getMTime();
- bool isPendingComplete();
+ virtual bool isPendingComplete();
const char * getFName() { return m_fname.to_charp(); }
private:
int m_fd = -1;
diff --git a/includes/ZHandle.h b/includes/ZHandle.h
index 5750365..8164853 100644
--- a/includes/ZHandle.h
+++ b/includes/ZHandle.h
@@ -2,6 +2,7 @@
#include <zlib.h>
#include <Handle.h>
+#include <Async.h>
namespace Balau {
@@ -24,13 +25,28 @@ class ZStream : public Handle {
void detach() { m_detached = true; }
void flush() { doFlush(false); }
void setUseAsyncOp(bool useAsyncOp) { m_useAsyncOp = useAsyncOp; }
- private:
+ virtual bool isPendingComplete();
void finish() { doFlush(true); }
void doFlush(bool finish);
+ private:
IO<Handle> m_h;
z_stream m_zin, m_zout;
String m_name;
- uint8_t * m_in = NULL;
+ uint8_t * m_buf = NULL;
+ uint8_t * m_wptr;
+ enum {
+ IDLE,
+ READING,
+ WRITING,
+ COMPRESSING,
+ COMPRESSING_IDLE,
+ DECOMPRESSING,
+ DECOMPRESSING_IDLE,
+ CLOSING,
+ } m_phase = IDLE;
+ size_t m_total, m_count, m_compressed;
+ AsyncOperation * m_op = NULL;
+ ssize_t m_status;
bool m_detached = false, m_closed = false, m_eof = false, m_useAsyncOp = true;
};
diff --git a/src/ZHandle.cc b/src/ZHandle.cc
index 8153343..ec08990 100644
--- a/src/ZHandle.cc
+++ b/src/ZHandle.cc
@@ -24,17 +24,28 @@ Balau::ZStream::ZStream(const IO<Handle> & h, int level, header_t header) : m_h(
}
void Balau::ZStream::close() throw (GeneralException) {
- if (m_h->canWrite())
- finish();
- inflateEnd(&m_zin);
- deflateEnd(&m_zout);
- if (m_in) {
- free(m_in);
- m_in = NULL;
+ switch (m_phase) {
+ case IDLE:
+ case WRITING:
+ case COMPRESSING:
+ if (m_h->canWrite())
+ finish();
+ inflateEnd(&m_zin);
+ deflateEnd(&m_zout);
+ if (m_buf) {
+ free(m_buf);
+ m_buf = NULL;
+ }
+ m_closed = true;
+ m_phase = CLOSING;
+ case CLOSING:
+ if (!m_detached)
+ m_h->close();
+ m_phase = IDLE;
+ return;
+ default:
+ AAssert(false, "Wrong phase");
}
- if (!m_detached)
- m_h->close();
- m_closed = true;
}
bool Balau::ZStream::isClosed() {
@@ -61,26 +72,46 @@ const char * Balau::ZStream::getName() {
namespace {
-class AsyncOpInflate : public Balau::AsyncOperation {
+class AsyncOpZlib : public Balau::AsyncOperation {
public:
- AsyncOpInflate(z_stream * zin, int * r, Balau::Events::Custom * evt) : m_zin(zin), m_r(r), m_evt(evt) { }
+ AsyncOpZlib(z_stream * z, bool deflate, int flush) : m_z(z), m_deflate(deflate), m_flush(flush) { }
virtual bool needsMainQueue() { return false; }
virtual bool needsFinishWorker() { return true; }
virtual void run() {
- *m_r = inflate(m_zin, Z_SYNC_FLUSH);
- }
- virtual void done() {
- m_evt->doSignal();
- delete this;
+ if (m_deflate)
+ m_r = deflate(m_z, m_flush);
+ else
+ m_r = inflate(m_z, Z_SYNC_FLUSH);
}
+ virtual void done() { m_evt.doSignal(); }
+ bool gotSignal() { return m_evt.gotSignal(); }
+ int getR() { return m_r; }
+ void yield() { Balau::Task::operationYield(&m_evt, Balau::Task::INTERRUPTIBLE); }
private:
- z_stream * m_zin;
- int * m_r;
- Balau::Events::Custom * m_evt;
+ z_stream * m_z;
+ int m_r, m_flush;
+ bool m_deflate;
+ Balau::Events::Custom m_evt;
};
};
+bool Balau::ZStream::isPendingComplete() {
+ AsyncOpZlib * async = dynamic_cast<AsyncOpZlib *>(m_op);
+
+ switch (m_phase) {
+ case READING:
+ case WRITING:
+ return m_h->isPendingComplete();
+ case COMPRESSING:
+ case DECOMPRESSING:
+ IAssert(async, "Shouldn't not have a cbResults here...");
+ return async->gotSignal();
+ default:
+ return true;
+ }
+}
+
static const int BLOCK_SIZE = 1024;
ssize_t Balau::ZStream::read(void * buf, size_t count) throw (GeneralException) {
@@ -89,64 +120,61 @@ ssize_t Balau::ZStream::read(void * buf, size_t count) throw (GeneralException)
AAssert(m_h->canRead(), "Can't call ZStream::read on a non-readable handle.");
- size_t readTotal = 0;
const int block_size = BLOCK_SIZE * (m_useAsyncOp ? 16 : 1);
- m_zin.next_out = (Bytef *) buf;
- m_zin.avail_out = count;
- if (!m_in) {
- m_zin.next_in = m_in = (uint8_t *) malloc(block_size);
- m_zin.avail_in = 0;
- }
- while ((count != 0) && !m_h->isClosed() && !m_h->isEOF()) {
- if (m_zin.avail_in == 0) {
- m_zin.next_in = m_in;
- size_t r = m_h->read(m_in, block_size);
- if (r <= 0)
- return readTotal;
- m_zin.avail_in = r;
- }
- int r = 0;
- if (m_useAsyncOp) {
- Events::Custom evt;
- createAsyncOp(new AsyncOpInflate(&m_zin, &r, &evt));
- Task::operationYield(&evt);
- } else {
- r = inflate(&m_zin, Z_SYNC_FLUSH);
- Task::operationYield();
+ AsyncOpZlib * async = dynamic_cast<AsyncOpZlib *>(m_op);
+
+ switch (m_phase) {
+ case IDLE:
+ m_total = 0;
+ m_count = count;
+ m_zin.next_out = (Bytef *) buf;
+ m_zin.avail_out = count;
+ if (!m_buf) {
+ m_zin.next_in = m_buf = (uint8_t *) malloc(block_size);
+ m_zin.avail_in = 0;
}
- size_t didRead = count - m_zin.avail_out;
- readTotal += didRead;
- count -= didRead;
- if (r == Z_STREAM_END) {
- m_eof = true;
- break;
+ while ((m_count != 0) && !m_h->isClosed() && !m_h->isEOF()) {
+ if (m_zin.avail_in == 0) {
+ m_zin.next_in = m_buf;
+ m_phase = READING;
+ case READING:
+ m_status = m_h->read(m_buf, block_size);
+ if (m_status <= 0)
+ return m_total;
+ m_zin.avail_in = m_status;
+ }
+ if (m_useAsyncOp) {
+ m_phase = COMPRESSING;
+ createAsyncOp(m_op = async = new AsyncOpZlib(&m_zin, false, 0));
+ async->yield();
+ case COMPRESSING:
+ m_status = async->getR();
+ delete async;
+ m_op = async = NULL;
+ } else {
+ m_status = inflate(&m_zin, Z_SYNC_FLUSH);
+ m_phase = COMPRESSING_IDLE;
+ Task::operationYield(NULL, Task::INTERRUPTIBLE);
+ }
+ case COMPRESSING_IDLE:
+ EAssert(m_status == Z_OK || m_status == Z_STREAM_END, "inflate() didn't return Z_OK or Z_STREAM_END but %zi", m_status);
+ ssize_t didRead = m_count - m_zin.avail_out;
+ m_total += didRead;
+ m_count -= didRead;
+ if (m_status == Z_STREAM_END) {
+ m_eof = true;
+ m_phase = IDLE;
+ return m_total;
+ }
}
- EAssert(r == Z_OK, "inflate() didn't return Z_OK but %i", r);
+ break;
+ default:
+ AAssert(false, "Don't call an operation without finishing another.");
}
- return readTotal;
-}
-
-namespace {
-class AsyncOpDeflate : public Balau::AsyncOperation {
- public:
- AsyncOpDeflate(z_stream * zout, int * r, int flush, Balau::Events::Custom * evt) : m_zout(zout), m_r(r), m_flush(flush), m_evt(evt) { }
- virtual bool needsMainQueue() { return false; }
- virtual bool needsFinishWorker() { return true; }
- virtual void run() {
- *m_r = deflate(m_zout, m_flush);
- }
- virtual void done() {
- m_evt->doSignal();
- delete this;
- }
- private:
- z_stream * m_zout;
- int * m_r, m_flush;
- Balau::Events::Custom * m_evt;
-};
-
-};
+ m_phase = IDLE;
+ return m_total;
+}
ssize_t Balau::ZStream::write(const void * buf, size_t count) throw (GeneralException) {
if (m_closed || m_eof)
@@ -154,37 +182,60 @@ ssize_t Balau::ZStream::write(const void * buf, size_t count) throw (GeneralExce
AAssert(m_h->canWrite(), "Can't call ZStream::write on a non-writable handle.");
- size_t wroteTotal = 0;
const int block_size = BLOCK_SIZE * (m_useAsyncOp ? 16 : 1);
- m_zout.next_in = (Bytef *) const_cast<void *>(buf);
- m_zout.avail_in = count;
- void * obuf = m_useAsyncOp ? malloc(block_size) : alloca(block_size);
- while ((count != 0) && !m_h->isClosed()) {
- m_zout.next_out = (Bytef *) obuf;
- m_zout.avail_out = block_size;
- int r = 0;
- if (m_useAsyncOp) {
- Events::Custom evt;
- createAsyncOp(new AsyncOpDeflate(&m_zout, &r, Z_NO_FLUSH, &evt));
- Task::operationYield(&evt);
- } else {
- r = deflate(&m_zout, Z_NO_FLUSH);
- Task::operationYield();
- }
- EAssert(r == Z_OK, "deflate() didn't return Z_OK but %i", r);
- size_t compressed = block_size - m_zout.avail_out;
- if (compressed) {
- size_t w = m_h->forceWrite(obuf, compressed);
- if (m_useAsyncOp)
- free(obuf);
- if (w <= 0)
- return wroteTotal;
+ ssize_t w;
+ AsyncOpZlib * async = dynamic_cast<AsyncOpZlib *>(m_op);
+
+ switch (m_phase) {
+ case IDLE:
+ m_total = 0;
+ m_count = count;
+ m_zout.next_in = (Bytef *) const_cast<void *>(buf);
+ m_zout.avail_in = count;
+ if (!m_buf)
+ m_buf = (uint8_t *) malloc(block_size);
+ while ((m_count != 0) && !m_h->isClosed()) {
+ m_zout.next_out = (Bytef *) m_buf;
+ m_zout.avail_out = block_size;
+ if (m_useAsyncOp) {
+ m_phase = DECOMPRESSING;
+ createAsyncOp(m_op = async = new AsyncOpZlib(&m_zout, true, Z_NO_FLUSH));
+ async->yield();
+ case DECOMPRESSING:
+ m_status = async->getR();
+ delete async;
+ m_op = async = NULL;
+ } else {
+ m_status = deflate(&m_zout, Z_NO_FLUSH);
+ m_phase = DECOMPRESSING_IDLE;
+ Task::operationYield(NULL, Task::INTERRUPTIBLE);
+ }
+ case DECOMPRESSING_IDLE:
+ EAssert(m_status == Z_OK, "deflate() didn't return Z_OK but %zi", m_status);
+ m_compressed = block_size - m_zout.avail_out;
+ m_phase = WRITING;
+ m_wptr = m_buf;
+ while (m_compressed) {
+ case WRITING:
+ w = m_h->write(m_wptr, m_compressed);
+ if (w <= 0) {
+ m_phase = IDLE;
+ return m_total;
+ }
+ m_compressed -= w;
+ m_wptr += w;
+ }
+ size_t didWrite = m_count - m_zout.avail_in;
+ m_total += didWrite;
+ m_count -= didWrite;
}
- size_t didRead = count - m_zout.avail_in;
- wroteTotal += didRead;
- count -= didRead;
+ break;
+ default:
+ AAssert(false, "Don't call an operation without finishing another.");
}
- return wroteTotal;
+
+ m_phase = IDLE;
+ return m_total;
}
void Balau::ZStream::doFlush(bool finish) {
@@ -192,28 +243,49 @@ void Balau::ZStream::doFlush(bool finish) {
const int block_size = BLOCK_SIZE * (m_useAsyncOp ? 16 : 1);
void * buf = m_useAsyncOp ? malloc(block_size) : alloca(block_size);
- m_zout.next_in = NULL;
- m_zout.avail_in = 0;
- int r;
- do {
- m_zout.next_out = (Bytef *) buf;
- m_zout.avail_out = block_size;
- if (m_useAsyncOp) {
- Events::Custom evt;
- createAsyncOp(new AsyncOpDeflate(&m_zout, &r, finish ? Z_FINISH : Z_SYNC_FLUSH, &evt));
- Task::operationYield(&evt);
- } else {
- r = deflate(&m_zout, finish ? Z_FINISH : Z_SYNC_FLUSH);
- Task::operationYield();
- }
- EAssert((r == Z_OK) || ((r == Z_STREAM_END) && finish), "deflate() didn't return Z_OK or Z_STREAM_END, but %i (finish = %s)", r, finish ? "true" : "false");
- size_t compressed = block_size - m_zout.avail_out;
- if (compressed) {
- size_t w = m_h->forceWrite(buf, compressed);
- if (m_useAsyncOp)
- free(buf);
- if (w <= 0)
- return;
- }
- } while (r == Z_OK && finish);
+ AsyncOpZlib * async = dynamic_cast<AsyncOpZlib *>(m_op);
+ ssize_t w = 0;
+
+ switch (m_phase) {
+ case IDLE:
+ m_zout.next_in = NULL;
+ m_zout.avail_in = 0;
+ do {
+ m_zout.next_out = (Bytef *) m_buf;
+ m_zout.avail_out = block_size;
+ if (m_useAsyncOp) {
+ m_phase = DECOMPRESSING;
+ createAsyncOp(m_op = async = new AsyncOpZlib(&m_zout, true, finish ? Z_FINISH : Z_SYNC_FLUSH));
+ async->yield();
+ case DECOMPRESSING:
+ m_status = async->getR();
+ delete async;
+ m_op = async = NULL;
+ } else {
+ m_status = deflate(&m_zout, finish ? Z_FINISH : Z_SYNC_FLUSH);
+ m_phase = DECOMPRESSING_IDLE;
+ Task::operationYield(NULL, Task::INTERRUPTIBLE);
+ }
+ case DECOMPRESSING_IDLE:
+ EAssert((m_status == Z_OK) || ((m_status == Z_STREAM_END) && finish), "deflate() didn't return Z_OK or Z_STREAM_END, but %zi (finish = %s)", m_status, finish ? "true" : "false");
+ m_compressed = block_size - m_zout.avail_out;
+ m_phase = WRITING;
+ m_wptr = m_buf;
+ while (m_compressed) {
+ case WRITING:
+ w = m_h->write(m_wptr, m_compressed);
+ if (w <= 0) {
+ m_phase = IDLE;
+ return;
+ }
+ m_compressed -= w;
+ m_wptr += w;
+ }
+ } while (m_status == Z_OK && finish);
+ break;
+ default:
+ AAssert(false, "Don't call an operation without finishing another.");
+ }
+
+ m_phase = IDLE;
}