From 0dc59daa2eeb30523208a888fd3e25d421083136 Mon Sep 17 00:00:00 2001 From: "Nicolas \"Pixel\" Noble" Date: Sun, 2 Sep 2012 15:37:26 -0700 Subject: ZStreams are now using async operations to compress or decompress. --- includes/ZHandle.h | 3 +- src/ZHandle.cc | 102 ++++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 91 insertions(+), 14 deletions(-) diff --git a/includes/ZHandle.h b/includes/ZHandle.h index e972466..5750365 100644 --- a/includes/ZHandle.h +++ b/includes/ZHandle.h @@ -23,14 +23,15 @@ class ZStream : public Handle { virtual ssize_t write(const void * buf, size_t count) throw (GeneralException); void detach() { m_detached = true; } void flush() { doFlush(false); } + void setUseAsyncOp(bool useAsyncOp) { m_useAsyncOp = useAsyncOp; } private: void finish() { doFlush(true); } void doFlush(bool finish); IO m_h; z_stream m_zin, m_zout; - bool m_detached = false, m_closed = false, m_eof = false; String m_name; uint8_t * m_in = NULL; + bool m_detached = false, m_closed = false, m_eof = false, m_useAsyncOp = true; }; }; diff --git a/src/ZHandle.cc b/src/ZHandle.cc index aa7bde7..8153343 100644 --- a/src/ZHandle.cc +++ b/src/ZHandle.cc @@ -1,5 +1,7 @@ #include "ZHandle.h" #include "Task.h" +#include "Async.h" +#include "TaskMan.h" Balau::ZStream::ZStream(const IO & h, int level, header_t header) : m_h(h) { m_zin.zalloc = m_zout.zalloc = NULL; @@ -57,6 +59,28 @@ const char * Balau::ZStream::getName() { return m_name.to_charp(); } +namespace { + +class AsyncOpInflate : public Balau::AsyncOperation { + public: + AsyncOpInflate(z_stream * zin, int * r, Balau::Events::Custom * evt) : m_zin(zin), m_r(r), m_evt(evt) { } + virtual bool needsMainQueue() { return false; } + virtual bool needsFinishWorker() { return true; } + virtual void run() { + *m_r = inflate(m_zin, Z_SYNC_FLUSH); + } + virtual void done() { + m_evt->doSignal(); + delete this; + } + private: + z_stream * m_zin; + int * m_r; + Balau::Events::Custom * m_evt; +}; + +}; + static const int BLOCK_SIZE = 1024; ssize_t Balau::ZStream::read(void * buf, size_t count) throw (GeneralException) { @@ -66,22 +90,30 @@ ssize_t Balau::ZStream::read(void * buf, size_t count) throw (GeneralException) AAssert(m_h->canRead(), "Can't call ZStream::read on a non-readable handle."); size_t readTotal = 0; + const int block_size = BLOCK_SIZE * (m_useAsyncOp ? 16 : 1); m_zin.next_out = (Bytef *) buf; m_zin.avail_out = count; if (!m_in) { - m_zin.next_in = m_in = (uint8_t *) malloc(BLOCK_SIZE); + m_zin.next_in = m_in = (uint8_t *) malloc(block_size); m_zin.avail_in = 0; } while ((count != 0) && !m_h->isClosed() && !m_h->isEOF()) { if (m_zin.avail_in == 0) { m_zin.next_in = m_in; - size_t r = m_h->read(m_in, BLOCK_SIZE); + size_t r = m_h->read(m_in, block_size); if (r <= 0) return readTotal; m_zin.avail_in = r; } - Task::operationYield(); - int r = inflate(&m_zin, Z_SYNC_FLUSH); + int r = 0; + if (m_useAsyncOp) { + Events::Custom evt; + createAsyncOp(new AsyncOpInflate(&m_zin, &r, &evt)); + Task::operationYield(&evt); + } else { + r = inflate(&m_zin, Z_SYNC_FLUSH); + Task::operationYield(); + } size_t didRead = count - m_zin.avail_out; readTotal += didRead; count -= didRead; @@ -94,6 +126,28 @@ ssize_t Balau::ZStream::read(void * buf, size_t count) throw (GeneralException) return readTotal; } +namespace { + +class AsyncOpDeflate : public Balau::AsyncOperation { + public: + AsyncOpDeflate(z_stream * zout, int * r, int flush, Balau::Events::Custom * evt) : m_zout(zout), m_r(r), m_flush(flush), m_evt(evt) { } + virtual bool needsMainQueue() { return false; } + virtual bool needsFinishWorker() { return true; } + virtual void run() { + *m_r = deflate(m_zout, m_flush); + } + virtual void done() { + m_evt->doSignal(); + delete this; + } + private: + z_stream * m_zout; + int * m_r, m_flush; + Balau::Events::Custom * m_evt; +}; + +}; + ssize_t Balau::ZStream::write(const void * buf, size_t count) throw (GeneralException) { if (m_closed || m_eof) return 0; @@ -101,18 +155,28 @@ ssize_t Balau::ZStream::write(const void * buf, size_t count) throw (GeneralExce AAssert(m_h->canWrite(), "Can't call ZStream::write on a non-writable handle."); size_t wroteTotal = 0; + const int block_size = BLOCK_SIZE * (m_useAsyncOp ? 16 : 1); m_zout.next_in = (Bytef *) const_cast(buf); m_zout.avail_in = count; - void * obuf = alloca(BLOCK_SIZE); + void * obuf = m_useAsyncOp ? malloc(block_size) : alloca(block_size); while ((count != 0) && !m_h->isClosed()) { m_zout.next_out = (Bytef *) obuf; - m_zout.avail_out = BLOCK_SIZE; - int r = deflate(&m_zout, Z_NO_FLUSH); + m_zout.avail_out = block_size; + int r = 0; + if (m_useAsyncOp) { + Events::Custom evt; + createAsyncOp(new AsyncOpDeflate(&m_zout, &r, Z_NO_FLUSH, &evt)); + Task::operationYield(&evt); + } else { + r = deflate(&m_zout, Z_NO_FLUSH); + Task::operationYield(); + } EAssert(r == Z_OK, "deflate() didn't return Z_OK but %i", r); - Task::operationYield(); - size_t compressed = BLOCK_SIZE - m_zout.avail_out; + size_t compressed = block_size - m_zout.avail_out; if (compressed) { size_t w = m_h->forceWrite(obuf, compressed); + if (m_useAsyncOp) + free(obuf); if (w <= 0) return wroteTotal; } @@ -124,18 +188,30 @@ ssize_t Balau::ZStream::write(const void * buf, size_t count) throw (GeneralExce } void Balau::ZStream::doFlush(bool finish) { - void * buf = alloca(BLOCK_SIZE); + AAssert(m_h->canWrite(), "Can't call ZStream::doFlush on a non-writable handle."); + + const int block_size = BLOCK_SIZE * (m_useAsyncOp ? 16 : 1); + void * buf = m_useAsyncOp ? malloc(block_size) : alloca(block_size); m_zout.next_in = NULL; m_zout.avail_in = 0; int r; do { m_zout.next_out = (Bytef *) buf; - m_zout.avail_out = BLOCK_SIZE; - r = deflate(&m_zout, finish ? Z_FINISH : Z_SYNC_FLUSH); + m_zout.avail_out = block_size; + if (m_useAsyncOp) { + Events::Custom evt; + createAsyncOp(new AsyncOpDeflate(&m_zout, &r, finish ? Z_FINISH : Z_SYNC_FLUSH, &evt)); + Task::operationYield(&evt); + } else { + r = deflate(&m_zout, finish ? Z_FINISH : Z_SYNC_FLUSH); + Task::operationYield(); + } EAssert((r == Z_OK) || ((r == Z_STREAM_END) && finish), "deflate() didn't return Z_OK or Z_STREAM_END, but %i (finish = %s)", r, finish ? "true" : "false"); - size_t compressed = BLOCK_SIZE - m_zout.avail_out; + size_t compressed = block_size - m_zout.avail_out; if (compressed) { size_t w = m_h->forceWrite(buf, compressed); + if (m_useAsyncOp) + free(buf); if (w <= 0) return; } -- cgit v1.2.3