From 06674e57649d536cf19715524ee40c5ad4a9026d Mon Sep 17 00:00:00 2001 From: "Nicolas \"Pixel\" Noble" Date: Sat, 1 Sep 2012 00:12:35 -0700 Subject: Adding async operations; first step towards tossing libeio out. --- includes/Async.h | 102 +++++++++++++++++++++++++++++++++++++++++++++++++++++ includes/Printer.h | 1 + includes/TaskMan.h | 24 ++++++++----- 3 files changed, 119 insertions(+), 8 deletions(-) create mode 100644 includes/Async.h (limited to 'includes') diff --git a/includes/Async.h b/includes/Async.h new file mode 100644 index 0000000..bd6ce7e --- /dev/null +++ b/includes/Async.h @@ -0,0 +1,102 @@ +#pragma once + +#include +#include +#include +#include + +namespace Balau { + +class AsyncManager; +class AsyncFinishWorker; + +typedef void (*IdleReadyCallback_t)(void *); + +class AsyncOperation { + protected: + virtual void run() { } + virtual void finish() { } + virtual void done() { } + virtual bool needsMainQueue() { return true; } + virtual bool needsFinishWorker() { return false; } + virtual bool needsSynchronousCallback() { return true; } + protected: + virtual ~AsyncOperation() { } + private: + Queue * m_idleQueue = NULL; + IdleReadyCallback_t m_idleReadyCallback = NULL; + void * m_idleReadyParam = NULL; + void finalize(); + + friend class AsyncManager; + friend class AsyncFinishWorker; +}; + +class AsyncFinishWorker : public Thread { + public: + AsyncFinishWorker(AsyncManager * async, Queue * queue) : m_async(async), m_queue(queue) { } + virtual void * proc(); + AsyncManager * m_async; + Queue * m_queue; + bool m_stopping = false; + volatile bool m_stopped = false; +}; + +class AsyncManager : public Thread { + public: + void setFinishers(int minIdle, int maxIdle) { + AAssert(minIdle < maxIdle, "Minimum number of threads needs to be less than maximum number of threads."); + m_minIdle = minIdle; + m_maxIdle = maxIdle; + } + void setIdleReadyCallback(IdleReadyCallback_t idleReadyCallback, void * param); + void queueOp(AsyncOperation * op); + void idle(); + bool isReady() { return m_ready; } + + protected: + virtual void threadExit(); + + private: + void checkIdle(); + void killOneFinisher(); + void startOneFinisher(); + void joinStoppedFinishers(); + void stopAllWorkers(); + virtual void * proc(); + struct TLS { + Queue idleQueue; + IdleReadyCallback_t idleReadyCallback; + void * idleReadyParam; + }; + TLS * getTLS() { + TLS * tls = (TLS *) m_tlsManager.getTLS(); + if (!tls) { + tls = new TLS(); + m_tlsManager.setTLS(tls); + m_TLSes.push(tls); + Atomic::Increment(&m_numTLSes); + } + return tls; + } + Queue m_queue; + Queue m_finished; + Queue m_TLSes; + volatile int m_numTLSes = 0; + PThreadsTLSManager m_tlsManager; + std::list m_workers; + int m_numFinishers = 0; + int m_numFinishersIdle = 0; + int m_minIdle = 1; + int m_maxIdle = 4; + bool m_stopping = false; + volatile bool m_ready = false; + volatile bool m_stopperPushed = false; + + void incIdle() { Atomic::Increment(&m_numFinishersIdle); } + void decIdle() { Atomic::Decrement(&m_numFinishersIdle); } + + friend class AsyncFinishWorker; +}; + +}; diff --git a/includes/Printer.h b/includes/Printer.h index e215fc1..8fd3674 100644 --- a/includes/Printer.h +++ b/includes/Printer.h @@ -38,6 +38,7 @@ enum { E_THREAD = 64, E_OUTPUT = 128, E_HTTPSERVER = 256, + E_ASYNC = 512, }; class Printer { diff --git a/includes/TaskMan.h b/includes/TaskMan.h index d675d75..d39598f 100644 --- a/includes/TaskMan.h +++ b/includes/TaskMan.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -17,12 +18,6 @@ namespace Balau { class Task; class TaskScheduler; -namespace Events { - -class Async; - -}; - class TaskMan { public: TaskMan(); @@ -37,9 +32,17 @@ class TaskMan { bool stopped() { return m_stopped; } private: static void registerTask(Task * t, Task * stick); + static void registerAsyncOp(AsyncOperation * op); void * getStack(); void freeStack(void * stack); void addToPending(Task * t); + static void asyncIdleReady(void * param) { + TaskMan * taskMan = (TaskMan *) param; + taskMan->asyncIdleReady(); + } + void asyncIdleReady() { + m_evt.send(); + } #ifndef _WIN32 coro_context m_returnContext; #else @@ -49,20 +52,25 @@ class TaskMan { friend class TaskScheduler; template friend T * createTask(T * t, Task * stick = NULL); + template + friend T * createAsyncOp(T * op); struct taskHasher { size_t operator()(const Task * t) const { return reinterpret_cast(t); } }; typedef gnu::hash_set taskHash_t; taskHash_t m_tasks, m_signaledTasks; Queue m_pendingAdd; - bool m_stopped = false; struct ev_loop * m_loop; - bool m_allowedToSignal = false; ev::async m_evt; std::queue m_stacks; int m_nStacks; int m_stopCode = 0; + bool m_stopped = false; + bool m_allowedToSignal = false; }; template T * createTask(T * t, Task * stick) { TaskMan::registerTask(t, stick); return t; } +template +T * createAsyncOp(T * op) { TaskMan::registerAsyncOp(op); return op; } + }; -- cgit v1.2.3