diff options
author | Nicolas "Pixel" Noble <pixel@nobis-crew.org> | 2014-08-10 22:07:42 -0700 |
---|---|---|
committer | Nicolas "Pixel" Noble <pixel@nobis-crew.org> | 2014-08-10 22:07:42 -0700 |
commit | 24c84e9423db42563de21da76efd4637ca2abcce (patch) | |
tree | 48045eadcfb48dd200fbc731e3dccb487bf661ca /src | |
parent | d6fe842fdb0972ffe228022c5705b1b167e68467 (diff) |
First pass at the SmartWriter.
Diffstat (limited to 'src')
-rw-r--r-- | src/SmartWriter.cc | 129 |
1 files changed, 129 insertions, 0 deletions
diff --git a/src/SmartWriter.cc b/src/SmartWriter.cc new file mode 100644 index 0000000..9a32330 --- /dev/null +++ b/src/SmartWriter.cc @@ -0,0 +1,129 @@ +#include <StacklessTask.h> +#include <TaskMan.h> +#include <SmartWriter.h> + +namespace { + +struct WriteCell { + ~WriteCell() { delete buffer; } + const void * buffer = NULL; + const uint8_t * ptr; + size_t size; + bool stop = false; + bool close = false; +}; + +} + +class SmartWriterTask : public Balau::StacklessTask { + public: + SmartWriterTask(Balau::IO<Balau::Handle> h) : m_h(h) { m_name.set("SmartWriterTask(%s)", h->getName()); } + void queueWrite(const void * buf, size_t count) { + WriteCell * cell = new WriteCell(); + uint8_t * copied = (uint8_t *) malloc(count); + memcpy(copied, buf, count); + cell->buffer = cell->ptr = copied; + cell->size = count; + m_queue.push(cell); + } + void stop(bool closeHandle) { + WriteCell * cell = new WriteCell(); + cell->stop = true; + cell->close = closeHandle; + m_queue.push(cell); + } + bool gotError() { return m_gotError; } + private: + virtual ~SmartWriterTask() { empty(); } + virtual const char * getName() const override { return m_name.to_charp(); } + void empty() { + delete m_current; + m_current = NULL; + while (!m_queue.isEmpty()) + delete m_queue.pop(); + } + virtual void Do() override { + waitFor(m_queue.getEvent()); + m_queue.getEvent()->resetMaybe(); + + bool gotQueue = !m_queue.isEmpty(); + + if (gotQueue && !m_current) { + m_current = m_queue.pop(); + gotQueue = !m_queue.isEmpty(); + } + + if (!gotQueue && !m_current) + taskSwitch(); + + if (m_current->stop) { + if (m_current->close) + m_h->close(); + delete m_current; + m_current = NULL; + return; + } + + ssize_t r = 0; + try { + r = m_h->write(m_current->ptr, m_current->size); + } + catch (Balau::EAgain & e) { + waitFor(e.getEvent()); + taskSwitch(); + } + + m_current->ptr += r; + m_current->size -= r; + + if (m_current->size == 0) { + delete m_current; + m_current = NULL; + } + + if (gotQueue || m_current) + yield(); + else + taskSwitch(); + } + Balau::IO<Balau::Handle> m_h; + Balau::String m_name; + std::atomic<bool> m_gotError = false; + Balau::TQueue<WriteCell> m_queue; + WriteCell * m_current = NULL; +}; + +void Balau::SmartWriter::close() { + if (m_writerTask) { + m_writerTask->stop(!isDetached()); + m_writerTask = NULL; + } else { + Filter::close(); + } +} + +ssize_t Balau::SmartWriter::write(const void * _buf, size_t count) throw (Balau::GeneralException) { + const uint8_t * buf = (const uint8_t *) _buf; + const ssize_t r = count; + while (count) { + if (m_writerTask) { + if (m_writerTask->gotError()) + return -1; + m_writerTask->queueWrite(buf, count); + } + + ssize_t r = 0; + try { + r = Filter::write(buf, count); + } + catch (EAgain &) { + m_writerTask = TaskMan::registerTask(new SmartWriterTask(getIO())); + } + if (r < 0) + return r; + count -= r; + buf += r; + } + + return r; +} |