diff options
-rw-r--r-- | includes/BWebSocket.h | 14 | ||||
-rw-r--r-- | src/BWebSocket.cc | 48 |
2 files changed, 51 insertions, 11 deletions
diff --git a/includes/BWebSocket.h b/includes/BWebSocket.h index 41e5a7d..a230a89 100644 --- a/includes/BWebSocket.h +++ b/includes/BWebSocket.h @@ -15,8 +15,12 @@ class WebSocketWorker : public StacklessTask { protected: WebSocketWorker(IO<Handle> socket, const String & url) : m_socket(new BStream(socket)) { m_name = String("WebSocket:") + url + "/" + m_socket->getName(); } ~WebSocketWorker() { free(m_payload); } - private: + void disconnect() { m_socket->close(); } + virtual void receiveMessage(const uint8_t * msg, size_t len, bool binary) = 0; +private: void processMessage(); + void processPing(); + void processPong(); const char * getName() const { return m_name.to_charp(); } void Do(); String m_name; @@ -40,6 +44,14 @@ class WebSocketWorker : public StacklessTask { bool m_firstFragment = true; bool m_enforceServer = false; bool m_enforceClient = false; + enum { + OPCODE_CONT = 0, + OPCODE_TEXT = 1, + OPCODE_BIN = 2, + OPCODE_CLOSE = 8, + OPCODE_PING = 9, + OPCODE_PONG = 10, + }; friend class WebSocketActionBase; }; diff --git a/src/BWebSocket.cc b/src/BWebSocket.cc index 0f94728..370b702 100644 --- a/src/BWebSocket.cc +++ b/src/BWebSocket.cc @@ -12,7 +12,8 @@ void Balau::WebSocketWorker::Do() { while (!m_socket->isClosed()) { switch (m_state) { case READ_H: - c = m_socket->readU8().get(); + m_socket->read(&c, 1); + if (m_socket->isClosed()) return; m_fin = c & 0x80; if ((c >> 4) & 7) goto error; c &= 15; @@ -21,7 +22,8 @@ void Balau::WebSocketWorker::Do() { m_opcode = c; m_state = READ_PLB; case READ_PLB: - c = m_socket->readU8().get(); + m_socket->read(&c, 1); + if (m_socket->isClosed()) return; m_hasMask = c & 0x80; if (m_enforceServer && !m_hasMask) goto error; @@ -32,17 +34,16 @@ void Balau::WebSocketWorker::Do() { if (m_payloadLen == 126) { m_payloadLen = 0; m_remainingBytes = 2; - } - else if (m_payloadLen == 127) { + } else if (m_payloadLen == 127) { m_payloadLen = 0; m_remainingBytes = 8; - } - else { + } else { m_remainingBytes = 0; } case READ_PLL: while (m_remainingBytes) { - c = m_socket->readU8().get(); + m_socket->read(&c, 1); + if (m_socket->isClosed()) return; m_payloadLen <<= 8; m_payloadLen += c; m_remainingBytes--; @@ -55,7 +56,8 @@ void Balau::WebSocketWorker::Do() { if (m_hasMask) m_remainingBytes = 4; case READ_MK: while (m_remainingBytes) { - c = m_socket->readU8().get(); + m_socket->read(&c, 1); + if (m_socket->isClosed()) return; m_mask <<= 8; m_mask += c; m_remainingBytes--; @@ -64,10 +66,11 @@ void Balau::WebSocketWorker::Do() { m_remainingBytes = m_payloadLen; if (m_totalLen >= MAX_WEBSOCKET_LIMIT) goto error; - m_payload = (uint8_t *)realloc(m_payload, m_totalLen); + m_payload = (uint8_t *)realloc(m_payload, m_totalLen + (m_opcode == OPCODE_TEXT ? 1 : 0)); case READ_PL: while (m_remainingBytes) { int r = m_socket->read(m_payload + m_totalLen - m_remainingBytes, m_remainingBytes); + if (m_socket->isClosed()) return; if (r < 0) goto error; m_remainingBytes -= r; @@ -82,13 +85,17 @@ void Balau::WebSocketWorker::Do() { m_mask = rotate(m_mask); } } + if (m_opcode == OPCODE_TEXT) + m_payload[m_payloadLen] = 0; processMessage(); } + + m_state = READ_H; } } error: - m_socket->close(); + disconnect(); } catch (Balau::EAgain & e) { taskSwitch(); @@ -96,6 +103,27 @@ void Balau::WebSocketWorker::Do() { } void Balau::WebSocketWorker::processMessage() { + switch (m_opcode) { + case OPCODE_PING: + processPing(); + break; + case OPCODE_PONG: + processPong(); + break; + case OPCODE_TEXT: + case OPCODE_BIN: + receiveMessage(m_payload, m_payloadLen, m_opcode == OPCODE_BIN); + break; + default: + disconnect(); + } +} + +void Balau::WebSocketWorker::processPing() { + +} + +void Balau::WebSocketWorker::processPong() { } |