summaryrefslogtreecommitdiff
path: root/includes/TaskMan.h
blob: acfa2c9b3589272ced8365d09aa25e00a8285723 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#pragma once

#include <stdint.h>
#include <curl/curl.h>
#ifndef _WIN32
#include <coro.h>
#include <netdb.h>
#endif
#include <ev++.h>
#ifdef _MSC_VER
#include <hash_set>
#else
#include <ext/hash_set>
#endif
#include <queue>
#include <Async.h>
#include <Threads.h>
#include <Exceptions.h>
#include <Task.h>

#ifndef _MSC_VER
namespace gnu = __gnu_cxx;
#endif

struct ares_channeldata;

namespace Balau {

class TaskScheduler;
class CurlTask;

namespace Events {

class TaskEvent;

};

class TaskMan {
  public:
    class TaskManThread : public Thread {
      public:
          virtual ~TaskManThread();
        virtual void * proc();
        void stopMe(int code = 0) { m_taskMan->stopMe(code); }
      private:
        TaskMan * m_taskMan = NULL;
    };

      TaskMan();
      ~TaskMan();
    int mainLoop();
    static TaskMan * getDefaultTaskMan();
    struct ev_loop * getLoop() { return m_loop; }
    void signalTask(Task * t);
    static void stop(int code);
    void stopMe(int code = 0);
    static TaskManThread * createThreadedTaskMan() {
        TaskManThread * r = new TaskManThread();
        r->threadStart();
        return r;
    }
    static void stopThreadedTaskMan(TaskManThread * tmt) {
        tmt->stopMe(0);
        tmt->join();
        delete tmt;
    }
    bool stopped() { return m_stopped; }
    template<class T>
    static T * registerTask(T * t, Task * stick = NULL) { TaskMan::iRegisterTask(t, stick, NULL); return t; }
    template<class T>
    static T * registerTask(T * t, Events::TaskEvent * event) { TaskMan::iRegisterTask(t, NULL, event); return t; }

    typedef std::function<void(int status, int timeouts, struct hostent * hostent)> AresHostCallback;
    void getHostByName(const Balau::String & name, int family, AresHostCallback callback);

  private:
    static void iRegisterTask(Task * t, Task * stick, Events::TaskEvent * event);
    static void registerAsyncOp(AsyncOperation * op);
    void * getStack();
    void freeStack(void * stack);
    void addToPending(Task * t);
    static void asyncIdleReady(void * param) {
        TaskMan * taskMan = (TaskMan *) param;
        taskMan->asyncIdleReady();
    }
    void asyncIdleReady() {
        m_evt.send();
    }
#ifndef _WIN32
    coro_context m_returnContext;
#else
    void * m_fiber;
#endif
    friend class Task;
    friend class CurlTask;
    friend class TaskScheduler;
    template<class T>
    friend T * createAsyncOp(T * op);
#ifdef _MSC_VER
    typedef stdext::hash_set<Task *> taskHash_t;
#else
    struct taskHasher { size_t operator()(const Task * t) const { return reinterpret_cast<uintptr_t>(t); } };
    typedef gnu::hash_set<Task *, taskHasher> taskHash_t;
#endif
    taskHash_t m_tasks, m_signaledTasks;
    Queue<Task> m_pendingAdd;
    struct ev_loop * m_loop;
    ev::async m_evt;
    std::queue<void *> m_stacks;
    int m_nStacks;
    int m_stopCode = 0;
    bool m_stopped = false;
    bool m_allowedToSignal = false;

    ev::timer m_curlTimer;
    CURLM * m_curlMulti = NULL;
    int m_curlStillRunning = 0;
    bool m_curlGotNewHandles = false;
    static int curlSocketCallbackStatic(CURL * easy, curl_socket_t s, int what, void * userp, void * socketp);
    int curlSocketCallback(CURL * easy, curl_socket_t s, int what, void * socketp);
    void curlSocketEventCallback(ev::io & w, int revents);
    static int curlMultiTimerCallbackStatic(CURLM * multi, long timeout_ms, void * userp);
    int curlMultiTimerCallback(CURLM * multi, long timeout_ms);
    void curlMultiTimerEventCallback(ev::timer & w, int revents);
    void registerCurlHandle(CurlTask * curlTask);
    void unregisterCurlHandle(CurlTask * curlTask);

    struct ares_channeldata * m_aresChannel = NULL;
    static const int ARES_MAX_SOCKETS = 2;
    curl_socket_t m_aresSockets[ARES_MAX_SOCKETS];
    ev::io * m_aresSocketEvents[ARES_MAX_SOCKETS];
    ev::timer m_aresTimer;
    static void aresSocketCallbackStatic(void * data, curl_socket_t s, int read, int write);
    void aresSocketCallback(curl_socket_t s, int read, int write);
    void aresSocketEventCallback(ev::io & w, int revents);
    void aresTimerEventCallback(ev::timer & w, int revents);
    static void aresHostCallback(void * arg, int status, int timeouts, struct hostent * hostent);

      TaskMan(const TaskMan &) = delete;
    TaskMan & operator=(const TaskMan &) = delete;
};

template<class T>
T * createAsyncOp(T * op) { TaskMan::registerAsyncOp(op); return op; }

};