diff options
| author | Nicolas "Pixel" Noble <pixel@nobis-crew.org> | 2012-09-01 00:12:35 -0700 | 
|---|---|---|
| committer | Nicolas "Pixel" Noble <pixel@nobis-crew.org> | 2012-09-01 00:12:35 -0700 | 
| commit | 06674e57649d536cf19715524ee40c5ad4a9026d (patch) | |
| tree | d519fc72a6e3946150cc5ee21ed01cb73e82747b /includes | |
| parent | d2db92f6b5d275b3150deb7a52a8da142a7cc953 (diff) | |
Adding async operations; first step towards tossing libeio out.
Diffstat (limited to 'includes')
| -rw-r--r-- | includes/Async.h | 102 | ||||
| -rw-r--r-- | includes/Printer.h | 1 | ||||
| -rw-r--r-- | includes/TaskMan.h | 24 | 
3 files changed, 119 insertions, 8 deletions
| diff --git a/includes/Async.h b/includes/Async.h new file mode 100644 index 0000000..bd6ce7e --- /dev/null +++ b/includes/Async.h @@ -0,0 +1,102 @@ +#pragma once + +#include <Atomic.h> +#include <Exceptions.h> +#include <Local.h> +#include <Threads.h> + +namespace Balau { + +class AsyncManager; +class AsyncFinishWorker; + +typedef void (*IdleReadyCallback_t)(void *); + +class AsyncOperation { +  protected: +    virtual void run() { } +    virtual void finish() { } +    virtual void done() { } +    virtual bool needsMainQueue() { return true; } +    virtual bool needsFinishWorker() { return false; } +    virtual bool needsSynchronousCallback() { return true; } +  protected: +      virtual ~AsyncOperation() { } +  private: +    Queue<AsyncOperation> * m_idleQueue = NULL; +    IdleReadyCallback_t m_idleReadyCallback = NULL; +    void * m_idleReadyParam = NULL; +    void finalize(); + +    friend class AsyncManager; +    friend class AsyncFinishWorker; +}; + +class AsyncFinishWorker : public Thread { +  public: +      AsyncFinishWorker(AsyncManager * async, Queue<AsyncOperation> * queue) : m_async(async), m_queue(queue) { } +    virtual void * proc(); +    AsyncManager * m_async; +    Queue<AsyncOperation> * m_queue; +    bool m_stopping = false; +    volatile bool m_stopped = false; +}; + +class AsyncManager : public Thread { +  public: +    void setFinishers(int minIdle, int maxIdle) { +        AAssert(minIdle < maxIdle, "Minimum number of threads needs to be less than maximum number of threads."); +        m_minIdle = minIdle; +        m_maxIdle = maxIdle; +    } +    void setIdleReadyCallback(IdleReadyCallback_t idleReadyCallback, void * param); +    void queueOp(AsyncOperation * op); +    void idle(); +    bool isReady() { return m_ready; } + +  protected: +    virtual void threadExit(); + +  private: +    void checkIdle(); +    void killOneFinisher(); +    void startOneFinisher(); +    void joinStoppedFinishers(); +    void stopAllWorkers(); +    virtual void * proc(); +    struct TLS { +        Queue<AsyncOperation> idleQueue; +        IdleReadyCallback_t idleReadyCallback; +        void * idleReadyParam; +    }; +    TLS * getTLS() { +        TLS * tls = (TLS *) m_tlsManager.getTLS(); +        if (!tls) { +            tls = new TLS(); +            m_tlsManager.setTLS(tls); +            m_TLSes.push(tls); +            Atomic::Increment(&m_numTLSes); +        } +        return tls; +    } +    Queue<AsyncOperation> m_queue; +    Queue<AsyncOperation> m_finished; +    Queue<TLS> m_TLSes; +    volatile int m_numTLSes = 0; +    PThreadsTLSManager m_tlsManager; +    std::list<AsyncFinishWorker *> m_workers; +    int m_numFinishers = 0; +    int m_numFinishersIdle = 0; +    int m_minIdle = 1; +    int m_maxIdle = 4; +    bool m_stopping = false; +    volatile bool m_ready = false; +    volatile bool m_stopperPushed = false; + +    void incIdle() { Atomic::Increment(&m_numFinishersIdle); } +    void decIdle() { Atomic::Decrement(&m_numFinishersIdle); } + +    friend class AsyncFinishWorker; +}; + +}; diff --git a/includes/Printer.h b/includes/Printer.h index e215fc1..8fd3674 100644 --- a/includes/Printer.h +++ b/includes/Printer.h @@ -38,6 +38,7 @@ enum {      E_THREAD = 64,      E_OUTPUT = 128,      E_HTTPSERVER = 256, +    E_ASYNC = 512,  };  class Printer { diff --git a/includes/TaskMan.h b/includes/TaskMan.h index d675d75..d39598f 100644 --- a/includes/TaskMan.h +++ b/includes/TaskMan.h @@ -7,6 +7,7 @@  #include <ev++.h>  #include <ext/hash_set>  #include <queue> +#include <Async.h>  #include <Threads.h>  #include <Exceptions.h> @@ -17,12 +18,6 @@ namespace Balau {  class Task;  class TaskScheduler; -namespace Events { - -class Async; - -}; -  class TaskMan {    public:        TaskMan(); @@ -37,9 +32,17 @@ class TaskMan {      bool stopped() { return m_stopped; }    private:      static void registerTask(Task * t, Task * stick); +    static void registerAsyncOp(AsyncOperation * op);      void * getStack();      void freeStack(void * stack);      void addToPending(Task * t); +    static void asyncIdleReady(void * param) { +        TaskMan * taskMan = (TaskMan *) param; +        taskMan->asyncIdleReady(); +    } +    void asyncIdleReady() { +        m_evt.send(); +    }  #ifndef _WIN32      coro_context m_returnContext;  #else @@ -49,20 +52,25 @@ class TaskMan {      friend class TaskScheduler;      template<class T>      friend T * createTask(T * t, Task * stick = NULL); +    template<class T> +    friend T * createAsyncOp(T * op);      struct taskHasher { size_t operator()(const Task * t) const { return reinterpret_cast<uintptr_t>(t); } };      typedef gnu::hash_set<Task *, taskHasher> taskHash_t;      taskHash_t m_tasks, m_signaledTasks;      Queue<Task> m_pendingAdd; -    bool m_stopped = false;      struct ev_loop * m_loop; -    bool m_allowedToSignal = false;      ev::async m_evt;      std::queue<void *> m_stacks;      int m_nStacks;      int m_stopCode = 0; +    bool m_stopped = false; +    bool m_allowedToSignal = false;  };  template<class T>  T * createTask(T * t, Task * stick) { TaskMan::registerTask(t, stick); return t; } +template<class T> +T * createAsyncOp(T * op) { TaskMan::registerAsyncOp(op); return op; } +  }; | 
