diff options
Diffstat (limited to 'includes')
-rw-r--r-- | includes/BStream.h | 4 | ||||
-rw-r--r-- | includes/BWebSocket.h | 26 | ||||
-rw-r--r-- | includes/Task.h | 1 |
3 files changed, 27 insertions, 4 deletions
diff --git a/includes/BStream.h b/includes/BStream.h index fd00b83..f39765d 100644 --- a/includes/BStream.h +++ b/includes/BStream.h @@ -11,9 +11,11 @@ class BStream : public Handle { virtual bool isClosed(); virtual bool isEOF(); virtual bool canRead(); + virtual bool canWrite() { return m_h->canWrite(); } virtual const char * getName(); virtual ssize_t read(void * buf, size_t count) throw (GeneralException); - virtual off64_t getSize(); + virtual ssize_t write(const void * buf, size_t count) throw (GeneralException) { return m_h->write(buf, count); } + virtual off64_t getSize(); int peekNextByte(); String readString(bool putNL = false); bool isEmpty() { return m_availBytes == 0; } diff --git a/includes/BWebSocket.h b/includes/BWebSocket.h index a230a89..49f8cd1 100644 --- a/includes/BWebSocket.h +++ b/includes/BWebSocket.h @@ -9,15 +9,33 @@ namespace Balau { class WebSocketActionBase; +class WebSocketFrame { + public: + WebSocketFrame(const String & str, uint8_t opcode, bool mask = false) : WebSocketFrame((uint8_t *) str.to_charp(), str.strlen(), opcode, mask) { } + WebSocketFrame(size_t len, uint8_t opcode, bool mask = false) : WebSocketFrame(NULL, len, opcode, mask) { } + WebSocketFrame(const uint8_t * data, size_t len, uint8_t opcode, bool mask = false); + ~WebSocketFrame() { free(m_data); } + uint8_t & operator[](size_t idx); + uint8_t * getPtr() { return m_data + m_headerSize; } + void send(IO<Handle> socket); + private: + uint8_t * m_data = NULL; + size_t m_len = 0; + size_t m_headerSize = 0; + uint32_t m_mask = 'BLAH'; + size_t m_bytesSent = 0; +}; + class WebSocketWorker : public StacklessTask { public: virtual bool parse(Http::Request & req) { return true; } + void sendFrame(WebSocketFrame * frame) { m_sendQueue.push(frame); } protected: - WebSocketWorker(IO<Handle> socket, const String & url) : m_socket(new BStream(socket)) { m_name = String("WebSocket:") + url + "/" + m_socket->getName(); } - ~WebSocketWorker() { free(m_payload); } + WebSocketWorker(IO<Handle> socket, const String & url) : m_socket(new BStream(socket)) { m_name = String("WebSocket:") + url + ":" + m_socket->getName(); } + ~WebSocketWorker(); void disconnect() { m_socket->close(); } virtual void receiveMessage(const uint8_t * msg, size_t len, bool binary) = 0; -private: + private: void processMessage(); void processPing(); void processPong(); @@ -34,6 +52,8 @@ private: } m_status = READ_H; enum { MAX_WEBSOCKET_LIMIT = 4 * 1024 * 1024 }; uint8_t * m_payload = NULL; + WebSocketFrame * m_sending = NULL; + TQueue<WebSocketFrame> m_sendQueue; uint64_t m_payloadLen; uint64_t m_totalLen; uint64_t m_remainingBytes; diff --git a/includes/Task.h b/includes/Task.h index 7521b9f..0f04b32 100644 --- a/includes/Task.h +++ b/includes/Task.h @@ -344,6 +344,7 @@ class TQueue : public QueueBase { public: void push(T * t) { iPush(t, &m_event); } T * pop() { return (T *) iPop(&m_event, true); } + Events::Async * getEvent() { return &m_event; } private: Events::Async m_event; }; |