From daf6897fe24e62ae8cf8e42b151ed565563332fe Mon Sep 17 00:00:00 2001 From: Nicolas Noble Date: Fri, 8 Aug 2014 13:34:23 -0700 Subject: Finalizing cares integration. --- src/HttpServer.cc | 23 ++++++++++----- src/Socket.cc | 86 +++++++++++++++++++++++++++++++++---------------------- src/TaskMan.cc | 16 +++++------ 3 files changed, 75 insertions(+), 50 deletions(-) (limited to 'src') diff --git a/src/HttpServer.cc b/src/HttpServer.cc index d6bc53d..fe95edb 100644 --- a/src/HttpServer.cc +++ b/src/HttpServer.cc @@ -633,20 +633,20 @@ typedef Balau::Listener HttpListener; void Balau::HttpServer::start() { AAssert(!m_started, "Don't start an HttpServer twice"); - m_listenerPtr = TaskMan::registerTask(new HttpListener(m_port, m_local.to_charp(), this)); m_started = true; + m_listenerPtr = TaskMan::registerTask(new HttpListener(m_port, m_local.to_charp(), this), &m_listenerEvent); } void Balau::HttpServer::stop() { AAssert(m_started, "Don't stop an HttpServer that hasn't been started"); + m_started = false; + IAssert(!m_listenerEvent.gotSignal(), "Our listener has stopped already!"); HttpListener * listener = reinterpret_cast(m_listenerPtr); - Events::TaskEvent event(listener); - Task::prepare(&event); + Task::prepare(&m_listenerEvent); listener->stop(); - m_started = false; - Task::operationYield(&event); - IAssert(event.gotSignal(), "HttpServer::stop didn't actually get the listener to stop"); - event.ack(); + Task::operationYield(&m_listenerEvent); + IAssert(m_listenerEvent.gotSignal(), "HttpServer::stop didn't actually get the listener to stop"); + m_listenerEvent.ack(); } void Balau::HttpServer::registerAction(Action * action) { @@ -665,6 +665,15 @@ void Balau::HttpServer::flushAllActions() { } } +bool Balau::HttpServer::started() { + if (m_listenerEvent.gotSignal()) + return false; + HttpListener * listener = reinterpret_cast(m_listenerPtr); + if (!listener) + return false; + return listener->started(); +} + Balau::HttpServer::Action::ActionMatch Balau::HttpServer::Action::matches(const char * uri, const char * host) { ActionMatch r; diff --git a/src/Socket.cc b/src/Socket.cc index f27b85a..db4e23d 100644 --- a/src/Socket.cc +++ b/src/Socket.cc @@ -228,26 +228,26 @@ bool Balau::Socket::canWrite() { return true; } const char * Balau::Socket::getName() { return m_name.to_charp(); } void Balau::Socket::resolve(const char * hostname) { - if (!m_resolving) { + if (!m_resolving && !m_resolved) { m_resolving = 2; Task * t = Task::getCurrentTask(); - auto callback = [&](int status, int timeouts, struct hostent * hostent, int family, ptrdiff_t srcOffset, void * destAddr, size_t sizeofDest, bool & failed) { + IO self(this); + auto callback = [self](int status, int timeouts, struct hostent * hostent, int family, void * destAddr, size_t sizeofDest, bool & failed) mutable { if (status == ARES_SUCCESS) { IAssert(hostent->h_addrtype == family, "We asked for socket family %i, but got %i instead", family, hostent->h_addrtype); - memcpy(destAddr, ((uint8_t *)hostent->h_addr_list[0]) + srcOffset, sizeofDest); - } - else { + uint8_t * srcPtr = (uint8_t *) hostent->h_addr_list[0]; + memcpy(destAddr, srcPtr, sizeofDest); + } else { failed = true; } - if (--m_resolving == 0) { - m_resolveEvent.doSignal(); - m_resolving = false; - m_resolved = true; + if (--self->m_resolving == 0) { + self->m_resolveEvent.doSignal(); + self->m_resolved = true; } }; - t->getTaskMan()->getHostByName(hostname, AF_INET, [&](int status, int timeouts, struct hostent * hostent) { callback(status, timeouts, hostent, AF_INET, offsetof(struct sockaddr_in, sin_addr), &m_resolvedAddr4, sizeof(m_resolvedAddr4), m_resolve4Failed); }); - t->getTaskMan()->getHostByName(hostname, AF_INET6, [&](int status, int timeouts, struct hostent * hostent) { callback(status, timeouts, hostent, AF_INET6, offsetof(struct sockaddr_in6, sin6_addr), &m_resolvedAddr6, sizeof(m_resolvedAddr6), m_resolve6Failed); }); + t->getTaskMan()->getHostByName(hostname, AF_INET, [callback, self](int status, int timeouts, struct hostent * hostent) mutable { callback(status, timeouts, hostent, AF_INET, &self->m_resolvedAddr4, sizeof(self->m_resolvedAddr4), self->m_resolve4Failed); }); + t->getTaskMan()->getHostByName(hostname, AF_INET6, [callback, self](int status, int timeouts, struct hostent * hostent) mutable { callback(status, timeouts, hostent, AF_INET6, &self->m_resolvedAddr6, sizeof(self->m_resolvedAddr6), self->m_resolve6Failed); }); Task::operationYield(&m_resolveEvent, Task::INTERRUPTIBLE); } @@ -263,16 +263,19 @@ void Balau::Socket::initAddr(sockaddr_in6 & out) { void Balau::Socket::resolved(sockaddr_in6 & out) { if (!m_resolve6Failed) { memcpy(&out.sin6_addr, &m_resolvedAddr6, sizeof(struct in6_addr)); + } else { + if (m_resolvedAddr4.s_addr == htonl(INADDR_LOOPBACK)) { + out.sin6_addr = in6addr_loopback; + } else { + memset(&out.sin6_addr, 0, sizeof(struct in6_addr)); + // v4 mapped IPv6 address + out.sin6_addr.s6_addr[10] = 0xff; + out.sin6_addr.s6_addr[11] = 0xff; + memcpy(out.sin6_addr.s6_addr + 12, &m_resolvedAddr4, sizeof(struct in_addr)); + } } - else { - memset(&out.sin6_addr, 0, sizeof(struct in6_addr)); - // v4 mapped IPv6 address - out.sin6_addr.s6_addr[10] = 0xff; - out.sin6_addr.s6_addr[11] = 0xff; - memcpy(out.sin6_addr.s6_addr + 12, &m_resolvedAddr4, sizeof(struct in_addr)); - } - m_resolving = false; m_resolved = false; + m_resolveEvent.reset(); } bool Balau::Socket::setLocal(const char * hostname, int port) { @@ -535,20 +538,35 @@ void Balau::ListenerBase::stop() { } void Balau::ListenerBase::Do() { - bool r; - IO io; - while (!m_stop) { - StacklessBegin(); - StacklessOperation(r = m_listener->setLocal(m_local.to_charp(), m_port)); - EAssert(r, "Couldn't set the local IP/port to listen to"); - r = m_listener->listen(); - EAssert(r, "Couldn't listen on the given IP/port"); - setName(); - waitFor(&m_evt); - StacklessOperationOrCond(io = m_listener->accept(), m_stop); - if (m_stop) - return; - factory(io, m_opaque); - StacklessEnd(); + try { + while (!m_stop) { + bool r; + IO io; + + switch (m_state) { + case 0: + waitFor(&m_evt); + m_state++; + case 1: + Printer::elog(E_SOCKET, "Listener task at %p (%s) is going to setLocal(%s, %i)", this, m_name.to_charp(), m_local.to_charp(), m_port); + r = m_listener->setLocal(m_local.to_charp(), m_port); + EAssert(r, "Couldn't set the local IP/port to listen to"); + Printer::elog(E_SOCKET, "Listener task at %p (%s) starts listening", this, m_name.to_charp()); + r = m_listener->listen(); + EAssert(r, "Couldn't listen on the given IP/port"); + setName(); + m_started = true; + m_state++; + default: + Printer::elog(E_SOCKET, "Listener task at %p (%s) starts accepting", this, m_name.to_charp()); + io = m_listener->accept(); + Printer::elog(E_SOCKET, "Listener task at %p (%s) accepted a connection: %s", this, m_name.to_charp(), io->getName()); + if (!m_stop) + factory(io, m_opaque); + } + } + } + catch (EAgain &) { + taskSwitch(); } } diff --git a/src/TaskMan.cc b/src/TaskMan.cc index 5e73ba1..ba9bd60 100644 --- a/src/TaskMan.cc +++ b/src/TaskMan.cc @@ -40,9 +40,9 @@ class Stopper : public Balau::Task { int m_code; }; -class CurlAndCaresSharedManager : public Balau::AtStart, Balau::AtExit { +class CurlAndAresSharedManager : public Balau::AtStart, Balau::AtExit { public: - CurlAndCaresSharedManager() : AtStart(0), AtExit(0) { } + CurlAndAresSharedManager() : AtStart(0), AtExit(0) { } struct SharedLocks { Balau::RWLock share, cookie, dns, ssl_session; }; @@ -98,7 +98,7 @@ class CurlAndCaresSharedManager : public Balau::AtStart, Balau::AtExit { }; static AsyncStarter s_asyncStarter; -static CurlAndCaresSharedManager s_curlSharedmManager; +static CurlAndAresSharedManager s_curlAndAresSharedManager; void Stopper::Do() { getTaskMan()->stopMe(m_code); @@ -385,14 +385,12 @@ void Balau::TaskMan::aresSocketCallback(curl_socket_t s, int read, int write) { what = CURL_POLL_INOUT; } - struct timeval tv; - bool hasTimer = ares_timeout(m_aresChannel, NULL, &tv); + struct timeval tv = { 5, 0 }; + ares_timeout(m_aresChannel, &tv, &tv); m_aresTimer.stop(); - if (hasTimer) { - m_aresTimer.set((ev_tstamp)(tv.tv_sec * 1000 + tv.tv_usec / 1000 + 1)); - m_aresTimer.start(); - } + m_aresTimer.set((ev_tstamp)(tv.tv_sec * 1000 + tv.tv_usec / 1000 + 1)); + m_aresTimer.start(); ev::io * evt = m_aresSocketEvents[i]; if (!evt) { -- cgit v1.2.3