summaryrefslogtreecommitdiff
path: root/src/TaskMan.cc
blob: 0fc4668c33a90d79704671bdbf2e6828caa69909 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#include "TaskMan.h"
#include "Task.h"
#include "Main.h"
#include "Local.h"

static Balau::DefaultTmpl<Balau::TaskMan> defaultTaskMan(50);
static Balau::LocalTmpl<Balau::TaskMan> localTaskMan;

Balau::TaskMan::TaskMan() : m_stopped(false), m_allowedToSignal(false) {
#ifndef _WIN32
    coro_create(&m_returnContext, 0, 0, 0, 0);
#else
    m_fiber = ConvertThreadToFiber(NULL);
    Assert(m_fiber);
#endif
    if (!localTaskMan.getGlobal()) {
        localTaskMan.setGlobal(this);
        m_loop = ev_default_loop(EVFLAG_AUTO);
    } else {
        m_loop = ev_loop_new(EVFLAG_AUTO);
    }
}

#ifdef _WIN32
class WinSocketStartup : public Balau::AtStart {
  public:
      WinSocketStartup() : AtStart(5) { }
    virtual void doStart() {
        WSADATA wsaData;
        int r = WSAStartup(MAKEWORD(2, 0), &wsaData);
        Assert(r == 0);
    }
};

static WinSocketStartup wsa;
#endif

Balau::TaskMan * Balau::TaskMan::getTaskMan() { return localTaskMan.get(); }

Balau::TaskMan::~TaskMan() {
    Assert(localTaskMan.getGlobal() != this);
    ev_loop_destroy(m_loop);
}

void Balau::TaskMan::mainLoop() {
    // We need at least one round before bailing :)
    do {
        taskList_t::iterator iL;
        taskHash_t::iterator iH;
        Task * t;
        bool noWait = false;

        Printer::elog(E_TASK, "TaskMan::mainLoop() with m_tasks.size = %i", m_tasks.size());

        // checking "STARTING" tasks, and running them once; also try to build the status of the noWait boolean.
        for (iH = m_tasks.begin(); iH != m_tasks.end(); iH++) {
            t = *iH;
            if (t->getStatus() == Task::STARTING)
                t->switchTo();
            if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED))
                noWait = true;
        }

        // probably means we have pending tasks; or none at all, for some reason. Don't wait on it forever.
        if (m_tasks.size() == 0)
            noWait = true;

        m_pendingLock.enter();
        if (m_pendingAdd.size() != 0)
            noWait = true;
        m_pendingLock.leave();

        // libev's event "loop". We always runs it once though.
        m_allowedToSignal = true;
        Printer::elog(E_TASK, "Going to libev main loop");
        ev_run(m_loop, noWait ? EVRUN_NOWAIT : EVRUN_ONCE);
        Printer::elog(E_TASK, "Getting out of libev main loop");

        // let's check what task got stopped, and signal them
        for (iH = m_tasks.begin(); iH != m_tasks.end(); iH++) {
            t = *iH;
            if (((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) &&
                 (t->m_waitedBy.size() != 0)) {
                Task::waitedByList_t::iterator i;
                for (i = t->m_waitedBy.begin(); i != t->m_waitedBy.end(); i++) {
                    Events::TaskEvent * e = *i;
                    e->doSignal();
                }
            }
        }
        m_allowedToSignal = false;

        // let's check who got signaled, and call them
        for (iH = m_signaledTasks.begin(); iH != m_signaledTasks.end(); iH++) {
            t = *iH;
            Printer::elog(E_TASK, "Switching to task %p (%s - %s) that got signaled somehow.", t, t->getName(), ClassName(t).c_str());
            Assert(t->getStatus() == Task::IDLE);
            t->switchTo();
        }
        m_signaledTasks.clear();

        m_pendingLock.enter();
        // Adding tasks that were added, maybe from other threads
        for (iL = m_pendingAdd.begin(); iL != m_pendingAdd.end(); iL++) {
            t = *iL;
            Assert(m_tasks.find(t) == m_tasks.end());
            m_tasks.insert(t);
        }
        m_pendingAdd.clear();
        m_pendingLock.leave();

        // Finally, let's destroy tasks that no longer are necessary.
        bool didDelete;
        do {
            didDelete = false;
            for (iH = m_tasks.begin(); iH != m_tasks.end(); iH++) {
                t = *iH;
                if (((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) &&
                     (t->m_waitedBy.size() == 0)) {
                    delete t;
                    m_tasks.erase(iH);
                    didDelete = true;
                    break;
                }
            }
        } while (didDelete);

    } while (!m_stopped && m_tasks.size() != 0);
}

void Balau::TaskMan::registerTask(Balau::Task * t) {
    m_pendingLock.enter();
    m_pendingAdd.push_back(t);
    m_pendingLock.leave();
}

void Balau::TaskMan::signalTask(Task * t) {
    Assert(m_tasks.find(t) != m_tasks.end());
    Assert(m_allowedToSignal);
    m_signaledTasks.insert(t);
}