summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNicolas "Pixel" Noble <pixel@nobis-crew.org>2014-08-10 22:07:42 -0700
committerNicolas "Pixel" Noble <pixel@nobis-crew.org>2014-08-10 22:07:42 -0700
commit24c84e9423db42563de21da76efd4637ca2abcce (patch)
tree48045eadcfb48dd200fbc731e3dccb487bf661ca
parentd6fe842fdb0972ffe228022c5705b1b167e68467 (diff)
First pass at the SmartWriter.
-rw-r--r--includes/SmartWriter.h20
-rw-r--r--src/SmartWriter.cc129
-rw-r--r--win32/project/Balau.vcxproj2
-rw-r--r--win32/project/Balau.vcxproj.filters6
4 files changed, 157 insertions, 0 deletions
diff --git a/includes/SmartWriter.h b/includes/SmartWriter.h
new file mode 100644
index 0000000..d8baf27
--- /dev/null
+++ b/includes/SmartWriter.h
@@ -0,0 +1,20 @@
+#pragma once
+
+#include <Handle.h>
+
+class SmartWriterTask;
+
+namespace Balau {
+
+class SmartWriter : public Filter {
+ public:
+ SmartWriter(IO<Handle> h) : Filter(h) { AAssert(h->canWrite(), "SmartWriter can't write"); m_name.set("SmartWriter(%s)", h->getName()); }
+ virtual ssize_t write(const void * buf, size_t count) throw (GeneralException) override;
+ virtual const char * getName() override { return m_name.to_charp(); }
+ virtual void close() override;
+ private:
+ SmartWriterTask * m_writerTask = NULL;
+ String m_name;
+};
+
+}
diff --git a/src/SmartWriter.cc b/src/SmartWriter.cc
new file mode 100644
index 0000000..9a32330
--- /dev/null
+++ b/src/SmartWriter.cc
@@ -0,0 +1,129 @@
+#include <StacklessTask.h>
+#include <TaskMan.h>
+#include <SmartWriter.h>
+
+namespace {
+
+struct WriteCell {
+ ~WriteCell() { delete buffer; }
+ const void * buffer = NULL;
+ const uint8_t * ptr;
+ size_t size;
+ bool stop = false;
+ bool close = false;
+};
+
+}
+
+class SmartWriterTask : public Balau::StacklessTask {
+ public:
+ SmartWriterTask(Balau::IO<Balau::Handle> h) : m_h(h) { m_name.set("SmartWriterTask(%s)", h->getName()); }
+ void queueWrite(const void * buf, size_t count) {
+ WriteCell * cell = new WriteCell();
+ uint8_t * copied = (uint8_t *) malloc(count);
+ memcpy(copied, buf, count);
+ cell->buffer = cell->ptr = copied;
+ cell->size = count;
+ m_queue.push(cell);
+ }
+ void stop(bool closeHandle) {
+ WriteCell * cell = new WriteCell();
+ cell->stop = true;
+ cell->close = closeHandle;
+ m_queue.push(cell);
+ }
+ bool gotError() { return m_gotError; }
+ private:
+ virtual ~SmartWriterTask() { empty(); }
+ virtual const char * getName() const override { return m_name.to_charp(); }
+ void empty() {
+ delete m_current;
+ m_current = NULL;
+ while (!m_queue.isEmpty())
+ delete m_queue.pop();
+ }
+ virtual void Do() override {
+ waitFor(m_queue.getEvent());
+ m_queue.getEvent()->resetMaybe();
+
+ bool gotQueue = !m_queue.isEmpty();
+
+ if (gotQueue && !m_current) {
+ m_current = m_queue.pop();
+ gotQueue = !m_queue.isEmpty();
+ }
+
+ if (!gotQueue && !m_current)
+ taskSwitch();
+
+ if (m_current->stop) {
+ if (m_current->close)
+ m_h->close();
+ delete m_current;
+ m_current = NULL;
+ return;
+ }
+
+ ssize_t r = 0;
+ try {
+ r = m_h->write(m_current->ptr, m_current->size);
+ }
+ catch (Balau::EAgain & e) {
+ waitFor(e.getEvent());
+ taskSwitch();
+ }
+
+ m_current->ptr += r;
+ m_current->size -= r;
+
+ if (m_current->size == 0) {
+ delete m_current;
+ m_current = NULL;
+ }
+
+ if (gotQueue || m_current)
+ yield();
+ else
+ taskSwitch();
+ }
+ Balau::IO<Balau::Handle> m_h;
+ Balau::String m_name;
+ std::atomic<bool> m_gotError = false;
+ Balau::TQueue<WriteCell> m_queue;
+ WriteCell * m_current = NULL;
+};
+
+void Balau::SmartWriter::close() {
+ if (m_writerTask) {
+ m_writerTask->stop(!isDetached());
+ m_writerTask = NULL;
+ } else {
+ Filter::close();
+ }
+}
+
+ssize_t Balau::SmartWriter::write(const void * _buf, size_t count) throw (Balau::GeneralException) {
+ const uint8_t * buf = (const uint8_t *) _buf;
+ const ssize_t r = count;
+ while (count) {
+ if (m_writerTask) {
+ if (m_writerTask->gotError())
+ return -1;
+ m_writerTask->queueWrite(buf, count);
+ }
+
+ ssize_t r = 0;
+ try {
+ r = Filter::write(buf, count);
+ }
+ catch (EAgain &) {
+ m_writerTask = TaskMan::registerTask(new SmartWriterTask(getIO()));
+ }
+ if (r < 0)
+ return r;
+ count -= r;
+ buf += r;
+ }
+
+ return r;
+}
diff --git a/win32/project/Balau.vcxproj b/win32/project/Balau.vcxproj
index 44ef4ad..493bb72 100644
--- a/win32/project/Balau.vcxproj
+++ b/win32/project/Balau.vcxproj
@@ -233,6 +233,7 @@
<ClCompile Include="..\..\src\Printer.cc" />
<ClCompile Include="..\..\src\Selectable.cc" />
<ClCompile Include="..\..\src\SimpleMustache.cc" />
+ <ClCompile Include="..\..\src\SmartWriter.cc" />
<ClCompile Include="..\..\src\Socket.cc" />
<ClCompile Include="..\..\src\StdIO.cc">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
@@ -294,6 +295,7 @@
<ClInclude Include="..\..\includes\Printer.h" />
<ClInclude Include="..\..\includes\Selectable.h" />
<ClInclude Include="..\..\includes\SimpleMustache.h" />
+ <ClInclude Include="..\..\includes\SmartWriter.h" />
<ClInclude Include="..\..\includes\Socket.h" />
<ClInclude Include="..\..\includes\StacklessTask.h" />
<ClInclude Include="..\..\includes\Task.h" />
diff --git a/win32/project/Balau.vcxproj.filters b/win32/project/Balau.vcxproj.filters
index cd115d4..06a4981 100644
--- a/win32/project/Balau.vcxproj.filters
+++ b/win32/project/Balau.vcxproj.filters
@@ -195,6 +195,9 @@
<ClCompile Include="..\getopt\getopt.c">
<Filter>Third Party\getopt</Filter>
</ClCompile>
+ <ClCompile Include="..\..\src\SmartWriter.cc">
+ <Filter>Source</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\includes\Async.h">
@@ -365,6 +368,9 @@
<ClInclude Include="..\getopt\getopt.h">
<Filter>Third Party\getopt</Filter>
</ClInclude>
+ <ClInclude Include="..\..\includes\SmartWriter.h">
+ <Filter>Headers</Filter>
+ </ClInclude>
</ItemGroup>
<ItemGroup>
<None Include="..\..\src\jsoncpp\src\json_internalarray.inl">