diff options
author | Nicolas Noble <nnoble@blizzard.com> | 2014-08-08 13:34:23 -0700 |
---|---|---|
committer | Nicolas Noble <nnoble@blizzard.com> | 2014-08-08 13:34:23 -0700 |
commit | daf6897fe24e62ae8cf8e42b151ed565563332fe (patch) | |
tree | d4f313b369e5b3b873f51c51525ed9816053aaea | |
parent | f67877e10216326b2230cd827d15aab0802d826d (diff) |
Finalizing cares integration.
-rw-r--r-- | includes/Handle.h | 2 | ||||
-rw-r--r-- | includes/HttpServer.h | 2 | ||||
-rw-r--r-- | includes/Socket.h | 2 | ||||
-rw-r--r-- | src/HttpServer.cc | 23 | ||||
-rw-r--r-- | src/Socket.cc | 86 | ||||
-rw-r--r-- | src/TaskMan.cc | 16 | ||||
-rw-r--r-- | tests/test-Http.cc | 3 |
7 files changed, 82 insertions, 52 deletions
diff --git a/includes/Handle.h b/includes/Handle.h index 40e52e5..6a087a7 100644 --- a/includes/Handle.h +++ b/includes/Handle.h @@ -160,7 +160,7 @@ class HPrinter : public Handle { class IOBase { private: IOBase() { } - ~IOBase() { if (m_h) m_h->delRef(); } + ~IOBase() { if (m_h) m_h->delRef(); m_h = NULL; } void setHandle(Handle * h) { m_h = h; if (m_h) m_h->addRef(); } Handle * m_h = NULL; template<class T> diff --git a/includes/HttpServer.h b/includes/HttpServer.h index 683390c..ac219e1 100644 --- a/includes/HttpServer.h +++ b/includes/HttpServer.h @@ -79,6 +79,7 @@ class HttpServer { }; ActionFound findAction(const char * uri, const char * host); String getServerName() { return "Balau/1.0"; } + bool started(); private: bool m_started; void * m_listenerPtr; @@ -87,6 +88,7 @@ class HttpServer { typedef std::list<Action *> ActionList; ActionList m_actions; RWLock m_actionsLock; + Events::TaskEvent m_listenerEvent; friend class HttpWorker; diff --git a/includes/Socket.h b/includes/Socket.h index 0909763..b1e59cb 100644 --- a/includes/Socket.h +++ b/includes/Socket.h @@ -62,6 +62,7 @@ class ListenerBase : public StacklessTask { virtual void Do(); void stop(); virtual const char * getName() const; + bool started() { return m_started; } protected: ListenerBase(int port, const char * local, void * opaque); virtual void factory(IO<Socket> & io, void * opaque) = 0; @@ -74,6 +75,7 @@ class ListenerBase : public StacklessTask { String m_local; int m_port = 0; void * m_opaque = NULL; + bool m_started = false; }; template<class Worker> diff --git a/src/HttpServer.cc b/src/HttpServer.cc index d6bc53d..fe95edb 100644 --- a/src/HttpServer.cc +++ b/src/HttpServer.cc @@ -633,20 +633,20 @@ typedef Balau::Listener<Balau::HttpWorker> HttpListener; void Balau::HttpServer::start() { AAssert(!m_started, "Don't start an HttpServer twice"); - m_listenerPtr = TaskMan::registerTask(new HttpListener(m_port, m_local.to_charp(), this)); m_started = true; + m_listenerPtr = TaskMan::registerTask(new HttpListener(m_port, m_local.to_charp(), this), &m_listenerEvent); } void Balau::HttpServer::stop() { AAssert(m_started, "Don't stop an HttpServer that hasn't been started"); + m_started = false; + IAssert(!m_listenerEvent.gotSignal(), "Our listener has stopped already!"); HttpListener * listener = reinterpret_cast<HttpListener *>(m_listenerPtr); - Events::TaskEvent event(listener); - Task::prepare(&event); + Task::prepare(&m_listenerEvent); listener->stop(); - m_started = false; - Task::operationYield(&event); - IAssert(event.gotSignal(), "HttpServer::stop didn't actually get the listener to stop"); - event.ack(); + Task::operationYield(&m_listenerEvent); + IAssert(m_listenerEvent.gotSignal(), "HttpServer::stop didn't actually get the listener to stop"); + m_listenerEvent.ack(); } void Balau::HttpServer::registerAction(Action * action) { @@ -665,6 +665,15 @@ void Balau::HttpServer::flushAllActions() { } } +bool Balau::HttpServer::started() { + if (m_listenerEvent.gotSignal()) + return false; + HttpListener * listener = reinterpret_cast<HttpListener *>(m_listenerPtr); + if (!listener) + return false; + return listener->started(); +} + Balau::HttpServer::Action::ActionMatch Balau::HttpServer::Action::matches(const char * uri, const char * host) { ActionMatch r; diff --git a/src/Socket.cc b/src/Socket.cc index f27b85a..db4e23d 100644 --- a/src/Socket.cc +++ b/src/Socket.cc @@ -228,26 +228,26 @@ bool Balau::Socket::canWrite() { return true; } const char * Balau::Socket::getName() { return m_name.to_charp(); } void Balau::Socket::resolve(const char * hostname) { - if (!m_resolving) { + if (!m_resolving && !m_resolved) { m_resolving = 2; Task * t = Task::getCurrentTask(); - auto callback = [&](int status, int timeouts, struct hostent * hostent, int family, ptrdiff_t srcOffset, void * destAddr, size_t sizeofDest, bool & failed) { + IO<Socket> self(this); + auto callback = [self](int status, int timeouts, struct hostent * hostent, int family, void * destAddr, size_t sizeofDest, bool & failed) mutable { if (status == ARES_SUCCESS) { IAssert(hostent->h_addrtype == family, "We asked for socket family %i, but got %i instead", family, hostent->h_addrtype); - memcpy(destAddr, ((uint8_t *)hostent->h_addr_list[0]) + srcOffset, sizeofDest); - } - else { + uint8_t * srcPtr = (uint8_t *) hostent->h_addr_list[0]; + memcpy(destAddr, srcPtr, sizeofDest); + } else { failed = true; } - if (--m_resolving == 0) { - m_resolveEvent.doSignal(); - m_resolving = false; - m_resolved = true; + if (--self->m_resolving == 0) { + self->m_resolveEvent.doSignal(); + self->m_resolved = true; } }; - t->getTaskMan()->getHostByName(hostname, AF_INET, [&](int status, int timeouts, struct hostent * hostent) { callback(status, timeouts, hostent, AF_INET, offsetof(struct sockaddr_in, sin_addr), &m_resolvedAddr4, sizeof(m_resolvedAddr4), m_resolve4Failed); }); - t->getTaskMan()->getHostByName(hostname, AF_INET6, [&](int status, int timeouts, struct hostent * hostent) { callback(status, timeouts, hostent, AF_INET6, offsetof(struct sockaddr_in6, sin6_addr), &m_resolvedAddr6, sizeof(m_resolvedAddr6), m_resolve6Failed); }); + t->getTaskMan()->getHostByName(hostname, AF_INET, [callback, self](int status, int timeouts, struct hostent * hostent) mutable { callback(status, timeouts, hostent, AF_INET, &self->m_resolvedAddr4, sizeof(self->m_resolvedAddr4), self->m_resolve4Failed); }); + t->getTaskMan()->getHostByName(hostname, AF_INET6, [callback, self](int status, int timeouts, struct hostent * hostent) mutable { callback(status, timeouts, hostent, AF_INET6, &self->m_resolvedAddr6, sizeof(self->m_resolvedAddr6), self->m_resolve6Failed); }); Task::operationYield(&m_resolveEvent, Task::INTERRUPTIBLE); } @@ -263,16 +263,19 @@ void Balau::Socket::initAddr(sockaddr_in6 & out) { void Balau::Socket::resolved(sockaddr_in6 & out) { if (!m_resolve6Failed) { memcpy(&out.sin6_addr, &m_resolvedAddr6, sizeof(struct in6_addr)); + } else { + if (m_resolvedAddr4.s_addr == htonl(INADDR_LOOPBACK)) { + out.sin6_addr = in6addr_loopback; + } else { + memset(&out.sin6_addr, 0, sizeof(struct in6_addr)); + // v4 mapped IPv6 address + out.sin6_addr.s6_addr[10] = 0xff; + out.sin6_addr.s6_addr[11] = 0xff; + memcpy(out.sin6_addr.s6_addr + 12, &m_resolvedAddr4, sizeof(struct in_addr)); + } } - else { - memset(&out.sin6_addr, 0, sizeof(struct in6_addr)); - // v4 mapped IPv6 address - out.sin6_addr.s6_addr[10] = 0xff; - out.sin6_addr.s6_addr[11] = 0xff; - memcpy(out.sin6_addr.s6_addr + 12, &m_resolvedAddr4, sizeof(struct in_addr)); - } - m_resolving = false; m_resolved = false; + m_resolveEvent.reset(); } bool Balau::Socket::setLocal(const char * hostname, int port) { @@ -535,20 +538,35 @@ void Balau::ListenerBase::stop() { } void Balau::ListenerBase::Do() { - bool r; - IO<Socket> io; - while (!m_stop) { - StacklessBegin(); - StacklessOperation(r = m_listener->setLocal(m_local.to_charp(), m_port)); - EAssert(r, "Couldn't set the local IP/port to listen to"); - r = m_listener->listen(); - EAssert(r, "Couldn't listen on the given IP/port"); - setName(); - waitFor(&m_evt); - StacklessOperationOrCond(io = m_listener->accept(), m_stop); - if (m_stop) - return; - factory(io, m_opaque); - StacklessEnd(); + try { + while (!m_stop) { + bool r; + IO<Socket> io; + + switch (m_state) { + case 0: + waitFor(&m_evt); + m_state++; + case 1: + Printer::elog(E_SOCKET, "Listener task at %p (%s) is going to setLocal(%s, %i)", this, m_name.to_charp(), m_local.to_charp(), m_port); + r = m_listener->setLocal(m_local.to_charp(), m_port); + EAssert(r, "Couldn't set the local IP/port to listen to"); + Printer::elog(E_SOCKET, "Listener task at %p (%s) starts listening", this, m_name.to_charp()); + r = m_listener->listen(); + EAssert(r, "Couldn't listen on the given IP/port"); + setName(); + m_started = true; + m_state++; + default: + Printer::elog(E_SOCKET, "Listener task at %p (%s) starts accepting", this, m_name.to_charp()); + io = m_listener->accept(); + Printer::elog(E_SOCKET, "Listener task at %p (%s) accepted a connection: %s", this, m_name.to_charp(), io->getName()); + if (!m_stop) + factory(io, m_opaque); + } + } + } + catch (EAgain &) { + taskSwitch(); } } diff --git a/src/TaskMan.cc b/src/TaskMan.cc index 5e73ba1..ba9bd60 100644 --- a/src/TaskMan.cc +++ b/src/TaskMan.cc @@ -40,9 +40,9 @@ class Stopper : public Balau::Task { int m_code; }; -class CurlAndCaresSharedManager : public Balau::AtStart, Balau::AtExit { +class CurlAndAresSharedManager : public Balau::AtStart, Balau::AtExit { public: - CurlAndCaresSharedManager() : AtStart(0), AtExit(0) { } + CurlAndAresSharedManager() : AtStart(0), AtExit(0) { } struct SharedLocks { Balau::RWLock share, cookie, dns, ssl_session; }; @@ -98,7 +98,7 @@ class CurlAndCaresSharedManager : public Balau::AtStart, Balau::AtExit { }; static AsyncStarter s_asyncStarter; -static CurlAndCaresSharedManager s_curlSharedmManager; +static CurlAndAresSharedManager s_curlAndAresSharedManager; void Stopper::Do() { getTaskMan()->stopMe(m_code); @@ -385,14 +385,12 @@ void Balau::TaskMan::aresSocketCallback(curl_socket_t s, int read, int write) { what = CURL_POLL_INOUT; } - struct timeval tv; - bool hasTimer = ares_timeout(m_aresChannel, NULL, &tv); + struct timeval tv = { 5, 0 }; + ares_timeout(m_aresChannel, &tv, &tv); m_aresTimer.stop(); - if (hasTimer) { - m_aresTimer.set((ev_tstamp)(tv.tv_sec * 1000 + tv.tv_usec / 1000 + 1)); - m_aresTimer.start(); - } + m_aresTimer.set((ev_tstamp)(tv.tv_sec * 1000 + tv.tv_usec / 1000 + 1)); + m_aresTimer.start(); ev::io * evt = m_aresSocketEvents[i]; if (!evt) { diff --git a/tests/test-Http.cc b/tests/test-Http.cc index 1ea040b..0dfcc74 100644 --- a/tests/test-Http.cc +++ b/tests/test-Http.cc @@ -137,7 +137,8 @@ void MainTask::Do() { s->setLocal("localhost"); s->start(); - sleep(1); + while (!s->started()) + sleep(0.1); Events::TaskEvent stopperEvent; Task * stopper = TaskMan::registerTask(new Stopper, &stopperEvent); |