summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNicolas "Pixel" Noble <pixel@nobis-crew.org>2014-06-19 15:38:02 -0700
committerNicolas "Pixel" Noble <pixel@nobis-crew.org>2014-06-19 15:38:02 -0700
commitc83a8a951cce9b7ed541c293993f708f720bf28d (patch)
treee2c92a5778de9829eef0b03bb49bca4acdba2396
parent948dd878a4060bac728f0af1cf7c0d0048ddace0 (diff)
Finishing up basic curl integration and adding a basic DownloadTask.
-rw-r--r--includes/CurlTask.h39
-rw-r--r--includes/Exceptions.h10
-rw-r--r--includes/TaskMan.h5
-rw-r--r--src/CurlTask.cc25
-rw-r--r--src/Exceptions.cc9
-rw-r--r--src/TaskMan.cc95
6 files changed, 128 insertions, 55 deletions
diff --git a/includes/CurlTask.h b/includes/CurlTask.h
index 0aa62a0..41243ec 100644
--- a/includes/CurlTask.h
+++ b/includes/CurlTask.h
@@ -7,20 +7,45 @@
namespace Balau {
class CurlTask : public StacklessTask {
-public:
- CurlTask();
+ public:
+ CurlTask();
+ ~CurlTask();
friend class TaskMan;
protected:
CURL * m_curlHandle;
-private:
- void curlDone(CURLcode result);
+ void registerCurlHandle() { getTaskMan()->registerCurlHandle(this); }
+ void unregisterCurlHandle() { getTaskMan()->unregisterCurlHandle(this); }
+ private:
+ virtual size_t writeFunction(char * ptr, size_t size, size_t nmemb) { return size * nmemb; }
+ virtual size_t readFunction(void * ptr, size_t size, size_t nmemb) { return CURL_READFUNC_ABORT; }
+ virtual int debugFunction(curl_infotype info, char * str, size_t str_len) { return 0; }
+ virtual void curlDone(CURLcode result) { }
static size_t writeFunctionStatic(char * ptr, size_t size, size_t nmemb, void * userdata);
- virtual size_t writeFunction(char * ptr, size_t size, size_t nmemb) { return size * nmemb; }
static size_t readFunctionStatic(void * ptr, size_t size, size_t nmemb, void * userdata);
- virtual size_t readFunction(void * ptr, size_t size, size_t nmemb) { return CURL_READFUNC_ABORT; }
static int debugFunctionStatic(CURL * easy, curl_infotype info, char * str, size_t str_len, void * userdata);
- virtual int debugFunction(curl_infotype info, char * str, size_t str_len) { return 0; }
+};
+
+class DownloadTask : public CurlTask {
+ public:
+ DownloadTask(const String & url);
+ const String & getData() const { return m_data; }
+ bool isDone() { return m_done; }
+ long responseCode() { return m_responseCode; }
+
+ protected:
+ String m_data;
+ CURLcode m_curlResult;
+ long m_responseCode;
+
+ private:
+ virtual const char * getName() const override { return m_name.to_charp(); }
+ virtual void Do() override;
+ virtual void curlDone(CURLcode result) override;
+ virtual size_t writeFunction(char * ptr, size_t size, size_t nmemb) override { m_data += ptr; return size * nmemb; }
+ String m_name;
+ Events::Custom m_evt;
+ bool m_done = false;
};
};
diff --git a/includes/Exceptions.h b/includes/Exceptions.h
index 1c66968..11d0d7b 100644
--- a/includes/Exceptions.h
+++ b/includes/Exceptions.h
@@ -97,15 +97,7 @@ static inline void * realloc(void * previous, size_t size) {
};
-static inline void AssertHelperInner(const String & msg, const char * details = NULL) throw (GeneralException) {
-#if defined(_MSC_VER) && defined(_DEBUG)
- if (IsDebuggerPresent())
- __debugbreak();
- else
-#endif
- throw GeneralException(msg, details);
-}
-
+void AssertHelperInner(const String & msg, const char * details = NULL) throw (GeneralException);
static inline void AssertHelper(const String & msg, const char * fmt, ...) printfwarning(2, 3);
static inline void AssertHelper(const String & msg, const char * fmt, ...) {
diff --git a/includes/TaskMan.h b/includes/TaskMan.h
index a51653b..1f35c1e 100644
--- a/includes/TaskMan.h
+++ b/includes/TaskMan.h
@@ -24,6 +24,7 @@ namespace gnu = __gnu_cxx;
namespace Balau {
class TaskScheduler;
+class CurlTask;
namespace Events {
@@ -85,6 +86,7 @@ class TaskMan {
void * m_fiber;
#endif
friend class Task;
+ friend class CurlTask;
friend class TaskScheduler;
template<class T>
friend T * createAsyncOp(T * op);
@@ -106,12 +108,15 @@ class TaskMan {
ev::timer m_curlTimer;
CURLM * m_curlMulti = false;
int m_curlStillRunning = 0;
+ bool m_curlGotNewHandles = false;
static int curlSocketCallbackStatic(CURL * easy, curl_socket_t s, int what, void * userp, void * socketp);
int curlSocketCallback(CURL * easy, curl_socket_t s, int what, void * socketp);
void curlSocketEventCallback(ev::io & w, int revents);
static int curlMultiTimerCallbackStatic(CURLM * multi, long timeout_ms, void * userp);
int curlMultiTimerCallback(CURLM * multi, long timeout_ms);
void curlMultiTimerEventCallback(ev::timer & w, int revents);
+ void registerCurlHandle(CurlTask * curlTask);
+ void unregisterCurlHandle(CurlTask * curlTask);
TaskMan(const TaskMan &) = delete;
TaskMan & operator=(const TaskMan &) = delete;
diff --git a/src/CurlTask.cc b/src/CurlTask.cc
index 5ff8b97..c43210f 100644
--- a/src/CurlTask.cc
+++ b/src/CurlTask.cc
@@ -10,6 +10,11 @@ Balau::CurlTask::CurlTask() {
curl_easy_setopt(m_curlHandle, CURLOPT_DEBUGDATA, this);
}
+Balau::CurlTask::~CurlTask() {
+ unregisterCurlHandle();
+ curl_easy_cleanup(m_curlHandle);
+}
+
size_t Balau::CurlTask::writeFunctionStatic(char * ptr, size_t size, size_t nmemb, void * userdata) {
CurlTask * curlTask = (CurlTask *) userdata;
return curlTask->writeFunction(ptr, size, nmemb);
@@ -26,6 +31,24 @@ int Balau::CurlTask::debugFunctionStatic(CURL * easy, curl_infotype info, char *
return curlTask->debugFunction(info, str, str_len);
}
-void Balau::CurlTask::curlDone(CURLcode result) {
+Balau::DownloadTask::DownloadTask(const Balau::String & url) {
+ curl_easy_setopt(m_curlHandle, CURLOPT_URL, url.to_charp());
+ m_name.set("DownloadTask(%s)", url.to_charp());
+}
+
+void Balau::DownloadTask::Do() {
+ if (m_state)
+ return;
+
+ m_state = 1;
+ registerCurlHandle();
+ waitFor(&m_evt);
+ yield();
+}
+void Balau::DownloadTask::curlDone(CURLcode result) {
+ m_curlResult = result;
+ curl_easy_getinfo(m_curlHandle, CURLINFO_RESPONSE_CODE, &m_responseCode);
+ m_evt.doSignal();
+ m_done = true;
}
diff --git a/src/Exceptions.cc b/src/Exceptions.cc
index 38fa005..44743fe 100644
--- a/src/Exceptions.cc
+++ b/src/Exceptions.cc
@@ -102,3 +102,12 @@ void Balau::ExitHelper(const String & msg, const char * fmt, ...) {
ExitHelperInner(msg, NULL);
}
}
+
+void Balau::AssertHelperInner(const Balau::String & msg, const char * details) throw (Balau::GeneralException) {
+#if defined(_MSC_VER) && defined(_DEBUG)
+ if (IsDebuggerPresent())
+ __debugbreak();
+ else
+#endif
+ throw Balau::GeneralException(msg, details);
+}
diff --git a/src/TaskMan.cc b/src/TaskMan.cc
index 00ce0f2..07fff44 100644
--- a/src/TaskMan.cc
+++ b/src/TaskMan.cc
@@ -1,5 +1,6 @@
-#ifdef _MSC_VER
-#include <Windows.h>
+#ifdef _WIN32
+#include <windows.h>
+#include <io.h>
#endif
#undef ERROR
@@ -42,12 +43,13 @@ class CurlSharedManager : public Balau::AtStart, Balau::AtExit {
public:
CurlSharedManager() : AtStart(0), AtExit(0) { }
struct SharedLocks {
- Balau::RWLock cookie, dns, ssl_session;
+ Balau::RWLock share, cookie, dns, ssl_session;
};
static void lock_function(CURL *handle, curl_lock_data data, curl_lock_access access, void * userptr) {
SharedLocks * locks = (SharedLocks *) userptr;
Balau::RWLock * lock = NULL;
switch (data) {
+ case CURL_LOCK_DATA_SHARE: lock = &locks->share; break;
case CURL_LOCK_DATA_COOKIE: lock = &locks->cookie; break;
case CURL_LOCK_DATA_DNS: lock = &locks->dns; break;
case CURL_LOCK_DATA_SSL_SESSION: lock = &locks->ssl_session; break;
@@ -63,6 +65,7 @@ class CurlSharedManager : public Balau::AtStart, Balau::AtExit {
SharedLocks * locks = (SharedLocks *) userptr;
Balau::RWLock * lock = NULL;
switch (data) {
+ case CURL_LOCK_DATA_SHARE: lock = &locks->share; break;
case CURL_LOCK_DATA_COOKIE: lock = &locks->cookie; break;
case CURL_LOCK_DATA_DNS: lock = &locks->dns; break;
case CURL_LOCK_DATA_SSL_SESSION: lock = &locks->ssl_session; break;
@@ -79,7 +82,7 @@ class CurlSharedManager : public Balau::AtStart, Balau::AtExit {
curl_share_setopt(s_curlShared, CURLSHOPT_SHARE, CURL_LOCK_DATA_SSL_SESSION);
curl_share_setopt(s_curlShared, CURLSHOPT_USERDATA, &locks);
curl_share_setopt(s_curlShared, CURLSHOPT_LOCKFUNC, lock_function);
- curl_share_setopt(s_curlShared, CURLSHOPT_UNLOCKFUNC, lock_function);
+ curl_share_setopt(s_curlShared, CURLSHOPT_UNLOCKFUNC, unlock_function);
}
void doExit() {
curl_share_cleanup(s_curlShared);
@@ -246,21 +249,28 @@ Balau::TaskMan::TaskMan() {
m_curlTimer.set<TaskMan, &TaskMan::curlMultiTimerEventCallback>(this);
}
+#ifdef _WIN32
+inline static int fromSocket(SOCKET s) { return _open_osfhandle(s, 0); }
+inline static SOCKET toSocket(int fd) { return _get_osfhandle(fd); }
+#else
+inline static int fromSocket(int s) { return s; }
+inline static int toSocket(int fd) { return fd; }
+#endif
+
int Balau::TaskMan::curlSocketCallbackStatic(CURL * easy, curl_socket_t s, int what, void * userp, void * socketp) {
TaskMan * taskMan = (TaskMan *)userp;
return taskMan->curlSocketCallback(easy, s, what, socketp);
}
int Balau::TaskMan::curlSocketCallback(CURL * easy, curl_socket_t s, int what, void * socketp) {
+ int fd = fromSocket(s);
ev::io * evt = (ev::io *) socketp;
if (!evt) {
if (what == CURL_POLL_REMOVE)
return 0;
evt = new ev::io;
evt->set<TaskMan, &TaskMan::curlSocketEventCallback>(this);
- evt->set(s, ev::READ | ev::WRITE);
evt->set(m_loop);
- evt->start();
curl_multi_assign(m_curlMulti, s, evt);
}
@@ -270,17 +280,17 @@ int Balau::TaskMan::curlSocketCallback(CURL * easy, curl_socket_t s, int what, v
break;
case CURL_POLL_IN:
evt->stop();
- evt->set(s, ev::READ);
+ evt->set(fd, ev::READ);
evt->start();
break;
case CURL_POLL_OUT:
evt->stop();
- evt->set(s, ev::WRITE);
+ evt->set(fd, ev::WRITE);
evt->start();
break;
case CURL_POLL_INOUT:
evt->stop();
- evt->set(s, ev::READ | ev::WRITE);
+ evt->set(fd, ev::READ | ev::WRITE);
evt->start();
break;
case CURL_POLL_REMOVE:
@@ -300,7 +310,7 @@ void Balau::TaskMan::curlSocketEventCallback(ev::io & w, int revents) {
bitmask |= CURL_CSELECT_OUT;
if (revents & ev::ERROR)
bitmask |= CURL_CSELECT_ERR;
- curl_multi_socket_action(m_curlMulti, w.fd, bitmask, &m_curlStillRunning);
+ curl_multi_socket_action(m_curlMulti, toSocket(w.fd), bitmask, &m_curlStillRunning);
}
int Balau::TaskMan::curlMultiTimerCallbackStatic(CURLM * multi, long timeout_ms, void * userp) {
@@ -406,9 +416,9 @@ int Balau::TaskMan::mainLoop() {
yielded.insert(t);
}
- // if we begin that loop with any pending task, just don't loop, so we can add them immediately.
+ // if we begin that loop with any pending task, just don't block, so we can add them immediately.
bool noWait = !m_pendingAdd.isEmpty() || !yielded.empty() || !stopped.empty();
- bool curlNeedsSpin = !m_curlTimer.is_active() && m_curlStillRunning != 0;
+ bool curlNeedsSpin = (!m_curlTimer.is_active() && m_curlStillRunning != 0) || m_curlGotNewHandles;
// libev's event "loop". We always runs it once though.
m_allowedToSignal = true;
@@ -419,6 +429,25 @@ int Balau::TaskMan::mainLoop() {
// calling async's idle
s_async.idle();
+ // process curl events, and signal tasks
+ if (m_curlGotNewHandles || curlNeedsSpin)
+ curl_multi_socket_all(m_curlMulti, &m_curlStillRunning);
+ m_curlGotNewHandles = false;
+
+ CURLMsg * curlMsg = NULL;
+ int curlMsgInQueue;
+
+ while ((curlMsg = curl_multi_info_read(m_curlMulti, &curlMsgInQueue))) {
+ if (curlMsg->msg != CURLMSG_DONE)
+ continue;
+ Task * maybeCurlTask = NULL;
+ curl_easy_getinfo(curlMsg->easy_handle, CURLINFO_PRIVATE, &maybeCurlTask);
+ IAssert(maybeCurlTask, "curl easy handle didn't have any private data...");
+ CurlTask * curlTask = dynamic_cast<CurlTask *>(maybeCurlTask);
+ IAssert(curlTask, "curl easy handle had corrupted private data...");
+ curlTask->curlDone(curlMsg->data.result);
+ }
+
// let's check what task got stopped, and signal them
for (Task * t : stopped) {
IAssert((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED), "Task %p in stopped list but isn't stopped.", t);
@@ -464,8 +493,6 @@ int Balau::TaskMan::mainLoop() {
yielded = yielded2;
yielded2.clear();
- bool curlGotHandle = false;
-
// Adding tasks that were added, maybe from other threads
while (!m_pendingAdd.isEmpty()) {
Printer::elog(E_TASK, "TaskMan at %p trying to pop a task...", this);
@@ -476,30 +503,6 @@ int Balau::TaskMan::mainLoop() {
t->setup(this, t->isStackless() ? NULL : getStack());
m_tasks.insert(t);
starting.insert(t);
- CurlTask * curlTask = dynamic_cast<CurlTask *>(t);
- if (curlTask) {
- curlGotHandle = true;
- curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_SHARE, s_curlShared);
- curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_PRIVATE, curlTask);
- curl_multi_add_handle(m_curlMulti, curlTask->m_curlHandle);
- }
- }
-
- if (curlGotHandle || curlNeedsSpin)
- curl_multi_socket_all(m_curlMulti, &m_curlStillRunning);
-
- CURLMsg * curlMsg = NULL;
- int curlMsgInQueue;
-
- while ((curlMsg = curl_multi_info_read(m_curlMulti, &curlMsgInQueue))) {
- if (curlMsg->msg != CURLMSG_DONE)
- continue;
- Task * maybeCurlTask = NULL;
- curl_easy_getinfo(curlMsg->easy_handle, CURLINFO_PRIVATE, &maybeCurlTask);
- IAssert(maybeCurlTask, "curl easy handle didn't have any private data...");
- CurlTask * curlTask = dynamic_cast<CurlTask *>(maybeCurlTask);
- IAssert(curlTask, "curl easy handle had corrupted private data...");
- curlTask->curlDone(curlMsg->data.result);
}
// Finally, let's destroy tasks that no longer are necessary.
@@ -533,6 +536,22 @@ int Balau::TaskMan::mainLoop() {
return m_stopCode;
}
+void Balau::TaskMan::registerCurlHandle(Balau::CurlTask * curlTask) {
+ m_curlGotNewHandles = true;
+ curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_SHARE, s_curlShared);
+ curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_PRIVATE, curlTask);
+ curl_multi_add_handle(m_curlMulti, curlTask->m_curlHandle);
+}
+
+void Balau::TaskMan::unregisterCurlHandle(Balau::CurlTask * curlTask) {
+ void * ptr = NULL;
+ curl_easy_getinfo(curlTask->m_curlHandle, CURLINFO_PRIVATE, &ptr);
+ if (!ptr)
+ return;
+ curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_PRIVATE, static_cast<void *>(0));
+ curl_multi_remove_handle(m_curlMulti, curlTask->m_curlHandle);
+}
+
void Balau::TaskMan::iRegisterTask(Balau::Task * t, Balau::Task * stick, Events::TaskEvent * event) {
if (stick) {
IAssert(!event, "inconsistent");