From c83a8a951cce9b7ed541c293993f708f720bf28d Mon Sep 17 00:00:00 2001
From: "Nicolas \"Pixel\" Noble" <pixel@nobis-crew.org>
Date: Thu, 19 Jun 2014 15:38:02 -0700
Subject: Finishing up basic curl integration and adding a basic DownloadTask.

---
 includes/CurlTask.h   | 39 +++++++++++++++++----
 includes/Exceptions.h | 10 +-----
 includes/TaskMan.h    |  5 +++
 src/CurlTask.cc       | 25 +++++++++++++-
 src/Exceptions.cc     |  9 +++++
 src/TaskMan.cc        | 95 ++++++++++++++++++++++++++++++---------------------
 6 files changed, 128 insertions(+), 55 deletions(-)

diff --git a/includes/CurlTask.h b/includes/CurlTask.h
index 0aa62a0..41243ec 100644
--- a/includes/CurlTask.h
+++ b/includes/CurlTask.h
@@ -7,20 +7,45 @@
 namespace Balau {
 
 class CurlTask : public StacklessTask {
-public:
-    CurlTask();
+  public:
+      CurlTask();
+      ~CurlTask();
     friend class TaskMan;
   protected:
     CURL * m_curlHandle;
-private:
-    void curlDone(CURLcode result);
+    void registerCurlHandle() { getTaskMan()->registerCurlHandle(this); }
+    void unregisterCurlHandle() { getTaskMan()->unregisterCurlHandle(this); }
+  private:
+    virtual size_t writeFunction(char * ptr, size_t size, size_t nmemb) { return size * nmemb; }
+    virtual size_t readFunction(void * ptr, size_t size, size_t nmemb) { return CURL_READFUNC_ABORT; }
+    virtual int    debugFunction(curl_infotype info, char * str, size_t str_len) { return 0; }
+    virtual void   curlDone(CURLcode result) { }
 
     static  size_t writeFunctionStatic(char * ptr, size_t size, size_t nmemb, void * userdata);
-    virtual size_t writeFunction(char * ptr, size_t size, size_t nmemb) { return size * nmemb; }
     static  size_t readFunctionStatic(void * ptr, size_t size, size_t nmemb, void * userdata);
-    virtual size_t readFunction(void * ptr, size_t size, size_t nmemb) { return CURL_READFUNC_ABORT; }
     static  int    debugFunctionStatic(CURL * easy, curl_infotype info, char * str, size_t str_len, void * userdata);
-    virtual int    debugFunction(curl_infotype info, char * str, size_t str_len) { return 0; }
+};
+
+class DownloadTask : public CurlTask {
+  public:
+      DownloadTask(const String & url);
+    const String & getData() const { return m_data; }
+    bool isDone() { return m_done; }
+    long responseCode() { return m_responseCode; }
+
+  protected:
+    String m_data;
+    CURLcode m_curlResult;
+    long m_responseCode;
+
+  private:
+    virtual const char * getName() const override { return m_name.to_charp(); }
+    virtual void Do() override;
+    virtual void curlDone(CURLcode result) override;
+    virtual size_t writeFunction(char * ptr, size_t size, size_t nmemb) override { m_data += ptr; return size * nmemb; }
+    String m_name;
+    Events::Custom m_evt;
+    bool m_done = false;
 };
 
 };
diff --git a/includes/Exceptions.h b/includes/Exceptions.h
index 1c66968..11d0d7b 100644
--- a/includes/Exceptions.h
+++ b/includes/Exceptions.h
@@ -97,15 +97,7 @@ static inline void * realloc(void * previous, size_t size) {
 
 };
 
