diff options
m--------- | LuaJIT | 0 | ||||
-rw-r--r-- | includes/BStream.h | 4 | ||||
-rw-r--r-- | includes/BWebSocket.h | 26 | ||||
-rw-r--r-- | includes/Task.h | 1 | ||||
-rw-r--r-- | src/BStream.cc | 1 | ||||
-rw-r--r-- | src/BWebSocket.cc | 96 | ||||
-rw-r--r-- | src/Task.cc | 3 |
7 files changed, 125 insertions, 6 deletions
diff --git a/LuaJIT b/LuaJIT -Subproject ec96d8b494f0fa87cb8a31a38e7c9241f4f414d +Subproject 21af151af28d4b3524684b106bd19b02484f67f diff --git a/includes/BStream.h b/includes/BStream.h index fd00b83..f39765d 100644 --- a/includes/BStream.h +++ b/includes/BStream.h @@ -11,9 +11,11 @@ class BStream : public Handle { virtual bool isClosed(); virtual bool isEOF(); virtual bool canRead(); + virtual bool canWrite() { return m_h->canWrite(); } virtual const char * getName(); virtual ssize_t read(void * buf, size_t count) throw (GeneralException); - virtual off64_t getSize(); + virtual ssize_t write(const void * buf, size_t count) throw (GeneralException) { return m_h->write(buf, count); } + virtual off64_t getSize(); int peekNextByte(); String readString(bool putNL = false); bool isEmpty() { return m_availBytes == 0; } diff --git a/includes/BWebSocket.h b/includes/BWebSocket.h index a230a89..49f8cd1 100644 --- a/includes/BWebSocket.h +++ b/includes/BWebSocket.h @@ -9,15 +9,33 @@ namespace Balau { class WebSocketActionBase; +class WebSocketFrame { + public: + WebSocketFrame(const String & str, uint8_t opcode, bool mask = false) : WebSocketFrame((uint8_t *) str.to_charp(), str.strlen(), opcode, mask) { } + WebSocketFrame(size_t len, uint8_t opcode, bool mask = false) : WebSocketFrame(NULL, len, opcode, mask) { } + WebSocketFrame(const uint8_t * data, size_t len, uint8_t opcode, bool mask = false); + ~WebSocketFrame() { free(m_data); } + uint8_t & operator[](size_t idx); + uint8_t * getPtr() { return m_data + m_headerSize; } + void send(IO<Handle> socket); + private: + uint8_t * m_data = NULL; + size_t m_len = 0; + size_t m_headerSize = 0; + uint32_t m_mask = 'BLAH'; + size_t m_bytesSent = 0; +}; + class WebSocketWorker : public StacklessTask { public: virtual bool parse(Http::Request & req) { return true; } + void sendFrame(WebSocketFrame * frame) { m_sendQueue.push(frame); } protected: - WebSocketWorker(IO<Handle> socket, const String & url) : m_socket(new BStream(socket)) { m_name = String("WebSocket:") + url + "/" + m_socket->getName(); } - ~WebSocketWorker() { free(m_payload); } + WebSocketWorker(IO<Handle> socket, const String & url) : m_socket(new BStream(socket)) { m_name = String("WebSocket:") + url + ":" + m_socket->getName(); } + ~WebSocketWorker(); void disconnect() { m_socket->close(); } virtual void receiveMessage(const uint8_t * msg, size_t len, bool binary) = 0; -private: + private: void processMessage(); void processPing(); void processPong(); @@ -34,6 +52,8 @@ private: } m_status = READ_H; enum { MAX_WEBSOCKET_LIMIT = 4 * 1024 * 1024 }; uint8_t * m_payload = NULL; + WebSocketFrame * m_sending = NULL; + TQueue<WebSocketFrame> m_sendQueue; uint64_t m_payloadLen; uint64_t m_totalLen; uint64_t m_remainingBytes; diff --git a/includes/Task.h b/includes/Task.h index 7521b9f..0f04b32 100644 --- a/includes/Task.h +++ b/includes/Task.h @@ -344,6 +344,7 @@ class TQueue : public QueueBase { public: void push(T * t) { iPush(t, &m_event); } T * pop() { return (T *) iPop(&m_event, true); } + Events::Async * getEvent() { return &m_event; } private: Events::Async m_event; }; diff --git a/src/BStream.cc b/src/BStream.cc index 3a3f972..b8f5433 100644 --- a/src/BStream.cc +++ b/src/BStream.cc @@ -14,6 +14,7 @@ void Balau::BStream::close() throw (Balau::GeneralException) { if (!m_detached) m_h->close(); free(m_buffer); + m_buffer = NULL; m_availBytes = 0; m_cursor = 0; } diff --git a/src/BWebSocket.cc b/src/BWebSocket.cc index 370b702..3e81e7d 100644 --- a/src/BWebSocket.cc +++ b/src/BWebSocket.cc @@ -5,11 +5,107 @@ #define rotate(value) (((value) << 8) | ((value) >> 24)) +Balau::WebSocketFrame::WebSocketFrame(const uint8_t * data, size_t len, uint8_t opcode, bool doMask) { + m_len = len; + m_headerSize = 2; + if (m_len >= 126) m_headerSize += 2; + if (m_len >= 65536) m_headerSize += 6; + if (doMask) m_headerSize += 4; + m_data = (uint8_t *)malloc(m_len + m_headerSize); + uint8_t * maskPtr; + + m_data[0] = 0x80 | opcode; + m_data[1] = doMask ? 0x80 : 0x00; + if (m_len < 125) { + m_data[1] |= m_len; + maskPtr = m_data + 2; + } else if (m_len < 65536) { + m_data[1] |= 126; + m_data[2] = m_len >> 8; + m_data[3] = m_len & 0xff; + maskPtr = m_data + 4; + } else { + m_data[1] |= 127; + uint8_t * lenPtr = maskPtr = m_data + 10; + size_t len = m_len; + *(--lenPtr) = len & 0xff; len >>= 8; + *(--lenPtr) = len & 0xff; len >>= 8; + *(--lenPtr) = len & 0xff; len >>= 8; + *(--lenPtr) = len & 0xff; len >>= 8; + *(--lenPtr) = len & 0xff; len >>= 8; + *(--lenPtr) = len & 0xff; len >>= 8; + *(--lenPtr) = len & 0xff; len >>= 8; + *(--lenPtr) = len & 0xff; len >>= 8; + } + + if (doMask) { + uint32_t mask = m_mask; + for (int i = 0; i < 4; i++) { + *(maskPtr++) = mask >> 24; + mask <<= 8; + } + } else { + m_mask = 0; + } + + if (data) memcpy(maskPtr, data, m_len); +} + +uint8_t & Balau::WebSocketFrame::operator[](size_t idx) { + static uint8_t dummy = 0; + if (idx >= m_len) return dummy; + return m_data[idx + m_headerSize]; +} + +void Balau::WebSocketFrame::send(Balau::IO<Balau::Handle> socket) { + size_t totalLen = m_headerSize + m_len; + + if (m_mask) { + for (int i = m_headerSize; i < totalLen; i++) { + m_data[i] ^= m_mask >> 24; + m_mask = rotate(m_mask); + } + m_mask = 0; + } + + while (m_bytesSent < totalLen) { + ssize_t r = socket->write(m_data + m_bytesSent, totalLen - m_bytesSent); + if (r < 0) + m_bytesSent = totalLen; + else + m_bytesSent += r; + } +} + +Balau::WebSocketWorker::~WebSocketWorker() { + free(m_payload); + delete m_sending; + while (!m_sendQueue.isEmpty()) + delete m_sendQueue.pop(); +} + void Balau::WebSocketWorker::Do() { uint8_t c; + waitFor(m_sendQueue.getEvent()); + m_sendQueue.getEvent()->resetMaybe(); + try { while (!m_socket->isClosed()) { + for (;;) { + if (m_sending) + m_sending->send(m_socket); + + if (!m_sendQueue.isEmpty()) + m_sending = m_sendQueue.pop(); + else + break; + if (m_socket->isClosed()) return; + } + + delete m_sending; + m_sending = NULL; + switch (m_state) { case READ_H: m_socket->read(&c, 1); diff --git a/src/Task.cc b/src/Task.cc index c97077b..9b15062 100644 --- a/src/Task.cc +++ b/src/Task.cc @@ -370,8 +370,7 @@ void * Balau::QueueBase::iPop(Events::Async * event, bool wait) { m_lock.leave(); Task::operationYield(event, Task::INTERRUPTIBLE); m_lock.enter(); - if (event->gotSignal()) - event->reset(); + event->resetMaybe(); } else { pthread_cond_wait(&m_cond, &m_lock.m_lock); } |