summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile1
-rw-r--r--includes/BStream.h27
-rw-r--r--src/BStream.cc137
-rw-r--r--tests/test-Handles.cc27
4 files changed, 190 insertions, 2 deletions
diff --git a/Makefile b/Makefile
index fabb147..2ba6284 100644
--- a/Makefile
+++ b/Makefile
@@ -122,6 +122,7 @@ Input.cc \
Output.cc \
Socket.cc \
Buffer.cc \
+BStream.cc \
\
Task.cc \
TaskMan.cc \
diff --git a/includes/BStream.h b/includes/BStream.h
new file mode 100644
index 0000000..73b7a96
--- /dev/null
+++ b/includes/BStream.h
@@ -0,0 +1,27 @@
+#pragma once
+
+#include <Handle.h>
+
+namespace Balau {
+
+class BStream : public Handle {
+ public:
+ BStream(const IO<Handle> & h);
+ virtual void close() throw (GeneralException);
+ virtual bool isClosed();
+ virtual bool isEOF();
+ virtual bool canRead();
+ virtual const char * getName();
+ virtual ssize_t read(void * buf, size_t count) throw (GeneralException);
+ virtual off_t getSize();
+ int peekNextByte();
+ String readString(bool putNL = false);
+ private:
+ IO<Handle> m_h;
+ uint8_t * m_buffer;
+ size_t m_availBytes;
+ size_t m_cursor;
+ String m_name;
+};
+
+};
diff --git a/src/BStream.cc b/src/BStream.cc
new file mode 100644
index 0000000..148e930
--- /dev/null
+++ b/src/BStream.cc
@@ -0,0 +1,137 @@
+#include "BStream.h"
+
+static const int s_blockSize = 16 * 1024;
+
+Balau::BStream::BStream(const IO<Handle> & h) : m_h(h), m_buffer((uint8_t *) malloc(s_blockSize)), m_availBytes(0), m_cursor(0) {
+ Assert(m_h->canRead());
+ m_name.set("Stream(%s)", m_h->getName());
+}
+
+void Balau::BStream::close() throw (Balau::GeneralException) {
+ m_h->close();
+ free(m_buffer);
+ m_availBytes = 0;
+ m_cursor = 0;
+}
+
+bool Balau::BStream::isClosed() { return m_h->isClosed(); }
+bool Balau::BStream::isEOF() { return m_availBytes == 0 && m_h->isEOF(); }
+bool Balau::BStream::canRead() { return true; }
+const char * Balau::BStream::getName() { return m_name.to_charp(); }
+off_t Balau::BStream::getSize() { return m_h->getSize(); }
+
+ssize_t Balau::BStream::read(void * _buf, size_t count) throw (Balau::GeneralException) {
+ uint8_t * buf = (uint8_t *) _buf;
+ size_t copied = 0;
+ size_t toCopy = count;
+
+ if (m_availBytes != 0) {
+ if (toCopy > m_availBytes)
+ toCopy = m_availBytes;
+ memcpy(buf, m_buffer + m_cursor, toCopy);
+ count -= toCopy;
+ m_cursor += toCopy;
+ m_availBytes -= toCopy;
+ copied = toCopy;
+ buf += toCopy;
+ toCopy = count;
+ }
+
+ if (count == 0)
+ return copied;
+
+ if (count >= s_blockSize)
+ return m_h->read(buf, count) + copied;
+
+ m_cursor = 0;
+ Assert(m_availBytes == 0);
+ ssize_t r = m_h->read(m_buffer, s_blockSize);
+ Assert(r >= 0);
+ m_availBytes = r;
+
+ if (toCopy > m_availBytes)
+ toCopy = m_availBytes;
+ if (toCopy == 0)
+ return 0;
+ memcpy(buf, m_buffer, toCopy);
+ m_cursor += toCopy;
+ m_availBytes -= toCopy;
+ copied += toCopy;
+
+ return copied;
+}
+
+int Balau::BStream::peekNextByte() {
+ if (m_availBytes == 0) {
+ uint8_t b;
+ ssize_t r = read(&b, 1);
+ if (!r)
+ return -1;
+ Assert(r == 1);
+ Assert(m_cursor > 0);
+ Assert(m_availBytes < s_blockSize);
+ m_cursor--;
+ m_availBytes++;
+ }
+
+ return m_buffer[m_cursor];
+}
+
+Balau::String Balau::BStream::readString(bool putNL) {
+ peekNextByte();
+ uint8_t * cr, * lf, * nl;
+ String ret;
+ size_t chunkSize = 0;
+
+ cr = (uint8_t *) memchr(m_buffer + m_cursor, '\r', m_availBytes);
+ lf = (uint8_t *) memchr(m_buffer + m_cursor, '\n', m_availBytes);
+ if (cr && lf) {
+ nl = cr;
+ if (lf < cr)
+ nl = lf;
+ } else if (!cr) {
+ nl = lf;
+ } else {
+ nl = cr;
+ }
+ while (!nl) {
+ chunkSize = m_availBytes;
+ ret += String((const char *) m_buffer + m_cursor, chunkSize);
+ m_availBytes -= chunkSize;
+ m_cursor += chunkSize;
+ if (isClosed() || isEOF())
+ return ret;
+ peekNextByte();
+ Assert(m_cursor == 0);
+ cr = (uint8_t *) memchr(m_buffer, '\r', m_availBytes);
+ lf = (uint8_t *) memchr(m_buffer, '\n', m_availBytes);
+ if (cr && lf) {
+ nl = cr;
+ if (lf < cr)
+ nl = lf;
+ } else if (!cr) {
+ nl = lf;
+ } else {
+ nl = cr;
+ }
+ }
+
+ chunkSize = nl - (m_buffer + m_cursor);
+ ret += String((const char *) m_buffer + m_cursor, chunkSize);
+
+ m_availBytes -= chunkSize;
+ m_cursor += chunkSize;
+
+ char b;
+ read(&b, 1);
+ if (putNL)
+ ret += String(&b, 1);
+
+ if ((b == '\r') && (peekNextByte() == '\n')) {
+ read(&b, 1);
+ if (putNL)
+ ret += String(&b, 1);
+ }
+
+ return ret;
+}
diff --git a/tests/test-Handles.cc b/tests/test-Handles.cc
index fd05c76..f185b60 100644
--- a/tests/test-Handles.cc
+++ b/tests/test-Handles.cc
@@ -2,6 +2,7 @@
#include <Input.h>
#include <Output.h>
#include <Buffer.h>
+#include <BStream.h>
#ifdef _WIN32
void ctime_r(const time_t * t, char * str) {
@@ -69,18 +70,40 @@ void MainTask::Do() {
Assert(s == 0);
s = o->getSize();
Assert(s == 0);
- o->write("foo\n", 4);
+ o->writeString("foo\n");
IO<Handle> b(new Buffer());
s = b->rtell();
Assert(s == 0);
s = b->wtell();
Assert(s == 0);
- b->write("foo\n", 4);
+ b->writeString("foo\n");
s = b->rtell();
Assert(s == 0);
s = b->wtell();
Assert(s == 4);
+ b->writeString("bar\r\n");
+ s = b->rtell();
+ Assert(s == 0);
+ s = b->wtell();
+ Assert(s == 9);
+ b->writeString("eof");
+ s = b->rtell();
+ Assert(s == 0);
+ s = b->wtell();
+ Assert(s == 12);
+
+ IO<BStream> strm(new BStream(b));
+ String str;
+ str = strm->readString();
+ Assert(str == "foo");
+ str = strm->readString();
+ Assert(str == "bar");
+ str = strm->readString();
+ Assert(str == "eof");
+ s = b->rtell();
+ Assert(s == 12);
+ Assert(b->isEOF());
Printer::log(M_STATUS, "Test::Handles passed.");
}