summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
m---------LuaJIT0
-rw-r--r--includes/BStream.h4
-rw-r--r--includes/BWebSocket.h26
-rw-r--r--includes/Task.h1
-rw-r--r--src/BStream.cc1
-rw-r--r--src/BWebSocket.cc96
-rw-r--r--src/Task.cc3
7 files changed, 125 insertions, 6 deletions
diff --git a/LuaJIT b/LuaJIT
-Subproject ec96d8b494f0fa87cb8a31a38e7c9241f4f414d
+Subproject 21af151af28d4b3524684b106bd19b02484f67f
diff --git a/includes/BStream.h b/includes/BStream.h
index fd00b83..f39765d 100644
--- a/includes/BStream.h
+++ b/includes/BStream.h
@@ -11,9 +11,11 @@ class BStream : public Handle {
virtual bool isClosed();
virtual bool isEOF();
virtual bool canRead();
+ virtual bool canWrite() { return m_h->canWrite(); }
virtual const char * getName();
virtual ssize_t read(void * buf, size_t count) throw (GeneralException);
- virtual off64_t getSize();
+ virtual ssize_t write(const void * buf, size_t count) throw (GeneralException) { return m_h->write(buf, count); }
+ virtual off64_t getSize();
int peekNextByte();
String readString(bool putNL = false);
bool isEmpty() { return m_availBytes == 0; }
diff --git a/includes/BWebSocket.h b/includes/BWebSocket.h
index a230a89..49f8cd1 100644
--- a/includes/BWebSocket.h
+++ b/includes/BWebSocket.h
@@ -9,15 +9,33 @@ namespace Balau {
class WebSocketActionBase;
+class WebSocketFrame {
+ public:
+ WebSocketFrame(const String & str, uint8_t opcode, bool mask = false) : WebSocketFrame((uint8_t *) str.to_charp(), str.strlen(), opcode, mask) { }
+ WebSocketFrame(size_t len, uint8_t opcode, bool mask = false) : WebSocketFrame(NULL, len, opcode, mask) { }
+ WebSocketFrame(const uint8_t * data, size_t len, uint8_t opcode, bool mask = false);
+ ~WebSocketFrame() { free(m_data); }
+ uint8_t & operator[](size_t idx);
+ uint8_t * getPtr() { return m_data + m_headerSize; }
+ void send(IO<Handle> socket);
+ private:
+ uint8_t * m_data = NULL;
+ size_t m_len = 0;
+ size_t m_headerSize = 0;
+ uint32_t m_mask = 'BLAH';
+ size_t m_bytesSent = 0;
+};
+
class WebSocketWorker : public StacklessTask {
public:
virtual bool parse(Http::Request & req) { return true; }
+ void sendFrame(WebSocketFrame * frame) { m_sendQueue.push(frame); }
protected:
- WebSocketWorker(IO<Handle> socket, const String & url) : m_socket(new BStream(socket)) { m_name = String("WebSocket:") + url + "/" + m_socket->getName(); }
- ~WebSocketWorker() { free(m_payload); }
+ WebSocketWorker(IO<Handle> socket, const String & url) : m_socket(new BStream(socket)) { m_name = String("WebSocket:") + url + ":" + m_socket->getName(); }
+ ~WebSocketWorker();
void disconnect() { m_socket->close(); }
virtual void receiveMessage(const uint8_t * msg, size_t len, bool binary) = 0;
-private:
+ private:
void processMessage();
void processPing();
void processPong();
@@ -34,6 +52,8 @@ private:
} m_status = READ_H;
enum { MAX_WEBSOCKET_LIMIT = 4 * 1024 * 1024 };
uint8_t * m_payload = NULL;
+ WebSocketFrame * m_sending = NULL;
+ TQueue<WebSocketFrame> m_sendQueue;
uint64_t m_payloadLen;
uint64_t m_totalLen;
uint64_t m_remainingBytes;
diff --git a/includes/Task.h b/includes/Task.h
index 7521b9f..0f04b32 100644
--- a/includes/Task.h
+++ b/includes/Task.h
@@ -344,6 +344,7 @@ class TQueue : public QueueBase {
public:
void push(T * t) { iPush(t, &m_event); }
T * pop() { return (T *) iPop(&m_event, true); }
+ Events::Async * getEvent() { return &m_event; }
private:
Events::Async m_event;
};
diff --git a/src/BStream.cc b/src/BStream.cc
index 3a3f972..b8f5433 100644
--- a/src/BStream.cc
+++ b/src/BStream.cc
@@ -14,6 +14,7 @@ void Balau::BStream::close() throw (Balau::GeneralException) {
if (!m_detached)
m_h->close();
free(m_buffer);
+ m_buffer = NULL;
m_availBytes = 0;
m_cursor = 0;
}
diff --git a/src/BWebSocket.cc b/src/BWebSocket.cc
index 370b702..3e81e7d 100644
--- a/src/BWebSocket.cc
+++ b/src/BWebSocket.cc
@@ -5,11 +5,107 @@
#define rotate(value) (((value) << 8) | ((value) >> 24))
+Balau::WebSocketFrame::WebSocketFrame(const uint8_t * data, size_t len, uint8_t opcode, bool doMask) {
+ m_len = len;
+ m_headerSize = 2;
+ if (m_len >= 126) m_headerSize += 2;
+ if (m_len >= 65536) m_headerSize += 6;
+ if (doMask) m_headerSize += 4;
+ m_data = (uint8_t *)malloc(m_len + m_headerSize);
+ uint8_t * maskPtr;
+
+ m_data[0] = 0x80 | opcode;
+ m_data[1] = doMask ? 0x80 : 0x00;
+ if (m_len < 125) {
+ m_data[1] |= m_len;
+ maskPtr = m_data + 2;
+ } else if (m_len < 65536) {
+ m_data[1] |= 126;
+ m_data[2] = m_len >> 8;
+ m_data[3] = m_len & 0xff;
+ maskPtr = m_data + 4;
+ } else {
+ m_data[1] |= 127;
+ uint8_t * lenPtr = maskPtr = m_data + 10;
+ size_t len = m_len;
+ *(--lenPtr) = len & 0xff; len >>= 8;
+ *(--lenPtr) = len & 0xff; len >>= 8;
+ *(--lenPtr) = len & 0xff; len >>= 8;
+ *(--lenPtr) = len & 0xff; len >>= 8;
+ *(--lenPtr) = len & 0xff; len >>= 8;
+ *(--lenPtr) = len & 0xff; len >>= 8;
+ *(--lenPtr) = len & 0xff; len >>= 8;
+ *(--lenPtr) = len & 0xff; len >>= 8;
+ }
+
+ if (doMask) {
+ uint32_t mask = m_mask;
+ for (int i = 0; i < 4; i++) {
+ *(maskPtr++) = mask >> 24;
+ mask <<= 8;
+ }
+ } else {
+ m_mask = 0;
+ }
+
+ if (data) memcpy(maskPtr, data, m_len);
+}
+
+uint8_t & Balau::WebSocketFrame::operator[](size_t idx) {
+ static uint8_t dummy = 0;
+ if (idx >= m_len) return dummy;
+ return m_data[idx + m_headerSize];
+}
+
+void Balau::WebSocketFrame::send(Balau::IO<Balau::Handle> socket) {
+ size_t totalLen = m_headerSize + m_len;
+
+ if (m_mask) {
+ for (int i = m_headerSize; i < totalLen; i++) {
+ m_data[i] ^= m_mask >> 24;
+ m_mask = rotate(m_mask);
+ }
+ m_mask = 0;
+ }
+
+ while (m_bytesSent < totalLen) {
+ ssize_t r = socket->write(m_data + m_bytesSent, totalLen - m_bytesSent);
+ if (r < 0)
+ m_bytesSent = totalLen;
+ else
+ m_bytesSent += r;
+ }
+}
+
+Balau::WebSocketWorker::~WebSocketWorker() {
+ free(m_payload);
+ delete m_sending;
+ while (!m_sendQueue.isEmpty())
+ delete m_sendQueue.pop();
+}
+
void Balau::WebSocketWorker::Do() {
uint8_t c;
+ waitFor(m_sendQueue.getEvent());
+ m_sendQueue.getEvent()->resetMaybe();
+
try {
while (!m_socket->isClosed()) {
+ for (;;) {
+ if (m_sending)
+ m_sending->send(m_socket);
+
+ if (!m_sendQueue.isEmpty())
+ m_sending = m_sendQueue.pop();
+ else
+ break;
+ if (m_socket->isClosed()) return;
+ }
+
+ delete m_sending;
+ m_sending = NULL;
+
switch (m_state) {
case READ_H:
m_socket->read(&c, 1);
diff --git a/src/Task.cc b/src/Task.cc
index c97077b..9b15062 100644
--- a/src/Task.cc
+++ b/src/Task.cc
@@ -370,8 +370,7 @@ void * Balau::QueueBase::iPop(Events::Async * event, bool wait) {
m_lock.leave();
Task::operationYield(event, Task::INTERRUPTIBLE);
m_lock.enter();
- if (event->gotSignal())
- event->reset();
+ event->resetMaybe();
} else {
pthread_cond_wait(&m_cond, &m_lock.m_lock);
}