diff options
-rw-r--r-- | includes/CurlTask.h | 39 | ||||
-rw-r--r-- | includes/Exceptions.h | 10 | ||||
-rw-r--r-- | includes/TaskMan.h | 5 | ||||
-rw-r--r-- | src/CurlTask.cc | 25 | ||||
-rw-r--r-- | src/Exceptions.cc | 9 | ||||
-rw-r--r-- | src/TaskMan.cc | 95 |
6 files changed, 128 insertions, 55 deletions
diff --git a/includes/CurlTask.h b/includes/CurlTask.h index 0aa62a0..41243ec 100644 --- a/includes/CurlTask.h +++ b/includes/CurlTask.h @@ -7,20 +7,45 @@ namespace Balau { class CurlTask : public StacklessTask { -public: - CurlTask(); + public: + CurlTask(); + ~CurlTask(); friend class TaskMan; protected: CURL * m_curlHandle; -private: - void curlDone(CURLcode result); + void registerCurlHandle() { getTaskMan()->registerCurlHandle(this); } + void unregisterCurlHandle() { getTaskMan()->unregisterCurlHandle(this); } + private: + virtual size_t writeFunction(char * ptr, size_t size, size_t nmemb) { return size * nmemb; } + virtual size_t readFunction(void * ptr, size_t size, size_t nmemb) { return CURL_READFUNC_ABORT; } + virtual int debugFunction(curl_infotype info, char * str, size_t str_len) { return 0; } + virtual void curlDone(CURLcode result) { } static size_t writeFunctionStatic(char * ptr, size_t size, size_t nmemb, void * userdata); - virtual size_t writeFunction(char * ptr, size_t size, size_t nmemb) { return size * nmemb; } static size_t readFunctionStatic(void * ptr, size_t size, size_t nmemb, void * userdata); - virtual size_t readFunction(void * ptr, size_t size, size_t nmemb) { return CURL_READFUNC_ABORT; } static int debugFunctionStatic(CURL * easy, curl_infotype info, char * str, size_t str_len, void * userdata); - virtual int debugFunction(curl_infotype info, char * str, size_t str_len) { return 0; } +}; + +class DownloadTask : public CurlTask { + public: + DownloadTask(const String & url); + const String & getData() const { return m_data; } + bool isDone() { return m_done; } + long responseCode() { return m_responseCode; } + + protected: + String m_data; + CURLcode m_curlResult; + long m_responseCode; + + private: + virtual const char * getName() const override { return m_name.to_charp(); } + virtual void Do() override; + virtual void curlDone(CURLcode result) override; + virtual size_t writeFunction(char * ptr, size_t size, size_t nmemb) override { m_data += ptr; return size * nmemb; } + String m_name; + Events::Custom m_evt; + bool m_done = false; }; }; diff --git a/includes/Exceptions.h b/includes/Exceptions.h index 1c66968..11d0d7b 100644 --- a/includes/Exceptions.h +++ b/includes/Exceptions.h @@ -97,15 +97,7 @@ static inline void * realloc(void * previous, size_t size) { }; -static inline void AssertHelperInner(const String & msg, const char * details = NULL) throw (GeneralException) { -#if defined(_MSC_VER) && defined(_DEBUG) - if (IsDebuggerPresent()) - __debugbreak(); - else -#endif - throw GeneralException(msg, details); -} - +void AssertHelperInner(const String & msg, const char * details = NULL) throw (GeneralException); static inline void AssertHelper(const String & msg, const char * fmt, ...) printfwarning(2, 3); static inline void AssertHelper(const String & msg, const char * fmt, ...) { diff --git a/includes/TaskMan.h b/includes/TaskMan.h index a51653b..1f35c1e 100644 --- a/includes/TaskMan.h +++ b/includes/TaskMan.h @@ -24,6 +24,7 @@ namespace gnu = __gnu_cxx; namespace Balau { class TaskScheduler; +class CurlTask; namespace Events { @@ -85,6 +86,7 @@ class TaskMan { void * m_fiber; #endif friend class Task; + friend class CurlTask; friend class TaskScheduler; template<class T> friend T * createAsyncOp(T * op); @@ -106,12 +108,15 @@ class TaskMan { ev::timer m_curlTimer; CURLM * m_curlMulti = false; int m_curlStillRunning = 0; + bool m_curlGotNewHandles = false; static int curlSocketCallbackStatic(CURL * easy, curl_socket_t s, int what, void * userp, void * socketp); int curlSocketCallback(CURL * easy, curl_socket_t s, int what, void * socketp); void curlSocketEventCallback(ev::io & w, int revents); static int curlMultiTimerCallbackStatic(CURLM * multi, long timeout_ms, void * userp); int curlMultiTimerCallback(CURLM * multi, long timeout_ms); void curlMultiTimerEventCallback(ev::timer & w, int revents); + void registerCurlHandle(CurlTask * curlTask); + void unregisterCurlHandle(CurlTask * curlTask); TaskMan(const TaskMan &) = delete; TaskMan & operator=(const TaskMan &) = delete; diff --git a/src/CurlTask.cc b/src/CurlTask.cc index 5ff8b97..c43210f 100644 --- a/src/CurlTask.cc +++ b/src/CurlTask.cc @@ -10,6 +10,11 @@ Balau::CurlTask::CurlTask() { curl_easy_setopt(m_curlHandle, CURLOPT_DEBUGDATA, this); } +Balau::CurlTask::~CurlTask() { + unregisterCurlHandle(); + curl_easy_cleanup(m_curlHandle); +} + size_t Balau::CurlTask::writeFunctionStatic(char * ptr, size_t size, size_t nmemb, void * userdata) { CurlTask * curlTask = (CurlTask *) userdata; return curlTask->writeFunction(ptr, size, nmemb); @@ -26,6 +31,24 @@ int Balau::CurlTask::debugFunctionStatic(CURL * easy, curl_infotype info, char * return curlTask->debugFunction(info, str, str_len); } -void Balau::CurlTask::curlDone(CURLcode result) { +Balau::DownloadTask::DownloadTask(const Balau::String & url) { + curl_easy_setopt(m_curlHandle, CURLOPT_URL, url.to_charp()); + m_name.set("DownloadTask(%s)", url.to_charp()); +} + +void Balau::DownloadTask::Do() { + if (m_state) + return; + + m_state = 1; + registerCurlHandle(); + waitFor(&m_evt); + yield(); +} +void Balau::DownloadTask::curlDone(CURLcode result) { + m_curlResult = result; + curl_easy_getinfo(m_curlHandle, CURLINFO_RESPONSE_CODE, &m_responseCode); + m_evt.doSignal(); + m_done = true; } diff --git a/src/Exceptions.cc b/src/Exceptions.cc index 38fa005..44743fe 100644 --- a/src/Exceptions.cc +++ b/src/Exceptions.cc @@ -102,3 +102,12 @@ void Balau::ExitHelper(const String & msg, const char * fmt, ...) { ExitHelperInner(msg, NULL); } } + +void Balau::AssertHelperInner(const Balau::String & msg, const char * details) throw (Balau::GeneralException) { +#if defined(_MSC_VER) && defined(_DEBUG) + if (IsDebuggerPresent()) + __debugbreak(); + else +#endif + throw Balau::GeneralException(msg, details); +} diff --git a/src/TaskMan.cc b/src/TaskMan.cc index 00ce0f2..07fff44 100644 --- a/src/TaskMan.cc +++ b/src/TaskMan.cc @@ -1,5 +1,6 @@ -#ifdef _MSC_VER -#include <Windows.h> +#ifdef _WIN32 +#include <windows.h> +#include <io.h> #endif #undef ERROR @@ -42,12 +43,13 @@ class CurlSharedManager : public Balau::AtStart, Balau::AtExit { public: CurlSharedManager() : AtStart(0), AtExit(0) { } struct SharedLocks { - Balau::RWLock cookie, dns, ssl_session; + Balau::RWLock share, cookie, dns, ssl_session; }; static void lock_function(CURL *handle, curl_lock_data data, curl_lock_access access, void * userptr) { SharedLocks * locks = (SharedLocks *) userptr; Balau::RWLock * lock = NULL; switch (data) { + case CURL_LOCK_DATA_SHARE: lock = &locks->share; break; case CURL_LOCK_DATA_COOKIE: lock = &locks->cookie; break; case CURL_LOCK_DATA_DNS: lock = &locks->dns; break; case CURL_LOCK_DATA_SSL_SESSION: lock = &locks->ssl_session; break; @@ -63,6 +65,7 @@ class CurlSharedManager : public Balau::AtStart, Balau::AtExit { SharedLocks * locks = (SharedLocks *) userptr; Balau::RWLock * lock = NULL; switch (data) { + case CURL_LOCK_DATA_SHARE: lock = &locks->share; break; case CURL_LOCK_DATA_COOKIE: lock = &locks->cookie; break; case CURL_LOCK_DATA_DNS: lock = &locks->dns; break; case CURL_LOCK_DATA_SSL_SESSION: lock = &locks->ssl_session; break; @@ -79,7 +82,7 @@ class CurlSharedManager : public Balau::AtStart, Balau::AtExit { curl_share_setopt(s_curlShared, CURLSHOPT_SHARE, CURL_LOCK_DATA_SSL_SESSION); curl_share_setopt(s_curlShared, CURLSHOPT_USERDATA, &locks); curl_share_setopt(s_curlShared, CURLSHOPT_LOCKFUNC, lock_function); - curl_share_setopt(s_curlShared, CURLSHOPT_UNLOCKFUNC, lock_function); + curl_share_setopt(s_curlShared, CURLSHOPT_UNLOCKFUNC, unlock_function); } void doExit() { curl_share_cleanup(s_curlShared); @@ -246,21 +249,28 @@ Balau::TaskMan::TaskMan() { m_curlTimer.set<TaskMan, &TaskMan::curlMultiTimerEventCallback>(this); } +#ifdef _WIN32 +inline static int fromSocket(SOCKET s) { return _open_osfhandle(s, 0); } +inline static SOCKET toSocket(int fd) { return _get_osfhandle(fd); } +#else +inline static int fromSocket(int s) { return s; } +inline static int toSocket(int fd) { return fd; } +#endif + int Balau::TaskMan::curlSocketCallbackStatic(CURL * easy, curl_socket_t s, int what, void * userp, void * socketp) { TaskMan * taskMan = (TaskMan *)userp; return taskMan->curlSocketCallback(easy, s, what, socketp); } int Balau::TaskMan::curlSocketCallback(CURL * easy, curl_socket_t s, int what, void * socketp) { + int fd = fromSocket(s); ev::io * evt = (ev::io *) socketp; if (!evt) { if (what == CURL_POLL_REMOVE) return 0; evt = new ev::io; evt->set<TaskMan, &TaskMan::curlSocketEventCallback>(this); - evt->set(s, ev::READ | ev::WRITE); evt->set(m_loop); - evt->start(); curl_multi_assign(m_curlMulti, s, evt); } @@ -270,17 +280,17 @@ int Balau::TaskMan::curlSocketCallback(CURL * easy, curl_socket_t s, int what, v break; case CURL_POLL_IN: evt->stop(); - evt->set(s, ev::READ); + evt->set(fd, ev::READ); evt->start(); break; case CURL_POLL_OUT: evt->stop(); - evt->set(s, ev::WRITE); + evt->set(fd, ev::WRITE); evt->start(); break; case CURL_POLL_INOUT: evt->stop(); - evt->set(s, ev::READ | ev::WRITE); + evt->set(fd, ev::READ | ev::WRITE); evt->start(); break; case CURL_POLL_REMOVE: @@ -300,7 +310,7 @@ void Balau::TaskMan::curlSocketEventCallback(ev::io & w, int revents) { bitmask |= CURL_CSELECT_OUT; if (revents & ev::ERROR) bitmask |= CURL_CSELECT_ERR; - curl_multi_socket_action(m_curlMulti, w.fd, bitmask, &m_curlStillRunning); + curl_multi_socket_action(m_curlMulti, toSocket(w.fd), bitmask, &m_curlStillRunning); } int Balau::TaskMan::curlMultiTimerCallbackStatic(CURLM * multi, long timeout_ms, void * userp) { @@ -406,9 +416,9 @@ int Balau::TaskMan::mainLoop() { yielded.insert(t); } - // if we begin that loop with any pending task, just don't loop, so we can add them immediately. + // if we begin that loop with any pending task, just don't block, so we can add them immediately. bool noWait = !m_pendingAdd.isEmpty() || !yielded.empty() || !stopped.empty(); - bool curlNeedsSpin = !m_curlTimer.is_active() && m_curlStillRunning != 0; + bool curlNeedsSpin = (!m_curlTimer.is_active() && m_curlStillRunning != 0) || m_curlGotNewHandles; // libev's event "loop". We always runs it once though. m_allowedToSignal = true; @@ -419,6 +429,25 @@ int Balau::TaskMan::mainLoop() { // calling async's idle s_async.idle(); + // process curl events, and signal tasks + if (m_curlGotNewHandles || curlNeedsSpin) + curl_multi_socket_all(m_curlMulti, &m_curlStillRunning); + m_curlGotNewHandles = false; + + CURLMsg * curlMsg = NULL; + int curlMsgInQueue; + + while ((curlMsg = curl_multi_info_read(m_curlMulti, &curlMsgInQueue))) { + if (curlMsg->msg != CURLMSG_DONE) + continue; + Task * maybeCurlTask = NULL; + curl_easy_getinfo(curlMsg->easy_handle, CURLINFO_PRIVATE, &maybeCurlTask); + IAssert(maybeCurlTask, "curl easy handle didn't have any private data..."); + CurlTask * curlTask = dynamic_cast<CurlTask *>(maybeCurlTask); + IAssert(curlTask, "curl easy handle had corrupted private data..."); + curlTask->curlDone(curlMsg->data.result); + } + // let's check what task got stopped, and signal them for (Task * t : stopped) { IAssert((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED), "Task %p in stopped list but isn't stopped.", t); @@ -464,8 +493,6 @@ int Balau::TaskMan::mainLoop() { yielded = yielded2; yielded2.clear(); - bool curlGotHandle = false; - // Adding tasks that were added, maybe from other threads while (!m_pendingAdd.isEmpty()) { Printer::elog(E_TASK, "TaskMan at %p trying to pop a task...", this); @@ -476,30 +503,6 @@ int Balau::TaskMan::mainLoop() { t->setup(this, t->isStackless() ? NULL : getStack()); m_tasks.insert(t); starting.insert(t); - CurlTask * curlTask = dynamic_cast<CurlTask *>(t); - if (curlTask) { - curlGotHandle = true; - curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_SHARE, s_curlShared); - curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_PRIVATE, curlTask); - curl_multi_add_handle(m_curlMulti, curlTask->m_curlHandle); - } - } - - if (curlGotHandle || curlNeedsSpin) - curl_multi_socket_all(m_curlMulti, &m_curlStillRunning); - - CURLMsg * curlMsg = NULL; - int curlMsgInQueue; - - while ((curlMsg = curl_multi_info_read(m_curlMulti, &curlMsgInQueue))) { - if (curlMsg->msg != CURLMSG_DONE) - continue; - Task * maybeCurlTask = NULL; - curl_easy_getinfo(curlMsg->easy_handle, CURLINFO_PRIVATE, &maybeCurlTask); - IAssert(maybeCurlTask, "curl easy handle didn't have any private data..."); - CurlTask * curlTask = dynamic_cast<CurlTask *>(maybeCurlTask); - IAssert(curlTask, "curl easy handle had corrupted private data..."); - curlTask->curlDone(curlMsg->data.result); } // Finally, let's destroy tasks that no longer are necessary. @@ -533,6 +536,22 @@ int Balau::TaskMan::mainLoop() { return m_stopCode; } +void Balau::TaskMan::registerCurlHandle(Balau::CurlTask * curlTask) { + m_curlGotNewHandles = true; + curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_SHARE, s_curlShared); + curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_PRIVATE, curlTask); + curl_multi_add_handle(m_curlMulti, curlTask->m_curlHandle); +} + +void Balau::TaskMan::unregisterCurlHandle(Balau::CurlTask * curlTask) { + void * ptr = NULL; + curl_easy_getinfo(curlTask->m_curlHandle, CURLINFO_PRIVATE, &ptr); + if (!ptr) + return; + curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_PRIVATE, static_cast<void *>(0)); + curl_multi_remove_handle(m_curlMulti, curlTask->m_curlHandle); +} + void Balau::TaskMan::iRegisterTask(Balau::Task * t, Balau::Task * stick, Events::TaskEvent * event) { if (stick) { IAssert(!event, "inconsistent"); |