summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorNicolas "Pixel" Noble <pixel@nobis-crew.org>2014-08-10 02:58:35 -0700
committerNicolas "Pixel" Noble <pixel@nobis-crew.org>2014-08-10 02:58:35 -0700
commitec0f82bb6ea911baee6b9654f799f037282bf5b9 (patch)
treef18dd08e58c9231de3353544191c1fa2dfdbcb2b /src
parentbd06a6ec9ec3d0e10fe6e09bab4a156ceef6e0c8 (diff)
Introducing notion of filters.
Diffstat (limited to 'src')
-rw-r--r--src/BStream.cc27
-rw-r--r--src/Buffer.cc7
-rw-r--r--src/Handle.cc6
-rw-r--r--src/HttpServer.cc17
-rw-r--r--src/ZHandle.cc54
5 files changed, 29 insertions, 82 deletions
diff --git a/src/BStream.cc b/src/BStream.cc
index 5400a6b..486c21e 100644
--- a/src/BStream.cc
+++ b/src/BStream.cc
@@ -3,31 +3,24 @@
static const int s_blockSize = 16 * 1024;
-Balau::BStream::BStream(const IO<Handle> & h) : m_h(h), m_buffer((uint8_t *) malloc(s_blockSize)) {
- AAssert(m_h->canRead(), "You can't create a buffered stream with a Handle that can't read");
- m_name.set("Stream(%s)", m_h->getName());
- if ((m_h.isA<Buffer>()) || (m_h.isA<BStream>()))
+Balau::BStream::BStream(IO<Handle> h) : Filter(h), m_buffer((uint8_t *) malloc(s_blockSize)) {
+ AAssert(h->canRead(), "You can't create a buffered stream with a Handle that can't read");
+ m_name.set("Stream(%s)", h->getName());
+ if ((h.isA<Buffer>()) || (h.isA<BStream>()))
m_passThru = true;
}
void Balau::BStream::close() throw (Balau::GeneralException) {
- if (!m_detached)
- m_h->close();
+ Filter::close();
free(m_buffer);
m_buffer = NULL;
m_availBytes = 0;
m_cursor = 0;
}
-bool Balau::BStream::isClosed() { return m_closed || m_h->isClosed(); }
-bool Balau::BStream::isEOF() { return (m_availBytes == 0) && m_h->isEOF(); }
-bool Balau::BStream::canRead() { return true; }
-const char * Balau::BStream::getName() { return m_name.to_charp(); }
-off64_t Balau::BStream::getSize() { return m_h->getSize(); }
-
ssize_t Balau::BStream::read(void * _buf, size_t count) throw (Balau::GeneralException) {
if (m_passThru)
- return m_h->read(_buf, count);
+ return getIO()->read(_buf, count);
uint8_t * buf = (uint8_t *) _buf;
size_t copied = 0;
size_t toCopy = count;
@@ -48,11 +41,11 @@ ssize_t Balau::BStream::read(void * _buf, size_t count) throw (Balau::GeneralExc
return copied;
if (count >= s_blockSize)
- return m_h->read(buf, count) + copied;
+ return getIO()->read(buf, count) + copied;
m_cursor = 0;
IAssert(m_availBytes == 0, "At this point, our internal buffer should be empty, but it's not: %zu", m_availBytes);
- ssize_t r = m_h->read(m_buffer, s_blockSize);
+ ssize_t r = getIO()->read(m_buffer, s_blockSize);
EAssert(r >= 0, "BStream got an error while reading: %zi", r);
m_availBytes = r;
@@ -86,8 +79,8 @@ int Balau::BStream::peekNextByte() {
}
Balau::String Balau::BStream::readString(bool putNL) {
- if (m_h.isA<BStream>())
- return m_h.asA<BStream>()->readString(putNL);
+ if (getIO().isA<BStream>())
+ return getIO().asA<BStream>()->readString(putNL);
peekNextByte();
uint8_t * cr, * lf, * nl;
diff --git a/src/Buffer.cc b/src/Buffer.cc
index 766206b..199b8e1 100644
--- a/src/Buffer.cc
+++ b/src/Buffer.cc
@@ -82,10 +82,3 @@ void Balau::Buffer::borrow(const uint8_t * buffer, size_t s) {
m_buffer = const_cast<uint8_t *>(buffer);
m_bufSize = s;
}
-
-bool Balau::Buffer::isClosed() { return false; }
-bool Balau::Buffer::isEOF() { return rtell() == m_bufSize; }
-const char * Balau::Buffer::getName() { return "Buffer"; }
-off64_t Balau::Buffer::getSize() { return m_bufSize; }
-bool Balau::Buffer::canRead() { return true; }
-bool Balau::Buffer::canWrite() { return !m_fromConst; }
diff --git a/src/Handle.cc b/src/Handle.cc
index e73d2fc..2002bc4 100644
--- a/src/Handle.cc
+++ b/src/Handle.cc
@@ -24,12 +24,6 @@ static const char * strerror_r(int errorno, char * buf, size_t bufsize) {
}
#endif
-bool Balau::Handle::canSeek() { return false; }
-bool Balau::Handle::canRead() { return false; }
-bool Balau::Handle::canWrite() { return false; }
-off64_t Balau::Handle::getSize() { return -1; }
-time_t Balau::Handle::getMTime() { return -1; }
-
ssize_t Balau::Handle::read(void * buf, size_t count) throw (GeneralException) {
if (canRead())
throw GeneralException(String("Handle ") + getName() + " can read, but read() not implemented (missing in class " + ClassName(this).c_str() + ")");
diff --git a/src/HttpServer.cc b/src/HttpServer.cc
index b0d4d2a..29571c6 100644
--- a/src/HttpServer.cc
+++ b/src/HttpServer.cc
@@ -11,27 +11,18 @@
#undef ERROR
#endif
-class OutputCheck : public Balau::Handle {
+class OutputCheck : public Balau::Filter {
public:
- OutputCheck(Balau::IO<Balau::Handle> h) : m_h(h), m_wrote(false) { IAssert(m_h->canWrite(), "We haven't been passed a writable Handle to our HttpWorker... ?"); m_name.set("OutputCheck(%s)", m_h->getName()); }
- virtual void close() throw (Balau::GeneralException) { m_h->close(); }
- virtual bool isClosed() { return m_h->isClosed(); }
- virtual bool isEOF() { return m_h->isEOF(); }
- virtual bool canWrite() { return true; }
- virtual bool canRead() { return m_h->canRead(); }
- virtual const char * getName() { return m_name.to_charp(); }
+ OutputCheck(Balau::IO<Balau::Handle> h) : Filter(h), m_wrote(false) { IAssert(h->canWrite(), "We haven't been passed a writable Handle to our HttpWorker... ?"); m_name.set("OutputCheck(%s)", h->getName()); }
virtual ssize_t write(const void * buf, size_t count) throw (Balau::GeneralException) {
if (!count)
return 0;
m_wrote = true;
- return m_h->write(buf, count);
- }
- virtual ssize_t read(void * buf, size_t count) throw (Balau::GeneralException) {
- return m_h->read(buf, count);
+ return Filter::write(buf, count);
}
+ virtual const char * getName() { return m_name.to_charp(); }
bool wrote() { return m_wrote; }
private:
- Balau::IO<Balau::Handle> m_h;
Balau::String m_name;
bool m_wrote;
};
diff --git a/src/ZHandle.cc b/src/ZHandle.cc
index effebe8..d9951f9 100644
--- a/src/ZHandle.cc
+++ b/src/ZHandle.cc
@@ -3,7 +3,7 @@
#include "Async.h"
#include "TaskMan.h"
-Balau::ZStream::ZStream(const IO<Handle> & h, int level, header_t header) : m_h(h) {
+Balau::ZStream::ZStream(IO<Handle> h, int level, header_t header) : Filter(h) {
m_zin.zalloc = m_zout.zalloc = NULL;
m_zin.zfree = m_zout.zfree = NULL;
m_zin.opaque = m_zout.opaque = NULL;
@@ -20,7 +20,7 @@ Balau::ZStream::ZStream(const IO<Handle> & h, int level, header_t header) : m_h(
EAssert(r == Z_OK, "inflateInit2 returned %i", r);
r = deflateInit2(&m_zout, level, Z_DEFLATED, window, 9, Z_DEFAULT_STRATEGY);
EAssert(r == Z_OK, "deflateInit2 returned %i", r);
- m_name.set("ZStream(%s)", m_h->getName());
+ m_name.set("ZStream(%s)", h->getName());
}
void Balau::ZStream::close() throw (GeneralException) {
@@ -29,7 +29,7 @@ void Balau::ZStream::close() throw (GeneralException) {
case WRITING_FINISH:
case COMPRESSING_FINISH:
case COMPRESSING_FINISH_IDLE:
- if (m_h->canWrite())
+ if (getIO()->canWrite())
finish();
inflateEnd(&m_zin);
deflateEnd(&m_zout);
@@ -37,11 +37,9 @@ void Balau::ZStream::close() throw (GeneralException) {
free(m_buf);
m_buf = NULL;
}
- m_closed = true;
m_phase = CLOSING;
case CLOSING:
- if (!m_detached)
- m_h->close();
+ Filter::close();
m_phase = IDLE;
return;
default:
@@ -49,28 +47,6 @@ void Balau::ZStream::close() throw (GeneralException) {
}
}
-bool Balau::ZStream::isClosed() {
- return m_closed;
-}
-
-bool Balau::ZStream::isEOF() {
- if (m_closed || m_eof)
- return true;
- return m_h->isEOF();
-}
-
-bool Balau::ZStream::canRead() {
- return m_h->canRead();
-}
-
-bool Balau::ZStream::canWrite() {
- return m_h->canWrite();
-}
-
-const char * Balau::ZStream::getName() {
- return m_name.to_charp();
-}
-
namespace {
class AsyncOpZlib : public Balau::AsyncOperation {
@@ -105,7 +81,7 @@ bool Balau::ZStream::isPendingComplete() {
case WRITING:
case WRITING_FINISH:
case CLOSING:
- return m_h->isPendingComplete();
+ return getIO()->isPendingComplete();
case COMPRESSING:
case DECOMPRESSING:
case COMPRESSING_FINISH:
@@ -119,10 +95,10 @@ bool Balau::ZStream::isPendingComplete() {
static const int BLOCK_SIZE = 1024;
ssize_t Balau::ZStream::read(void * buf, size_t count) throw (GeneralException) {
- if (m_closed || m_eof)
+ if (isClosed() || m_eof)
return 0;
- AAssert(m_h->canRead(), "Can't call ZStream::read on a non-readable handle.");
+ AAssert(getIO()->canRead(), "Can't call ZStream::read on a non-readable handle.");
const int block_size = BLOCK_SIZE * (m_useAsyncOp ? 16 : 1);
AsyncOpZlib * async = dynamic_cast<AsyncOpZlib *>(m_op);
@@ -137,12 +113,12 @@ ssize_t Balau::ZStream::read(void * buf, size_t count) throw (GeneralException)
m_zin.next_in = m_buf = (uint8_t *) malloc(block_size);
m_zin.avail_in = 0;
}
- while ((m_count != 0) && !m_h->isClosed() && !m_h->isEOF()) {
+ while ((m_count != 0) && !getIO()->isClosed() && !getIO()->isEOF()) {
if (m_zin.avail_in == 0) {
m_zin.next_in = m_buf;
m_phase = READING;
case READING:
- m_status = m_h->read(m_buf, block_size);
+ m_status = getIO()->read(m_buf, block_size);
if (m_status <= 0)
return m_total;
m_zin.avail_in = m_status;
@@ -181,10 +157,10 @@ ssize_t Balau::ZStream::read(void * buf, size_t count) throw (GeneralException)
}
ssize_t Balau::ZStream::write(const void * buf, size_t count) throw (GeneralException) {
- if (m_closed || m_eof)
+ if (isClosed() || m_eof)
return 0;
- AAssert(m_h->canWrite(), "Can't call ZStream::write on a non-writable handle.");
+ AAssert(getIO()->canWrite(), "Can't call ZStream::write on a non-writable handle.");
const int block_size = BLOCK_SIZE * (m_useAsyncOp ? 16 : 1);
ssize_t w;
@@ -198,7 +174,7 @@ ssize_t Balau::ZStream::write(const void * buf, size_t count) throw (GeneralExce
m_zout.avail_in = count;
if (!m_buf)
m_buf = (uint8_t *) malloc(block_size);
- while ((m_count != 0) && !m_h->isClosed()) {
+ while ((m_count != 0) && !getIO()->isClosed()) {
m_zout.next_out = (Bytef *) m_buf;
m_zout.avail_out = block_size;
if (m_useAsyncOp) {
@@ -221,7 +197,7 @@ ssize_t Balau::ZStream::write(const void * buf, size_t count) throw (GeneralExce
m_wptr = m_buf;
while (m_compressed) {
case WRITING:
- w = m_h->write(m_wptr, m_compressed);
+ w = getIO()->write(m_wptr, m_compressed);
if (w <= 0) {
m_phase = IDLE;
return m_total;
@@ -243,7 +219,7 @@ ssize_t Balau::ZStream::write(const void * buf, size_t count) throw (GeneralExce
}
void Balau::ZStream::doFlush(bool finish) {
- AAssert(m_h->canWrite(), "Can't call ZStream::doFlush on a non-writable handle.");
+ AAssert(getIO()->canWrite(), "Can't call ZStream::doFlush on a non-writable handle.");
const int block_size = BLOCK_SIZE * (m_useAsyncOp ? 16 : 1);
void * buf = m_useAsyncOp ? malloc(block_size) : alloca(block_size);
@@ -277,7 +253,7 @@ void Balau::ZStream::doFlush(bool finish) {
m_wptr = m_buf;
while (m_compressed) {
case WRITING_FINISH:
- w = m_h->write(m_wptr, m_compressed);
+ w = getIO()->write(m_wptr, m_compressed);
if (w <= 0) {
m_phase = IDLE;
return;