summaryrefslogtreecommitdiff
path: root/src/TaskMan.cc
diff options
context:
space:
mode:
authorNicolas "Pixel" Noble <pixel@nobis-crew.org>2014-06-19 15:38:02 -0700
committerNicolas "Pixel" Noble <pixel@nobis-crew.org>2014-06-19 15:38:02 -0700
commitc83a8a951cce9b7ed541c293993f708f720bf28d (patch)
treee2c92a5778de9829eef0b03bb49bca4acdba2396 /src/TaskMan.cc
parent948dd878a4060bac728f0af1cf7c0d0048ddace0 (diff)
Finishing up basic curl integration and adding a basic DownloadTask.
Diffstat (limited to 'src/TaskMan.cc')
-rw-r--r--src/TaskMan.cc95
1 files changed, 57 insertions, 38 deletions
diff --git a/src/TaskMan.cc b/src/TaskMan.cc
index 00ce0f2..07fff44 100644
--- a/src/TaskMan.cc
+++ b/src/TaskMan.cc
@@ -1,5 +1,6 @@
-#ifdef _MSC_VER
-#include <Windows.h>
+#ifdef _WIN32
+#include <windows.h>
+#include <io.h>
#endif
#undef ERROR
@@ -42,12 +43,13 @@ class CurlSharedManager : public Balau::AtStart, Balau::AtExit {
public:
CurlSharedManager() : AtStart(0), AtExit(0) { }
struct SharedLocks {
- Balau::RWLock cookie, dns, ssl_session;
+ Balau::RWLock share, cookie, dns, ssl_session;
};
static void lock_function(CURL *handle, curl_lock_data data, curl_lock_access access, void * userptr) {
SharedLocks * locks = (SharedLocks *) userptr;
Balau::RWLock * lock = NULL;
switch (data) {
+ case CURL_LOCK_DATA_SHARE: lock = &locks->share; break;
case CURL_LOCK_DATA_COOKIE: lock = &locks->cookie; break;
case CURL_LOCK_DATA_DNS: lock = &locks->dns; break;
case CURL_LOCK_DATA_SSL_SESSION: lock = &locks->ssl_session; break;
@@ -63,6 +65,7 @@ class CurlSharedManager : public Balau::AtStart, Balau::AtExit {
SharedLocks * locks = (SharedLocks *) userptr;
Balau::RWLock * lock = NULL;
switch (data) {
+ case CURL_LOCK_DATA_SHARE: lock = &locks->share; break;
case CURL_LOCK_DATA_COOKIE: lock = &locks->cookie; break;
case CURL_LOCK_DATA_DNS: lock = &locks->dns; break;
case CURL_LOCK_DATA_SSL_SESSION: lock = &locks->ssl_session; break;
@@ -79,7 +82,7 @@ class CurlSharedManager : public Balau::AtStart, Balau::AtExit {
curl_share_setopt(s_curlShared, CURLSHOPT_SHARE, CURL_LOCK_DATA_SSL_SESSION);
curl_share_setopt(s_curlShared, CURLSHOPT_USERDATA, &locks);
curl_share_setopt(s_curlShared, CURLSHOPT_LOCKFUNC, lock_function);
- curl_share_setopt(s_curlShared, CURLSHOPT_UNLOCKFUNC, lock_function);
+ curl_share_setopt(s_curlShared, CURLSHOPT_UNLOCKFUNC, unlock_function);
}
void doExit() {
curl_share_cleanup(s_curlShared);
@@ -246,21 +249,28 @@ Balau::TaskMan::TaskMan() {
m_curlTimer.set<TaskMan, &TaskMan::curlMultiTimerEventCallback>(this);
}
+#ifdef _WIN32
+inline static int fromSocket(SOCKET s) { return _open_osfhandle(s, 0); }
+inline static SOCKET toSocket(int fd) { return _get_osfhandle(fd); }
+#else
+inline static int fromSocket(int s) { return s; }
+inline static int toSocket(int fd) { return fd; }
+#endif
+
int Balau::TaskMan::curlSocketCallbackStatic(CURL * easy, curl_socket_t s, int what, void * userp, void * socketp) {
TaskMan * taskMan = (TaskMan *)userp;
return taskMan->curlSocketCallback(easy, s, what, socketp);
}
int Balau::TaskMan::curlSocketCallback(CURL * easy, curl_socket_t s, int what, void * socketp) {
+ int fd = fromSocket(s);
ev::io * evt = (ev::io *) socketp;
if (!evt) {
if (what == CURL_POLL_REMOVE)
return 0;
evt = new ev::io;
evt->set<TaskMan, &TaskMan::curlSocketEventCallback>(this);
- evt->set(s, ev::READ | ev::WRITE);
evt->set(m_loop);
- evt->start();
curl_multi_assign(m_curlMulti, s, evt);
}
@@ -270,17 +280,17 @@ int Balau::TaskMan::curlSocketCallback(CURL * easy, curl_socket_t s, int what, v
break;
case CURL_POLL_IN:
evt->stop();
- evt->set(s, ev::READ);
+ evt->set(fd, ev::READ);
evt->start();
break;
case CURL_POLL_OUT:
evt->stop();
- evt->set(s, ev::WRITE);
+ evt->set(fd, ev::WRITE);
evt->start();
break;
case CURL_POLL_INOUT:
evt->stop();
- evt->set(s, ev::READ | ev::WRITE);
+ evt->set(fd, ev::READ | ev::WRITE);
evt->start();
break;
case CURL_POLL_REMOVE:
@@ -300,7 +310,7 @@ void Balau::TaskMan::curlSocketEventCallback(ev::io & w, int revents) {
bitmask |= CURL_CSELECT_OUT;
if (revents & ev::ERROR)
bitmask |= CURL_CSELECT_ERR;
- curl_multi_socket_action(m_curlMulti, w.fd, bitmask, &m_curlStillRunning);
+ curl_multi_socket_action(m_curlMulti, toSocket(w.fd), bitmask, &m_curlStillRunning);
}
int Balau::TaskMan::curlMultiTimerCallbackStatic(CURLM * multi, long timeout_ms, void * userp) {
@@ -406,9 +416,9 @@ int Balau::TaskMan::mainLoop() {
yielded.insert(t);
}
- // if we begin that loop with any pending task, just don't loop, so we can add them immediately.
+ // if we begin that loop with any pending task, just don't block, so we can add them immediately.
bool noWait = !m_pendingAdd.isEmpty() || !yielded.empty() || !stopped.empty();
- bool curlNeedsSpin = !m_curlTimer.is_active() && m_curlStillRunning != 0;
+ bool curlNeedsSpin = (!m_curlTimer.is_active() && m_curlStillRunning != 0) || m_curlGotNewHandles;
// libev's event "loop". We always runs it once though.
m_allowedToSignal = true;
@@ -419,6 +429,25 @@ int Balau::TaskMan::mainLoop() {
// calling async's idle
s_async.idle();
+ // process curl events, and signal tasks
+ if (m_curlGotNewHandles || curlNeedsSpin)
+ curl_multi_socket_all(m_curlMulti, &m_curlStillRunning);
+ m_curlGotNewHandles = false;
+
+ CURLMsg * curlMsg = NULL;
+ int curlMsgInQueue;
+
+ while ((curlMsg = curl_multi_info_read(m_curlMulti, &curlMsgInQueue))) {
+ if (curlMsg->msg != CURLMSG_DONE)
+ continue;
+ Task * maybeCurlTask = NULL;
+ curl_easy_getinfo(curlMsg->easy_handle, CURLINFO_PRIVATE, &maybeCurlTask);
+ IAssert(maybeCurlTask, "curl easy handle didn't have any private data...");
+ CurlTask * curlTask = dynamic_cast<CurlTask *>(maybeCurlTask);
+ IAssert(curlTask, "curl easy handle had corrupted private data...");
+ curlTask->curlDone(curlMsg->data.result);
+ }
+
// let's check what task got stopped, and signal them
for (Task * t : stopped) {
IAssert((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED), "Task %p in stopped list but isn't stopped.", t);
@@ -464,8 +493,6 @@ int Balau::TaskMan::mainLoop() {
yielded = yielded2;
yielded2.clear();
- bool curlGotHandle = false;
-
// Adding tasks that were added, maybe from other threads
while (!m_pendingAdd.isEmpty()) {
Printer::elog(E_TASK, "TaskMan at %p trying to pop a task...", this);
@@ -476,30 +503,6 @@ int Balau::TaskMan::mainLoop() {
t->setup(this, t->isStackless() ? NULL : getStack());
m_tasks.insert(t);
starting.insert(t);
- CurlTask * curlTask = dynamic_cast<CurlTask *>(t);
- if (curlTask) {
- curlGotHandle = true;
- curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_SHARE, s_curlShared);
- curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_PRIVATE, curlTask);
- curl_multi_add_handle(m_curlMulti, curlTask->m_curlHandle);
- }
- }
-
- if (curlGotHandle || curlNeedsSpin)
- curl_multi_socket_all(m_curlMulti, &m_curlStillRunning);
-
- CURLMsg * curlMsg = NULL;
- int curlMsgInQueue;
-
- while ((curlMsg = curl_multi_info_read(m_curlMulti, &curlMsgInQueue))) {
- if (curlMsg->msg != CURLMSG_DONE)
- continue;
- Task * maybeCurlTask = NULL;
- curl_easy_getinfo(curlMsg->easy_handle, CURLINFO_PRIVATE, &maybeCurlTask);
- IAssert(maybeCurlTask, "curl easy handle didn't have any private data...");
- CurlTask * curlTask = dynamic_cast<CurlTask *>(maybeCurlTask);
- IAssert(curlTask, "curl easy handle had corrupted private data...");
- curlTask->curlDone(curlMsg->data.result);
}
// Finally, let's destroy tasks that no longer are necessary.
@@ -533,6 +536,22 @@ int Balau::TaskMan::mainLoop() {
return m_stopCode;
}
+void Balau::TaskMan::registerCurlHandle(Balau::CurlTask * curlTask) {
+ m_curlGotNewHandles = true;
+ curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_SHARE, s_curlShared);
+ curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_PRIVATE, curlTask);
+ curl_multi_add_handle(m_curlMulti, curlTask->m_curlHandle);
+}
+
+void Balau::TaskMan::unregisterCurlHandle(Balau::CurlTask * curlTask) {
+ void * ptr = NULL;
+ curl_easy_getinfo(curlTask->m_curlHandle, CURLINFO_PRIVATE, &ptr);
+ if (!ptr)
+ return;
+ curl_easy_setopt(curlTask->m_curlHandle, CURLOPT_PRIVATE, static_cast<void *>(0));
+ curl_multi_remove_handle(m_curlMulti, curlTask->m_curlHandle);
+}
+
void Balau::TaskMan::iRegisterTask(Balau::Task * t, Balau::Task * stick, Events::TaskEvent * event) {
if (stick) {
IAssert(!event, "inconsistent");