From c83a8a951cce9b7ed541c293993f708f720bf28d Mon Sep 17 00:00:00 2001 From: "Nicolas \"Pixel\" Noble" Date: Thu, 19 Jun 2014 15:38:02 -0700 Subject: Finishing up basic curl integration and adding a basic DownloadTask. --- src/CurlTask.cc | 25 ++++++++++++++- src/Exceptions.cc | 9 ++++++ src/TaskMan.cc | 95 +++++++++++++++++++++++++++++++++---------------------- 3 files changed, 90 insertions(+), 39 deletions(-) (limited to 'src') diff --git a/src/CurlTask.cc b/src/CurlTask.cc index 5ff8b97..c43210f 100644 --- a/src/CurlTask.cc +++ b/src/CurlTask.cc @@ -10,6 +10,11 @@ Balau::CurlTask::CurlTask() { curl_easy_setopt(m_curlHandle, CURLOPT_DEBUGDATA, this); } +Balau::CurlTask::~CurlTask() { + unregisterCurlHandle(); + curl_easy_cleanup(m_curlHandle); +} + size_t Balau::CurlTask::writeFunctionStatic(char * ptr, size_t size, size_t nmemb, void * userdata) { CurlTask * curlTask = (CurlTask *) userdata; return curlTask->writeFunction(ptr, size, nmemb); @@ -26,6 +31,24 @@ int Balau::CurlTask::debugFunctionStatic(CURL * easy, curl_infotype info, char * return curlTask->debugFunction(info, str, str_len); } -void Balau::CurlTask::curlDone(CURLcode result) { +Balau::DownloadTask::DownloadTask(const Balau::String & url) { + curl_easy_setopt(m_curlHandle, CURLOPT_URL, url.to_charp()); + m_name.set("DownloadTask(%s)", url.to_charp()); +} + +void Balau::DownloadTask::Do() { + if (m_state) + return; + + m_state = 1; + registerCurlHandle(); + waitFor(&m_evt); + yield(); +} +void Balau::DownloadTask::curlDone(CURLcode result) { + m_curlResult = result; + curl_easy_getinfo(m_curlHandle, CURLINFO_RESPONSE_CODE, &m_responseCode); + m_evt.doSignal(); + m_done = true; } diff --git a/src/Exceptions.cc b/src/Exceptions.cc index 38fa005..44743fe 100644 --- a/src/Exceptions.cc +++ b/src/Exceptions.cc @@ -102,3 +102,12 @@ void Balau::ExitHelper(const String & msg, const char * fmt, ...) { ExitHelperInner(msg, NULL); } } + +void Balau::AssertHelperInner(const Balau::String & msg, const char * details) throw (Balau::GeneralException) { +#if defined(_MSC_VER) && defined(_DEBUG) + if (IsDebuggerPresent()) + __debugbreak(); + else +#endif + throw Balau::GeneralException(msg, details); +} diff --git a/src/TaskMan.cc b/src/TaskMan.cc index 00ce0f2..07fff44 100644 --- a/src/TaskMan.cc +++ b/src/TaskMan.cc @@ -1,5 +1,6 @@ -#ifdef _MSC_VER -#include +#ifdef _WIN32 +#include +#include #endif #undef ERROR @@ -42,12 +43,13 @@ class CurlSharedManager : public Balau::AtStart, Balau::AtExit { public: CurlSharedManager() : AtStart(0), AtExit(0) { } struct SharedLocks { - Balau::RWLock cookie, dns, ssl_session; + Balau::RWLock share, cookie, dns, ssl_session; }; static void lock_function(CURL *handle, curl_lock_data data, curl_lock_access access, void * userptr) { SharedLocks * locks = (SharedLocks *) userptr; Balau::RWLock * lock = NULL; switch (data) { + case CURL_LOCK_DATA_SHARE: lock = &locks->share; break; case CURL_LOCK_DATA_COOKIE: lock = &locks->cookie; break; case CURL_LOCK_DATA_DNS: lock = &locks->dns; break; case CURL_LOCK_DATA_SSL_SESSION: lock = &locks->ssl_session; break; @@ -63,6 +65,7 @@ class CurlSharedManager : public Balau::AtStart, Balau::AtExit { SharedLocks * locks = (SharedLocks *) userptr; Balau::RWLock * lock = NULL; switch (data) { + case CURL_LOCK_DATA_SHARE: lock = &locks->share; break; case CURL_LOCK_DATA_COOKIE: lock = &locks->cookie; break; case CURL_LOCK_DATA_DNS: lock = &locks->dns; break; case CURL_LOCK_DATA_SSL_SESSION: lock = &locks->ssl_session; break; @@ -79,7 +82,7 @@ class CurlSharedManager : public Balau::AtStart, Balau::AtExit { curl_share_setopt(s_curlShared, CURLSHOPT_SHARE, CURL_LOCK_DATA_SSL_SESSION); curl_share_setopt(s_curlShared, CURLSHOPT_USERDATA, &locks); curl_share_setopt(s_curlShared, CURLSHOPT_LOCKFUNC, lock_function); - curl_share_setopt(s_curlShared, CURLSHOPT_UNLOCKFUNC, lock_function); + curl_share_setopt(s_curlShared, CURLSHOPT_UNLOCKFUNC, unlock_function); } void doExit() { curl_share_cleanup(s_curlShared); @@ -246,21 +249,28 @@ Balau::TaskMan::TaskMan() { m_curlTimer.set(this); } +#ifdef _WIN32 +inline static int fromSocket(SOCKET s) { return _open_osfhandle(s, 0); } +inline static SOCKET toSocket(int fd) { return _get_osfhandle(fd); } +#else +inline static int fromSocket(int s) { return s; } +inline static int toSocket(int fd) { return fd; } +#endif + int Balau::TaskMan::curlSocketCallbackStatic(CURL * easy, curl_socket_t s, int what, void * userp, void * socketp) { TaskMan * taskMan = (TaskMan *)userp; return taskMan->curlSocketCallback(easy, s, what, socketp); } int Balau::TaskMan::curlSocketCallback(CURL * easy, curl_socket_t s, int what, void * socketp) { + int fd = fromSocket(s); ev::io * evt = (ev::io *) socketp; if (!evt) { if (what == CURL_POLL_REMOVE) return 0; evt = new ev::io; evt->set(this); - evt->set(s, ev::READ | ev::WRITE); evt->set(m_loop); - evt->start(); curl_multi_assign(m_curlMulti, s, evt); } @@ -270,17 +280,17 @@ int Balau::TaskMan::curlSocketCallback(CURL * easy, curl_socket_t s, int what, v break; case CURL_POLL_IN: evt->stop(); - evt->set(s, ev::READ); + evt->set(fd, ev::READ); evt->start(); break; case CURL_POLL_OUT: evt->stop(); - evt->set(s, ev::WRITE); + evt->set(fd, ev::WRITE); evt->start(); break; case CURL_POLL_INOUT: evt->stop(); - evt->set(s, ev::READ | ev::WRITE); + evt->set(fd, ev::READ | ev::WRITE); evt->start(); break; case CURL_POLL_REMOVE: @@ -300,7 +310,7 @@ void Balau::TaskMan::curlSocketEventCallback(ev::io & w, int revents) { bitmask |= CURL_CSELECT_OUT; if (revents & ev::ERROR) bitmask |= CURL_CSELECT_ERR; - curl_multi_socket_action(m_curlMulti, w.fd, bitmask, &m_curlStillRunning); + curl_multi_socket_action(m_curlMulti, toSocket(w.fd), bitmask, &m_curlStillRunning); } int Balau::TaskMan::curlMultiTimerCallbackStatic(CURLM * multi, long timeout_ms, void * userp) { @@ -406,9 +416,9 @@ int Balau::TaskMan::mainLoop() { yielded.insert(t); } - // if we begin that loop with any pending task, just don't loop, so we can add them immediately. + // if we begin that loop with any pending task, just don't block, so we can add them immediately. bool noWait = !m_pendingAdd.isEmpty() || !yielded.empty() || !stopped.empty(); - bool curlNeedsSpin = !m_curlTimer.is_active() && m_curlStillRunning != 0; + bool curlNeedsSpin = (!m_curlTimer.is_active() && m_curlStillRunning != 0) || m_curlGotNewHandles; // libev's event "loop". We always runs it once though. m_allowedToSignal = true; @@ -419,6 +429,25 @@ int Balau::TaskMan::mainLoop() { // calling async's idle s_async.idle(); + // process curl events, and signal tasks + if (m_curlGotNewHandles || curlNeedsSpin) + curl_multi_socket_all(m_curlMulti, &m_curlStillRunning); + m_curlGotNewHandles = false; + + CURLMsg * curlMsg = NULL; + int curlMsgInQueue; + + while ((curlMsg = curl_multi_info_read(m_curlMulti, &curlMsgInQueue))) { + if (curlMsg->msg != CURLMSG_DONE) + continue; + Task * maybeCurlTask = NULL; + curl_easy_getinfo(curlMsg->easy_handle, CURLINFO_PRIVATE, &maybeCurlTask); + IAssert(maybeCurlTask, "curl easy handle didn't have any private data..."); + CurlTask * curlTask = dynamic_cast(maybeCurlTask); + IAssert(curlTask, "curl easy handle had corrupted private data..."); + curlTask->curlDone(curlMsg->data.result); + } + // let's check what task got stopped, and signal them for (Task * t : stopped) { IAssert((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED), "Task %p in stopped list but isn't stopped.", t); @@ -464,8 +493,6 @@ int Balau::TaskMan::mainLoop() { yielded = yielded2; yielded2.clear(); - bool curlGotHandle = false; - // Adding tasks that were added, maybe from other threads while (!m_pendingAdd.isEmpty()) { Printer::elog(E_TASK, "TaskMan at %p trying to pop a task...", this); @@ -476,30 +503,6 @@ int Balau::TaskMan::mainLoop() { t->setup(this, t->isStackless() ? NULL : getStack()); m_tasks.insert(t); starting.insert(t); - CurlTask * curlTask = dynamic_cast(t); - if (curlTask) { - curlGotHandle = true; - curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_SHARE, s_curlShared); - curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_PRIVATE, curlTask); - curl_multi_add_handle(m_curlMulti, curlTask->m_curlHandle); - } - } - - if (curlGotHandle || curlNeedsSpin) - curl_multi_socket_all(m_curlMulti, &m_curlStillRunning); - - CURLMsg * curlMsg = NULL; - int curlMsgInQueue; - - while ((curlMsg = curl_multi_info_read(m_curlMulti, &curlMsgInQueue))) { - if (curlMsg->msg != CURLMSG_DONE) - continue; - Task * maybeCurlTask = NULL; - curl_easy_getinfo(curlMsg->easy_handle, CURLINFO_PRIVATE, &maybeCurlTask); - IAssert(maybeCurlTask, "curl easy handle didn't have any private data..."); - CurlTask * curlTask = dynamic_cast(maybeCurlTask); - IAssert(curlTask, "curl easy handle had corrupted private data..."); - curlTask->curlDone(curlMsg->data.result); } // Finally, let's destroy tasks that no longer are necessary. @@ -533,6 +536,22 @@ int Balau::TaskMan::mainLoop() { return m_stopCode; } +void Balau::TaskMan::registerCurlHandle(Balau::CurlTask * curlTask) { + m_curlGotNewHandles = true; + curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_SHARE, s_curlShared); + curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_PRIVATE, curlTask); + curl_multi_add_handle(m_curlMulti, curlTask->m_curlHandle); +} + +void Balau::TaskMan::unregisterCurlHandle(Balau::CurlTask * curlTask) { + void * ptr = NULL; + curl_easy_getinfo(curlTask->m_curlHandle, CURLINFO_PRIVATE, &ptr); + if (!ptr) + return; + curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_PRIVATE, static_cast(0)); + curl_multi_remove_handle(m_curlMulti, curlTask->m_curlHandle); +} + void Balau::TaskMan::iRegisterTask(Balau::Task * t, Balau::Task * stick, Events::TaskEvent * event) { if (stick) { IAssert(!event, "inconsistent"); -- cgit v1.2.3