summaryrefslogtreecommitdiff
path: root/includes
diff options
context:
space:
mode:
Diffstat (limited to 'includes')
-rw-r--r--includes/Async.h102
-rw-r--r--includes/Printer.h1
-rw-r--r--includes/TaskMan.h24
3 files changed, 119 insertions, 8 deletions
diff --git a/includes/Async.h b/includes/Async.h
new file mode 100644
index 0000000..bd6ce7e
--- /dev/null
+++ b/includes/Async.h
@@ -0,0 +1,102 @@
+#pragma once
+
+#include <Atomic.h>
+#include <Exceptions.h>
+#include <Local.h>
+#include <Threads.h>
+
+namespace Balau {
+
+class AsyncManager;
+class AsyncFinishWorker;
+
+typedef void (*IdleReadyCallback_t)(void *);
+
+class AsyncOperation {
+ protected:
+ virtual void run() { }
+ virtual void finish() { }
+ virtual void done() { }
+ virtual bool needsMainQueue() { return true; }
+ virtual bool needsFinishWorker() { return false; }
+ virtual bool needsSynchronousCallback() { return true; }
+ protected:
+ virtual ~AsyncOperation() { }
+ private:
+ Queue<AsyncOperation> * m_idleQueue = NULL;
+ IdleReadyCallback_t m_idleReadyCallback = NULL;
+ void * m_idleReadyParam = NULL;
+ void finalize();
+
+ friend class AsyncManager;
+ friend class AsyncFinishWorker;
+};
+
+class AsyncFinishWorker : public Thread {
+ public:
+ AsyncFinishWorker(AsyncManager * async, Queue<AsyncOperation> * queue) : m_async(async), m_queue(queue) { }
+ virtual void * proc();
+ AsyncManager * m_async;
+ Queue<AsyncOperation> * m_queue;
+ bool m_stopping = false;
+ volatile bool m_stopped = false;
+};
+
+class AsyncManager : public Thread {
+ public:
+ void setFinishers(int minIdle, int maxIdle) {
+ AAssert(minIdle < maxIdle, "Minimum number of threads needs to be less than maximum number of threads.");
+ m_minIdle = minIdle;
+ m_maxIdle = maxIdle;
+ }
+ void setIdleReadyCallback(IdleReadyCallback_t idleReadyCallback, void * param);
+ void queueOp(AsyncOperation * op);
+ void idle();
+ bool isReady() { return m_ready; }
+
+ protected:
+ virtual void threadExit();
+
+ private:
+ void checkIdle();
+ void killOneFinisher();
+ void startOneFinisher();
+ void joinStoppedFinishers();
+ void stopAllWorkers();
+ virtual void * proc();
+ struct TLS {
+ Queue<AsyncOperation> idleQueue;
+ IdleReadyCallback_t idleReadyCallback;
+ void * idleReadyParam;
+ };
+ TLS * getTLS() {
+ TLS * tls = (TLS *) m_tlsManager.getTLS();
+ if (!tls) {
+ tls = new TLS();
+ m_tlsManager.setTLS(tls);
+ m_TLSes.push(tls);
+ Atomic::Increment(&m_numTLSes);
+ }
+ return tls;
+ }
+ Queue<AsyncOperation> m_queue;
+ Queue<AsyncOperation> m_finished;
+ Queue<TLS> m_TLSes;
+ volatile int m_numTLSes = 0;
+ PThreadsTLSManager m_tlsManager;
+ std::list<AsyncFinishWorker *> m_workers;
+ int m_numFinishers = 0;
+ int m_numFinishersIdle = 0;
+ int m_minIdle = 1;
+ int m_maxIdle = 4;
+ bool m_stopping = false;
+ volatile bool m_ready = false;
+ volatile bool m_stopperPushed = false;
+
+ void incIdle() { Atomic::Increment(&m_numFinishersIdle); }
+ void decIdle() { Atomic::Decrement(&m_numFinishersIdle); }
+
+ friend class AsyncFinishWorker;
+};
+
+};
diff --git a/includes/Printer.h b/includes/Printer.h
index e215fc1..8fd3674 100644
--- a/includes/Printer.h
+++ b/includes/Printer.h
@@ -38,6 +38,7 @@ enum {
E_THREAD = 64,
E_OUTPUT = 128,
E_HTTPSERVER = 256,
+ E_ASYNC = 512,
};
class Printer {
diff --git a/includes/TaskMan.h b/includes/TaskMan.h
index d675d75..d39598f 100644
--- a/includes/TaskMan.h
+++ b/includes/TaskMan.h
@@ -7,6 +7,7 @@
#include <ev++.h>
#include <ext/hash_set>
#include <queue>
+#include <Async.h>
#include <Threads.h>
#include <Exceptions.h>
@@ -17,12 +18,6 @@ namespace Balau {
class Task;
class TaskScheduler;
-namespace Events {
-
-class Async;
-
-};
-
class TaskMan {
public:
TaskMan();
@@ -37,9 +32,17 @@ class TaskMan {
bool stopped() { return m_stopped; }
private:
static void registerTask(Task * t, Task * stick);
+ static void registerAsyncOp(AsyncOperation * op);
void * getStack();
void freeStack(void * stack);
void addToPending(Task * t);
+ static void asyncIdleReady(void * param) {
+ TaskMan * taskMan = (TaskMan *) param;
+ taskMan->asyncIdleReady();
+ }
+ void asyncIdleReady() {
+ m_evt.send();
+ }
#ifndef _WIN32
coro_context m_returnContext;
#else
@@ -49,20 +52,25 @@ class TaskMan {
friend class TaskScheduler;
template<class T>
friend T * createTask(T * t, Task * stick = NULL);
+ template<class T>
+ friend T * createAsyncOp(T * op);
struct taskHasher { size_t operator()(const Task * t) const { return reinterpret_cast<uintptr_t>(t); } };
typedef gnu::hash_set<Task *, taskHasher> taskHash_t;
taskHash_t m_tasks, m_signaledTasks;
Queue<Task> m_pendingAdd;
- bool m_stopped = false;
struct ev_loop * m_loop;
- bool m_allowedToSignal = false;
ev::async m_evt;
std::queue<void *> m_stacks;
int m_nStacks;
int m_stopCode = 0;
+ bool m_stopped = false;
+ bool m_allowedToSignal = false;
};
template<class T>
T * createTask(T * t, Task * stick) { TaskMan::registerTask(t, stick); return t; }
+template<class T>
+T * createAsyncOp(T * op) { TaskMan::registerAsyncOp(op); return op; }
+
};