summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--includes/BWebSocket.h14
-rw-r--r--src/BWebSocket.cc48
2 files changed, 51 insertions, 11 deletions
diff --git a/includes/BWebSocket.h b/includes/BWebSocket.h
index 41e5a7d..a230a89 100644
--- a/includes/BWebSocket.h
+++ b/includes/BWebSocket.h
@@ -15,8 +15,12 @@ class WebSocketWorker : public StacklessTask {
protected:
WebSocketWorker(IO<Handle> socket, const String & url) : m_socket(new BStream(socket)) { m_name = String("WebSocket:") + url + "/" + m_socket->getName(); }
~WebSocketWorker() { free(m_payload); }
- private:
+ void disconnect() { m_socket->close(); }
+ virtual void receiveMessage(const uint8_t * msg, size_t len, bool binary) = 0;
+private:
void processMessage();
+ void processPing();
+ void processPong();
const char * getName() const { return m_name.to_charp(); }
void Do();
String m_name;
@@ -40,6 +44,14 @@ class WebSocketWorker : public StacklessTask {
bool m_firstFragment = true;
bool m_enforceServer = false;
bool m_enforceClient = false;
+ enum {
+ OPCODE_CONT = 0,
+ OPCODE_TEXT = 1,
+ OPCODE_BIN = 2,
+ OPCODE_CLOSE = 8,
+ OPCODE_PING = 9,
+ OPCODE_PONG = 10,
+ };
friend class WebSocketActionBase;
};
diff --git a/src/BWebSocket.cc b/src/BWebSocket.cc
index 0f94728..370b702 100644
--- a/src/BWebSocket.cc
+++ b/src/BWebSocket.cc
@@ -12,7 +12,8 @@ void Balau::WebSocketWorker::Do() {
while (!m_socket->isClosed()) {
switch (m_state) {
case READ_H:
- c = m_socket->readU8().get();
+ m_socket->read(&c, 1);
+ if (m_socket->isClosed()) return;
m_fin = c & 0x80;
if ((c >> 4) & 7) goto error;
c &= 15;
@@ -21,7 +22,8 @@ void Balau::WebSocketWorker::Do() {
m_opcode = c;
m_state = READ_PLB;
case READ_PLB:
- c = m_socket->readU8().get();
+ m_socket->read(&c, 1);
+ if (m_socket->isClosed()) return;
m_hasMask = c & 0x80;
if (m_enforceServer && !m_hasMask)
goto error;
@@ -32,17 +34,16 @@ void Balau::WebSocketWorker::Do() {
if (m_payloadLen == 126) {
m_payloadLen = 0;
m_remainingBytes = 2;
- }
- else if (m_payloadLen == 127) {
+ } else if (m_payloadLen == 127) {
m_payloadLen = 0;
m_remainingBytes = 8;
- }
- else {
+ } else {
m_remainingBytes = 0;
}
case READ_PLL:
while (m_remainingBytes) {
- c = m_socket->readU8().get();
+ m_socket->read(&c, 1);
+ if (m_socket->isClosed()) return;
m_payloadLen <<= 8;
m_payloadLen += c;
m_remainingBytes--;
@@ -55,7 +56,8 @@ void Balau::WebSocketWorker::Do() {
if (m_hasMask) m_remainingBytes = 4;
case READ_MK:
while (m_remainingBytes) {
- c = m_socket->readU8().get();
+ m_socket->read(&c, 1);
+ if (m_socket->isClosed()) return;
m_mask <<= 8;
m_mask += c;
m_remainingBytes--;
@@ -64,10 +66,11 @@ void Balau::WebSocketWorker::Do() {
m_remainingBytes = m_payloadLen;
if (m_totalLen >= MAX_WEBSOCKET_LIMIT)
goto error;
- m_payload = (uint8_t *)realloc(m_payload, m_totalLen);
+ m_payload = (uint8_t *)realloc(m_payload, m_totalLen + (m_opcode == OPCODE_TEXT ? 1 : 0));
case READ_PL:
while (m_remainingBytes) {
int r = m_socket->read(m_payload + m_totalLen - m_remainingBytes, m_remainingBytes);
+ if (m_socket->isClosed()) return;
if (r < 0)
goto error;
m_remainingBytes -= r;
@@ -82,13 +85,17 @@ void Balau::WebSocketWorker::Do() {
m_mask = rotate(m_mask);
}
}
+ if (m_opcode == OPCODE_TEXT)
+ m_payload[m_payloadLen] = 0;
processMessage();
}
+
+ m_state = READ_H;
}
}
error:
- m_socket->close();
+ disconnect();
}
catch (Balau::EAgain & e) {
taskSwitch();
@@ -96,6 +103,27 @@ void Balau::WebSocketWorker::Do() {
}
void Balau::WebSocketWorker::processMessage() {
+ switch (m_opcode) {
+ case OPCODE_PING:
+ processPing();
+ break;
+ case OPCODE_PONG:
+ processPong();
+ break;
+ case OPCODE_TEXT:
+ case OPCODE_BIN:
+ receiveMessage(m_payload, m_payloadLen, m_opcode == OPCODE_BIN);
+ break;
+ default:
+ disconnect();
+ }
+}
+
+void Balau::WebSocketWorker::processPing() {
+
+}
+
+void Balau::WebSocketWorker::processPong() {
}