summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile1
-rw-r--r--includes/Printer.h2
-rw-r--r--includes/Selectable.h49
-rw-r--r--includes/Socket.h24
-rw-r--r--src/Selectable.cc137
-rw-r--r--src/Socket.cc176
6 files changed, 233 insertions, 156 deletions
diff --git a/Makefile b/Makefile
index 0f01d95..cbbaeed 100644
--- a/Makefile
+++ b/Makefile
@@ -46,6 +46,7 @@ Handle.cc \
Input.cc \
Output.cc \
Socket.cc \
+Selectable.cc \
Buffer.cc \
BStream.cc \
ZHandle.cc \
diff --git a/includes/Printer.h b/includes/Printer.h
index cc9be2c..8494e49 100644
--- a/includes/Printer.h
+++ b/includes/Printer.h
@@ -27,6 +27,7 @@ enum {
#undef E_INPUT
#undef E_SOCKET
#undef E_THREAD
+#undef E_SELECT
enum {
E_STRING = 1,
@@ -39,6 +40,7 @@ enum {
E_OUTPUT = 128,
E_HTTPSERVER = 256,
E_ASYNC = 512,
+ E_SELECT = 1024,
};
class Printer {
diff --git a/includes/Selectable.h b/includes/Selectable.h
new file mode 100644
index 0000000..0cfae0c
--- /dev/null
+++ b/includes/Selectable.h
@@ -0,0 +1,49 @@
+#pragma once
+
+#include <Handle.h>
+#include <TaskMan.h>
+#include <Task.h>
+#include <StacklessTask.h>
+#include <Printer.h>
+
+namespace Balau {
+
+class Selectable : public Handle {
+ public:
+ ~Selectable();
+ virtual ssize_t read(void * buf, size_t count) throw (GeneralException);
+ virtual ssize_t write(const void * buf, size_t count) throw (GeneralException);
+ virtual bool isClosed();
+ virtual bool isEOF();
+
+ bool gotR() { return m_evtR->gotSignal(); }
+ bool gotW() { return m_evtW->gotSignal(); }
+
+ class SelectableEvent : public Events::BaseEvent {
+ public:
+ SelectableEvent(int fd, int evt = ev::READ | ev::WRITE) : m_task(NULL) { Printer::elog(E_SELECT, "Got a new SelectableEvent at %p", this); m_evt.set<SelectableEvent, &SelectableEvent::evt_cb>(this); m_evt.set(fd, evt); }
+ virtual ~SelectableEvent() { Printer::elog(E_SELECT, "Destroying a SelectableEvent at %p", this); m_evt.stop(); }
+ void stop() { Printer::elog(E_SELECT, "Stopping a SelectableEvent at %p", this); reset(); m_evt.stop(); }
+ private:
+ void evt_cb(ev::io & w, int revents) { Printer::elog(E_SELECT, "Got a libev callback on a SelectableEvent at %p", this); doSignal(); }
+ virtual void gotOwner(Task * task);
+
+ ev::io m_evt;
+ Task * m_task = NULL;
+ };
+
+ protected:
+ Selectable() { }
+ void setFD(int fd) throw (GeneralException);
+ void internalClose() { m_fd = -1; }
+ int getFD() { return m_fd; }
+ virtual ssize_t recv(int sockfd, void *buf, size_t len, int flags) = 0;
+ virtual ssize_t send(int sockfd, const void *buf, size_t len, int flags) = 0;
+
+ SelectableEvent * m_evtR = NULL, * m_evtW = NULL;
+
+ private:
+ int m_fd = -1;
+};
+
+};
diff --git a/includes/Socket.h b/includes/Socket.h
index 00dc2b4..cfa8218 100644
--- a/includes/Socket.h
+++ b/includes/Socket.h
@@ -8,6 +8,7 @@
#include <netdb.h>
#endif
#include <Handle.h>
+#include <Selectable.h>
#include <TaskMan.h>
#include <Task.h>
#include <StacklessTask.h>
@@ -17,47 +18,32 @@ namespace Balau {
struct DNSRequest;
-class Socket : public Handle {
+class Socket : public Selectable {
public:
Socket() throw (GeneralException);
- virtual void close() throw (GeneralException);
virtual ssize_t read(void * buf, size_t count) throw (GeneralException);
virtual ssize_t write(const void * buf, size_t count) throw (GeneralException);
- virtual bool isClosed();
- virtual bool isEOF();
+ virtual void close() throw (GeneralException);
virtual bool canRead();
virtual bool canWrite();
virtual const char * getName();
bool setLocal(const char * hostname = NULL, int port = 0);
bool connect(const char * hostname, int port);
- bool gotR() { return m_evtR->gotSignal(); }
- bool gotW() { return m_evtW->gotSignal(); }
IO<Socket> accept() throw (GeneralException);
bool listen();
bool resolved();
private:
Socket(int fd);
- class SocketEvent : public Events::BaseEvent {
- public:
- SocketEvent(int fd, int evt = ev::READ | ev::WRITE) : m_task(NULL) { Printer::elog(E_SOCKET, "Got a new SocketEvent at %p", this); m_evt.set<SocketEvent, &SocketEvent::evt_cb>(this); m_evt.set(fd, evt); }
- virtual ~SocketEvent() { Printer::elog(E_SOCKET, "Destroying a SocketEvent at %p", this); m_evt.stop(); }
- void stop() { Printer::elog(E_SOCKET, "Stopping a SocketEvent at %p", this); reset(); m_evt.stop(); }
- private:
- void evt_cb(ev::io & w, int revents) { Printer::elog(E_SOCKET, "Got a libev callback on a SocketEvent at %p", this); doSignal(); }
- virtual void gotOwner(Task * task);
- ev::io m_evt;
- Task * m_task = NULL;
- };
+ virtual ssize_t recv(int sockfd, void *buf, size_t len, int flags);
+ virtual ssize_t send(int sockfd, const void *buf, size_t len, int flags);
- int m_fd;
String m_name;
bool m_connected = false;
bool m_connecting = false;
bool m_listening = false;
sockaddr_in6 m_localAddr, m_remoteAddr;
- SocketEvent * m_evtR, * m_evtW;
DNSRequest * m_req = NULL;
};
diff --git a/src/Selectable.cc b/src/Selectable.cc
new file mode 100644
index 0000000..f716bb1
--- /dev/null
+++ b/src/Selectable.cc
@@ -0,0 +1,137 @@
+#include <sys/types.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <stdio.h>
+#include <errno.h>
+#include "Selectable.h"
+#include "Threads.h"
+#include "Printer.h"
+#include "Main.h"
+#include "Atomic.h"
+#include "Async.h"
+#include "Task.h"
+#include "TaskMan.h"
+
+#ifndef _WIN32
+namespace {
+
+class SigpipeBlocker : public Balau::AtStart {
+ public:
+ SigpipeBlocker() : AtStart(5) { }
+ virtual void doStart() {
+ struct sigaction new_actn, old_actn;
+ new_actn.sa_handler = SIG_IGN;
+ sigemptyset(&new_actn.sa_mask);
+ new_actn.sa_flags = 0;
+ sigaction(SIGPIPE, &new_actn, &old_actn);
+ }
+};
+
+static SigpipeBlocker sigpipeBlocker;
+
+};
+#endif
+
+void Balau::Selectable::SelectableEvent::gotOwner(Task * task) {
+ Printer::elog(E_SELECT, "Arming SelectableEvent at %p", this);
+ if (!m_task) {
+ Printer::elog(E_SELECT, "...with a new task (%p)", task);
+ } else if (task == m_task) {
+ Printer::elog(E_SELECT, "...with the same task, doing nothing.");
+ return;
+ } else {
+ Printer::elog(E_SELECT, "...with a new task (%p -> %p); stopping first", m_task, task);
+ m_evt.stop();
+ }
+ m_task = task;
+ m_evt.set(task->getLoop());
+ m_evt.start();
+}
+
+void Balau::Selectable::setFD(int fd) throw (GeneralException) {
+ if (m_fd >= 0)
+ throw GeneralException("FD already set.");
+ m_fd = fd;
+
+ m_evtR = new SelectableEvent(m_fd, ev::READ);
+ m_evtW = new SelectableEvent(m_fd, ev::WRITE);
+#ifdef _WIN32
+ u_long iMode = 1;
+ ioctlsocket(m_fd, FIONBIO, &iMode);
+#else
+ fcntl(m_fd, F_SETFL, O_NONBLOCK);
+#endif
+}
+
+Balau::Selectable::~Selectable() {
+ m_fd = -1;
+ delete m_evtR;
+ delete m_evtW;
+ m_evtR = m_evtW = NULL;
+}
+
+bool Balau::Selectable::isClosed() { return m_fd < 0; }
+bool Balau::Selectable::isEOF() { return isClosed(); }
+
+ssize_t Balau::Selectable::read(void * buf, size_t count) throw (GeneralException) {
+ if (count == 0)
+ return 0;
+
+ AAssert(m_fd >= 0, "You can't call read() on a closed selectable");
+
+ int spins = 0;
+
+ do {
+ ssize_t r = recv(m_fd, (char *) buf, count, 0);
+
+ if (r >= 0) {
+ if (r == 0)
+ close();
+ return r;
+ }
+
+ if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) {
+ Task::operationYield(m_evtR, Task::INTERRUPTIBLE);
+ } else {
+ m_evtR->stop();
+ return r;
+ }
+ } while (spins++ < 2);
+
+ return -1;
+}
+
+ssize_t Balau::Selectable::write(const void * buf, size_t count) throw (GeneralException) {
+ if (count == 0)
+ return 0;
+
+ AAssert(m_fd >= 0, "You can't call write() on a closed selectable");
+
+ int spins = 0;
+
+ do {
+ ssize_t r = send(m_fd, (const char *) buf, count, 0);
+
+ EAssert(r != 0, "send() returned 0 (broken pipe ?)");
+
+ if (r > 0)
+ return r;
+
+#ifndef _WIN32
+ if (errno == EPIPE) {
+ close();
+ return 0;
+ }
+#endif
+
+ if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) {
+ Task::operationYield(m_evtW, Task::INTERRUPTIBLE);
+ } else {
+ m_evtW->stop();
+ return r;
+ }
+ } while (spins++ < 2);
+
+ return -1;
+}
diff --git a/src/Socket.cc b/src/Socket.cc
index 8989a72..b22539f 100644
--- a/src/Socket.cc
+++ b/src/Socket.cc
@@ -17,42 +17,6 @@
#include "Task.h"
#include "TaskMan.h"
-#ifndef _WIN32
-namespace {
-
-class SigpipeBlocker : public Balau::AtStart {
- public:
- SigpipeBlocker() : AtStart(5) { }
- virtual void doStart() {
- struct sigaction new_actn, old_actn;
- new_actn.sa_handler = SIG_IGN;
- sigemptyset(&new_actn.sa_mask);
- new_actn.sa_flags = 0;
- sigaction(SIGPIPE, &new_actn, &old_actn);
- }
-};
-
-static SigpipeBlocker sigpipeBlocker;
-
-};
-#endif
-
-void Balau::Socket::SocketEvent::gotOwner(Task * task) {
- Printer::elog(E_SOCKET, "Arming SocketEvent at %p", this);
- if (!m_task) {
- Printer::elog(E_SOCKET, "...with a new task (%p)", task);
- } else if (task == m_task) {
- Printer::elog(E_SOCKET, "...with the same task, doing nothing.");
- return;
- } else {
- Printer::elog(E_SOCKET, "...with a new task (%p -> %p); stopping first", m_task, task);
- m_evt.stop();
- }
- m_task = task;
- m_evt.set(task->getLoop());
- m_evt.start();
-}
-
static Balau::String getErrorMessage() {
Balau::String msg;
#ifdef _WIN32
@@ -231,20 +195,16 @@ static Balau::DNSRequest * resolveName(const char * name, const char * service =
return req;
}
-Balau::Socket::Socket() throw (GeneralException) : m_fd(socket(AF_INET6, SOCK_STREAM, 0)) {
+Balau::Socket::Socket() throw (GeneralException) {
+ int fd = socket(AF_INET6, SOCK_STREAM, 0);
+
m_name = "Socket(nonconnected)";
- RAssert(m_fd >= 0, "socket() returned %i", m_fd);
- m_evtR = new SocketEvent(m_fd, ev::READ);
- m_evtW = new SocketEvent(m_fd, ev::WRITE);
-#ifdef _WIN32
- u_long iMode = 1;
- ioctlsocket(m_fd, FIONBIO, &iMode);
-#else
- fcntl(m_fd, F_SETFL, O_NONBLOCK);
-#endif
+ RAssert(fd >= 0, "socket() returned %i", fd);
+
+ setFD(fd);
int on = 0;
- int r = setsockopt(m_fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &on, sizeof(on));
+ int r = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &on, sizeof(on));
EAssert(r == 0, "setsockopt returned %i", r);
memset(&m_localAddr, 0, sizeof(m_localAddr));
@@ -252,15 +212,15 @@ Balau::Socket::Socket() throw (GeneralException) : m_fd(socket(AF_INET6, SOCK_ST
Printer::elog(E_SOCKET, "Creating a socket at %p", this);
}
-Balau::Socket::Socket(int fd) : m_fd(fd) {
+Balau::Socket::Socket(int fd) {
socklen_t len;
m_connected = true;
len = sizeof(m_localAddr);
- getsockname(m_fd, (sockaddr *) &m_localAddr, &len);
+ getsockname(fd, (sockaddr *) &m_localAddr, &len);
len = sizeof(m_remoteAddr);
- getpeername(m_fd, (sockaddr *) &m_remoteAddr, &len);
+ getpeername(fd, (sockaddr *) &m_remoteAddr, &len);
char prtLocal[INET6_ADDRSTRLEN], prtRemote[INET6_ADDRSTRLEN];
const char * rLocal, * rRemote;
@@ -272,40 +232,28 @@ Balau::Socket::Socket(int fd) : m_fd(fd) {
EAssert(rLocal, "inet_ntop returned NULL");
EAssert(rRemote, "inet_ntop returned NULL");
- m_evtR = new SocketEvent(m_fd, ev::READ);
- m_evtW = new SocketEvent(m_fd, ev::WRITE);
-#ifdef _WIN32
- u_long iMode = 1;
- ioctlsocket(m_fd, FIONBIO, &iMode);
-#else
- fcntl(m_fd, F_SETFL, O_NONBLOCK);
-#endif
+ setFD(fd);
m_name.set("Socket(Connected - [%s]:%i <- [%s]:%i)", rLocal, ntohs(m_localAddr.sin6_port), rRemote, ntohs(m_remoteAddr.sin6_port));
Printer::elog(E_SOCKET, "Created a new socket from listener at %p; %s", this, m_name.to_charp());
}
void Balau::Socket::close() throw (GeneralException) {
- if (m_fd < 0)
+ if (isClosed())
return;
#ifdef _WIN32
- closesocket(m_fd);
+ closesocket(getFD());
WSACleanup();
#else
- ::close(m_fd);
+ ::close(getFD());
#endif
Printer::elog(E_SOCKET, "Closing socket at %p", this);
m_connected = false;
m_connecting = false;
m_listening = false;
- m_fd = -1;
- delete m_evtR;
- delete m_evtW;
- m_evtR = m_evtW = NULL;
+ internalClose();
}
-bool Balau::Socket::isClosed() { return m_fd < 0; }
-bool Balau::Socket::isEOF() { return isClosed(); }
bool Balau::Socket::canRead() { return true; }
bool Balau::Socket::canWrite() { return true; }
const char * Balau::Socket::getName() { return m_name.to_charp(); }
@@ -358,9 +306,9 @@ bool Balau::Socket::setLocal(const char * hostname, int port) {
m_localAddr.sin6_family = AF_INET6;
#ifndef _WIN32
int enable = 1;
- setsockopt(m_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
+ setsockopt(getFD(), SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
#endif
- return bind(m_fd, (struct sockaddr *) &m_localAddr, sizeof(m_localAddr)) == 0;
+ return bind(getFD(), (struct sockaddr *) &m_localAddr, sizeof(m_localAddr)) == 0;
}
#if defined(_WIN32) && !defined(EISCONN)
@@ -371,7 +319,7 @@ bool Balau::Socket::connect(const char * hostname, int port) {
AAssert(!m_listening, "You can't call Socket::connect() on a listening socket");
AAssert(!m_connected, "You can't call Socket::connect() on an already connected socket");
AAssert(hostname, "You can't call Socket::connect() without a hostname");
- AAssert(m_fd >= 0, "You can't call Socket::connect() on a closed socket");
+ AAssert(!isClosed(), "You can't call Socket::connect() on a closed socket");
if (!m_connecting && !m_req) {
Printer::elog(E_SOCKET, "Resolving %s", hostname);
@@ -416,7 +364,7 @@ bool Balau::Socket::connect(const char * hostname, int port) {
m_req = NULL;
} else {
// if we end up there, it means our yield earlier threw an EAgain exception.
- AAssert(m_evtR->gotSignal(), "Please don't call connect after a EAgain without checking its signal first.");
+ AAssert(gotR(), "Please don't call connect after a EAgain without checking its signal first.");
}
int spins = 0;
@@ -426,7 +374,7 @@ bool Balau::Socket::connect(const char * hostname, int port) {
int r;
int err;
if (spins == 0) {
- r = ::connect(m_fd, (sockaddr *) &m_remoteAddr, sizeof(m_remoteAddr));
+ r = ::connect(getFD(), (sockaddr *) &m_remoteAddr, sizeof(m_remoteAddr));
#ifdef _WIN32
err = WSAGetLastError();
#else
@@ -434,7 +382,7 @@ bool Balau::Socket::connect(const char * hostname, int port) {
#endif
} else {
socklen_t sLen = sizeof(err);
- int g = getsockopt(m_fd, SOL_SOCKET, SO_ERROR, (char *) &err, &sLen);
+ int g = getsockopt(getFD(), SOL_SOCKET, SO_ERROR, (char *) &err, &sLen);
EAssert(g == 0, "getsockopt failed; g = %i", g);
r = err != 0 ? -1 : 0;
}
@@ -445,10 +393,10 @@ bool Balau::Socket::connect(const char * hostname, int port) {
socklen_t len;
len = sizeof(m_localAddr);
- getsockname(m_fd, (sockaddr *) &m_localAddr, &len);
+ getsockname(getFD(), (sockaddr *) &m_localAddr, &len);
len = sizeof(m_remoteAddr);
- getpeername(m_fd, (sockaddr *) &m_remoteAddr, &len);
+ getpeername(getFD(), (sockaddr *) &m_remoteAddr, &len);
char prtLocal[INET6_ADDRSTRLEN], prtRemote[INET6_ADDRSTRLEN];
const char * rLocal, * rRemote;
@@ -480,7 +428,7 @@ bool Balau::Socket::connect(const char * hostname, int port) {
Task::operationYield(m_evtW, Task::INTERRUPTIBLE);
// if we're still here, it means the parent task doesn't want to be thrown an exception
- IAssert(m_evtW->gotSignal(), "We shouldn't have been awoken without getting our event signalled");
+ IAssert(gotW(), "We shouldn't have been awoken without getting our event signalled");
} while (spins++ < 2);
@@ -491,15 +439,15 @@ bool Balau::Socket::listen() {
AAssert(!m_listening, "You can't call Socket::listen() on an already listening socket");
AAssert(!m_connecting, "You can't call Socket::listen() on a connecting socket");
AAssert(!m_connected, "You can't call Socket::listen() on a connected socket");
- AAssert(m_fd >= 0, "You can't call Socket::listen() on a closed socket");
+ AAssert(!isClosed(), "You can't call Socket::listen() on a closed socket");
- if (::listen(m_fd, 16) == 0) {
+ if (::listen(getFD(), 16) == 0) {
m_listening = true;
socklen_t len;
len = sizeof(m_localAddr);
- getsockname(m_fd, (sockaddr *) &m_localAddr, &len);
+ getsockname(getFD(), (sockaddr *) &m_localAddr, &len);
char prtLocal[INET6_ADDRSTRLEN];
const char * rLocal;
@@ -510,7 +458,7 @@ bool Balau::Socket::listen() {
EAssert(rLocal, "inet_ntop() returned NULL");
m_name.set("Socket(Listener - [%s]:%i)", rLocal, ntohs(m_localAddr.sin6_port));
- Printer::elog(E_SOCKET, "Socket %i started to listen: %s", m_fd, m_name.to_charp());
+ Printer::elog(E_SOCKET, "Socket %i started to listen: %s", getFD(), m_name.to_charp());
} else {
String msg = getErrorMessage();
Printer::elog(E_SOCKET, "listen() failed with error %i (%s)", errno, msg.to_charp());
@@ -527,13 +475,13 @@ bool Balau::Socket::listen() {
Balau::IO<Balau::Socket> Balau::Socket::accept() throw (GeneralException) {
AAssert(m_listening, "You can't call accept() on a non-listening socket");
- AAssert(m_fd >= 0, "You can't call accept() on a closed socket");
+ AAssert(!isClosed(), "You can't call accept() on a closed socket");
while(true) {
sockaddr_in6 remoteAddr;
socklen_t len = sizeof(sockaddr_in6);
- Printer::elog(E_SOCKET, "Socket %i (%s) is going to accept()", m_fd, m_name.to_charp());
- int s = ::accept(m_fd, (sockaddr *) &remoteAddr, &len);
+ Printer::elog(E_SOCKET, "Socket %i (%s) is going to accept()", getFD(), m_name.to_charp());
+ int s = ::accept(getFD(), (sockaddr *) &remoteAddr, &len);
if (s < 0) {
if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) {
@@ -551,67 +499,21 @@ Balau::IO<Balau::Socket> Balau::Socket::accept() throw (GeneralException) {
}
ssize_t Balau::Socket::read(void * buf, size_t count) throw (GeneralException) {
- if (count == 0)
- return 0;
-
AAssert(m_connected, "You can't call read() on a non-connected socket");
- AAssert(m_fd >= 0, "You can't call read() on a closed socket");
-
- int spins = 0;
-
- do {
- ssize_t r = ::recv(m_fd, (char *) buf, count, 0);
-
- if (r >= 0) {
- if (r == 0)
- close();
- return r;
- }
-
- if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) {
- Task::operationYield(m_evtR, Task::INTERRUPTIBLE);
- } else {
- m_evtR->stop();
- return r;
- }
- } while (spins++ < 2);
-
- return -1;
+ return Selectable::read(buf, count);
}
ssize_t Balau::Socket::write(const void * buf, size_t count) throw (GeneralException) {
- if (count == 0)
- return 0;
-
AAssert(m_connected, "You can't call write() on a non-connected socket");
- AAssert(m_fd >= 0, "You can't call write() on a closed socket");
-
- int spins = 0;
-
- do {
- ssize_t r = ::send(m_fd, (const char *) buf, count, 0);
-
- EAssert(r != 0, "send() returned 0 (broken pipe ?)");
-
- if (r > 0)
- return r;
-
-#ifndef _WIN32
- if (errno == EPIPE) {
- close();
- return 0;
- }
-#endif
+ return Selectable::write(buf, count);
+}
- if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) {
- Task::operationYield(m_evtW, Task::INTERRUPTIBLE);
- } else {
- m_evtW->stop();
- return r;
- }
- } while (spins++ < 2);
+ssize_t Balau::Socket::recv(int sockfd, void *buf, size_t len, int flags) {
+ return ::recv(sockfd, buf, len, flags);
+}
- return -1;
+ssize_t Balau::Socket::send(int sockfd, const void *buf, size_t len, int flags) {
+ return ::send(sockfd, buf, len, flags);
}
Balau::ListenerBase::ListenerBase(int port, const char * local, void * opaque) : m_listener(new Socket()), m_stop(false), m_local(local), m_port(port), m_opaque(opaque) {