summaryrefslogtreecommitdiff
path: root/src/Socket.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/Socket.cc')
-rw-r--r--src/Socket.cc45
1 files changed, 36 insertions, 9 deletions
diff --git a/src/Socket.cc b/src/Socket.cc
index 148d0a4..7511bde 100644
--- a/src/Socket.cc
+++ b/src/Socket.cc
@@ -14,6 +14,26 @@
#include "Main.h"
#include "Atomic.h"
+#ifndef _WIN32
+namespace {
+
+class SigpipeBlocker : public Balau::AtStart {
+ public:
+ SigpipeBlocker() : AtStart(5) { }
+ virtual void doStart() {
+ struct sigaction new_actn, old_actn;
+ new_actn.sa_handler = SIG_IGN;
+ sigemptyset(&new_actn.sa_mask);
+ new_actn.sa_flags = 0;
+ sigaction(SIGPIPE, &new_actn, &old_actn);
+ }
+};
+
+static SigpipeBlocker sigpipeBlocker;
+
+};
+#endif
+
void Balau::Socket::SocketEvent::gotOwner(Task * task) {
Printer::elog(E_SOCKET, "Arming SocketEvent at %p", this);
if (!m_task) {
@@ -172,9 +192,8 @@ static const char * inet_ntop(int af, const void * src, char * dst, socklen_t si
#endif
-#if 0
-// TODO: use getaddrinfo_a, if available.
-#else
+namespace {
+
class ResolverThread : public Balau::GlobalThread {
public:
ResolverThread() : GlobalThread(8), m_stopping(false) { }
@@ -186,6 +205,8 @@ class ResolverThread : public Balau::GlobalThread {
volatile bool m_stopping;
};
+};
+
void ResolverThread::threadExit() {
m_stopping = true;
DNSRequest req;
@@ -222,13 +243,12 @@ static DNSRequest resolveName(const char * name, const char * service = NULL, st
Balau::Printer::elog(Balau::E_SOCKET, "Sending a request to the resolver thread");
Balau::Task::prepare(&evt);
resolverThread.pushRequest(&req);
- Balau::Task::yield(&evt);
+ Balau::Task::operationYield(&evt);
Balau::Atomic::MemoryFence();
return req;
}
-#endif
Balau::Socket::Socket() throw (GeneralException) : m_fd(socket(AF_INET6, SOCK_STREAM, 0)) {
m_name = "Socket(unconnected)";
@@ -457,7 +477,7 @@ bool Balau::Socket::connect(const char * hostname, int port) {
IAssert(spins == 0, "We shouldn't have spinned...");
}
- Task::yield(m_evtW, true);
+ Task::operationYield(m_evtW, Task::INTERRUPTIBLE);
// if we're still here, it means the parent task doesn't want to be thrown an exception
IAssert(m_evtW->gotSignal(), "We shouldn't have been awoken without getting our event signalled");
@@ -516,7 +536,7 @@ Balau::IO<Balau::Socket> Balau::Socket::accept() throw (GeneralException) {
if (s < 0) {
if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) {
- Task::yield(m_evtR, true);
+ Task::operationYield(m_evtR, Task::INTERRUPTIBLE);
} else {
String msg = getErrorMessage();
throw GeneralException(String("Unexpected error accepting a connection: #") + errno + "(" + msg + ")");
@@ -548,7 +568,7 @@ ssize_t Balau::Socket::read(void * buf, size_t count) throw (GeneralException) {
}
if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) {
- Task::yield(m_evtR, true);
+ Task::operationYield(m_evtR, Task::INTERRUPTIBLE);
} else {
m_evtR->stop();
return r;
@@ -575,8 +595,15 @@ ssize_t Balau::Socket::write(const void * buf, size_t count) throw (GeneralExcep
if (r > 0)
return r;
+#ifndef _WIN32
+ if (errno == EPIPE) {
+ close();
+ return 0;
+ }
+#endif
+
if ((errno == EAGAIN) || (errno == EINTR) || (errno == EWOULDBLOCK)) {
- Task::yield(m_evtW, true);
+ Task::operationYield(m_evtW, Task::INTERRUPTIBLE);
} else {
m_evtW->stop();
return r;