summaryrefslogtreecommitdiff
path: root/src/SmartWriter.cc
blob: 9a323306f9630b9c0eec6cd61e37f7127841a884 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#include <StacklessTask.h>
#include <TaskMan.h>
#include <SmartWriter.h>

namespace {

struct WriteCell {
      ~WriteCell() { delete buffer; }
    const void * buffer = NULL;
    const uint8_t * ptr;
    size_t size;
    bool stop = false;
    bool close = false;
};

}

class SmartWriterTask : public Balau::StacklessTask {
  public:
      SmartWriterTask(Balau::IO<Balau::Handle> h) : m_h(h) { m_name.set("SmartWriterTask(%s)", h->getName()); }
    void queueWrite(const void * buf, size_t count) {
        WriteCell * cell = new WriteCell();
        uint8_t * copied = (uint8_t *) malloc(count);
        memcpy(copied, buf, count);
        cell->buffer = cell->ptr = copied;
        cell->size = count;
        m_queue.push(cell);
    }
    void stop(bool closeHandle) {
        WriteCell * cell = new WriteCell();
        cell->stop = true;
        cell->close = closeHandle;
        m_queue.push(cell);
    }
    bool gotError() { return m_gotError; }
  private:
      virtual ~SmartWriterTask() { empty(); }
    virtual const char * getName() const override { return m_name.to_charp(); }
    void empty() {
        delete m_current;
        m_current = NULL;
        while (!m_queue.isEmpty())
            delete m_queue.pop();
    }
    virtual void Do() override {
        waitFor(m_queue.getEvent());
        m_queue.getEvent()->resetMaybe();

        bool gotQueue = !m_queue.isEmpty();

        if (gotQueue && !m_current) {
            m_current = m_queue.pop();
            gotQueue = !m_queue.isEmpty();
        }

        if (!gotQueue && !m_current)
            taskSwitch();

        if (m_current->stop) {
            if (m_current->close)
                m_h->close();
            delete m_current;
            m_current = NULL;
            return;
        }

        ssize_t r = 0;
        try {
            r = m_h->write(m_current->ptr, m_current->size);
        }
        catch (Balau::EAgain & e) {
            waitFor(e.getEvent());
            taskSwitch();
        }

        m_current->ptr += r;
        m_current->size -= r;

        if (m_current->size == 0) {
            delete m_current;
            m_current = NULL;
        }

        if (gotQueue || m_current)
            yield();
        else
            taskSwitch();
    }
    Balau::IO<Balau::Handle> m_h;
    Balau::String m_name;
    std::atomic<bool> m_gotError = false;
    Balau::TQueue<WriteCell> m_queue;
    WriteCell * m_current = NULL;
};

void Balau::SmartWriter::close() {
    if (m_writerTask) {
        m_writerTask->stop(!isDetached());
        m_writerTask = NULL;
    } else {
        Filter::close();
    }
}

ssize_t Balau::SmartWriter::write(const void * _buf, size_t count) throw (Balau::GeneralException) {
    const uint8_t * buf = (const uint8_t *) _buf;
    const ssize_t r = count;
    while (count) {
        if (m_writerTask) {
            if (m_writerTask->gotError())
                return -1;
            m_writerTask->queueWrite(buf, count);
        }

        ssize_t r = 0;
        try {
            r = Filter::write(buf, count);
        }
        catch (EAgain &) {
            m_writerTask = TaskMan::registerTask(new SmartWriterTask(getIO()));
        }
        if (r < 0)
            return r;
        count -= r;
        buf += r;
    }

    return r;
}