From 56b8ca62c666a1b747766a5ede70db070977ad37 Mon Sep 17 00:00:00 2001 From: Nicolas 'Pixel' Noble Date: Mon, 21 Jan 2013 00:53:12 -0800 Subject: ZStreams's close, read and write are now fully interruptible. Will need some testing though. --- src/ZHandle.cc | 322 +++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 197 insertions(+), 125 deletions(-) (limited to 'src/ZHandle.cc') diff --git a/src/ZHandle.cc b/src/ZHandle.cc index 8153343..ec08990 100644 --- a/src/ZHandle.cc +++ b/src/ZHandle.cc @@ -24,17 +24,28 @@ Balau::ZStream::ZStream(const IO & h, int level, header_t header) : m_h( } void Balau::ZStream::close() throw (GeneralException) { - if (m_h->canWrite()) - finish(); - inflateEnd(&m_zin); - deflateEnd(&m_zout); - if (m_in) { - free(m_in); - m_in = NULL; + switch (m_phase) { + case IDLE: + case WRITING: + case COMPRESSING: + if (m_h->canWrite()) + finish(); + inflateEnd(&m_zin); + deflateEnd(&m_zout); + if (m_buf) { + free(m_buf); + m_buf = NULL; + } + m_closed = true; + m_phase = CLOSING; + case CLOSING: + if (!m_detached) + m_h->close(); + m_phase = IDLE; + return; + default: + AAssert(false, "Wrong phase"); } - if (!m_detached) - m_h->close(); - m_closed = true; } bool Balau::ZStream::isClosed() { @@ -61,26 +72,46 @@ const char * Balau::ZStream::getName() { namespace { -class AsyncOpInflate : public Balau::AsyncOperation { +class AsyncOpZlib : public Balau::AsyncOperation { public: - AsyncOpInflate(z_stream * zin, int * r, Balau::Events::Custom * evt) : m_zin(zin), m_r(r), m_evt(evt) { } + AsyncOpZlib(z_stream * z, bool deflate, int flush) : m_z(z), m_deflate(deflate), m_flush(flush) { } virtual bool needsMainQueue() { return false; } virtual bool needsFinishWorker() { return true; } virtual void run() { - *m_r = inflate(m_zin, Z_SYNC_FLUSH); - } - virtual void done() { - m_evt->doSignal(); - delete this; + if (m_deflate) + m_r = deflate(m_z, m_flush); + else + m_r = inflate(m_z, Z_SYNC_FLUSH); } + virtual void done() { m_evt.doSignal(); } + bool gotSignal() { return m_evt.gotSignal(); } + int getR() { return m_r; } + void yield() { Balau::Task::operationYield(&m_evt, Balau::Task::INTERRUPTIBLE); } private: - z_stream * m_zin; - int * m_r; - Balau::Events::Custom * m_evt; + z_stream * m_z; + int m_r, m_flush; + bool m_deflate; + Balau::Events::Custom m_evt; }; }; +bool Balau::ZStream::isPendingComplete() { + AsyncOpZlib * async = dynamic_cast(m_op); + + switch (m_phase) { + case READING: + case WRITING: + return m_h->isPendingComplete(); + case COMPRESSING: + case DECOMPRESSING: + IAssert(async, "Shouldn't not have a cbResults here..."); + return async->gotSignal(); + default: + return true; + } +} + static const int BLOCK_SIZE = 1024; ssize_t Balau::ZStream::read(void * buf, size_t count) throw (GeneralException) { @@ -89,64 +120,61 @@ ssize_t Balau::ZStream::read(void * buf, size_t count) throw (GeneralException) AAssert(m_h->canRead(), "Can't call ZStream::read on a non-readable handle."); - size_t readTotal = 0; const int block_size = BLOCK_SIZE * (m_useAsyncOp ? 16 : 1); - m_zin.next_out = (Bytef *) buf; - m_zin.avail_out = count; - if (!m_in) { - m_zin.next_in = m_in = (uint8_t *) malloc(block_size); - m_zin.avail_in = 0; - } - while ((count != 0) && !m_h->isClosed() && !m_h->isEOF()) { - if (m_zin.avail_in == 0) { - m_zin.next_in = m_in; - size_t r = m_h->read(m_in, block_size); - if (r <= 0) - return readTotal; - m_zin.avail_in = r; - } - int r = 0; - if (m_useAsyncOp) { - Events::Custom evt; - createAsyncOp(new AsyncOpInflate(&m_zin, &r, &evt)); - Task::operationYield(&evt); - } else { - r = inflate(&m_zin, Z_SYNC_FLUSH); - Task::operationYield(); + AsyncOpZlib * async = dynamic_cast(m_op); + + switch (m_phase) { + case IDLE: + m_total = 0; + m_count = count; + m_zin.next_out = (Bytef *) buf; + m_zin.avail_out = count; + if (!m_buf) { + m_zin.next_in = m_buf = (uint8_t *) malloc(block_size); + m_zin.avail_in = 0; } - size_t didRead = count - m_zin.avail_out; - readTotal += didRead; - count -= didRead; - if (r == Z_STREAM_END) { - m_eof = true; - break; + while ((m_count != 0) && !m_h->isClosed() && !m_h->isEOF()) { + if (m_zin.avail_in == 0) { + m_zin.next_in = m_buf; + m_phase = READING; + case READING: + m_status = m_h->read(m_buf, block_size); + if (m_status <= 0) + return m_total; + m_zin.avail_in = m_status; + } + if (m_useAsyncOp) { + m_phase = COMPRESSING; + createAsyncOp(m_op = async = new AsyncOpZlib(&m_zin, false, 0)); + async->yield(); + case COMPRESSING: + m_status = async->getR(); + delete async; + m_op = async = NULL; + } else { + m_status = inflate(&m_zin, Z_SYNC_FLUSH); + m_phase = COMPRESSING_IDLE; + Task::operationYield(NULL, Task::INTERRUPTIBLE); + } + case COMPRESSING_IDLE: + EAssert(m_status == Z_OK || m_status == Z_STREAM_END, "inflate() didn't return Z_OK or Z_STREAM_END but %zi", m_status); + ssize_t didRead = m_count - m_zin.avail_out; + m_total += didRead; + m_count -= didRead; + if (m_status == Z_STREAM_END) { + m_eof = true; + m_phase = IDLE; + return m_total; + } } - EAssert(r == Z_OK, "inflate() didn't return Z_OK but %i", r); + break; + default: + AAssert(false, "Don't call an operation without finishing another."); } - return readTotal; -} - -namespace { -class AsyncOpDeflate : public Balau::AsyncOperation { - public: - AsyncOpDeflate(z_stream * zout, int * r, int flush, Balau::Events::Custom * evt) : m_zout(zout), m_r(r), m_flush(flush), m_evt(evt) { } - virtual bool needsMainQueue() { return false; } - virtual bool needsFinishWorker() { return true; } - virtual void run() { - *m_r = deflate(m_zout, m_flush); - } - virtual void done() { - m_evt->doSignal(); - delete this; - } - private: - z_stream * m_zout; - int * m_r, m_flush; - Balau::Events::Custom * m_evt; -}; - -}; + m_phase = IDLE; + return m_total; +} ssize_t Balau::ZStream::write(const void * buf, size_t count) throw (GeneralException) { if (m_closed || m_eof) @@ -154,37 +182,60 @@ ssize_t Balau::ZStream::write(const void * buf, size_t count) throw (GeneralExce AAssert(m_h->canWrite(), "Can't call ZStream::write on a non-writable handle."); - size_t wroteTotal = 0; const int block_size = BLOCK_SIZE * (m_useAsyncOp ? 16 : 1); - m_zout.next_in = (Bytef *) const_cast(buf); - m_zout.avail_in = count; - void * obuf = m_useAsyncOp ? malloc(block_size) : alloca(block_size); - while ((count != 0) && !m_h->isClosed()) { - m_zout.next_out = (Bytef *) obuf; - m_zout.avail_out = block_size; - int r = 0; - if (m_useAsyncOp) { - Events::Custom evt; - createAsyncOp(new AsyncOpDeflate(&m_zout, &r, Z_NO_FLUSH, &evt)); - Task::operationYield(&evt); - } else { - r = deflate(&m_zout, Z_NO_FLUSH); - Task::operationYield(); - } - EAssert(r == Z_OK, "deflate() didn't return Z_OK but %i", r); - size_t compressed = block_size - m_zout.avail_out; - if (compressed) { - size_t w = m_h->forceWrite(obuf, compressed); - if (m_useAsyncOp) - free(obuf); - if (w <= 0) - return wroteTotal; + ssize_t w; + AsyncOpZlib * async = dynamic_cast(m_op); + + switch (m_phase) { + case IDLE: + m_total = 0; + m_count = count; + m_zout.next_in = (Bytef *) const_cast(buf); + m_zout.avail_in = count; + if (!m_buf) + m_buf = (uint8_t *) malloc(block_size); + while ((m_count != 0) && !m_h->isClosed()) { + m_zout.next_out = (Bytef *) m_buf; + m_zout.avail_out = block_size; + if (m_useAsyncOp) { + m_phase = DECOMPRESSING; + createAsyncOp(m_op = async = new AsyncOpZlib(&m_zout, true, Z_NO_FLUSH)); + async->yield(); + case DECOMPRESSING: + m_status = async->getR(); + delete async; + m_op = async = NULL; + } else { + m_status = deflate(&m_zout, Z_NO_FLUSH); + m_phase = DECOMPRESSING_IDLE; + Task::operationYield(NULL, Task::INTERRUPTIBLE); + } + case DECOMPRESSING_IDLE: + EAssert(m_status == Z_OK, "deflate() didn't return Z_OK but %zi", m_status); + m_compressed = block_size - m_zout.avail_out; + m_phase = WRITING; + m_wptr = m_buf; + while (m_compressed) { + case WRITING: + w = m_h->write(m_wptr, m_compressed); + if (w <= 0) { + m_phase = IDLE; + return m_total; + } + m_compressed -= w; + m_wptr += w; + } + size_t didWrite = m_count - m_zout.avail_in; + m_total += didWrite; + m_count -= didWrite; } - size_t didRead = count - m_zout.avail_in; - wroteTotal += didRead; - count -= didRead; + break; + default: + AAssert(false, "Don't call an operation without finishing another."); } - return wroteTotal; + + m_phase = IDLE; + return m_total; } void Balau::ZStream::doFlush(bool finish) { @@ -192,28 +243,49 @@ void Balau::ZStream::doFlush(bool finish) { const int block_size = BLOCK_SIZE * (m_useAsyncOp ? 16 : 1); void * buf = m_useAsyncOp ? malloc(block_size) : alloca(block_size); - m_zout.next_in = NULL; - m_zout.avail_in = 0; - int r; - do { - m_zout.next_out = (Bytef *) buf; - m_zout.avail_out = block_size; - if (m_useAsyncOp) { - Events::Custom evt; - createAsyncOp(new AsyncOpDeflate(&m_zout, &r, finish ? Z_FINISH : Z_SYNC_FLUSH, &evt)); - Task::operationYield(&evt); - } else { - r = deflate(&m_zout, finish ? Z_FINISH : Z_SYNC_FLUSH); - Task::operationYield(); - } - EAssert((r == Z_OK) || ((r == Z_STREAM_END) && finish), "deflate() didn't return Z_OK or Z_STREAM_END, but %i (finish = %s)", r, finish ? "true" : "false"); - size_t compressed = block_size - m_zout.avail_out; - if (compressed) { - size_t w = m_h->forceWrite(buf, compressed); - if (m_useAsyncOp) - free(buf); - if (w <= 0) - return; - } - } while (r == Z_OK && finish); + AsyncOpZlib * async = dynamic_cast(m_op); + ssize_t w = 0; + + switch (m_phase) { + case IDLE: + m_zout.next_in = NULL; + m_zout.avail_in = 0; + do { + m_zout.next_out = (Bytef *) m_buf; + m_zout.avail_out = block_size; + if (m_useAsyncOp) { + m_phase = DECOMPRESSING; + createAsyncOp(m_op = async = new AsyncOpZlib(&m_zout, true, finish ? Z_FINISH : Z_SYNC_FLUSH)); + async->yield(); + case DECOMPRESSING: + m_status = async->getR(); + delete async; + m_op = async = NULL; + } else { + m_status = deflate(&m_zout, finish ? Z_FINISH : Z_SYNC_FLUSH); + m_phase = DECOMPRESSING_IDLE; + Task::operationYield(NULL, Task::INTERRUPTIBLE); + } + case DECOMPRESSING_IDLE: + EAssert((m_status == Z_OK) || ((m_status == Z_STREAM_END) && finish), "deflate() didn't return Z_OK or Z_STREAM_END, but %zi (finish = %s)", m_status, finish ? "true" : "false"); + m_compressed = block_size - m_zout.avail_out; + m_phase = WRITING; + m_wptr = m_buf; + while (m_compressed) { + case WRITING: + w = m_h->write(m_wptr, m_compressed); + if (w <= 0) { + m_phase = IDLE; + return; + } + m_compressed -= w; + m_wptr += w; + } + } while (m_status == Z_OK && finish); + break; + default: + AAssert(false, "Don't call an operation without finishing another."); + } + + m_phase = IDLE; } -- cgit v1.2.3