summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile2
-rw-r--r--includes/Async.h102
-rw-r--r--includes/Printer.h1
-rw-r--r--includes/TaskMan.h24
-rw-r--r--src/Async.cc171
-rw-r--r--src/TaskMan.cc30
-rw-r--r--tests/test-Async.cc56
7 files changed, 378 insertions, 8 deletions
diff --git a/Makefile b/Makefile
index 478ff0f..6516423 100644
--- a/Makefile
+++ b/Makefile
@@ -118,6 +118,7 @@ Exceptions.cc \
\
Local.cc \
Threads.cc \
+Async.cc \
\
BString.cc \
Main.cc \
@@ -178,6 +179,7 @@ test-Sanity.cc \
test-String.cc \
test-Tasks.cc \
test-Threads.cc \
+test-Async.cc \
test-Handles.cc \
test-Sockets.cc \
test-Http.cc \
diff --git a/includes/Async.h b/includes/Async.h
new file mode 100644
index 0000000..bd6ce7e
--- /dev/null
+++ b/includes/Async.h
@@ -0,0 +1,102 @@
+#pragma once
+
+#include <Atomic.h>
+#include <Exceptions.h>
+#include <Local.h>
+#include <Threads.h>
+
+namespace Balau {
+
+class AsyncManager;
+class AsyncFinishWorker;
+
+typedef void (*IdleReadyCallback_t)(void *);
+
+class AsyncOperation {
+ protected:
+ virtual void run() { }
+ virtual void finish() { }
+ virtual void done() { }
+ virtual bool needsMainQueue() { return true; }
+ virtual bool needsFinishWorker() { return false; }
+ virtual bool needsSynchronousCallback() { return true; }
+ protected:
+ virtual ~AsyncOperation() { }
+ private:
+ Queue<AsyncOperation> * m_idleQueue = NULL;
+ IdleReadyCallback_t m_idleReadyCallback = NULL;
+ void * m_idleReadyParam = NULL;
+ void finalize();
+
+ friend class AsyncManager;
+ friend class AsyncFinishWorker;
+};
+
+class AsyncFinishWorker : public Thread {
+ public:
+ AsyncFinishWorker(AsyncManager * async, Queue<AsyncOperation> * queue) : m_async(async), m_queue(queue) { }
+ virtual void * proc();
+ AsyncManager * m_async;
+ Queue<AsyncOperation> * m_queue;
+ bool m_stopping = false;
+ volatile bool m_stopped = false;
+};
+
+class AsyncManager : public Thread {
+ public:
+ void setFinishers(int minIdle, int maxIdle) {
+ AAssert(minIdle < maxIdle, "Minimum number of threads needs to be less than maximum number of threads.");
+ m_minIdle = minIdle;
+ m_maxIdle = maxIdle;
+ }
+ void setIdleReadyCallback(IdleReadyCallback_t idleReadyCallback, void * param);
+ void queueOp(AsyncOperation * op);
+ void idle();
+ bool isReady() { return m_ready; }
+
+ protected:
+ virtual void threadExit();
+
+ private:
+ void checkIdle();
+ void killOneFinisher();
+ void startOneFinisher();
+ void joinStoppedFinishers();
+ void stopAllWorkers();
+ virtual void * proc();
+ struct TLS {
+ Queue<AsyncOperation> idleQueue;
+ IdleReadyCallback_t idleReadyCallback;
+ void * idleReadyParam;
+ };
+ TLS * getTLS() {
+ TLS * tls = (TLS *) m_tlsManager.getTLS();
+ if (!tls) {
+ tls = new TLS();
+ m_tlsManager.setTLS(tls);
+ m_TLSes.push(tls);
+ Atomic::Increment(&m_numTLSes);
+ }
+ return tls;
+ }
+ Queue<AsyncOperation> m_queue;
+ Queue<AsyncOperation> m_finished;
+ Queue<TLS> m_TLSes;
+ volatile int m_numTLSes = 0;
+ PThreadsTLSManager m_tlsManager;
+ std::list<AsyncFinishWorker *> m_workers;
+ int m_numFinishers = 0;
+ int m_numFinishersIdle = 0;
+ int m_minIdle = 1;
+ int m_maxIdle = 4;
+ bool m_stopping = false;
+ volatile bool m_ready = false;
+ volatile bool m_stopperPushed = false;
+
+ void incIdle() { Atomic::Increment(&m_numFinishersIdle); }
+ void decIdle() { Atomic::Decrement(&m_numFinishersIdle); }
+
+ friend class AsyncFinishWorker;
+};
+
+};
diff --git a/includes/Printer.h b/includes/Printer.h
index e215fc1..8fd3674 100644
--- a/includes/Printer.h
+++ b/includes/Printer.h
@@ -38,6 +38,7 @@ enum {
E_THREAD = 64,
E_OUTPUT = 128,
E_HTTPSERVER = 256,
+ E_ASYNC = 512,
};
class Printer {
diff --git a/includes/TaskMan.h b/includes/TaskMan.h
index d675d75..d39598f 100644
--- a/includes/TaskMan.h
+++ b/includes/TaskMan.h
@@ -7,6 +7,7 @@
#include <ev++.h>
#include <ext/hash_set>
#include <queue>
+#include <Async.h>
#include <Threads.h>
#include <Exceptions.h>
@@ -17,12 +18,6 @@ namespace Balau {
class Task;
class TaskScheduler;
-namespace Events {
-
-class Async;
-
-};
-
class TaskMan {
public:
TaskMan();
@@ -37,9 +32,17 @@ class TaskMan {
bool stopped() { return m_stopped; }
private:
static void registerTask(Task * t, Task * stick);
+ static void registerAsyncOp(AsyncOperation * op);
void * getStack();
void freeStack(void * stack);
void addToPending(Task * t);
+ static void asyncIdleReady(void * param) {
+ TaskMan * taskMan = (TaskMan *) param;
+ taskMan->asyncIdleReady();
+ }
+ void asyncIdleReady() {
+ m_evt.send();
+ }
#ifndef _WIN32
coro_context m_returnContext;
#else
@@ -49,20 +52,25 @@ class TaskMan {
friend class TaskScheduler;
template<class T>
friend T * createTask(T * t, Task * stick = NULL);
+ template<class T>
+ friend T * createAsyncOp(T * op);
struct taskHasher { size_t operator()(const Task * t) const { return reinterpret_cast<uintptr_t>(t); } };
typedef gnu::hash_set<Task *, taskHasher> taskHash_t;
taskHash_t m_tasks, m_signaledTasks;
Queue<Task> m_pendingAdd;
- bool m_stopped = false;
struct ev_loop * m_loop;
- bool m_allowedToSignal = false;
ev::async m_evt;
std::queue<void *> m_stacks;
int m_nStacks;
int m_stopCode = 0;
+ bool m_stopped = false;
+ bool m_allowedToSignal = false;
};
template<class T>
T * createTask(T * t, Task * stick) { TaskMan::registerTask(t, stick); return t; }
+template<class T>
+T * createAsyncOp(T * op) { TaskMan::registerAsyncOp(op); return op; }
+
};
diff --git a/src/Async.cc b/src/Async.cc
new file mode 100644
index 0000000..829c677
--- /dev/null
+++ b/src/Async.cc
@@ -0,0 +1,171 @@
+#include "Async.h"
+
+namespace {
+
+class AsyncStopper : public Balau::AsyncOperation {
+ public:
+ virtual bool needsSynchronousCallback() { return false; }
+ virtual void done() { delete this; }
+};
+
+};
+
+void Balau::AsyncManager::setIdleReadyCallback(void (*callback)(void *), void * param) {
+ while (!m_ready);
+ TLS * tls = getTLS();
+ tls->idleReadyCallback = callback;
+ tls->idleReadyParam = param;
+}
+
+void Balau::AsyncManager::queueOp(AsyncOperation * op) {
+ if (m_stopperPushed) {
+ Printer::elog(E_ASYNC, "AsyncManager's queue has been stopped; running operation %p on this thread instead.", op);
+ op->run();
+ op->finalize();
+ return;
+ }
+ while (!m_ready);
+ TLS * tls = getTLS();
+ Printer::elog(E_ASYNC, "Queuing operation at %p", op);
+ if (op->needsSynchronousCallback()) {
+ Printer::elog(E_ASYNC, "Operation at %p needs synchronous callback, copying values; idleQueue = %p; idleReadyCallback = %p; idleReadyParam = %p", &tls->idleQueue, tls->idleReadyCallback, tls->idleReadyParam);
+ op->m_idleQueue = &tls->idleQueue;
+ op->m_idleReadyCallback = tls->idleReadyCallback;
+ op->m_idleReadyParam = tls->idleReadyParam;
+ }
+ if (op->needsMainQueue())
+ m_queue.push(op);
+ else
+ m_finished.push(op);
+}
+
+void Balau::AsyncManager::checkIdle() {
+ if (m_numFinishersIdle > m_maxIdle)
+ killOneFinisher();
+ if (m_numFinishersIdle < m_minIdle)
+ startOneFinisher();
+ joinStoppedFinishers();
+}
+
+void Balau::AsyncManager::killOneFinisher() {
+ Printer::elog(E_ASYNC, "Too many workers idle (%i / %i), killing one.", m_numFinishersIdle, m_maxIdle);
+ m_finished.push(new AsyncStopper());
+}
+
+void Balau::AsyncManager::startOneFinisher() {
+ AsyncFinishWorker * worker = new AsyncFinishWorker(this, &m_finished);
+ Printer::elog(E_ASYNC, "Not enough workers idle (%i / %i), starting one at %p.", m_numFinishersIdle, m_minIdle, worker);
+ m_workers.push_back(worker);
+ m_numFinishers++;
+ worker->threadStart();
+}
+
+void Balau::AsyncManager::joinStoppedFinishers() {
+ for (auto i = m_workers.begin(); i != m_workers.end(); i++) {
+ AsyncFinishWorker * worker = *i;
+ if (!worker->m_stopped)
+ continue;
+ Printer::elog(E_ASYNC, "Joining stopped worker at %p", worker);
+ m_numFinishers--;
+ m_workers.erase(i);
+ worker->join();
+ delete worker;
+ break;
+ }
+}
+
+void * Balau::AsyncManager::proc() {
+ Printer::elog(E_ASYNC, "AsyncManager thread starting up");
+ m_tlsManager.init();
+ m_ready = true;
+ while (!m_stopping) {
+ checkIdle();
+ AsyncOperation * op = m_queue.pop();
+ Printer::elog(E_ASYNC, "AsyncManager got an operation at %p", op);
+ if (dynamic_cast<AsyncStopper *>(op)) {
+ Printer::elog(E_ASYNC, "AsyncManager got a stopper operation");
+ m_stopping = true;
+ }
+ Printer::elog(E_ASYNC, "AsyncManager running operation at %p", op);
+ op->run();
+ if (op->needsFinishWorker()) {
+ Printer::elog(E_ASYNC, "AsyncManager pushing operation at %p in the finisher's queue", op);
+ m_finished.push(op);
+ } else {
+ Printer::elog(E_ASYNC, "AsyncManager finalizing operation at %p", op);
+ op->finalize();
+ }
+ }
+ stopAllWorkers();
+
+ while (Atomic::Prefetch::Decrement(&m_numTLSes)) {
+ TLS * tls = m_TLSes.pop();
+ while (!tls->idleQueue.isEmpty());
+ }
+
+ return NULL;
+}
+
+void * Balau::AsyncFinishWorker::proc() {
+ Printer::elog(E_ASYNC, "AsyncFinishWorker thread starting up");
+ AsyncOperation * op;
+ while (!m_stopping) {
+ m_async->incIdle();
+ op = m_queue->pop();
+ m_async->decIdle();
+ Printer::elog(E_ASYNC, "AsyncFinishWorker got operation at %p", op);
+ if (dynamic_cast<AsyncStopper *>(op)) {
+ Printer::elog(E_ASYNC, "AsyncFinishWorker got a stopper operation");
+ m_stopping = true;
+ }
+ op->finalize();
+ }
+
+ m_stopped = true;
+ Printer::elog(E_ASYNC, "AsyncFinishWorker thread stopping");
+
+ return NULL;
+}
+
+void Balau::AsyncOperation::finalize() {
+ Printer::elog(E_ASYNC, "AsyncOperation::finalize() is finishing operation %p", this);
+ finish();
+ if (needsSynchronousCallback()) {
+ Printer::elog(E_ASYNC, "AsyncOperation::finalize() is pushing operation %p to its idle queue", this);
+ bool wasEmpty = m_idleQueue->isEmpty();
+ m_idleQueue->push(this);
+ Printer::elog(E_ASYNC, "AsyncOperation::finalize() has pushed operation %p to its idle queue; wasEmpty = %s; callback = %p", this, wasEmpty ? "true" : "false", m_idleReadyCallback);
+ if (wasEmpty && m_idleReadyCallback) {
+ Printer::elog(E_ASYNC, "AsyncOperation::finalize() is calling ready callback to wake up main loop");
+ m_idleReadyCallback(m_idleReadyParam);
+ }
+ } else {
+ Printer::elog(E_ASYNC, "AsyncOperation::finalize() is wrapping up operation %p", this);
+ done();
+ }
+}
+
+void Balau::AsyncManager::idle() {
+ Printer::elog(E_ASYNC, "AsyncManager::idle() is running");
+ while (!m_ready);
+ AsyncOperation * op;
+ TLS * tls = getTLS();
+ while ((op = tls->idleQueue.pop(false))) {
+ Printer::elog(E_ASYNC, "AsyncManager::idle() is wrapping up operation %p", op);
+ op->done();
+ }
+}
+
+void Balau::AsyncManager::threadExit() {
+ Printer::elog(E_ASYNC, "AsyncManager thread is being asked to stop; creating stopper");
+ if (Atomic::CmpXChgBool(&m_stopperPushed, true, false))
+ m_queue.push(new AsyncStopper());
+}
+
+void Balau::AsyncManager::stopAllWorkers() {
+ Printer::elog(E_ASYNC, "AsyncManager thread is being stopping and joining %i workers", m_numFinishers);
+ for (int i = 0; i < m_numFinishers; i++)
+ m_finished.push(new AsyncStopper());
+ for (auto worker : m_workers)
+ worker->join();
+}
diff --git a/src/TaskMan.cc b/src/TaskMan.cc
index 17f0a40..49c326a 100644
--- a/src/TaskMan.cc
+++ b/src/TaskMan.cc
@@ -1,8 +1,24 @@
+#include "Async.h"
#include "TaskMan.h"
#include "Task.h"
#include "Main.h"
#include "Local.h"
+static Balau::AsyncManager s_async;
+
+namespace {
+
+class AsyncStarter : public Balau::AtStart, Balau::AtExit {
+ public:
+ AsyncStarter() : AtStart(1000), AtExit(0) { }
+ void doStart() {
+ s_async.threadStart();
+ }
+ void doExit() {
+ s_async.join();
+ }
+};
+
class Stopper : public Balau::Task {
public:
Stopper(int code) : m_code(code) { }
@@ -12,6 +28,10 @@ class Stopper : public Balau::Task {
int m_code;
};
+};
+
+static AsyncStarter s_asyncStarter;
+
void Stopper::Do() {
getTaskMan()->stopMe(m_code);
}
@@ -207,6 +227,8 @@ int Balau::TaskMan::mainLoop() {
if (t->getStatus() == Task::STARTING)
starting.insert(t);
+ s_async.setIdleReadyCallback(asyncIdleReady, this);
+
do {
bool noWait = false;
@@ -239,6 +261,9 @@ int Balau::TaskMan::mainLoop() {
ev_run(m_loop, noWait || m_stopped ? EVRUN_NOWAIT : EVRUN_ONCE);
Printer::elog(E_TASK, "TaskMan at %p Getting out of libev main loop", this);
+ // calling async's idle loop here
+ s_async.idle();
+
// let's check what task got stopped, and signal them
for (Task * t : stopped) {
IAssert((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED), "Task %p in stopped list but isn't stopped.", t);
@@ -316,6 +341,7 @@ int Balau::TaskMan::mainLoop() {
} while (!m_stopped);
Printer::elog(E_TASK, "TaskManager at %p stopping.", this);
+ s_async.setIdleReadyCallback(NULL, NULL);
return m_stopCode;
}
@@ -329,6 +355,10 @@ void Balau::TaskMan::registerTask(Balau::Task * t, Balau::Task * stick) {
}
}
+void Balau::TaskMan::registerAsyncOp(Balau::AsyncOperation * op) {
+ s_async.queueOp(op);
+}
+
void Balau::TaskMan::addToPending(Balau::Task * t) {
m_pendingAdd.push(t);
}
diff --git a/tests/test-Async.cc b/tests/test-Async.cc
new file mode 100644
index 0000000..5f93217
--- /dev/null
+++ b/tests/test-Async.cc
@@ -0,0 +1,56 @@
+#include <Async.h>
+#include <Task.h>
+#include <TaskMan.h>
+#include <Main.h>
+
+using namespace Balau;
+
+class AsyncOpTest : public AsyncOperation {
+ public:
+ AsyncOpTest(Events::Custom * evt) : m_evt(evt) { }
+ virtual void run() {
+ Printer::log(M_STATUS, "Async operation running");
+ TAssert(!m_ran);
+ TAssert(!m_finished);
+ TAssert(!m_done);
+ m_ran = true;
+ }
+ virtual void finish() {
+ Printer::log(M_STATUS, "Async operation finishing");
+ TAssert(m_ran);
+ TAssert(!m_finished);
+ TAssert(!m_done);
+ m_finished = true;
+ }
+ virtual void done() {
+ Printer::log(M_STATUS, "Async operation done");
+ TAssert(m_ran);
+ TAssert(m_finished);
+ TAssert(!m_done);
+ m_done = true;
+ m_evt->doSignal();
+ }
+ virtual bool needsFinishWorker() { return true; }
+
+ bool m_ran = false;
+ bool m_finished = false;
+ bool m_done = false;
+ Events::Custom * m_evt;
+};
+
+void MainTask::Do() {
+ Printer::log(M_STATUS, "Test::Async running.");
+
+ Events::Custom evt;
+ AsyncOpTest * op = createAsyncOp(new AsyncOpTest(&evt));
+ waitFor(&evt);
+ TAssert(!evt.gotSignal());
+ yield();
+ TAssert(evt.gotSignal());
+ TAssert(op->m_ran);
+ TAssert(op->m_finished);
+ TAssert(op->m_done);
+ delete op;
+
+ Printer::log(M_STATUS, "Test::Async passed.");
+}