summaryrefslogtreecommitdiff
path: root/src/TaskMan.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/TaskMan.cc')
-rw-r--r--src/TaskMan.cc102
1 files changed, 73 insertions, 29 deletions
diff --git a/src/TaskMan.cc b/src/TaskMan.cc
index 49c326a..177cec1 100644
--- a/src/TaskMan.cc
+++ b/src/TaskMan.cc
@@ -101,7 +101,6 @@ void Balau::TaskScheduler::stopAll(int code) {
m_taskManagers.pop();
altQueue.push(tm);
tm->addToPending(new Stopper(code));
- tm->m_evt.send();
}
while (!altQueue.empty()) {
tm = altQueue.front();
@@ -130,7 +129,6 @@ void * Balau::TaskScheduler::proc() {
m_lock.leave();
Printer::elog(E_TASK, "TaskScheduler popped task %s at %p; adding to TaskMan %p", t->getName(), t, tm);
tm->addToPending(t);
- tm->m_evt.send();
}
Printer::elog(E_TASK, "TaskScheduler stopping.");
return NULL;
@@ -145,6 +143,16 @@ void asyncDummy(ev::async & w, int revents) {
Balau::Printer::elog(Balau::E_TASK, "TaskMan is getting woken up...");
}
+void Balau::TaskMan::stopMe(int code) {
+ Task * t = Task::getCurrentTask();
+ if (t->getTaskMan() == this) {
+ m_stopped = true;
+ m_stopCode = code;
+ } else {
+ addToPending(new Stopper(code));
+ }
+}
+
Balau::TaskMan::TaskMan() {
#ifndef _WIN32
coro_create(&m_returnContext, 0, 0, 0, 0);
@@ -168,6 +176,8 @@ Balau::TaskMan::TaskMan() {
}
#ifdef _WIN32
+namespace {
+
class WinSocketStartup : public Balau::AtStart {
public:
WinSocketStartup() : AtStart(5) { }
@@ -179,6 +189,8 @@ class WinSocketStartup : public Balau::AtStart {
};
static WinSocketStartup wsa;
+
+};
#endif
Balau::TaskMan * Balau::TaskMan::getDefaultTaskMan() { return localTaskMan.get(); }
@@ -191,14 +203,16 @@ Balau::TaskMan::~TaskMan() {
}
s_scheduler.unregisterTaskMan(this);
// probably way more work to do here in order to clean up tasks from that thread
+ m_evt.stop();
ev_loop_destroy(m_loop);
}
void * Balau::TaskMan::getStack() {
+ if (!Task::needsStacks())
+ return NULL;
void * r = NULL;
if (m_nStacks == 0) {
- if (Task::needsStacks())
- r = malloc(Task::stackSize());
+ r = malloc(Task::stackSize());
} else {
r = m_stacks.front();
m_stacks.pop();
@@ -230,30 +244,23 @@ int Balau::TaskMan::mainLoop() {
s_async.setIdleReadyCallback(asyncIdleReady, this);
do {
- bool noWait = false;
-
Printer::elog(E_TASK, "TaskMan::mainLoop() at %p with m_tasks.size = %li", this, m_tasks.size());
- // checking "STARTING" tasks, and running them once; also try to build the status of the noWait boolean.
+ // checking "STARTING" tasks, and running them once
while ((iH = starting.begin()) != starting.end()) {
Task * t = *iH;
IAssert(t->getStatus() == Task::STARTING, "Got task at %p in the starting list, but isn't starting.", t);
t->switchTo();
IAssert(t->getStatus() != Task::STARTING, "Task at %p got switchedTo, but still is 'STARTING'.", t);
starting.erase(iH);
- if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) {
- noWait = true;
+ if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED))
stopped.insert(t);
- }
- if (t->getStatus() == Task::YIELDED) {
- noWait = true;
+ if (t->getStatus() == Task::YIELDED)
yielded.insert(t);
- }
}
// if we begin that loop with any pending task, just don't loop, so we can add them immediately.
- if (!m_pendingAdd.isEmpty() || !yielded.empty())
- noWait = true;
+ bool noWait = !m_pendingAdd.isEmpty() || !yielded.empty() || !stopped.empty();
// libev's event "loop". We always runs it once though.
m_allowedToSignal = true;
@@ -261,7 +268,7 @@ int Balau::TaskMan::mainLoop() {
ev_run(m_loop, noWait || m_stopped ? EVRUN_NOWAIT : EVRUN_ONCE);
Printer::elog(E_TASK, "TaskMan at %p Getting out of libev main loop", this);
- // calling async's idle loop here
+ // calling async's idle
s_async.idle();
// let's check what task got stopped, and signal them
@@ -276,7 +283,7 @@ int Balau::TaskMan::mainLoop() {
// let's check who got signaled, and call them
for (Task * t : m_signaledTasks) {
Printer::elog(E_TASK, "TaskMan at %p Switching to task %p (%s - %s) that got signaled somehow.", this, t, t->getName(), ClassName(t).c_str());
- IAssert(t->getStatus() == Task::IDLE || t->getStatus() == Task::YIELDED, "We're switching to a non-idle/yielded task at %p... ? status = %i", t, t->getStatus());
+ IAssert(t->getStatus() == Task::SLEEPING || t->getStatus() == Task::YIELDED, "We're switching to a non-sleeping/yielded task at %p... ? status = %i", t, t->getStatus());
bool wasYielded = t->getStatus() == Task::YIELDED;
t->switchTo();
if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) {
@@ -313,7 +320,7 @@ int Balau::TaskMan::mainLoop() {
Printer::elog(E_TASK, "TaskMan at %p popped task %p...", this, t);
IAssert(m_tasks.find(t) == m_tasks.end(), "TaskMan got task %p twice... ?", t);
ev_now_update(m_loop);
- t->setup(this, getStack());
+ t->setup(this, t->isStackless() ? NULL : getStack());
m_tasks.insert(t);
starting.insert(t);
}
@@ -345,12 +352,14 @@ int Balau::TaskMan::mainLoop() {
return m_stopCode;
}
-void Balau::TaskMan::registerTask(Balau::Task * t, Balau::Task * stick) {
+void Balau::TaskMan::iRegisterTask(Balau::Task * t, Balau::Task * stick, Events::TaskEvent * event) {
if (stick) {
+ IAssert(!event, "inconsistent");
TaskMan * tm = stick->getTaskMan();
tm->addToPending(t);
- tm->m_evt.send();
} else {
+ if (event)
+ event->attachToTask(t);
s_scheduler.registerTask(t);
}
}
@@ -361,6 +370,7 @@ void Balau::TaskMan::registerAsyncOp(Balau::AsyncOperation * op) {
void Balau::TaskMan::addToPending(Balau::Task * t) {
m_pendingAdd.push(t);
+ m_evt.send();
}
void Balau::TaskMan::signalTask(Task * t) {
@@ -373,17 +383,51 @@ void Balau::TaskMan::stop(int code) {
s_scheduler.stopAll(code);
}
-class ThreadedTaskMan : public Balau::Thread {
- virtual void * proc() {
+void * Balau::TaskMan::TaskManThread::proc() {
+ bool success = false;
+ m_taskMan = NULL;
+ try {
m_taskMan = new Balau::TaskMan();
m_taskMan->mainLoop();
- return NULL;
+ success = true;
}
- Balau::TaskMan * m_taskMan;
-};
+ catch (Exit e) {
+ Printer::log(M_ERROR, "We shouldn't have gotten an Exit exception here... exitting anyway");
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_ERROR, "%s", str.to_charp());
+ }
+ catch (RessourceException e) {
+ Printer::log(M_ERROR | M_ALERT, "The TaskMan thread got a ressource problem: %s", e.getMsg());
+ const char * details = e.getDetails();
+ if (details)
+ Printer::log(M_ERROR, " %s", details);
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_DEBUG, "%s", str.to_charp());
+ }
+ catch (GeneralException e) {
+ Printer::log(M_ERROR | M_ALERT, "The TaskMan thread caused an exception: %s", e.getMsg());
+ const char * details = e.getDetails();
+ if (details)
+ Printer::log(M_ERROR, " %s", details);
+ auto trace = e.getTrace();
+ for (String & str : trace)
+ Printer::log(M_DEBUG, "%s", str.to_charp());
+ }
+ catch (...) {
+ Printer::log(M_ERROR | M_ALERT, "The TaskMan thread caused an unknown exception");
+ }
+ if (!success) {
+ if (m_taskMan)
+ delete m_taskMan;
+ m_taskMan = NULL;
+ TaskMan::stop(-1);
+ }
+ return NULL;
+}
-Balau::Thread * Balau::TaskMan::createThreadedTaskMan() {
- Thread * r = new ThreadedTaskMan();
- r->threadStart();
- return r;
+Balau::TaskMan::TaskManThread::~TaskManThread() {
+ if (m_taskMan)
+ delete m_taskMan;
}