From 06674e57649d536cf19715524ee40c5ad4a9026d Mon Sep 17 00:00:00 2001 From: "Nicolas \"Pixel\" Noble" Date: Sat, 1 Sep 2012 00:12:35 -0700 Subject: Adding async operations; first step towards tossing libeio out. --- Makefile | 2 + includes/Async.h | 102 +++++++++++++++++++++++++++++++ includes/Printer.h | 1 + includes/TaskMan.h | 24 +++++--- src/Async.cc | 171 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/TaskMan.cc | 30 +++++++++ tests/test-Async.cc | 56 +++++++++++++++++ 7 files changed, 378 insertions(+), 8 deletions(-) create mode 100644 includes/Async.h create mode 100644 src/Async.cc create mode 100644 tests/test-Async.cc diff --git a/Makefile b/Makefile index 478ff0f..6516423 100644 --- a/Makefile +++ b/Makefile @@ -118,6 +118,7 @@ Exceptions.cc \ \ Local.cc \ Threads.cc \ +Async.cc \ \ BString.cc \ Main.cc \ @@ -178,6 +179,7 @@ test-Sanity.cc \ test-String.cc \ test-Tasks.cc \ test-Threads.cc \ +test-Async.cc \ test-Handles.cc \ test-Sockets.cc \ test-Http.cc \ diff --git a/includes/Async.h b/includes/Async.h new file mode 100644 index 0000000..bd6ce7e --- /dev/null +++ b/includes/Async.h @@ -0,0 +1,102 @@ +#pragma once + +#include +#include +#include +#include + +namespace Balau { + +class AsyncManager; +class AsyncFinishWorker; + +typedef void (*IdleReadyCallback_t)(void *); + +class AsyncOperation { + protected: + virtual void run() { } + virtual void finish() { } + virtual void done() { } + virtual bool needsMainQueue() { return true; } + virtual bool needsFinishWorker() { return false; } + virtual bool needsSynchronousCallback() { return true; } + protected: + virtual ~AsyncOperation() { } + private: + Queue * m_idleQueue = NULL; + IdleReadyCallback_t m_idleReadyCallback = NULL; + void * m_idleReadyParam = NULL; + void finalize(); + + friend class AsyncManager; + friend class AsyncFinishWorker; +}; + +class AsyncFinishWorker : public Thread { + public: + AsyncFinishWorker(AsyncManager * async, Queue * queue) : m_async(async), m_queue(queue) { } + virtual void * proc(); + AsyncManager * m_async; + Queue * m_queue; + bool m_stopping = false; + volatile bool m_stopped = false; +}; + +class AsyncManager : public Thread { + public: + void setFinishers(int minIdle, int maxIdle) { + AAssert(minIdle < maxIdle, "Minimum number of threads needs to be less than maximum number of threads."); + m_minIdle = minIdle; + m_maxIdle = maxIdle; + } + void setIdleReadyCallback(IdleReadyCallback_t idleReadyCallback, void * param); + void queueOp(AsyncOperation * op); + void idle(); + bool isReady() { return m_ready; } + + protected: + virtual void threadExit(); + + private: + void checkIdle(); + void killOneFinisher(); + void startOneFinisher(); + void joinStoppedFinishers(); + void stopAllWorkers(); + virtual void * proc(); + struct TLS { + Queue idleQueue; + IdleReadyCallback_t idleReadyCallback; + void * idleReadyParam; + }; + TLS * getTLS() { + TLS * tls = (TLS *) m_tlsManager.getTLS(); + if (!tls) { + tls = new TLS(); + m_tlsManager.setTLS(tls); + m_TLSes.push(tls); + Atomic::Increment(&m_numTLSes); + } + return tls; + } + Queue m_queue; + Queue m_finished; + Queue m_TLSes; + volatile int m_numTLSes = 0; + PThreadsTLSManager m_tlsManager; + std::list m_workers; + int m_numFinishers = 0; + int m_numFinishersIdle = 0; + int m_minIdle = 1; + int m_maxIdle = 4; + bool m_stopping = false; + volatile bool m_ready = false; + volatile bool m_stopperPushed = false; + + void incIdle() { Atomic::Increment(&m_numFinishersIdle); } + void decIdle() { Atomic::Decrement(&m_numFinishersIdle); } + + friend class AsyncFinishWorker; +}; + +}; diff --git a/includes/Printer.h b/includes/Printer.h index e215fc1..8fd3674 100644 --- a/includes/Printer.h +++ b/includes/Printer.h @@ -38,6 +38,7 @@ enum { E_THREAD = 64, E_OUTPUT = 128, E_HTTPSERVER = 256, + E_ASYNC = 512, }; class Printer { diff --git a/includes/TaskMan.h b/includes/TaskMan.h index d675d75..d39598f 100644 --- a/includes/TaskMan.h +++ b/includes/TaskMan.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -17,12 +18,6 @@ namespace Balau { class Task; class TaskScheduler; -namespace Events { - -class Async; - -}; - class TaskMan { public: TaskMan(); @@ -37,9 +32,17 @@ class TaskMan { bool stopped() { return m_stopped; } private: static void registerTask(Task * t, Task * stick); + static void registerAsyncOp(AsyncOperation * op); void * getStack(); void freeStack(void * stack); void addToPending(Task * t); + static void asyncIdleReady(void * param) { + TaskMan * taskMan = (TaskMan *) param; + taskMan->asyncIdleReady(); + } + void asyncIdleReady() { + m_evt.send(); + } #ifndef _WIN32 coro_context m_returnContext; #else @@ -49,20 +52,25 @@ class TaskMan { friend class TaskScheduler; template friend T * createTask(T * t, Task * stick = NULL); + template + friend T * createAsyncOp(T * op); struct taskHasher { size_t operator()(const Task * t) const { return reinterpret_cast(t); } }; typedef gnu::hash_set taskHash_t; taskHash_t m_tasks, m_signaledTasks; Queue m_pendingAdd; - bool m_stopped = false; struct ev_loop * m_loop; - bool m_allowedToSignal = false; ev::async m_evt; std::queue m_stacks; int m_nStacks; int m_stopCode = 0; + bool m_stopped = false; + bool m_allowedToSignal = false; }; template T * createTask(T * t, Task * stick) { TaskMan::registerTask(t, stick); return t; } +template +T * createAsyncOp(T * op) { TaskMan::registerAsyncOp(op); return op; } + }; diff --git a/src/Async.cc b/src/Async.cc new file mode 100644 index 0000000..829c677 --- /dev/null +++ b/src/Async.cc @@ -0,0 +1,171 @@ +#include "Async.h" + +namespace { + +class AsyncStopper : public Balau::AsyncOperation { + public: + virtual bool needsSynchronousCallback() { return false; } + virtual void done() { delete this; } +}; + +}; + +void Balau::AsyncManager::setIdleReadyCallback(void (*callback)(void *), void * param) { + while (!m_ready); + TLS * tls = getTLS(); + tls->idleReadyCallback = callback; + tls->idleReadyParam = param; +} + +void Balau::AsyncManager::queueOp(AsyncOperation * op) { + if (m_stopperPushed) { + Printer::elog(E_ASYNC, "AsyncManager's queue has been stopped; running operation %p on this thread instead.", op); + op->run(); + op->finalize(); + return; + } + while (!m_ready); + TLS * tls = getTLS(); + Printer::elog(E_ASYNC, "Queuing operation at %p", op); + if (op->needsSynchronousCallback()) { + Printer::elog(E_ASYNC, "Operation at %p needs synchronous callback, copying values; idleQueue = %p; idleReadyCallback = %p; idleReadyParam = %p", &tls->idleQueue, tls->idleReadyCallback, tls->idleReadyParam); + op->m_idleQueue = &tls->idleQueue; + op->m_idleReadyCallback = tls->idleReadyCallback; + op->m_idleReadyParam = tls->idleReadyParam; + } + if (op->needsMainQueue()) + m_queue.push(op); + else + m_finished.push(op); +} + +void Balau::AsyncManager::checkIdle() { + if (m_numFinishersIdle > m_maxIdle) + killOneFinisher(); + if (m_numFinishersIdle < m_minIdle) + startOneFinisher(); + joinStoppedFinishers(); +} + +void Balau::AsyncManager::killOneFinisher() { + Printer::elog(E_ASYNC, "Too many workers idle (%i / %i), killing one.", m_numFinishersIdle, m_maxIdle); + m_finished.push(new AsyncStopper()); +} + +void Balau::AsyncManager::startOneFinisher() { + AsyncFinishWorker * worker = new AsyncFinishWorker(this, &m_finished); + Printer::elog(E_ASYNC, "Not enough workers idle (%i / %i), starting one at %p.", m_numFinishersIdle, m_minIdle, worker); + m_workers.push_back(worker); + m_numFinishers++; + worker->threadStart(); +} + +void Balau::AsyncManager::joinStoppedFinishers() { + for (auto i = m_workers.begin(); i != m_workers.end(); i++) { + AsyncFinishWorker * worker = *i; + if (!worker->m_stopped) + continue; + Printer::elog(E_ASYNC, "Joining stopped worker at %p", worker); + m_numFinishers--; + m_workers.erase(i); + worker->join(); + delete worker; + break; + } +} + +void * Balau::AsyncManager::proc() { + Printer::elog(E_ASYNC, "AsyncManager thread starting up"); + m_tlsManager.init(); + m_ready = true; + while (!m_stopping) { + checkIdle(); + AsyncOperation * op = m_queue.pop(); + Printer::elog(E_ASYNC, "AsyncManager got an operation at %p", op); + if (dynamic_cast(op)) { + Printer::elog(E_ASYNC, "AsyncManager got a stopper operation"); + m_stopping = true; + } + Printer::elog(E_ASYNC, "AsyncManager running operation at %p", op); + op->run(); + if (op->needsFinishWorker()) { + Printer::elog(E_ASYNC, "AsyncManager pushing operation at %p in the finisher's queue", op); + m_finished.push(op); + } else { + Printer::elog(E_ASYNC, "AsyncManager finalizing operation at %p", op); + op->finalize(); + } + } + stopAllWorkers(); + + while (Atomic::Prefetch::Decrement(&m_numTLSes)) { + TLS * tls = m_TLSes.pop(); + while (!tls->idleQueue.isEmpty()); + } + + return NULL; +} + +void * Balau::AsyncFinishWorker::proc() { + Printer::elog(E_ASYNC, "AsyncFinishWorker thread starting up"); + AsyncOperation * op; + while (!m_stopping) { + m_async->incIdle(); + op = m_queue->pop(); + m_async->decIdle(); + Printer::elog(E_ASYNC, "AsyncFinishWorker got operation at %p", op); + if (dynamic_cast(op)) { + Printer::elog(E_ASYNC, "AsyncFinishWorker got a stopper operation"); + m_stopping = true; + } + op->finalize(); + } + + m_stopped = true; + Printer::elog(E_ASYNC, "AsyncFinishWorker thread stopping"); + + return NULL; +} + +void Balau::AsyncOperation::finalize() { + Printer::elog(E_ASYNC, "AsyncOperation::finalize() is finishing operation %p", this); + finish(); + if (needsSynchronousCallback()) { + Printer::elog(E_ASYNC, "AsyncOperation::finalize() is pushing operation %p to its idle queue", this); + bool wasEmpty = m_idleQueue->isEmpty(); + m_idleQueue->push(this); + Printer::elog(E_ASYNC, "AsyncOperation::finalize() has pushed operation %p to its idle queue; wasEmpty = %s; callback = %p", this, wasEmpty ? "true" : "false", m_idleReadyCallback); + if (wasEmpty && m_idleReadyCallback) { + Printer::elog(E_ASYNC, "AsyncOperation::finalize() is calling ready callback to wake up main loop"); + m_idleReadyCallback(m_idleReadyParam); + } + } else { + Printer::elog(E_ASYNC, "AsyncOperation::finalize() is wrapping up operation %p", this); + done(); + } +} + +void Balau::AsyncManager::idle() { + Printer::elog(E_ASYNC, "AsyncManager::idle() is running"); + while (!m_ready); + AsyncOperation * op; + TLS * tls = getTLS(); + while ((op = tls->idleQueue.pop(false))) { + Printer::elog(E_ASYNC, "AsyncManager::idle() is wrapping up operation %p", op); + op->done(); + } +} + +void Balau::AsyncManager::threadExit() { + Printer::elog(E_ASYNC, "AsyncManager thread is being asked to stop; creating stopper"); + if (Atomic::CmpXChgBool(&m_stopperPushed, true, false)) + m_queue.push(new AsyncStopper()); +} + +void Balau::AsyncManager::stopAllWorkers() { + Printer::elog(E_ASYNC, "AsyncManager thread is being stopping and joining %i workers", m_numFinishers); + for (int i = 0; i < m_numFinishers; i++) + m_finished.push(new AsyncStopper()); + for (auto worker : m_workers) + worker->join(); +} diff --git a/src/TaskMan.cc b/src/TaskMan.cc index 17f0a40..49c326a 100644 --- a/src/TaskMan.cc +++ b/src/TaskMan.cc @@ -1,8 +1,24 @@ +#include "Async.h" #include "TaskMan.h" #include "Task.h" #include "Main.h" #include "Local.h" +static Balau::AsyncManager s_async; + +namespace { + +class AsyncStarter : public Balau::AtStart, Balau::AtExit { + public: + AsyncStarter() : AtStart(1000), AtExit(0) { } + void doStart() { + s_async.threadStart(); + } + void doExit() { + s_async.join(); + } +}; + class Stopper : public Balau::Task { public: Stopper(int code) : m_code(code) { } @@ -12,6 +28,10 @@ class Stopper : public Balau::Task { int m_code; }; +}; + +static AsyncStarter s_asyncStarter; + void Stopper::Do() { getTaskMan()->stopMe(m_code); } @@ -207,6 +227,8 @@ int Balau::TaskMan::mainLoop() { if (t->getStatus() == Task::STARTING) starting.insert(t); + s_async.setIdleReadyCallback(asyncIdleReady, this); + do { bool noWait = false; @@ -239,6 +261,9 @@ int Balau::TaskMan::mainLoop() { ev_run(m_loop, noWait || m_stopped ? EVRUN_NOWAIT : EVRUN_ONCE); Printer::elog(E_TASK, "TaskMan at %p Getting out of libev main loop", this); + // calling async's idle loop here + s_async.idle(); + // let's check what task got stopped, and signal them for (Task * t : stopped) { IAssert((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED), "Task %p in stopped list but isn't stopped.", t); @@ -316,6 +341,7 @@ int Balau::TaskMan::mainLoop() { } while (!m_stopped); Printer::elog(E_TASK, "TaskManager at %p stopping.", this); + s_async.setIdleReadyCallback(NULL, NULL); return m_stopCode; } @@ -329,6 +355,10 @@ void Balau::TaskMan::registerTask(Balau::Task * t, Balau::Task * stick) { } } +void Balau::TaskMan::registerAsyncOp(Balau::AsyncOperation * op) { + s_async.queueOp(op); +} + void Balau::TaskMan::addToPending(Balau::Task * t) { m_pendingAdd.push(t); } diff --git a/tests/test-Async.cc b/tests/test-Async.cc new file mode 100644 index 0000000..5f93217 --- /dev/null +++ b/tests/test-Async.cc @@ -0,0 +1,56 @@ +#include +#include +#include +#include + +using namespace Balau; + +class AsyncOpTest : public AsyncOperation { + public: + AsyncOpTest(Events::Custom * evt) : m_evt(evt) { } + virtual void run() { + Printer::log(M_STATUS, "Async operation running"); + TAssert(!m_ran); + TAssert(!m_finished); + TAssert(!m_done); + m_ran = true; + } + virtual void finish() { + Printer::log(M_STATUS, "Async operation finishing"); + TAssert(m_ran); + TAssert(!m_finished); + TAssert(!m_done); + m_finished = true; + } + virtual void done() { + Printer::log(M_STATUS, "Async operation done"); + TAssert(m_ran); + TAssert(m_finished); + TAssert(!m_done); + m_done = true; + m_evt->doSignal(); + } + virtual bool needsFinishWorker() { return true; } + + bool m_ran = false; + bool m_finished = false; + bool m_done = false; + Events::Custom * m_evt; +}; + +void MainTask::Do() { + Printer::log(M_STATUS, "Test::Async running."); + + Events::Custom evt; + AsyncOpTest * op = createAsyncOp(new AsyncOpTest(&evt)); + waitFor(&evt); + TAssert(!evt.gotSignal()); + yield(); + TAssert(evt.gotSignal()); + TAssert(op->m_ran); + TAssert(op->m_finished); + TAssert(op->m_done); + delete op; + + Printer::log(M_STATUS, "Test::Async passed."); +} -- cgit v1.2.3