summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorNicolas Noble <nnoble@blizzard.com>2013-07-16 11:14:34 -0700
committerNicolas Noble <nnoble@blizzard.com>2013-07-16 11:14:34 -0700
commitf8dbc120c055fb61fa79f901cc2974d049d04f4f (patch)
tree65241936fe02d4041e82508bfbd5f45d84523eb3 /src
parentd35f4eb97c77438af67466be97516aef76bee9f0 (diff)
Split the Socket class into Selectable, in order to let it work with other non-socket file descriptors.
Diffstat (limited to 'src')
-rw-r--r--src/Selectable.cc137
-rw-r--r--src/Socket.cc176
2 files changed, 176 insertions, 137 deletions
diff --git a/src/Selectable.cc b/src/Selectable.cc
new file mode 100644
index 0000000..f716bb1
--- /dev/null
+++ b/src/Selectable.cc
@@ -0,0 +1,137 @@
+#include <sys/types.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <stdio.h>
+#include <errno.h>
+#include "Selectable.h"
+#include "Threads.h"
+#include "Printer.h"
+#include "Main.h"
+#include "Atomic.h"
+#include "Async.h"
+#include "Task.h"
+#include "TaskMan.h"
+
+#ifndef _WIN32
+namespace {
+
+class SigpipeBlocker : public Balau::AtStart {
+ public:
+ SigpipeBlocker() : AtStart(5) { }
+ virtual void doStart() {
+ struct sigaction new_actn, old_actn;
+ new_actn.sa_handler = SIG_IGN;
+ sigemptyset(&new_actn.sa_mask);
+ new_actn.sa_flags = 0;
+ sigaction(SIGPIPE, &new_actn, &old_actn);
+ }
+};
+
+static SigpipeBlocker sigpipeBlocker;
+
+};
+#endif
+
+void Balau::Selectable::SelectableEvent::gotOwner(Task * task) {
+ Printer::elog(E_SELECT, "Arming SelectableEvent at %p", this);
+ if (!m_task) {
+ Printer::elog(E_SELECT, "...with a new task (%p)", task);
+ } else if (task == m_task) {
+ Printer::elog(E_SELECT, "...with the same task, doing nothing.");
+ return;
+ } else {
+ Printer::elog(E_SELECT, "...with a new task (%p -> %p); stopping first", m_task, task);
+ m_evt.stop();
+ }
+ m_task = task;
+ m_evt.set(task->getLoop());
+ m_evt.start();
+}
+
+void Balau::Selectable::setFD(int fd) throw (GeneralException) {
+ if (m_fd >= 0)
+ throw GeneralException("FD already set.");
+ m_fd = fd;
+
+ m_evtR = new SelectableEvent(m_fd, ev::READ);
+ m_evtW = new SelectableEvent(m_fd, ev::WRITE);
+#ifdef _WIN32
+ u_long iMode = 1;
+ ioctlsocket(m_fd, FIONBIO, &iMode);
+#else
+ fcntl(m_fd, F_SETFL, O_NONBLOCK);
+#endif
+}
+
+Balau::Selectable::~Selectable() {
+ m_fd = -1;
+ delete m_evtR;
+ delete m_evtW;
+ m_evtR = m_evtW = NULL;
+}
+
+bool Balau::Selectable::isClosed() { return m_fd < 0; }
+bool Balau::Selectable::isEOF() { return isClosed(); }
+
+ssize_t Balau::Selectable::read(void * buf, size_t count) throw (GeneralException) {
+ if (count == 0)
+ return 0;
+
+ AAssert(m_fd >= 0, "You can't call read() on a closed selectable");
+
+ int spins = 0;
+
+ do {
+ ssize_t r = recv(m_fd, (char *) buf, count, 0);
+
+ if (r >= 0) {
+ if (r == 0)
+ close();
+ return r;
+ }
+
+ if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) {
+ Task::operationYield(m_evtR, Task::INTERRUPTIBLE);
+ } else {
+ m_evtR->stop();
+ return r;
+ }
+ } while (spins++ < 2);
+
+ return -1;
+}
+
+ssize_t Balau::Selectable::write(const void * buf, size_t count) throw (GeneralException) {
+ if (count == 0)
+ return 0;
+
+ AAssert(m_fd >= 0, "You can't call write() on a closed selectable");
+
+ int spins = 0;
+
+ do {
+ ssize_t r = send(m_fd, (const char *) buf, count, 0);
+
+ EAssert(r != 0, "send() returned 0 (broken pipe ?)");
+
+ if (r > 0)
+ return r;
+
+#ifndef _WIN32
+ if (errno == EPIPE) {
+ close();
+ return 0;
+ }
+#endif
+
+ if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) {
+ Task::operationYield(m_evtW, Task::INTERRUPTIBLE);
+ } else {
+ m_evtW->stop();
+ return r;
+ }
+ } while (spins++ < 2);
+
+ return -1;
+}
diff --git a/src/Socket.cc b/src/Socket.cc
index 8989a72..b22539f 100644
--- a/src/Socket.cc
+++ b/src/Socket.cc
@@ -17,42 +17,6 @@
#include "Task.h"
#include "TaskMan.h"
-#ifndef _WIN32
-namespace {
-
-class SigpipeBlocker : public Balau::AtStart {
- public:
- SigpipeBlocker() : AtStart(5) { }
- virtual void doStart() {
- struct sigaction new_actn, old_actn;
- new_actn.sa_handler = SIG_IGN;
- sigemptyset(&new_actn.sa_mask);
- new_actn.sa_flags = 0;
- sigaction(SIGPIPE, &new_actn, &old_actn);
- }
-};
-
-static SigpipeBlocker sigpipeBlocker;
-
-};
-#endif
-
-void Balau::Socket::SocketEvent::gotOwner(Task * task) {
- Printer::elog(E_SOCKET, "Arming SocketEvent at %p", this);
- if (!m_task) {
- Printer::elog(E_SOCKET, "...with a new task (%p)", task);
- } else if (task == m_task) {
- Printer::elog(E_SOCKET, "...with the same task, doing nothing.");
- return;
- } else {
- Printer::elog(E_SOCKET, "...with a new task (%p -> %p); stopping first", m_task, task);
- m_evt.stop();
- }
- m_task = task;
- m_evt.set(task->getLoop());
- m_evt.start();
-}
-
static Balau::String getErrorMessage() {
Balau::String msg;
#ifdef _WIN32
@@ -231,20 +195,16 @@ static Balau::DNSRequest * resolveName(const char * name, const char * service =
return req;
}
-Balau::Socket::Socket() throw (GeneralException) : m_fd(socket(AF_INET6, SOCK_STREAM, 0)) {
+Balau::Socket::Socket() throw (GeneralException) {
+ int fd = socket(AF_INET6, SOCK_STREAM, 0);
+
m_name = "Socket(nonconnected)";
- RAssert(m_fd >= 0, "socket() returned %i", m_fd);
- m_evtR = new SocketEvent(m_fd, ev::READ);
- m_evtW = new SocketEvent(m_fd, ev::WRITE);
-#ifdef _WIN32
- u_long iMode = 1;
- ioctlsocket(m_fd, FIONBIO, &iMode);
-#else
- fcntl(m_fd, F_SETFL, O_NONBLOCK);
-#endif
+ RAssert(fd >= 0, "socket() returned %i", fd);
+
+ setFD(fd);
int on = 0;
- int r = setsockopt(m_fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &on, sizeof(on));
+ int r = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &on, sizeof(on));
EAssert(r == 0, "setsockopt returned %i", r);
memset(&m_localAddr, 0, sizeof(m_localAddr));
@@ -252,15 +212,15 @@ Balau::Socket::Socket() throw (GeneralException) : m_fd(socket(AF_INET6, SOCK_ST
Printer::elog(E_SOCKET, "Creating a socket at %p", this);
}
-Balau::Socket::Socket(int fd) : m_fd(fd) {
+Balau::Socket::Socket(int fd) {
socklen_t len;
m_connected = true;
len = sizeof(m_localAddr);
- getsockname(m_fd, (sockaddr *) &m_localAddr, &len);
+ getsockname(fd, (sockaddr *) &m_localAddr, &len);
len = sizeof(m_remoteAddr);
- getpeername(m_fd, (sockaddr *) &m_remoteAddr, &len);
+ getpeername(fd, (sockaddr *) &m_remoteAddr, &len);
char prtLocal[INET6_ADDRSTRLEN], prtRemote[INET6_ADDRSTRLEN];
const char * rLocal, * rRemote;
@@ -272,40 +232,28 @@ Balau::Socket::Socket(int fd) : m_fd(fd) {
EAssert(rLocal, "inet_ntop returned NULL");
EAssert(rRemote, "inet_ntop returned NULL");
- m_evtR = new SocketEvent(m_fd, ev::READ);
- m_evtW = new SocketEvent(m_fd, ev::WRITE);
-#ifdef _WIN32
- u_long iMode = 1;
- ioctlsocket(m_fd, FIONBIO, &iMode);
-#else
- fcntl(m_fd, F_SETFL, O_NONBLOCK);
-#endif
+ setFD(fd);
m_name.set("Socket(Connected - [%s]:%i <- [%s]:%i)", rLocal, ntohs(m_localAddr.sin6_port), rRemote, ntohs(m_remoteAddr.sin6_port));
Printer::elog(E_SOCKET, "Created a new socket from listener at %p; %s", this, m_name.to_charp());
}
void Balau::Socket::close() throw (GeneralException) {
- if (m_fd < 0)
+ if (isClosed())
return;
#ifdef _WIN32
- closesocket(m_fd);
+ closesocket(getFD());
WSACleanup();
#else
- ::close(m_fd);
+ ::close(getFD());
#endif
Printer::elog(E_SOCKET, "Closing socket at %p", this);
m_connected = false;
m_connecting = false;
m_listening = false;
- m_fd = -1;
- delete m_evtR;
- delete m_evtW;
- m_evtR = m_evtW = NULL;
+ internalClose();
}
-bool Balau::Socket::isClosed() { return m_fd < 0; }
-bool Balau::Socket::isEOF() { return isClosed(); }
bool Balau::Socket::canRead() { return true; }
bool Balau::Socket::canWrite() { return true; }
const char * Balau::Socket::getName() { return m_name.to_charp(); }
@@ -358,9 +306,9 @@ bool Balau::Socket::setLocal(const char * hostname, int port) {
m_localAddr.sin6_family = AF_INET6;
#ifndef _WIN32
int enable = 1;
- setsockopt(m_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
+ setsockopt(getFD(), SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable));
#endif
- return bind(m_fd, (struct sockaddr *) &m_localAddr, sizeof(m_localAddr)) == 0;
+ return bind(getFD(), (struct sockaddr *) &m_localAddr, sizeof(m_localAddr)) == 0;
}
#if defined(_WIN32) && !defined(EISCONN)
@@ -371,7 +319,7 @@ bool Balau::Socket::connect(const char * hostname, int port) {
AAssert(!m_listening, "You can't call Socket::connect() on a listening socket");
AAssert(!m_connected, "You can't call Socket::connect() on an already connected socket");
AAssert(hostname, "You can't call Socket::connect() without a hostname");
- AAssert(m_fd >= 0, "You can't call Socket::connect() on a closed socket");
+ AAssert(!isClosed(), "You can't call Socket::connect() on a closed socket");
if (!m_connecting && !m_req) {
Printer::elog(E_SOCKET, "Resolving %s", hostname);
@@ -416,7 +364,7 @@ bool Balau::Socket::connect(const char * hostname, int port) {
m_req = NULL;
} else {
// if we end up there, it means our yield earlier threw an EAgain exception.
- AAssert(m_evtR->gotSignal(), "Please don't call connect after a EAgain without checking its signal first.");
+ AAssert(gotR(), "Please don't call connect after a EAgain without checking its signal first.");
}
int spins = 0;
@@ -426,7 +374,7 @@ bool Balau::Socket::connect(const char * hostname, int port) {
int r;
int err;
if (spins == 0) {
- r = ::connect(m_fd, (sockaddr *) &m_remoteAddr, sizeof(m_remoteAddr));
+ r = ::connect(getFD(), (sockaddr *) &m_remoteAddr, sizeof(m_remoteAddr));
#ifdef _WIN32
err = WSAGetLastError();
#else
@@ -434,7 +382,7 @@ bool Balau::Socket::connect(const char * hostname, int port) {
#endif
} else {
socklen_t sLen = sizeof(err);
- int g = getsockopt(m_fd, SOL_SOCKET, SO_ERROR, (char *) &err, &sLen);
+ int g = getsockopt(getFD(), SOL_SOCKET, SO_ERROR, (char *) &err, &sLen);
EAssert(g == 0, "getsockopt failed; g = %i", g);
r = err != 0 ? -1 : 0;
}
@@ -445,10 +393,10 @@ bool Balau::Socket::connect(const char * hostname, int port) {
socklen_t len;
len = sizeof(m_localAddr);
- getsockname(m_fd, (sockaddr *) &m_localAddr, &len);
+ getsockname(getFD(), (sockaddr *) &m_localAddr, &len);
len = sizeof(m_remoteAddr);
- getpeername(m_fd, (sockaddr *) &m_remoteAddr, &len);
+ getpeername(getFD(), (sockaddr *) &m_remoteAddr, &len);
char prtLocal[INET6_ADDRSTRLEN], prtRemote[INET6_ADDRSTRLEN];
const char * rLocal, * rRemote;
@@ -480,7 +428,7 @@ bool Balau::Socket::connect(const char * hostname, int port) {
Task::operationYield(m_evtW, Task::INTERRUPTIBLE);
// if we're still here, it means the parent task doesn't want to be thrown an exception
- IAssert(m_evtW->gotSignal(), "We shouldn't have been awoken without getting our event signalled");
+ IAssert(gotW(), "We shouldn't have been awoken without getting our event signalled");
} while (spins++ < 2);
@@ -491,15 +439,15 @@ bool Balau::Socket::listen() {
AAssert(!m_listening, "You can't call Socket::listen() on an already listening socket");
AAssert(!m_connecting, "You can't call Socket::listen() on a connecting socket");
AAssert(!m_connected, "You can't call Socket::listen() on a connected socket");
- AAssert(m_fd >= 0, "You can't call Socket::listen() on a closed socket");
+ AAssert(!isClosed(), "You can't call Socket::listen() on a closed socket");
- if (::listen(m_fd, 16) == 0) {
+ if (::listen(getFD(), 16) == 0) {
m_listening = true;
socklen_t len;
len = sizeof(m_localAddr);
- getsockname(m_fd, (sockaddr *) &m_localAddr, &len);
+ getsockname(getFD(), (sockaddr *) &m_localAddr, &len);
char prtLocal[INET6_ADDRSTRLEN];
const char * rLocal;
@@ -510,7 +458,7 @@ bool Balau::Socket::listen() {
EAssert(rLocal, "inet_ntop() returned NULL");
m_name.set("Socket(Listener - [%s]:%i)", rLocal, ntohs(m_localAddr.sin6_port));
- Printer::elog(E_SOCKET, "Socket %i started to listen: %s", m_fd, m_name.to_charp());
+ Printer::elog(E_SOCKET, "Socket %i started to listen: %s", getFD(), m_name.to_charp());
} else {
String msg = getErrorMessage();
Printer::elog(E_SOCKET, "listen() failed with error %i (%s)", errno, msg.to_charp());
@@ -527,13 +475,13 @@ bool Balau::Socket::listen() {
Balau::IO<Balau::Socket> Balau::Socket::accept() throw (GeneralException) {
AAssert(m_listening, "You can't call accept() on a non-listening socket");
- AAssert(m_fd >= 0, "You can't call accept() on a closed socket");
+ AAssert(!isClosed(), "You can't call accept() on a closed socket");
while(true) {
sockaddr_in6 remoteAddr;
socklen_t len = sizeof(sockaddr_in6);
- Printer::elog(E_SOCKET, "Socket %i (%s) is going to accept()", m_fd, m_name.to_charp());
- int s = ::accept(m_fd, (sockaddr *) &remoteAddr, &len);
+ Printer::elog(E_SOCKET, "Socket %i (%s) is going to accept()", getFD(), m_name.to_charp());
+ int s = ::accept(getFD(), (sockaddr *) &remoteAddr, &len);
if (s < 0) {
if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) {
@@ -551,67 +499,21 @@ Balau::IO<Balau::Socket> Balau::Socket::accept() throw (GeneralException) {
}
ssize_t Balau::Socket::read(void * buf, size_t count) throw (GeneralException) {
- if (count == 0)
- return 0;
-
AAssert(m_connected, "You can't call read() on a non-connected socket");
- AAssert(m_fd >= 0, "You can't call read() on a closed socket");
-
- int spins = 0;
-
- do {
- ssize_t r = ::recv(m_fd, (char *) buf, count, 0);
-
- if (r >= 0) {
- if (r == 0)
- close();
- return r;
- }
-
- if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) {
- Task::operationYield(m_evtR, Task::INTERRUPTIBLE);
- } else {
- m_evtR->stop();
- return r;
- }
- } while (spins++ < 2);
-
- return -1;
+ return Selectable::read(buf, count);
}
ssize_t Balau::Socket::write(const void * buf, size_t count) throw (GeneralException) {
- if (count == 0)
- return 0;
-
AAssert(m_connected, "You can't call write() on a non-connected socket");
- AAssert(m_fd >= 0, "You can't call write() on a closed socket");
-
- int spins = 0;
-
- do {
- ssize_t r = ::send(m_fd, (const char *) buf, count, 0);
-
- EAssert(r != 0, "send() returned 0 (broken pipe ?)");
-
- if (r > 0)
- return r;
-
-#ifndef _WIN32
- if (errno == EPIPE) {
- close();
- return 0;
- }
-#endif
+ return Selectable::write(buf, count);
+}
- if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) {
- Task::operationYield(m_evtW, Task::INTERRUPTIBLE);
- } else {
- m_evtW->stop();
- return r;
- }
- } while (spins++ < 2);
+ssize_t Balau::Socket::recv(int sockfd, void *buf, size_t len, int flags) {
+ return ::recv(sockfd, buf, len, flags);
+}
- return -1;
+ssize_t Balau::Socket::send(int sockfd, const void *buf, size_t len, int flags) {
+ return ::send(sockfd, buf, len, flags);
}
Balau::ListenerBase::ListenerBase(int port, const char * local, void * opaque) : m_listener(new Socket()), m_stop(false), m_local(local), m_port(port), m_opaque(opaque) {