diff options
author | Nicolas "Pixel" Noble <pixel@nobis-crew.org> | 2014-08-10 22:07:42 -0700 |
---|---|---|
committer | Nicolas "Pixel" Noble <pixel@nobis-crew.org> | 2014-08-10 22:07:42 -0700 |
commit | 24c84e9423db42563de21da76efd4637ca2abcce (patch) | |
tree | 48045eadcfb48dd200fbc731e3dccb487bf661ca | |
parent | d6fe842fdb0972ffe228022c5705b1b167e68467 (diff) |
First pass at the SmartWriter.
-rw-r--r-- | includes/SmartWriter.h | 20 | ||||
-rw-r--r-- | src/SmartWriter.cc | 129 | ||||
-rw-r--r-- | win32/project/Balau.vcxproj | 2 | ||||
-rw-r--r-- | win32/project/Balau.vcxproj.filters | 6 |
4 files changed, 157 insertions, 0 deletions
diff --git a/includes/SmartWriter.h b/includes/SmartWriter.h new file mode 100644 index 0000000..d8baf27 --- /dev/null +++ b/includes/SmartWriter.h @@ -0,0 +1,20 @@ +#pragma once + +#include <Handle.h> + +class SmartWriterTask; + +namespace Balau { + +class SmartWriter : public Filter { + public: + SmartWriter(IO<Handle> h) : Filter(h) { AAssert(h->canWrite(), "SmartWriter can't write"); m_name.set("SmartWriter(%s)", h->getName()); } + virtual ssize_t write(const void * buf, size_t count) throw (GeneralException) override; + virtual const char * getName() override { return m_name.to_charp(); } + virtual void close() override; + private: + SmartWriterTask * m_writerTask = NULL; + String m_name; +}; + +} diff --git a/src/SmartWriter.cc b/src/SmartWriter.cc new file mode 100644 index 0000000..9a32330 --- /dev/null +++ b/src/SmartWriter.cc @@ -0,0 +1,129 @@ +#include <StacklessTask.h> +#include <TaskMan.h> +#include <SmartWriter.h> + +namespace { + +struct WriteCell { + ~WriteCell() { delete buffer; } + const void * buffer = NULL; + const uint8_t * ptr; + size_t size; + bool stop = false; + bool close = false; +}; + +} + +class SmartWriterTask : public Balau::StacklessTask { + public: + SmartWriterTask(Balau::IO<Balau::Handle> h) : m_h(h) { m_name.set("SmartWriterTask(%s)", h->getName()); } + void queueWrite(const void * buf, size_t count) { + WriteCell * cell = new WriteCell(); + uint8_t * copied = (uint8_t *) malloc(count); + memcpy(copied, buf, count); + cell->buffer = cell->ptr = copied; + cell->size = count; + m_queue.push(cell); + } + void stop(bool closeHandle) { + WriteCell * cell = new WriteCell(); + cell->stop = true; + cell->close = closeHandle; + m_queue.push(cell); + } + bool gotError() { return m_gotError; } + private: + virtual ~SmartWriterTask() { empty(); } + virtual const char * getName() const override { return m_name.to_charp(); } + void empty() { + delete m_current; + m_current = NULL; + while (!m_queue.isEmpty()) + delete m_queue.pop(); + } + virtual void Do() override { + waitFor(m_queue.getEvent()); + m_queue.getEvent()->resetMaybe(); + + bool gotQueue = !m_queue.isEmpty(); + + if (gotQueue && !m_current) { + m_current = m_queue.pop(); + gotQueue = !m_queue.isEmpty(); + } + + if (!gotQueue && !m_current) + taskSwitch(); + + if (m_current->stop) { + if (m_current->close) + m_h->close(); + delete m_current; + m_current = NULL; + return; + } + + ssize_t r = 0; + try { + r = m_h->write(m_current->ptr, m_current->size); + } + catch (Balau::EAgain & e) { + waitFor(e.getEvent()); + taskSwitch(); + } + + m_current->ptr += r; + m_current->size -= r; + + if (m_current->size == 0) { + delete m_current; + m_current = NULL; + } + + if (gotQueue || m_current) + yield(); + else + taskSwitch(); + } + Balau::IO<Balau::Handle> m_h; + Balau::String m_name; + std::atomic<bool> m_gotError = false; + Balau::TQueue<WriteCell> m_queue; + WriteCell * m_current = NULL; +}; + +void Balau::SmartWriter::close() { + if (m_writerTask) { + m_writerTask->stop(!isDetached()); + m_writerTask = NULL; + } else { + Filter::close(); + } +} + +ssize_t Balau::SmartWriter::write(const void * _buf, size_t count) throw (Balau::GeneralException) { + const uint8_t * buf = (const uint8_t *) _buf; + const ssize_t r = count; + while (count) { + if (m_writerTask) { + if (m_writerTask->gotError()) + return -1; + m_writerTask->queueWrite(buf, count); + } + + ssize_t r = 0; + try { + r = Filter::write(buf, count); + } + catch (EAgain &) { + m_writerTask = TaskMan::registerTask(new SmartWriterTask(getIO())); + } + if (r < 0) + return r; + count -= r; + buf += r; + } + + return r; +} diff --git a/win32/project/Balau.vcxproj b/win32/project/Balau.vcxproj index 44ef4ad..493bb72 100644 --- a/win32/project/Balau.vcxproj +++ b/win32/project/Balau.vcxproj @@ -233,6 +233,7 @@ <ClCompile Include="..\..\src\Printer.cc" />
<ClCompile Include="..\..\src\Selectable.cc" />
<ClCompile Include="..\..\src\SimpleMustache.cc" />
+ <ClCompile Include="..\..\src\SmartWriter.cc" />
<ClCompile Include="..\..\src\Socket.cc" />
<ClCompile Include="..\..\src\StdIO.cc">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
@@ -294,6 +295,7 @@ <ClInclude Include="..\..\includes\Printer.h" />
<ClInclude Include="..\..\includes\Selectable.h" />
<ClInclude Include="..\..\includes\SimpleMustache.h" />
+ <ClInclude Include="..\..\includes\SmartWriter.h" />
<ClInclude Include="..\..\includes\Socket.h" />
<ClInclude Include="..\..\includes\StacklessTask.h" />
<ClInclude Include="..\..\includes\Task.h" />
diff --git a/win32/project/Balau.vcxproj.filters b/win32/project/Balau.vcxproj.filters index cd115d4..06a4981 100644 --- a/win32/project/Balau.vcxproj.filters +++ b/win32/project/Balau.vcxproj.filters @@ -195,6 +195,9 @@ <ClCompile Include="..\getopt\getopt.c">
<Filter>Third Party\getopt</Filter>
</ClCompile>
+ <ClCompile Include="..\..\src\SmartWriter.cc">
+ <Filter>Source</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\includes\Async.h">
@@ -365,6 +368,9 @@ <ClInclude Include="..\getopt\getopt.h">
<Filter>Third Party\getopt</Filter>
</ClInclude>
+ <ClInclude Include="..\..\includes\SmartWriter.h">
+ <Filter>Headers</Filter>
+ </ClInclude>
</ItemGroup>
<ItemGroup>
<None Include="..\..\src\jsoncpp\src\json_internalarray.inl">
|