diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Selectable.cc | 137 | ||||
-rw-r--r-- | src/Socket.cc | 176 |
2 files changed, 176 insertions, 137 deletions
diff --git a/src/Selectable.cc b/src/Selectable.cc new file mode 100644 index 0000000..f716bb1 --- /dev/null +++ b/src/Selectable.cc @@ -0,0 +1,137 @@ +#include <sys/types.h> +#include <unistd.h> +#include <fcntl.h> +#include <sys/stat.h> +#include <stdio.h> +#include <errno.h> +#include "Selectable.h" +#include "Threads.h" +#include "Printer.h" +#include "Main.h" +#include "Atomic.h" +#include "Async.h" +#include "Task.h" +#include "TaskMan.h" + +#ifndef _WIN32 +namespace { + +class SigpipeBlocker : public Balau::AtStart { + public: + SigpipeBlocker() : AtStart(5) { } + virtual void doStart() { + struct sigaction new_actn, old_actn; + new_actn.sa_handler = SIG_IGN; + sigemptyset(&new_actn.sa_mask); + new_actn.sa_flags = 0; + sigaction(SIGPIPE, &new_actn, &old_actn); + } +}; + +static SigpipeBlocker sigpipeBlocker; + +}; +#endif + +void Balau::Selectable::SelectableEvent::gotOwner(Task * task) { + Printer::elog(E_SELECT, "Arming SelectableEvent at %p", this); + if (!m_task) { + Printer::elog(E_SELECT, "...with a new task (%p)", task); + } else if (task == m_task) { + Printer::elog(E_SELECT, "...with the same task, doing nothing."); + return; + } else { + Printer::elog(E_SELECT, "...with a new task (%p -> %p); stopping first", m_task, task); + m_evt.stop(); + } + m_task = task; + m_evt.set(task->getLoop()); + m_evt.start(); +} + +void Balau::Selectable::setFD(int fd) throw (GeneralException) { + if (m_fd >= 0) + throw GeneralException("FD already set."); + m_fd = fd; + + m_evtR = new SelectableEvent(m_fd, ev::READ); + m_evtW = new SelectableEvent(m_fd, ev::WRITE); +#ifdef _WIN32 + u_long iMode = 1; + ioctlsocket(m_fd, FIONBIO, &iMode); +#else + fcntl(m_fd, F_SETFL, O_NONBLOCK); +#endif +} + +Balau::Selectable::~Selectable() { + m_fd = -1; + delete m_evtR; + delete m_evtW; + m_evtR = m_evtW = NULL; +} + +bool Balau::Selectable::isClosed() { return m_fd < 0; } +bool Balau::Selectable::isEOF() { return isClosed(); } + +ssize_t Balau::Selectable::read(void * buf, size_t count) throw (GeneralException) { + if (count == 0) + return 0; + + AAssert(m_fd >= 0, "You can't call read() on a closed selectable"); + + int spins = 0; + + do { + ssize_t r = recv(m_fd, (char *) buf, count, 0); + + if (r >= 0) { + if (r == 0) + close(); + return r; + } + + if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) { + Task::operationYield(m_evtR, Task::INTERRUPTIBLE); + } else { + m_evtR->stop(); + return r; + } + } while (spins++ < 2); + + return -1; +} + +ssize_t Balau::Selectable::write(const void * buf, size_t count) throw (GeneralException) { + if (count == 0) + return 0; + + AAssert(m_fd >= 0, "You can't call write() on a closed selectable"); + + int spins = 0; + + do { + ssize_t r = send(m_fd, (const char *) buf, count, 0); + + EAssert(r != 0, "send() returned 0 (broken pipe ?)"); + + if (r > 0) + return r; + +#ifndef _WIN32 + if (errno == EPIPE) { + close(); + return 0; + } +#endif + + if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) { + Task::operationYield(m_evtW, Task::INTERRUPTIBLE); + } else { + m_evtW->stop(); + return r; + } + } while (spins++ < 2); + + return -1; +} diff --git a/src/Socket.cc b/src/Socket.cc index 8989a72..b22539f 100644 --- a/src/Socket.cc +++ b/src/Socket.cc @@ -17,42 +17,6 @@ #include "Task.h" #include "TaskMan.h" -#ifndef _WIN32 -namespace { - -class SigpipeBlocker : public Balau::AtStart { - public: - SigpipeBlocker() : AtStart(5) { } - virtual void doStart() { - struct sigaction new_actn, old_actn; - new_actn.sa_handler = SIG_IGN; - sigemptyset(&new_actn.sa_mask); - new_actn.sa_flags = 0; - sigaction(SIGPIPE, &new_actn, &old_actn); - } -}; - -static SigpipeBlocker sigpipeBlocker; - -}; -#endif - -void Balau::Socket::SocketEvent::gotOwner(Task * task) { - Printer::elog(E_SOCKET, "Arming SocketEvent at %p", this); - if (!m_task) { - Printer::elog(E_SOCKET, "...with a new task (%p)", task); - } else if (task == m_task) { - Printer::elog(E_SOCKET, "...with the same task, doing nothing."); - return; - } else { - Printer::elog(E_SOCKET, "...with a new task (%p -> %p); stopping first", m_task, task); - m_evt.stop(); - } - m_task = task; - m_evt.set(task->getLoop()); - m_evt.start(); -} - static Balau::String getErrorMessage() { Balau::String msg; #ifdef _WIN32 @@ -231,20 +195,16 @@ static Balau::DNSRequest * resolveName(const char * name, const char * service = return req; } -Balau::Socket::Socket() throw (GeneralException) : m_fd(socket(AF_INET6, SOCK_STREAM, 0)) { +Balau::Socket::Socket() throw (GeneralException) { + int fd = socket(AF_INET6, SOCK_STREAM, 0); + m_name = "Socket(nonconnected)"; - RAssert(m_fd >= 0, "socket() returned %i", m_fd); - m_evtR = new SocketEvent(m_fd, ev::READ); - m_evtW = new SocketEvent(m_fd, ev::WRITE); -#ifdef _WIN32 - u_long iMode = 1; - ioctlsocket(m_fd, FIONBIO, &iMode); -#else - fcntl(m_fd, F_SETFL, O_NONBLOCK); -#endif + RAssert(fd >= 0, "socket() returned %i", fd); + + setFD(fd); int on = 0; - int r = setsockopt(m_fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &on, sizeof(on)); + int r = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &on, sizeof(on)); EAssert(r == 0, "setsockopt returned %i", r); memset(&m_localAddr, 0, sizeof(m_localAddr)); @@ -252,15 +212,15 @@ Balau::Socket::Socket() throw (GeneralException) : m_fd(socket(AF_INET6, SOCK_ST Printer::elog(E_SOCKET, "Creating a socket at %p", this); } -Balau::Socket::Socket(int fd) : m_fd(fd) { +Balau::Socket::Socket(int fd) { socklen_t len; m_connected = true; len = sizeof(m_localAddr); - getsockname(m_fd, (sockaddr *) &m_localAddr, &len); + getsockname(fd, (sockaddr *) &m_localAddr, &len); len = sizeof(m_remoteAddr); - getpeername(m_fd, (sockaddr *) &m_remoteAddr, &len); + getpeername(fd, (sockaddr *) &m_remoteAddr, &len); char prtLocal[INET6_ADDRSTRLEN], prtRemote[INET6_ADDRSTRLEN]; const char * rLocal, * rRemote; @@ -272,40 +232,28 @@ Balau::Socket::Socket(int fd) : m_fd(fd) { EAssert(rLocal, "inet_ntop returned NULL"); EAssert(rRemote, "inet_ntop returned NULL"); - m_evtR = new SocketEvent(m_fd, ev::READ); - m_evtW = new SocketEvent(m_fd, ev::WRITE); -#ifdef _WIN32 - u_long iMode = 1; - ioctlsocket(m_fd, FIONBIO, &iMode); -#else - fcntl(m_fd, F_SETFL, O_NONBLOCK); -#endif + setFD(fd); m_name.set("Socket(Connected - [%s]:%i <- [%s]:%i)", rLocal, ntohs(m_localAddr.sin6_port), rRemote, ntohs(m_remoteAddr.sin6_port)); Printer::elog(E_SOCKET, "Created a new socket from listener at %p; %s", this, m_name.to_charp()); } void Balau::Socket::close() throw (GeneralException) { - if (m_fd < 0) + if (isClosed()) return; #ifdef _WIN32 - closesocket(m_fd); + closesocket(getFD()); WSACleanup(); #else - ::close(m_fd); + ::close(getFD()); #endif Printer::elog(E_SOCKET, "Closing socket at %p", this); m_connected = false; m_connecting = false; m_listening = false; - m_fd = -1; - delete m_evtR; - delete m_evtW; - m_evtR = m_evtW = NULL; + internalClose(); } -bool Balau::Socket::isClosed() { return m_fd < 0; } -bool Balau::Socket::isEOF() { return isClosed(); } bool Balau::Socket::canRead() { return true; } bool Balau::Socket::canWrite() { return true; } const char * Balau::Socket::getName() { return m_name.to_charp(); } @@ -358,9 +306,9 @@ bool Balau::Socket::setLocal(const char * hostname, int port) { m_localAddr.sin6_family = AF_INET6; #ifndef _WIN32 int enable = 1; - setsockopt(m_fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)); + setsockopt(getFD(), SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)); #endif - return bind(m_fd, (struct sockaddr *) &m_localAddr, sizeof(m_localAddr)) == 0; + return bind(getFD(), (struct sockaddr *) &m_localAddr, sizeof(m_localAddr)) == 0; } #if defined(_WIN32) && !defined(EISCONN) @@ -371,7 +319,7 @@ bool Balau::Socket::connect(const char * hostname, int port) { AAssert(!m_listening, "You can't call Socket::connect() on a listening socket"); AAssert(!m_connected, "You can't call Socket::connect() on an already connected socket"); AAssert(hostname, "You can't call Socket::connect() without a hostname"); - AAssert(m_fd >= 0, "You can't call Socket::connect() on a closed socket"); + AAssert(!isClosed(), "You can't call Socket::connect() on a closed socket"); if (!m_connecting && !m_req) { Printer::elog(E_SOCKET, "Resolving %s", hostname); @@ -416,7 +364,7 @@ bool Balau::Socket::connect(const char * hostname, int port) { m_req = NULL; } else { // if we end up there, it means our yield earlier threw an EAgain exception. - AAssert(m_evtR->gotSignal(), "Please don't call connect after a EAgain without checking its signal first."); + AAssert(gotR(), "Please don't call connect after a EAgain without checking its signal first."); } int spins = 0; @@ -426,7 +374,7 @@ bool Balau::Socket::connect(const char * hostname, int port) { int r; int err; if (spins == 0) { - r = ::connect(m_fd, (sockaddr *) &m_remoteAddr, sizeof(m_remoteAddr)); + r = ::connect(getFD(), (sockaddr *) &m_remoteAddr, sizeof(m_remoteAddr)); #ifdef _WIN32 err = WSAGetLastError(); #else @@ -434,7 +382,7 @@ bool Balau::Socket::connect(const char * hostname, int port) { #endif } else { socklen_t sLen = sizeof(err); - int g = getsockopt(m_fd, SOL_SOCKET, SO_ERROR, (char *) &err, &sLen); + int g = getsockopt(getFD(), SOL_SOCKET, SO_ERROR, (char *) &err, &sLen); EAssert(g == 0, "getsockopt failed; g = %i", g); r = err != 0 ? -1 : 0; } @@ -445,10 +393,10 @@ bool Balau::Socket::connect(const char * hostname, int port) { socklen_t len; len = sizeof(m_localAddr); - getsockname(m_fd, (sockaddr *) &m_localAddr, &len); + getsockname(getFD(), (sockaddr *) &m_localAddr, &len); len = sizeof(m_remoteAddr); - getpeername(m_fd, (sockaddr *) &m_remoteAddr, &len); + getpeername(getFD(), (sockaddr *) &m_remoteAddr, &len); char prtLocal[INET6_ADDRSTRLEN], prtRemote[INET6_ADDRSTRLEN]; const char * rLocal, * rRemote; @@ -480,7 +428,7 @@ bool Balau::Socket::connect(const char * hostname, int port) { Task::operationYield(m_evtW, Task::INTERRUPTIBLE); // if we're still here, it means the parent task doesn't want to be thrown an exception - IAssert(m_evtW->gotSignal(), "We shouldn't have been awoken without getting our event signalled"); + IAssert(gotW(), "We shouldn't have been awoken without getting our event signalled"); } while (spins++ < 2); @@ -491,15 +439,15 @@ bool Balau::Socket::listen() { AAssert(!m_listening, "You can't call Socket::listen() on an already listening socket"); AAssert(!m_connecting, "You can't call Socket::listen() on a connecting socket"); AAssert(!m_connected, "You can't call Socket::listen() on a connected socket"); - AAssert(m_fd >= 0, "You can't call Socket::listen() on a closed socket"); + AAssert(!isClosed(), "You can't call Socket::listen() on a closed socket"); - if (::listen(m_fd, 16) == 0) { + if (::listen(getFD(), 16) == 0) { m_listening = true; socklen_t len; len = sizeof(m_localAddr); - getsockname(m_fd, (sockaddr *) &m_localAddr, &len); + getsockname(getFD(), (sockaddr *) &m_localAddr, &len); char prtLocal[INET6_ADDRSTRLEN]; const char * rLocal; @@ -510,7 +458,7 @@ bool Balau::Socket::listen() { EAssert(rLocal, "inet_ntop() returned NULL"); m_name.set("Socket(Listener - [%s]:%i)", rLocal, ntohs(m_localAddr.sin6_port)); - Printer::elog(E_SOCKET, "Socket %i started to listen: %s", m_fd, m_name.to_charp()); + Printer::elog(E_SOCKET, "Socket %i started to listen: %s", getFD(), m_name.to_charp()); } else { String msg = getErrorMessage(); Printer::elog(E_SOCKET, "listen() failed with error %i (%s)", errno, msg.to_charp()); @@ -527,13 +475,13 @@ bool Balau::Socket::listen() { Balau::IO<Balau::Socket> Balau::Socket::accept() throw (GeneralException) { AAssert(m_listening, "You can't call accept() on a non-listening socket"); - AAssert(m_fd >= 0, "You can't call accept() on a closed socket"); + AAssert(!isClosed(), "You can't call accept() on a closed socket"); while(true) { sockaddr_in6 remoteAddr; socklen_t len = sizeof(sockaddr_in6); - Printer::elog(E_SOCKET, "Socket %i (%s) is going to accept()", m_fd, m_name.to_charp()); - int s = ::accept(m_fd, (sockaddr *) &remoteAddr, &len); + Printer::elog(E_SOCKET, "Socket %i (%s) is going to accept()", getFD(), m_name.to_charp()); + int s = ::accept(getFD(), (sockaddr *) &remoteAddr, &len); if (s < 0) { if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) { @@ -551,67 +499,21 @@ Balau::IO<Balau::Socket> Balau::Socket::accept() throw (GeneralException) { } ssize_t Balau::Socket::read(void * buf, size_t count) throw (GeneralException) { - if (count == 0) - return 0; - AAssert(m_connected, "You can't call read() on a non-connected socket"); - AAssert(m_fd >= 0, "You can't call read() on a closed socket"); - - int spins = 0; - - do { - ssize_t r = ::recv(m_fd, (char *) buf, count, 0); - - if (r >= 0) { - if (r == 0) - close(); - return r; - } - - if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) { - Task::operationYield(m_evtR, Task::INTERRUPTIBLE); - } else { - m_evtR->stop(); - return r; - } - } while (spins++ < 2); - - return -1; + return Selectable::read(buf, count); } ssize_t Balau::Socket::write(const void * buf, size_t count) throw (GeneralException) { - if (count == 0) - return 0; - AAssert(m_connected, "You can't call write() on a non-connected socket"); - AAssert(m_fd >= 0, "You can't call write() on a closed socket"); - - int spins = 0; - - do { - ssize_t r = ::send(m_fd, (const char *) buf, count, 0); - - EAssert(r != 0, "send() returned 0 (broken pipe ?)"); - - if (r > 0) - return r; - -#ifndef _WIN32 - if (errno == EPIPE) { - close(); - return 0; - } -#endif + return Selectable::write(buf, count); +} - if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) { - Task::operationYield(m_evtW, Task::INTERRUPTIBLE); - } else { - m_evtW->stop(); - return r; - } - } while (spins++ < 2); +ssize_t Balau::Socket::recv(int sockfd, void *buf, size_t len, int flags) { + return ::recv(sockfd, buf, len, flags); +} - return -1; +ssize_t Balau::Socket::send(int sockfd, const void *buf, size_t len, int flags) { + return ::send(sockfd, buf, len, flags); } Balau::ListenerBase::ListenerBase(int port, const char * local, void * opaque) : m_listener(new Socket()), m_stop(false), m_local(local), m_port(port), m_opaque(opaque) { |