-static inline void AssertHelperInner(const String & msg, const char * details = NULL) throw (GeneralException) {
-#if defined(_MSC_VER) && defined(_DEBUG)
-	if (IsDebuggerPresent())
-		__debugbreak();
-	else
-#endif
-		throw GeneralException(msg, details);
-}
-
+void AssertHelperInner(const String & msg, const char * details = NULL) throw (GeneralException);
 static inline void AssertHelper(const String & msg, const char * fmt, ...) printfwarning(2, 3);
 
 static inline void AssertHelper(const String & msg, const char * fmt, ...) {
diff --git a/includes/TaskMan.h b/includes/TaskMan.h
index a51653b..1f35c1e 100644
--- a/includes/TaskMan.h
+++ b/includes/TaskMan.h
@@ -24,6 +24,7 @@ namespace gnu = __gnu_cxx;
 namespace Balau {
 
 class TaskScheduler;
+class CurlTask;
 
 namespace Events {
 
@@ -85,6 +86,7 @@ class TaskMan {
     void * m_fiber;
 #endif
     friend class Task;
+    friend class CurlTask;
     friend class TaskScheduler;
     template<class T>
     friend T * createAsyncOp(T * op);
@@ -106,12 +108,15 @@ class TaskMan {
     ev::timer m_curlTimer;
     CURLM * m_curlMulti = false;
     int m_curlStillRunning = 0;
+    bool m_curlGotNewHandles = false;
     static int curlSocketCallbackStatic(CURL * easy, curl_socket_t s, int what, void * userp, void * socketp);
     int curlSocketCallback(CURL * easy, curl_socket_t s, int what, void * socketp);
     void curlSocketEventCallback(ev::io & w, int revents);
     static int curlMultiTimerCallbackStatic(CURLM * multi, long timeout_ms, void * userp);
     int curlMultiTimerCallback(CURLM * multi, long timeout_ms);
     void curlMultiTimerEventCallback(ev::timer & w, int revents);
+    void registerCurlHandle(CurlTask * curlTask);
+    void unregisterCurlHandle(CurlTask * curlTask);
 
       TaskMan(const TaskMan &) = delete;
     TaskMan & operator=(const TaskMan &) = delete;
diff --git a/src/CurlTask.cc b/src/CurlTask.cc
index 5ff8b97..c43210f 100644
--- a/src/CurlTask.cc
+++ b/src/CurlTask.cc
@@ -10,6 +10,11 @@ Balau::CurlTask::CurlTask() {
     curl_easy_setopt(m_curlHandle, CURLOPT_DEBUGDATA, this);
 }
 
+Balau::CurlTask::~CurlTask() {
+    unregisterCurlHandle();
+    curl_easy_cleanup(m_curlHandle);
+}
+
 size_t Balau::CurlTask::writeFunctionStatic(char * ptr, size_t size, size_t nmemb, void * userdata) {
     CurlTask * curlTask = (CurlTask *) userdata;
     return curlTask->writeFunction(ptr, size, nmemb);
@@ -26,6 +31,24 @@ int Balau::CurlTask::debugFunctionStatic(CURL * easy, curl_infotype info, char *
     return curlTask->debugFunction(info, str, str_len);
 }
 
-void Balau::CurlTask::curlDone(CURLcode result) {
+Balau::DownloadTask::DownloadTask(const Balau::String & url) {
+    curl_easy_setopt(m_curlHandle, CURLOPT_URL, url.to_charp());
+    m_name.set("DownloadTask(%s)", url.to_charp());
+}
+
+void Balau::DownloadTask::Do() {
+    if (m_state)
+        return;
+
+    m_state = 1;
+    registerCurlHandle();
+    waitFor(&m_evt);
+    yield();
+}
 
+void Balau::DownloadTask::curlDone(CURLcode result) {
+    m_curlResult = result;
+    curl_easy_getinfo(m_curlHandle, CURLINFO_RESPONSE_CODE, &m_responseCode);
+    m_evt.doSignal();
+    m_done = true;
 }
diff --git a/src/Exceptions.cc b/src/Exceptions.cc
index 38fa005..44743fe 100644
--- a/src/Exceptions.cc
+++ b/src/Exceptions.cc
@@ -102,3 +102,12 @@ void Balau::ExitHelper(const String & msg, const char * fmt, ...) {
         ExitHelperInner(msg, NULL);
     }
 }
+
+void Balau::AssertHelperInner(const Balau::String & msg, const char * details) throw (Balau::GeneralException) {
+#if defined(_MSC_VER) && defined(_DEBUG)
+    if (IsDebuggerPresent())
+        __debugbreak();
+    else
+#endif
+        throw Balau::GeneralException(msg, details);
+}
diff --git a/src/TaskMan.cc b/src/TaskMan.cc
index 00ce0f2..07fff44 100644
--- a/src/TaskMan.cc
+++ b/src/TaskMan.cc
@@ -1,5 +1,6 @@
-#ifdef _MSC_VER
-#include <Windows.h>
+#ifdef _WIN32
+#include <windows.h>
+#include <io.h>
 #endif
 
 #undef ERROR
@@ -42,12 +43,13 @@ class CurlSharedManager : public Balau::AtStart, Balau::AtExit {
   public:
       CurlSharedManager() : AtStart(0), AtExit(0) { }
     struct SharedLocks {
-        Balau::RWLock cookie, dns, ssl_session;
+        Balau::RWLock share, cookie, dns, ssl_session;
     };
     static void lock_function(CURL *handle, curl_lock_data data, curl_lock_access access, void * userptr) {
         SharedLocks * locks = (SharedLocks *) userptr;
         Balau::RWLock * lock = NULL;
         switch (data) {
+            case CURL_LOCK_DATA_SHARE: lock = &locks->share; break;
             case CURL_LOCK_DATA_COOKIE: lock = &locks->cookie; break;
             case CURL_LOCK_DATA_DNS: lock = &locks->dns; break;
             case CURL_LOCK_DATA_SSL_SESSION: lock = &locks->ssl_session; break;
@@ -63,6 +65,7 @@ class CurlSharedManager : public Balau::AtStart, Balau::AtExit {
         SharedLocks * locks = (SharedLocks *) userptr;
         Balau::RWLock * lock = NULL;
         switch (data) {
+            case CURL_LOCK_DATA_SHARE: lock = &locks->share; break;
             case CURL_LOCK_DATA_COOKIE: lock = &locks->cookie; break;
             case CURL_LOCK_DATA_DNS: lock = &locks->dns; break;
             case CURL_LOCK_DATA_SSL_SESSION: lock = &locks->ssl_session; break;
@@ -79,7 +82,7 @@ class CurlSharedManager : public Balau::AtStart, Balau::AtExit {
         curl_share_setopt(s_curlShared, CURLSHOPT_SHARE, CURL_LOCK_DATA_SSL_SESSION);
         curl_share_setopt(s_curlShared, CURLSHOPT_USERDATA, &locks);
         curl_share_setopt(s_curlShared, CURLSHOPT_LOCKFUNC, lock_function);
-        curl_share_setopt(s_curlShared, CURLSHOPT_UNLOCKFUNC, lock_function);
+        curl_share_setopt(s_curlShared, CURLSHOPT_UNLOCKFUNC, unlock_function);
     }
     void doExit() {
         curl_share_cleanup(s_curlShared);
@@ -246,21 +249,28 @@ Balau::TaskMan::TaskMan() {
     m_curlTimer.set<TaskMan, &TaskMan::curlMultiTimerEventCallback>(this);
 }
 
+#ifdef _WIN32
+inline static int fromSocket(SOCKET s) { return _open_osfhandle(s, 0); }
+inline static SOCKET toSocket(int fd) { return _get_osfhandle(fd); }
+#else
+inline static int fromSocket(int s) { return s; }
+inline static int toSocket(int fd) { return fd; }
+#endif
+
 int Balau::TaskMan::curlSocketCallbackStatic(CURL * easy, curl_socket_t s, int what, void * userp, void * socketp) {
     TaskMan * taskMan = (TaskMan *)userp;
     return taskMan->curlSocketCallback(easy, s, what, socketp);
 }
 
 int Balau::TaskMan::curlSocketCallback(CURL * easy, curl_socket_t s, int what, void * socketp) {
+    int fd = fromSocket(s);
     ev::io * evt = (ev::io *) socketp;
     if (!evt) {
         if (what == CURL_POLL_REMOVE)
             return 0;
         evt = new ev::io;
         evt->set<TaskMan, &TaskMan::curlSocketEventCallback>(this);
-        evt->set(s, ev::READ | ev::WRITE);
         evt->set(m_loop);
-        evt->start();
         curl_multi_assign(m_curlMulti, s, evt);
     }
 
@@ -270,17 +280,17 @@ int Balau::TaskMan::curlSocketCallback(CURL * easy, curl_socket_t s, int what, v
         break;
     case CURL_POLL_IN:
         evt->stop();
-        evt->set(s, ev::READ);
+        evt->set(fd, ev::READ);
         evt->start();
         break;
     case CURL_POLL_OUT:
         evt->stop();
-        evt->set(s, ev::WRITE);
+        evt->set(fd, ev::WRITE);
         evt->start();
         break;
     case CURL_POLL_INOUT:
         evt->stop();
-        evt->set(s, ev::READ | ev::WRITE);
+        evt->set(fd, ev::READ | ev::WRITE);
         evt->start();
         break;
     case CURL_POLL_REMOVE:
@@ -300,7 +310,7 @@ void Balau::TaskMan::curlSocketEventCallback(ev::io & w, int revents) {
         bitmask |= CURL_CSELECT_OUT;
     if (revents & ev::ERROR)
         bitmask |= CURL_CSELECT_ERR;
-    curl_multi_socket_action(m_curlMulti, w.fd, bitmask, &m_curlStillRunning);
+    curl_multi_socket_action(m_curlMulti, toSocket(w.fd), bitmask, &m_curlStillRunning);
 }
 
 int Balau::TaskMan::curlMultiTimerCallbackStatic(CURLM * multi, long timeout_ms, void * userp) {
@@ -406,9 +416,9 @@ int Balau::TaskMan::mainLoop() {
                 yielded.insert(t);
         }
 
-        // if we begin that loop with any pending task, just don't loop, so we can add them immediately.
+        // if we begin that loop with any pending task, just don't block, so we can add them immediately.
         bool noWait = !m_pendingAdd.isEmpty() || !yielded.empty() || !stopped.empty();
-        bool curlNeedsSpin = !m_curlTimer.is_active() && m_curlStillRunning != 0;
+        bool curlNeedsSpin = (!m_curlTimer.is_active() && m_curlStillRunning != 0) || m_curlGotNewHandles;
 
         // libev's event "loop". We always runs it once though.
         m_allowedToSignal = true;
@@ -419,6 +429,25 @@ int Balau::TaskMan::mainLoop() {
         // calling async's idle
         s_async.idle();
 
+        // process curl events, and signal tasks
+        if (m_curlGotNewHandles || curlNeedsSpin)
+            curl_multi_socket_all(m_curlMulti, &m_curlStillRunning);
+        m_curlGotNewHandles = false;
+
+        CURLMsg * curlMsg = NULL;
+        int curlMsgInQueue;
+
+        while ((curlMsg = curl_multi_info_read(m_curlMulti, &curlMsgInQueue))) {
+            if (curlMsg->msg != CURLMSG_DONE)
+                continue;
+            Task * maybeCurlTask = NULL;
+            curl_easy_getinfo(curlMsg->easy_handle, CURLINFO_PRIVATE, &maybeCurlTask);
+            IAssert(maybeCurlTask, "curl easy handle didn't have any private data...");
+            CurlTask * curlTask = dynamic_cast<CurlTask *>(maybeCurlTask);
+            IAssert(curlTask, "curl easy handle had corrupted private data...");
+            curlTask->curlDone(curlMsg->data.result);
+        }
+
         // let's check what task got stopped, and signal them
         for (Task * t : stopped) {
             IAssert((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED), "Task %p in stopped list but isn't stopped.", t);
@@ -464,8 +493,6 @@ int Balau::TaskMan::mainLoop() {
         yielded = yielded2;
         yielded2.clear();
 
-        bool curlGotHandle = false;
-
         // Adding tasks that were added, maybe from other threads
         while (!m_pendingAdd.isEmpty()) {
             Printer::elog(E_TASK, "TaskMan at %p trying to pop a task...", this);
@@ -476,30 +503,6 @@ int Balau::TaskMan::mainLoop() {
             t->setup(this, t->isStackless() ? NULL : getStack());
             m_tasks.insert(t);
             starting.insert(t);
-            CurlTask * curlTask = dynamic_cast<CurlTask *>(t);
-            if (curlTask) {
-                curlGotHandle = true;
-                curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_SHARE, s_curlShared);
-                curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_PRIVATE, curlTask);
-                curl_multi_add_handle(m_curlMulti, curlTask->m_curlHandle);
-            }
-        }
-
-        if (curlGotHandle || curlNeedsSpin)
-            curl_multi_socket_all(m_curlMulti, &m_curlStillRunning);
-
-        CURLMsg * curlMsg = NULL;
-        int curlMsgInQueue;
-
-        while ((curlMsg = curl_multi_info_read(m_curlMulti, &curlMsgInQueue))) {
-            if (curlMsg->msg != CURLMSG_DONE)
-                continue;
-            Task * maybeCurlTask = NULL;
-            curl_easy_getinfo(curlMsg->easy_handle, CURLINFO_PRIVATE, &maybeCurlTask);
-            IAssert(maybeCurlTask, "curl easy handle didn't have any private data...");
-            CurlTask * curlTask = dynamic_cast<CurlTask *>(maybeCurlTask);
-            IAssert(curlTask, "curl easy handle had corrupted private data...");
-            curlTask->curlDone(curlMsg->data.result);
         }
 
         // Finally, let's destroy tasks that no longer are necessary.
@@ -533,6 +536,22 @@ int Balau::TaskMan::mainLoop() {
     return m_stopCode;
 }
 
+void Balau::TaskMan::registerCurlHandle(Balau::CurlTask * curlTask) {
+    m_curlGotNewHandles = true;
+    curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_SHARE, s_curlShared);
+    curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_PRIVATE, curlTask);
+    curl_multi_add_handle(m_curlMulti, curlTask->m_curlHandle);
+}
+
+void Balau::TaskMan::unregisterCurlHandle(Balau::CurlTask * curlTask) {
+    void * ptr = NULL;
+    curl_easy_getinfo(curlTask->m_curlHandle, CURLINFO_PRIVATE, &ptr);
+    if (!ptr)
+        return;
+    curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_PRIVATE, static_cast<void *>(0));
+    curl_multi_remove_handle(m_curlMulti, curlTask->m_curlHandle);
+}
+
 void Balau::TaskMan::iRegisterTask(Balau::Task * t, Balau::Task * stick, Events::TaskEvent * event) {
     if (stick) {
         IAssert(!event, "inconsistent");
-- 
cgit v1.2.3