#pragma once #include #include #ifndef _WIN32 #include #include #endif #include #ifdef _MSC_VER #include #else #include #endif #include #include #include #include #include #ifndef _MSC_VER namespace gnu = __gnu_cxx; #endif struct ares_channeldata; namespace Balau { class TaskScheduler; class CurlTask; namespace Events { class TaskEvent; }; class TaskMan { public: class TaskManThread : public Thread { public: virtual ~TaskManThread(); virtual void * proc(); void stopMe(int code = 0) { m_taskMan->stopMe(code); } private: TaskMan * m_taskMan = NULL; }; TaskMan(); ~TaskMan(); int mainLoop(); static TaskMan * getDefaultTaskMan(); struct ev_loop * getLoop() { return m_loop; } void signalTask(Task * t); static void stop(int code); void stopMe(int code = 0); static TaskManThread * createThreadedTaskMan() { TaskManThread * r = new TaskManThread(); r->threadStart(); return r; } static void stopThreadedTaskMan(TaskManThread * tmt) { tmt->stopMe(0); tmt->join(); delete tmt; } bool stopped() { return m_stopped; } template static T * registerTask(T * t, Task * stick = NULL) { TaskMan::iRegisterTask(t, stick, NULL); return t; } template static T * registerTask(T * t, Events::TaskEvent * event) { TaskMan::iRegisterTask(t, NULL, event); return t; } typedef std::function AresHostCallback; void getHostByName(const Balau::String & name, int family, AresHostCallback callback); private: static void iRegisterTask(Task * t, Task * stick, Events::TaskEvent * event); static void registerAsyncOp(AsyncOperation * op); void * getStack(); void freeStack(void * stack); void addToPending(Task * t); static void asyncIdleReady(void * param) { TaskMan * taskMan = (TaskMan *) param; taskMan->asyncIdleReady(); } void asyncIdleReady() { m_evt.send(); } #ifndef _WIN32 coro_context m_returnContext; #else void * m_fiber; #endif friend class Task; friend class CurlTask; friend class TaskScheduler; template friend T * createAsyncOp(T * op); #ifdef _MSC_VER typedef stdext::hash_set taskHash_t; #else struct taskHasher { size_t operator()(const Task * t) const { return reinterpret_cast(t); } }; typedef gnu::hash_set taskHash_t; #endif taskHash_t m_tasks, m_signaledTasks; Queue m_pendingAdd; struct ev_loop * m_loop; ev::async m_evt; std::queue m_stacks; int m_nStacks; int m_stopCode = 0; bool m_stopped = false; bool m_allowedToSignal = false; ev::timer m_curlTimer; CURLM * m_curlMulti = NULL; int m_curlStillRunning = 0; bool m_curlGotNewHandles = false; static int curlSocketCallbackStatic(CURL * easy, curl_socket_t s, int what, void * userp, void * socketp); int curlSocketCallback(CURL * easy, curl_socket_t s, int what, void * socketp); void curlSocketEventCallback(ev::io & w, int revents); static int curlMultiTimerCallbackStatic(CURLM * multi, long timeout_ms, void * userp); int curlMultiTimerCallback(CURLM * multi, long timeout_ms); void curlMultiTimerEventCallback(ev::timer & w, int revents); void registerCurlHandle(CurlTask * curlTask); void unregisterCurlHandle(CurlTask * curlTask); struct ares_channeldata * m_aresChannel = NULL; static const int ARES_MAX_SOCKETS = 2; curl_socket_t m_aresSockets[ARES_MAX_SOCKETS]; ev::io * m_aresSocketEvents[ARES_MAX_SOCKETS]; ev::timer m_aresTimer; static void aresSocketCallbackStatic(void * data, curl_socket_t s, int read, int write); void aresSocketCallback(curl_socket_t s, int read, int write); void aresSocketEventCallback(ev::io & w, int revents); void aresTimerEventCallback(ev::timer & w, int revents); static void aresHostCallback(void * arg, int status, int timeouts, struct hostent * hostent); TaskMan(const TaskMan &) = delete; TaskMan & operator=(const TaskMan &) = delete; }; template T * createAsyncOp(T * op) { TaskMan::registerAsyncOp(op); return op; } };