diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/BStream.cc | 1 | ||||
-rw-r--r-- | src/BWebSocket.cc | 96 | ||||
-rw-r--r-- | src/Task.cc | 3 |
3 files changed, 98 insertions, 2 deletions
diff --git a/src/BStream.cc b/src/BStream.cc index 3a3f972..b8f5433 100644 --- a/src/BStream.cc +++ b/src/BStream.cc @@ -14,6 +14,7 @@ void Balau::BStream::close() throw (Balau::GeneralException) { if (!m_detached) m_h->close(); free(m_buffer); + m_buffer = NULL; m_availBytes = 0; m_cursor = 0; } diff --git a/src/BWebSocket.cc b/src/BWebSocket.cc index 370b702..3e81e7d 100644 --- a/src/BWebSocket.cc +++ b/src/BWebSocket.cc @@ -5,11 +5,107 @@ #define rotate(value) (((value) << 8) | ((value) >> 24)) +Balau::WebSocketFrame::WebSocketFrame(const uint8_t * data, size_t len, uint8_t opcode, bool doMask) { + m_len = len; + m_headerSize = 2; + if (m_len >= 126) m_headerSize += 2; + if (m_len >= 65536) m_headerSize += 6; + if (doMask) m_headerSize += 4; + m_data = (uint8_t *)malloc(m_len + m_headerSize); + uint8_t * maskPtr; + + m_data[0] = 0x80 | opcode; + m_data[1] = doMask ? 0x80 : 0x00; + if (m_len < 125) { + m_data[1] |= m_len; + maskPtr = m_data + 2; + } else if (m_len < 65536) { + m_data[1] |= 126; + m_data[2] = m_len >> 8; + m_data[3] = m_len & 0xff; + maskPtr = m_data + 4; + } else { + m_data[1] |= 127; + uint8_t * lenPtr = maskPtr = m_data + 10; + size_t len = m_len; + *(--lenPtr) = len & 0xff; len >>= 8; + *(--lenPtr) = len & 0xff; len >>= 8; + *(--lenPtr) = len & 0xff; len >>= 8; + *(--lenPtr) = len & 0xff; len >>= 8; + *(--lenPtr) = len & 0xff; len >>= 8; + *(--lenPtr) = len & 0xff; len >>= 8; + *(--lenPtr) = len & 0xff; len >>= 8; + *(--lenPtr) = len & 0xff; len >>= 8; + } + + if (doMask) { + uint32_t mask = m_mask; + for (int i = 0; i < 4; i++) { + *(maskPtr++) = mask >> 24; + mask <<= 8; + } + } else { + m_mask = 0; + } + + if (data) memcpy(maskPtr, data, m_len); +} + +uint8_t & Balau::WebSocketFrame::operator[](size_t idx) { + static uint8_t dummy = 0; + if (idx >= m_len) return dummy; + return m_data[idx + m_headerSize]; +} + +void Balau::WebSocketFrame::send(Balau::IO<Balau::Handle> socket) { + size_t totalLen = m_headerSize + m_len; + + if (m_mask) { + for (int i = m_headerSize; i < totalLen; i++) { + m_data[i] ^= m_mask >> 24; + m_mask = rotate(m_mask); + } + m_mask = 0; + } + + while (m_bytesSent < totalLen) { + ssize_t r = socket->write(m_data + m_bytesSent, totalLen - m_bytesSent); + if (r < 0) + m_bytesSent = totalLen; + else + m_bytesSent += r; + } +} + +Balau::WebSocketWorker::~WebSocketWorker() { + free(m_payload); + delete m_sending; + while (!m_sendQueue.isEmpty()) + delete m_sendQueue.pop(); +} + void Balau::WebSocketWorker::Do() { uint8_t c; + waitFor(m_sendQueue.getEvent()); + m_sendQueue.getEvent()->resetMaybe(); + try { while (!m_socket->isClosed()) { + for (;;) { + if (m_sending) + m_sending->send(m_socket); + + if (!m_sendQueue.isEmpty()) + m_sending = m_sendQueue.pop(); + else + break; + if (m_socket->isClosed()) return; + } + + delete m_sending; + m_sending = NULL; + switch (m_state) { case READ_H: m_socket->read(&c, 1); diff --git a/src/Task.cc b/src/Task.cc index c97077b..9b15062 100644 --- a/src/Task.cc +++ b/src/Task.cc @@ -370,8 +370,7 @@ void * Balau::QueueBase::iPop(Events::Async * event, bool wait) { m_lock.leave(); Task::operationYield(event, Task::INTERRUPTIBLE); m_lock.enter(); - if (event->gotSignal()) - event->reset(); + event->resetMaybe(); } else { pthread_cond_wait(&m_cond, &m_lock.m_lock); } |