summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNicolas Noble <nnoble@blizzard.com>2014-08-08 13:34:23 -0700
committerNicolas Noble <nnoble@blizzard.com>2014-08-08 13:34:23 -0700
commitdaf6897fe24e62ae8cf8e42b151ed565563332fe (patch)
treed4f313b369e5b3b873f51c51525ed9816053aaea
parentf67877e10216326b2230cd827d15aab0802d826d (diff)
Finalizing cares integration.
-rw-r--r--includes/Handle.h2
-rw-r--r--includes/HttpServer.h2
-rw-r--r--includes/Socket.h2
-rw-r--r--src/HttpServer.cc23
-rw-r--r--src/Socket.cc86
-rw-r--r--src/TaskMan.cc16
-rw-r--r--tests/test-Http.cc3
7 files changed, 82 insertions, 52 deletions
diff --git a/includes/Handle.h b/includes/Handle.h
index 40e52e5..6a087a7 100644
--- a/includes/Handle.h
+++ b/includes/Handle.h
@@ -160,7 +160,7 @@ class HPrinter : public Handle {
class IOBase {
private:
IOBase() { }
- ~IOBase() { if (m_h) m_h->delRef(); }
+ ~IOBase() { if (m_h) m_h->delRef(); m_h = NULL; }
void setHandle(Handle * h) { m_h = h; if (m_h) m_h->addRef(); }
Handle * m_h = NULL;
template<class T>
diff --git a/includes/HttpServer.h b/includes/HttpServer.h
index 683390c..ac219e1 100644
--- a/includes/HttpServer.h
+++ b/includes/HttpServer.h
@@ -79,6 +79,7 @@ class HttpServer {
};
ActionFound findAction(const char * uri, const char * host);
String getServerName() { return "Balau/1.0"; }
+ bool started();
private:
bool m_started;
void * m_listenerPtr;
@@ -87,6 +88,7 @@ class HttpServer {
typedef std::list<Action *> ActionList;
ActionList m_actions;
RWLock m_actionsLock;
+ Events::TaskEvent m_listenerEvent;
friend class HttpWorker;
diff --git a/includes/Socket.h b/includes/Socket.h
index 0909763..b1e59cb 100644
--- a/includes/Socket.h
+++ b/includes/Socket.h
@@ -62,6 +62,7 @@ class ListenerBase : public StacklessTask {
virtual void Do();
void stop();
virtual const char * getName() const;
+ bool started() { return m_started; }
protected:
ListenerBase(int port, const char * local, void * opaque);
virtual void factory(IO<Socket> & io, void * opaque) = 0;
@@ -74,6 +75,7 @@ class ListenerBase : public StacklessTask {
String m_local;
int m_port = 0;
void * m_opaque = NULL;
+ bool m_started = false;
};
template<class Worker>
diff --git a/src/HttpServer.cc b/src/HttpServer.cc
index d6bc53d..fe95edb 100644
--- a/src/HttpServer.cc
+++ b/src/HttpServer.cc
@@ -633,20 +633,20 @@ typedef Balau::Listener<Balau::HttpWorker> HttpListener;
void Balau::HttpServer::start() {
AAssert(!m_started, "Don't start an HttpServer twice");
- m_listenerPtr = TaskMan::registerTask(new HttpListener(m_port, m_local.to_charp(), this));
m_started = true;
+ m_listenerPtr = TaskMan::registerTask(new HttpListener(m_port, m_local.to_charp(), this), &m_listenerEvent);
}
void Balau::HttpServer::stop() {
AAssert(m_started, "Don't stop an HttpServer that hasn't been started");
+ m_started = false;
+ IAssert(!m_listenerEvent.gotSignal(), "Our listener has stopped already!");
HttpListener * listener = reinterpret_cast<HttpListener *>(m_listenerPtr);
- Events::TaskEvent event(listener);
- Task::prepare(&event);
+ Task::prepare(&m_listenerEvent);
listener->stop();
- m_started = false;
- Task::operationYield(&event);
- IAssert(event.gotSignal(), "HttpServer::stop didn't actually get the listener to stop");
- event.ack();
+ Task::operationYield(&m_listenerEvent);
+ IAssert(m_listenerEvent.gotSignal(), "HttpServer::stop didn't actually get the listener to stop");
+ m_listenerEvent.ack();
}
void Balau::HttpServer::registerAction(Action * action) {
@@ -665,6 +665,15 @@ void Balau::HttpServer::flushAllActions() {
}
}
+bool Balau::HttpServer::started() {
+ if (m_listenerEvent.gotSignal())
+ return false;
+ HttpListener * listener = reinterpret_cast<HttpListener *>(m_listenerPtr);
+ if (!listener)
+ return false;
+ return listener->started();
+}
+
Balau::HttpServer::Action::ActionMatch Balau::HttpServer::Action::matches(const char * uri, const char * host) {
ActionMatch r;
diff --git a/src/Socket.cc b/src/Socket.cc
index f27b85a..db4e23d 100644
--- a/src/Socket.cc
+++ b/src/Socket.cc
@@ -228,26 +228,26 @@ bool Balau::Socket::canWrite() { return true; }
const char * Balau::Socket::getName() { return m_name.to_charp(); }
void Balau::Socket::resolve(const char * hostname) {
- if (!m_resolving) {
+ if (!m_resolving && !m_resolved) {
m_resolving = 2;
Task * t = Task::getCurrentTask();
- auto callback = [&](int status, int timeouts, struct hostent * hostent, int family, ptrdiff_t srcOffset, void * destAddr, size_t sizeofDest, bool & failed) {
+ IO<Socket> self(this);
+ auto callback = [self](int status, int timeouts, struct hostent * hostent, int family, void * destAddr, size_t sizeofDest, bool & failed) mutable {
if (status == ARES_SUCCESS) {
IAssert(hostent->h_addrtype == family, "We asked for socket family %i, but got %i instead", family, hostent->h_addrtype);
- memcpy(destAddr, ((uint8_t *)hostent->h_addr_list[0]) + srcOffset, sizeofDest);
- }
- else {
+ uint8_t * srcPtr = (uint8_t *) hostent->h_addr_list[0];
+ memcpy(destAddr, srcPtr, sizeofDest);
+ } else {
failed = true;
}
- if (--m_resolving == 0) {
- m_resolveEvent.doSignal();
- m_resolving = false;
- m_resolved = true;
+ if (--self->m_resolving == 0) {
+ self->m_resolveEvent.doSignal();
+ self->m_resolved = true;
}
};
- t->getTaskMan()->getHostByName(hostname, AF_INET, [&](int status, int timeouts, struct hostent * hostent) { callback(status, timeouts, hostent, AF_INET, offsetof(struct sockaddr_in, sin_addr), &m_resolvedAddr4, sizeof(m_resolvedAddr4), m_resolve4Failed); });
- t->getTaskMan()->getHostByName(hostname, AF_INET6, [&](int status, int timeouts, struct hostent * hostent) { callback(status, timeouts, hostent, AF_INET6, offsetof(struct sockaddr_in6, sin6_addr), &m_resolvedAddr6, sizeof(m_resolvedAddr6), m_resolve6Failed); });
+ t->getTaskMan()->getHostByName(hostname, AF_INET, [callback, self](int status, int timeouts, struct hostent * hostent) mutable { callback(status, timeouts, hostent, AF_INET, &self->m_resolvedAddr4, sizeof(self->m_resolvedAddr4), self->m_resolve4Failed); });
+ t->getTaskMan()->getHostByName(hostname, AF_INET6, [callback, self](int status, int timeouts, struct hostent * hostent) mutable { callback(status, timeouts, hostent, AF_INET6, &self->m_resolvedAddr6, sizeof(self->m_resolvedAddr6), self->m_resolve6Failed); });
Task::operationYield(&m_resolveEvent, Task::INTERRUPTIBLE);
}
@@ -263,16 +263,19 @@ void Balau::Socket::initAddr(sockaddr_in6 & out) {
void Balau::Socket::resolved(sockaddr_in6 & out) {
if (!m_resolve6Failed) {
memcpy(&out.sin6_addr, &m_resolvedAddr6, sizeof(struct in6_addr));
+ } else {
+ if (m_resolvedAddr4.s_addr == htonl(INADDR_LOOPBACK)) {
+ out.sin6_addr = in6addr_loopback;
+ } else {
+ memset(&out.sin6_addr, 0, sizeof(struct in6_addr));
+ // v4 mapped IPv6 address
+ out.sin6_addr.s6_addr[10] = 0xff;
+ out.sin6_addr.s6_addr[11] = 0xff;
+ memcpy(out.sin6_addr.s6_addr + 12, &m_resolvedAddr4, sizeof(struct in_addr));
+ }
}
- else {
- memset(&out.sin6_addr, 0, sizeof(struct in6_addr));
- // v4 mapped IPv6 address
- out.sin6_addr.s6_addr[10] = 0xff;
- out.sin6_addr.s6_addr[11] = 0xff;
- memcpy(out.sin6_addr.s6_addr + 12, &m_resolvedAddr4, sizeof(struct in_addr));
- }
- m_resolving = false;
m_resolved = false;
+ m_resolveEvent.reset();
}
bool Balau::Socket::setLocal(const char * hostname, int port) {
@@ -535,20 +538,35 @@ void Balau::ListenerBase::stop() {
}
void Balau::ListenerBase::Do() {
- bool r;
- IO<Socket> io;
- while (!m_stop) {
- StacklessBegin();
- StacklessOperation(r = m_listener->setLocal(m_local.to_charp(), m_port));
- EAssert(r, "Couldn't set the local IP/port to listen to");
- r = m_listener->listen();
- EAssert(r, "Couldn't listen on the given IP/port");
- setName();
- waitFor(&m_evt);
- StacklessOperationOrCond(io = m_listener->accept(), m_stop);
- if (m_stop)
- return;
- factory(io, m_opaque);
- StacklessEnd();
+ try {
+ while (!m_stop) {
+ bool r;
+ IO<Socket> io;
+
+ switch (m_state) {
+ case 0:
+ waitFor(&m_evt);
+ m_state++;
+ case 1:
+ Printer::elog(E_SOCKET, "Listener task at %p (%s) is going to setLocal(%s, %i)", this, m_name.to_charp(), m_local.to_charp(), m_port);
+ r = m_listener->setLocal(m_local.to_charp(), m_port);
+ EAssert(r, "Couldn't set the local IP/port to listen to");
+ Printer::elog(E_SOCKET, "Listener task at %p (%s) starts listening", this, m_name.to_charp());
+ r = m_listener->listen();
+ EAssert(r, "Couldn't listen on the given IP/port");
+ setName();
+ m_started = true;
+ m_state++;
+ default:
+ Printer::elog(E_SOCKET, "Listener task at %p (%s) starts accepting", this, m_name.to_charp());
+ io = m_listener->accept();
+ Printer::elog(E_SOCKET, "Listener task at %p (%s) accepted a connection: %s", this, m_name.to_charp(), io->getName());
+ if (!m_stop)
+ factory(io, m_opaque);
+ }
+ }
+ }
+ catch (EAgain &) {
+ taskSwitch();
}
}
diff --git a/src/TaskMan.cc b/src/TaskMan.cc
index 5e73ba1..ba9bd60 100644
--- a/src/TaskMan.cc
+++ b/src/TaskMan.cc
@@ -40,9 +40,9 @@ class Stopper : public Balau::Task {
int m_code;
};
-class CurlAndCaresSharedManager : public Balau::AtStart, Balau::AtExit {
+class CurlAndAresSharedManager : public Balau::AtStart, Balau::AtExit {
public:
- CurlAndCaresSharedManager() : AtStart(0), AtExit(0) { }
+ CurlAndAresSharedManager() : AtStart(0), AtExit(0) { }
struct SharedLocks {
Balau::RWLock share, cookie, dns, ssl_session;
};
@@ -98,7 +98,7 @@ class CurlAndCaresSharedManager : public Balau::AtStart, Balau::AtExit {
};
static AsyncStarter s_asyncStarter;
-static CurlAndCaresSharedManager s_curlSharedmManager;
+static CurlAndAresSharedManager s_curlAndAresSharedManager;
void Stopper::Do() {
getTaskMan()->stopMe(m_code);
@@ -385,14 +385,12 @@ void Balau::TaskMan::aresSocketCallback(curl_socket_t s, int read, int write) {
what = CURL_POLL_INOUT;
}
- struct timeval tv;
- bool hasTimer = ares_timeout(m_aresChannel, NULL, &tv);
+ struct timeval tv = { 5, 0 };
+ ares_timeout(m_aresChannel, &tv, &tv);
m_aresTimer.stop();
- if (hasTimer) {
- m_aresTimer.set((ev_tstamp)(tv.tv_sec * 1000 + tv.tv_usec / 1000 + 1));
- m_aresTimer.start();
- }
+ m_aresTimer.set((ev_tstamp)(tv.tv_sec * 1000 + tv.tv_usec / 1000 + 1));
+ m_aresTimer.start();
ev::io * evt = m_aresSocketEvents[i];
if (!evt) {
diff --git a/tests/test-Http.cc b/tests/test-Http.cc
index 1ea040b..0dfcc74 100644
--- a/tests/test-Http.cc
+++ b/tests/test-Http.cc
@@ -137,7 +137,8 @@ void MainTask::Do() {
s->setLocal("localhost");
s->start();
- sleep(1);
+ while (!s->started())
+ sleep(0.1);
Events::TaskEvent stopperEvent;
Task * stopper = TaskMan::registerTask(new Stopper, &stopperEvent);