summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNicolas "Pixel" Noble <pixel@nobis-crew.org>2012-09-02 15:37:26 -0700
committerNicolas "Pixel" Noble <pixel@nobis-crew.org>2012-09-02 15:37:26 -0700
commit0dc59daa2eeb30523208a888fd3e25d421083136 (patch)
tree68504cb53e87b4e2c5422404df39e79d1a38cc6e
parent9cb4c8073edea87d52bd0cf88f2317ead353eec2 (diff)
ZStreams are now using async operations to compress or decompress.
-rw-r--r--includes/ZHandle.h3
-rw-r--r--src/ZHandle.cc102
2 files changed, 91 insertions, 14 deletions
diff --git a/includes/ZHandle.h b/includes/ZHandle.h
index e972466..5750365 100644
--- a/includes/ZHandle.h
+++ b/includes/ZHandle.h
@@ -23,14 +23,15 @@ class ZStream : public Handle {
virtual ssize_t write(const void * buf, size_t count) throw (GeneralException);
void detach() { m_detached = true; }
void flush() { doFlush(false); }
+ void setUseAsyncOp(bool useAsyncOp) { m_useAsyncOp = useAsyncOp; }
private:
void finish() { doFlush(true); }
void doFlush(bool finish);
IO<Handle> m_h;
z_stream m_zin, m_zout;
- bool m_detached = false, m_closed = false, m_eof = false;
String m_name;
uint8_t * m_in = NULL;
+ bool m_detached = false, m_closed = false, m_eof = false, m_useAsyncOp = true;
};
};
diff --git a/src/ZHandle.cc b/src/ZHandle.cc
index aa7bde7..8153343 100644
--- a/src/ZHandle.cc
+++ b/src/ZHandle.cc
@@ -1,5 +1,7 @@
#include "ZHandle.h"
#include "Task.h"
+#include "Async.h"
+#include "TaskMan.h"
Balau::ZStream::ZStream(const IO<Handle> & h, int level, header_t header) : m_h(h) {
m_zin.zalloc = m_zout.zalloc = NULL;
@@ -57,6 +59,28 @@ const char * Balau::ZStream::getName() {
return m_name.to_charp();
}
+namespace {
+
+class AsyncOpInflate : public Balau::AsyncOperation {
+ public:
+ AsyncOpInflate(z_stream * zin, int * r, Balau::Events::Custom * evt) : m_zin(zin), m_r(r), m_evt(evt) { }
+ virtual bool needsMainQueue() { return false; }
+ virtual bool needsFinishWorker() { return true; }
+ virtual void run() {
+ *m_r = inflate(m_zin, Z_SYNC_FLUSH);
+ }
+ virtual void done() {
+ m_evt->doSignal();
+ delete this;
+ }
+ private:
+ z_stream * m_zin;
+ int * m_r;
+ Balau::Events::Custom * m_evt;
+};
+
+};
+
static const int BLOCK_SIZE = 1024;
ssize_t Balau::ZStream::read(void * buf, size_t count) throw (GeneralException) {
@@ -66,22 +90,30 @@ ssize_t Balau::ZStream::read(void * buf, size_t count) throw (GeneralException)
AAssert(m_h->canRead(), "Can't call ZStream::read on a non-readable handle.");
size_t readTotal = 0;
+ const int block_size = BLOCK_SIZE * (m_useAsyncOp ? 16 : 1);
m_zin.next_out = (Bytef *) buf;
m_zin.avail_out = count;
if (!m_in) {
- m_zin.next_in = m_in = (uint8_t *) malloc(BLOCK_SIZE);
+ m_zin.next_in = m_in = (uint8_t *) malloc(block_size);
m_zin.avail_in = 0;
}
while ((count != 0) && !m_h->isClosed() && !m_h->isEOF()) {
if (m_zin.avail_in == 0) {
m_zin.next_in = m_in;
- size_t r = m_h->read(m_in, BLOCK_SIZE);
+ size_t r = m_h->read(m_in, block_size);
if (r <= 0)
return readTotal;
m_zin.avail_in = r;
}
- Task::operationYield();
- int r = inflate(&m_zin, Z_SYNC_FLUSH);
+ int r = 0;
+ if (m_useAsyncOp) {
+ Events::Custom evt;
+ createAsyncOp(new AsyncOpInflate(&m_zin, &r, &evt));
+ Task::operationYield(&evt);
+ } else {
+ r = inflate(&m_zin, Z_SYNC_FLUSH);
+ Task::operationYield();
+ }
size_t didRead = count - m_zin.avail_out;
readTotal += didRead;
count -= didRead;
@@ -94,6 +126,28 @@ ssize_t Balau::ZStream::read(void * buf, size_t count) throw (GeneralException)
return readTotal;
}
+namespace {
+
+class AsyncOpDeflate : public Balau::AsyncOperation {
+ public:
+ AsyncOpDeflate(z_stream * zout, int * r, int flush, Balau::Events::Custom * evt) : m_zout(zout), m_r(r), m_flush(flush), m_evt(evt) { }
+ virtual bool needsMainQueue() { return false; }
+ virtual bool needsFinishWorker() { return true; }
+ virtual void run() {
+ *m_r = deflate(m_zout, m_flush);
+ }
+ virtual void done() {
+ m_evt->doSignal();
+ delete this;
+ }
+ private:
+ z_stream * m_zout;
+ int * m_r, m_flush;
+ Balau::Events::Custom * m_evt;
+};
+
+};
+
ssize_t Balau::ZStream::write(const void * buf, size_t count) throw (GeneralException) {
if (m_closed || m_eof)
return 0;
@@ -101,18 +155,28 @@ ssize_t Balau::ZStream::write(const void * buf, size_t count) throw (GeneralExce
AAssert(m_h->canWrite(), "Can't call ZStream::write on a non-writable handle.");
size_t wroteTotal = 0;
+ const int block_size = BLOCK_SIZE * (m_useAsyncOp ? 16 : 1);
m_zout.next_in = (Bytef *) const_cast<void *>(buf);
m_zout.avail_in = count;
- void * obuf = alloca(BLOCK_SIZE);
+ void * obuf = m_useAsyncOp ? malloc(block_size) : alloca(block_size);
while ((count != 0) && !m_h->isClosed()) {
m_zout.next_out = (Bytef *) obuf;
- m_zout.avail_out = BLOCK_SIZE;
- int r = deflate(&m_zout, Z_NO_FLUSH);
+ m_zout.avail_out = block_size;
+ int r = 0;
+ if (m_useAsyncOp) {
+ Events::Custom evt;
+ createAsyncOp(new AsyncOpDeflate(&m_zout, &r, Z_NO_FLUSH, &evt));
+ Task::operationYield(&evt);
+ } else {
+ r = deflate(&m_zout, Z_NO_FLUSH);
+ Task::operationYield();
+ }
EAssert(r == Z_OK, "deflate() didn't return Z_OK but %i", r);
- Task::operationYield();
- size_t compressed = BLOCK_SIZE - m_zout.avail_out;
+ size_t compressed = block_size - m_zout.avail_out;
if (compressed) {
size_t w = m_h->forceWrite(obuf, compressed);
+ if (m_useAsyncOp)
+ free(obuf);
if (w <= 0)
return wroteTotal;
}
@@ -124,18 +188,30 @@ ssize_t Balau::ZStream::write(const void * buf, size_t count) throw (GeneralExce
}
void Balau::ZStream::doFlush(bool finish) {
- void * buf = alloca(BLOCK_SIZE);
+ AAssert(m_h->canWrite(), "Can't call ZStream::doFlush on a non-writable handle.");
+
+ const int block_size = BLOCK_SIZE * (m_useAsyncOp ? 16 : 1);
+ void * buf = m_useAsyncOp ? malloc(block_size) : alloca(block_size);
m_zout.next_in = NULL;
m_zout.avail_in = 0;
int r;
do {
m_zout.next_out = (Bytef *) buf;
- m_zout.avail_out = BLOCK_SIZE;
- r = deflate(&m_zout, finish ? Z_FINISH : Z_SYNC_FLUSH);
+ m_zout.avail_out = block_size;
+ if (m_useAsyncOp) {
+ Events::Custom evt;
+ createAsyncOp(new AsyncOpDeflate(&m_zout, &r, finish ? Z_FINISH : Z_SYNC_FLUSH, &evt));
+ Task::operationYield(&evt);
+ } else {
+ r = deflate(&m_zout, finish ? Z_FINISH : Z_SYNC_FLUSH);
+ Task::operationYield();
+ }
EAssert((r == Z_OK) || ((r == Z_STREAM_END) && finish), "deflate() didn't return Z_OK or Z_STREAM_END, but %i (finish = %s)", r, finish ? "true" : "false");
- size_t compressed = BLOCK_SIZE - m_zout.avail_out;
+ size_t compressed = block_size - m_zout.avail_out;
if (compressed) {
size_t w = m_h->forceWrite(buf, compressed);
+ if (m_useAsyncOp)
+ free(buf);
if (w <= 0)
return;
}