From f8dbc120c055fb61fa79f901cc2974d049d04f4f Mon Sep 17 00:00:00 2001 From: Nicolas Noble Date: Tue, 16 Jul 2013 11:14:34 -0700 Subject: Split the Socket class into Selectable, in order to let it work with other non-socket file descriptors. --- Makefile | 1 + includes/Printer.h | 2 + includes/Selectable.h | 49 ++++++++++++++ includes/Socket.h | 24 ++----- src/Selectable.cc | 137 +++++++++++++++++++++++++++++++++++++++ src/Socket.cc | 176 +++++++++++--------------------------------------- 6 files changed, 233 insertions(+), 156 deletions(-) create mode 100644 includes/Selectable.h create mode 100644 src/Selectable.cc diff --git a/Makefile b/Makefile index 0f01d95..cbbaeed 100644 --- a/Makefile +++ b/Makefile @@ -46,6 +46,7 @@ Handle.cc \ Input.cc \ Output.cc \ Socket.cc \ +Selectable.cc \ Buffer.cc \ BStream.cc \ ZHandle.cc \ diff --git a/includes/Printer.h b/includes/Printer.h index cc9be2c..8494e49 100644 --- a/includes/Printer.h +++ b/includes/Printer.h @@ -27,6 +27,7 @@ enum { #undef E_INPUT #undef E_SOCKET #undef E_THREAD +#undef E_SELECT enum { E_STRING = 1, @@ -39,6 +40,7 @@ enum { E_OUTPUT = 128, E_HTTPSERVER = 256, E_ASYNC = 512, + E_SELECT = 1024, }; class Printer { diff --git a/includes/Selectable.h b/includes/Selectable.h new file mode 100644 index 0000000..0cfae0c --- /dev/null +++ b/includes/Selectable.h @@ -0,0 +1,49 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace Balau { + +class Selectable : public Handle { + public: + ~Selectable(); + virtual ssize_t read(void * buf, size_t count) throw (GeneralException); + virtual ssize_t write(const void * buf, size_t count) throw (GeneralException); + virtual bool isClosed(); + virtual bool isEOF(); + + bool gotR() { return m_evtR->gotSignal(); } + bool gotW() { return m_evtW->gotSignal(); } + + class SelectableEvent : public Events::BaseEvent { + public: + SelectableEvent(int fd, int evt = ev::READ | ev::WRITE) : m_task(NULL) { Printer::elog(E_SELECT, "Got a new SelectableEvent at %p", this); m_evt.set(this); m_evt.set(fd, evt); } + virtual ~SelectableEvent() { Printer::elog(E_SELECT, "Destroying a SelectableEvent at %p", this); m_evt.stop(); } + void stop() { Printer::elog(E_SELECT, "Stopping a SelectableEvent at %p", this); reset(); m_evt.stop(); } + private: + void evt_cb(ev::io & w, int revents) { Printer::elog(E_SELECT, "Got a libev callback on a SelectableEvent at %p", this); doSignal(); } + virtual void gotOwner(Task * task); + + ev::io m_evt; + Task * m_task = NULL; + }; + + protected: + Selectable() { } + void setFD(int fd) throw (GeneralException); + void internalClose() { m_fd = -1; } + int getFD() { return m_fd; } + virtual ssize_t recv(int sockfd, void *buf, size_t len, int flags) = 0; + virtual ssize_t send(int sockfd, const void *buf, size_t len, int flags) = 0; + + SelectableEvent * m_evtR = NULL, * m_evtW = NULL; + + private: + int m_fd = -1; +}; + +}; diff --git a/includes/Socket.h b/includes/Socket.h index 00dc2b4..cfa8218 100644 --- a/includes/Socket.h +++ b/includes/Socket.h @@ -8,6 +8,7 @@ #include #endif #include +#include #include #include #include @@ -17,47 +18,32 @@ namespace Balau { struct DNSRequest; -class Socket : public Handle { +class Socket : public Selectable { public: Socket() throw (GeneralException); - virtual void close() throw (GeneralException); virtual ssize_t read(void * buf, size_t count) throw (GeneralException); virtual ssize_t write(const void * buf, size_t count) throw (GeneralException); - virtual bool isClosed(); - virtual bool isEOF(); + virtual void close() throw (GeneralException); virtual bool canRead(); virtual bool canWrite(); virtual const char * getName(); bool setLocal(const char * hostname = NULL, int port = 0); bool connect(const char * hostname, int port); - bool gotR() { return m_evtR->gotSignal(); } - bool gotW() { return m_evtW->gotSignal(); } IO accept() throw (GeneralException); bool listen(); bool resolved(); private: Socket(int fd); - class SocketEvent : public Events::BaseEvent { - public: - SocketEvent(int fd, int evt = ev::READ | ev::WRITE) : m_task(NULL) { Printer::elog(E_SOCKET, "Got a new SocketEvent at %p", this); m_evt.set(this); m_evt.set(fd, evt); } - virtual ~SocketEvent() { Printer::elog(E_SOCKET, "Destroying a SocketEvent at %p", this); m_evt.stop(); } - void stop() { Printer::elog(E_SOCKET, "Stopping a SocketEvent at %p", this); reset(); m_evt.stop(); } - private: - void evt_cb(ev::io & w, int revents) { Printer::elog(E_SOCKET, "Got a libev callback on a SocketEvent at %p", this); doSignal(); } - virtual void gotOwner(Task * task); - ev::io m_evt; - Task * m_task = NULL; - }; + virtual ssize_t recv(int sockfd, void *buf, size_t len, int flags); + virtual ssize_t send(int sockfd, const void *buf, size_t len, int flags); - int m_fd; String m_name; bool m_connected = false; bool m_connecting = false; bool m_listening = false; sockaddr_in6 m_localAddr, m_remoteAddr; - SocketEvent * m_evtR, * m_evtW; DNSRequest * m_req = NULL; }; diff --git a/src/Selectable.cc b/src/Selectable.cc new file mode 100644 index 0000000..f716bb1 --- /dev/null +++ b/src/Selectable.cc @@ -0,0 +1,137 @@ +#include +#include +#include +#include +#include +#include +#include "Selectable.h" +#include "Threads.h" +#include "Printer.h" +#include "Main.h" +#include "Atomic.h" +#include "Async.h" +#include "Task.h" +#include "TaskMan.h" + +#ifndef _WIN32 +namespace { + +class SigpipeBlocker : public Balau::AtStart { + public: + SigpipeBlocker() : AtStart(5) { } + virtual void doStart() { + struct sigaction new_actn, old_actn; + new_actn.sa_handler = SIG_IGN; + sigemptyset(&new_actn.sa_mask); + new_actn.sa_flags = 0; + sigaction(SIGPIPE, &new_actn, &old_actn); + } +}; + +static SigpipeBlocker sigpipeBlocker; + +}; +#endif + +void Balau::Selectable::SelectableEvent::gotOwner(Task * task) { + Printer::elog(E_SELECT, "Arming SelectableEvent at %p", this); + if (!m_task) { + Printer::elog(E_SELECT, "...with a new task (%p)", task); + } else if (task == m_task) { + Printer::elog(E_SELECT, "...with the same task, doing nothing."); + return; + } else { + Printer::elog(E_SELECT, "...with a new task (%p -> %p); stopping first", m_task, task); + m_evt.stop(); + } + m_task = task; + m_evt.set(task->getLoop()); + m_evt.start(); +} + +void Balau::Selectable::setFD(int fd) throw (GeneralException) { + if (m_fd >= 0) + throw GeneralException("FD already set."); + m_fd = fd; + + m_evtR = new SelectableEvent(m_fd, ev::READ); + m_evtW = new SelectableEvent(m_fd, ev::WRITE); +#ifdef _WIN32 + u_long iMode = 1; + ioctlsocket(m_fd, FIONBIO, &iMode); +#else + fcntl(m_fd, F_SETFL, O_NONBLOCK); +#endif +} + +Balau::Selectable::~Selectable() { + m_fd = -1; + delete m_evtR; + delete m_evtW; + m_evtR = m_evtW = NULL; +} + +bool Balau::Selectable::isClosed() { return m_fd < 0; } +bool Balau::Selectable::isEOF() { return isClosed(); } + +ssize_t Balau::Selectable::read(void * buf, size_t count) throw (GeneralException) { + if (count == 0) + return 0; + + AAssert(m_fd >= 0, "You can't call read() on a closed selectable"); + + int spins = 0; + + do { + ssize_t r = recv(m_fd, (char *) buf, count, 0); + + if (r >= 0) { + if (r == 0) + close(); + return r; + } + + if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) { + Task::operationYield(m_evtR, Task::INTERRUPTIBLE); + } else { + m_evtR->stop(); + return r; + } + } while (spins++ < 2); + + return -1; +} + +ssize_t Balau::Selectable::write(const void * buf, size_t count) throw (GeneralException) { + if (count == 0) + return 0; + + AAssert(m_fd >= 0, "You can't call write() on a closed selectable"); + + int spins = 0; + + do { + ssize_t r = send(m_fd, (const char *) buf, count, 0); + + EAssert(r != 0, "send() returned 0 (broken pipe ?)"); + + if (r > 0) + return r; + +#ifndef _WIN32 + if (errno == EPIPE) { + close(); + return 0; + } +#endif + + if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) { + Task::operationYield(m_evtW, Task::INTERRUPTIBLE); + } else { + m_evtW->stop(); + return r; + } + } while (spins++ < 2); + + return -1; +} diff --git a/src/Socket.cc b/src/Socket.cc index 8989a72..b22539f 100644 --- a/src/Socket.cc +++ b/src/Socket.cc @@ -17,42 +17,6 @@ #include "Task.h" #include "TaskMan.h" -#ifndef _WIN32 -namespace { - -class SigpipeBlocker : public Balau::AtStart { - public: - SigpipeBlocker() : AtStart(5) { } - virtual void doStart() { - struct sigaction new_actn, old_actn; - new_actn.sa_handler = SIG_IGN; - sigemptyset(&new_actn.sa_mask); - new_actn.sa_flags = 0; - sigaction(SIGPIPE, &new_actn, &old_actn); - } -}; - -static SigpipeBlocker sigpipeBlocker; - -}; -#endif - -void Balau::Socket::SocketEvent::gotOwner(Task * task) { - Printer::elog(E_SOCKET, "Arming SocketEvent at %p", this); - if (!m_task) { - Printer::elog(E_SOCKET, "...with a new task (%p)", task); - } else if (task == m_task) { - Printer::elog(E_SOCKET, "...with the same task, doing nothing."); - return; - } else { - Printer::elog(E_SOCKET, "...with a new task (%p -> %p); stopping first", m_task, task); - m_evt.stop(); - } - m_task = task; - m_evt.set(task->getLoop()); - m_evt.start(); -} - static Balau::String getErrorMessage() { Balau::String msg; #ifdef _WIN32 @@ -231,20 +195,16 @@ static Balau::DNSRequest * resolveName(const char * name, const char * service = return req; } -Balau::Socket::Socket() throw (GeneralException) : m_fd(socket(AF_INET6, SOCK_STREAM, 0)) { +Balau::Socket::Socket() throw (GeneralException) { + int fd = socket(AF_INET6, SOCK_STREAM, 0); + m_name = "Socket(nonconnected)"; - RAssert(m_fd >= 0, "socket() returned %i", m_fd); - m_evtR = new SocketEvent(m_fd, ev::READ); - m_evtW = new SocketEvent(m_fd, ev::WRITE); -#ifdef _WIN32 - u_long iMode = 1; - ioctlsocket(m_fd, FIONBIO, &iMode); -#else - fcntl(m_fd, F_SETFL, O_NONBLOCK); -#endif + RAssert(fd >= 0, "socket() returned %i", fd); + + setFD(fd); int on = 0; - int r = setsockopt(m_fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &on, sizeof(on)); + int r = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &on, sizeof(on)); EAssert(r == 0, "setsockopt returned %i", r); memset(&m_localAddr, 0, sizeof(m_localAddr)); @@ -252,15 +212,15 @@ Balau::Socket::Socket() throw (GeneralException) : m_fd(socket(AF_INET6, SOCK_ST Printer::elog(E_SOCKET, "Creating a socket at %p", this); } -Balau::Socket::Socket(int fd) : m_fd(fd) { +Balau::Socket::Socket(int fd) { socklen_t len; m_connected = true; len = sizeof(m_localAddr); - getsockname(m_fd, (sockaddr *) &m_localAddr, &len); + getsockname(fd, (sockaddr *) &m_localAddr, &len); len = sizeof(m_remoteAddr); - getpeername(m_fd, (sockaddr *) &m_remoteAddr, &len); + getpeername(fd, (sockaddr *) &m_remoteAddr, &len); char prtLocal[INET6_ADDRSTRLEN], prtRemote[INET6_ADDRSTRLEN]; const char * rLocal, * rRemote; @@ -272,40 +232,28 @@ Balau::Socket::Socket(int fd) : m_fd(fd) { EAssert(rLocal, "inet_ntop returned NULL"); EAssert(rRemote, "inet_ntop returned NULL"); - m_evtR = new SocketEvent(m_fd, ev::READ); - m_evtW = new SocketEvent(m_fd, ev::WRITE); -#ifdef _WIN32 - u_long iMode = 1; - ioctlsocket(m_fd, FIONBIO, &iMode); -#else - fcntl(m_fd, F_SETFL, O_NONBLOCK); -#endif + setFD(fd); m_name.set("Socket(Connected - [%s]:%i <- [%s]:%i)", rLocal, ntohs(m_localAddr.sin6_port), rRemote, ntohs(m_remoteAddr.sin6_port)); Printer::elog(E_SOCKET, "Created a new socket from listener at %p; %s", this, m_name.to_charp()); } void Balau::Socket::close() throw (GeneralException) { - if (m_fd < 0) + if (isClosed()) return; #ifdef _WIN32 - closesocket(m_fd); + closesocket(getFD()); WSACleanup(); #else - ::close(m_fd); + ::close(getFD()); #endif Printer::elog(E_SOCKET, "Closing socket at %p", this); m_connected = false; m_connecting = false; m_listening = false; - m_fd = -1; - delete m_evtR; - delete m_evtW; - m_evtR = m_evtW = NULL; + internalClose(); } -bool Balau::Socket::isClosed() { return m_fd < 0; } -bool Balau::Socket::isEOF() { return isClosed(); } bool Balau::Socket::canRead() { return true; } bool Balau::Socket::canWrite() { return true; } const char * Balau::Socket::getName() { return m_name.to_charp(); } @@ -358,9 +306,9 @@ bool Balau::Socket::setLocal(const char * hostname, int port) { m_localAddr.sin6_family = AF_INET6; #ifndef _WIN32 int enable = 1; - setsockopt(m_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)); + setsockopt(getFD(), SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)); #endif - return bind(m_fd, (struct sockaddr *) &m_localAddr, sizeof(m_localAddr)) == 0; + return bind(getFD(), (struct sockaddr *) &m_localAddr, sizeof(m_localAddr)) == 0; } #if defined(_WIN32) && !defined(EISCONN) @@ -371,7 +319,7 @@ bool Balau::Socket::connect(const char * hostname, int port) { AAssert(!m_listening, "You can't call Socket::connect() on a listening socket"); AAssert(!m_connected, "You can't call Socket::connect() on an already connected socket"); AAssert(hostname, "You can't call Socket::connect() without a hostname"); - AAssert(m_fd >= 0, "You can't call Socket::connect() on a closed socket"); + AAssert(!isClosed(), "You can't call Socket::connect() on a closed socket"); if (!m_connecting && !m_req) { Printer::elog(E_SOCKET, "Resolving %s", hostname); @@ -416,7 +364,7 @@ bool Balau::Socket::connect(const char * hostname, int port) { m_req = NULL; } else { // if we end up there, it means our yield earlier threw an EAgain exception. - AAssert(m_evtR->gotSignal(), "Please don't call connect after a EAgain without checking its signal first."); + AAssert(gotR(), "Please don't call connect after a EAgain without checking its signal first."); } int spins = 0; @@ -426,7 +374,7 @@ bool Balau::Socket::connect(const char * hostname, int port) { int r; int err; if (spins == 0) { - r = ::connect(m_fd, (sockaddr *) &m_remoteAddr, sizeof(m_remoteAddr)); + r = ::connect(getFD(), (sockaddr *) &m_remoteAddr, sizeof(m_remoteAddr)); #ifdef _WIN32 err = WSAGetLastError(); #else @@ -434,7 +382,7 @@ bool Balau::Socket::connect(const char * hostname, int port) { #endif } else { socklen_t sLen = sizeof(err); - int g = getsockopt(m_fd, SOL_SOCKET, SO_ERROR, (char *) &err, &sLen); + int g = getsockopt(getFD(), SOL_SOCKET, SO_ERROR, (char *) &err, &sLen); EAssert(g == 0, "getsockopt failed; g = %i", g); r = err != 0 ? -1 : 0; } @@ -445,10 +393,10 @@ bool Balau::Socket::connect(const char * hostname, int port) { socklen_t len; len = sizeof(m_localAddr); - getsockname(m_fd, (sockaddr *) &m_localAddr, &len); + getsockname(getFD(), (sockaddr *) &m_localAddr, &len); len = sizeof(m_remoteAddr); - getpeername(m_fd, (sockaddr *) &m_remoteAddr, &len); + getpeername(getFD(), (sockaddr *) &m_remoteAddr, &len); char prtLocal[INET6_ADDRSTRLEN], prtRemote[INET6_ADDRSTRLEN]; const char * rLocal, * rRemote; @@ -480,7 +428,7 @@ bool Balau::Socket::connect(const char * hostname, int port) { Task::operationYield(m_evtW, Task::INTERRUPTIBLE); // if we're still here, it means the parent task doesn't want to be thrown an exception - IAssert(m_evtW->gotSignal(), "We shouldn't have been awoken without getting our event signalled"); + IAssert(gotW(), "We shouldn't have been awoken without getting our event signalled"); } while (spins++ < 2); @@ -491,15 +439,15 @@ bool Balau::Socket::listen() { AAssert(!m_listening, "You can't call Socket::listen() on an already listening socket"); AAssert(!m_connecting, "You can't call Socket::listen() on a connecting socket"); AAssert(!m_connected, "You can't call Socket::listen() on a connected socket"); - AAssert(m_fd >= 0, "You can't call Socket::listen() on a closed socket"); + AAssert(!isClosed(), "You can't call Socket::listen() on a closed socket"); - if (::listen(m_fd, 16) == 0) { + if (::listen(getFD(), 16) == 0) { m_listening = true; socklen_t len; len = sizeof(m_localAddr); - getsockname(m_fd, (sockaddr *) &m_localAddr, &len); + getsockname(getFD(), (sockaddr *) &m_localAddr, &len); char prtLocal[INET6_ADDRSTRLEN]; const char * rLocal; @@ -510,7 +458,7 @@ bool Balau::Socket::listen() { EAssert(rLocal, "inet_ntop() returned NULL"); m_name.set("Socket(Listener - [%s]:%i)", rLocal, ntohs(m_localAddr.sin6_port)); - Printer::elog(E_SOCKET, "Socket %i started to listen: %s", m_fd, m_name.to_charp()); + Printer::elog(E_SOCKET, "Socket %i started to listen: %s", getFD(), m_name.to_charp()); } else { String msg = getErrorMessage(); Printer::elog(E_SOCKET, "listen() failed with error %i (%s)", errno, msg.to_charp()); @@ -527,13 +475,13 @@ bool Balau::Socket::listen() { Balau::IO Balau::Socket::accept() throw (GeneralException) { AAssert(m_listening, "You can't call accept() on a non-listening socket"); - AAssert(m_fd >= 0, "You can't call accept() on a closed socket"); + AAssert(!isClosed(), "You can't call accept() on a closed socket"); while(true) { sockaddr_in6 remoteAddr; socklen_t len = sizeof(sockaddr_in6); - Printer::elog(E_SOCKET, "Socket %i (%s) is going to accept()", m_fd, m_name.to_charp()); - int s = ::accept(m_fd, (sockaddr *) &remoteAddr, &len); + Printer::elog(E_SOCKET, "Socket %i (%s) is going to accept()", getFD(), m_name.to_charp()); + int s = ::accept(getFD(), (sockaddr *) &remoteAddr, &len); if (s < 0) { if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) { @@ -551,67 +499,21 @@ Balau::IO Balau::Socket::accept() throw (GeneralException) { } ssize_t Balau::Socket::read(void * buf, size_t count) throw (GeneralException) { - if (count == 0) - return 0; - AAssert(m_connected, "You can't call read() on a non-connected socket"); - AAssert(m_fd >= 0, "You can't call read() on a closed socket"); - - int spins = 0; - - do { - ssize_t r = ::recv(m_fd, (char *) buf, count, 0); - - if (r >= 0) { - if (r == 0) - close(); - return r; - } - - if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) { - Task::operationYield(m_evtR, Task::INTERRUPTIBLE); - } else { - m_evtR->stop(); - return r; - } - } while (spins++ < 2); - - return -1; + return Selectable::read(buf, count); } ssize_t Balau::Socket::write(const void * buf, size_t count) throw (GeneralException) { - if (count == 0) - return 0; - AAssert(m_connected, "You can't call write() on a non-connected socket"); - AAssert(m_fd >= 0, "You can't call write() on a closed socket"); - - int spins = 0; - - do { - ssize_t r = ::send(m_fd, (const char *) buf, count, 0); - - EAssert(r != 0, "send() returned 0 (broken pipe ?)"); - - if (r > 0) - return r; - -#ifndef _WIN32 - if (errno == EPIPE) { - close(); - return 0; - } -#endif + return Selectable::write(buf, count); +} - if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) { - Task::operationYield(m_evtW, Task::INTERRUPTIBLE); - } else { - m_evtW->stop(); - return r; - } - } while (spins++ < 2); +ssize_t Balau::Socket::recv(int sockfd, void *buf, size_t len, int flags) { + return ::recv(sockfd, buf, len, flags); +} - return -1; +ssize_t Balau::Socket::send(int sockfd, const void *buf, size_t len, int flags) { + return ::send(sockfd, buf, len, flags); } Balau::ListenerBase::ListenerBase(int port, const char * local, void * opaque) : m_listener(new Socket()), m_stop(false), m_local(local), m_port(port), m_opaque(opaque) { -- cgit v1.2.3