diff options
m--------- | LuaJIT | 0 | ||||
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | includes/Async.h | 102 | ||||
-rw-r--r-- | includes/BLua.h | 1 | ||||
-rw-r--r-- | includes/BStream.h | 6 | ||||
-rw-r--r-- | includes/HttpServer.h | 2 | ||||
-rw-r--r-- | includes/Input.h | 6 | ||||
-rw-r--r-- | includes/Local.h | 12 | ||||
-rw-r--r-- | includes/LuaTask.h | 3 | ||||
-rw-r--r-- | includes/Output.h | 6 | ||||
-rw-r--r-- | includes/Printer.h | 5 | ||||
-rw-r--r-- | includes/SimpleMustache.h | 6 | ||||
-rw-r--r-- | includes/Socket.h | 6 | ||||
-rw-r--r-- | includes/Task.h | 25 | ||||
-rw-r--r-- | includes/TaskMan.h | 21 | ||||
-rw-r--r-- | includes/ZHandle.h | 4 | ||||
-rw-r--r-- | src/Async.cc | 173 | ||||
-rw-r--r-- | src/BStream.cc | 2 | ||||
-rw-r--r-- | src/Input.cc | 2 | ||||
-rw-r--r-- | src/Local.cc | 28 | ||||
-rw-r--r-- | src/LuaTask.cc | 2 | ||||
-rw-r--r-- | src/Output.cc | 2 | ||||
-rw-r--r-- | src/Printer.cc | 2 | ||||
-rw-r--r-- | src/Socket.cc | 5 | ||||
-rw-r--r-- | src/Task.cc | 9 | ||||
-rw-r--r-- | src/TaskMan.cc | 32 | ||||
-rw-r--r-- | src/Threads.cc | 2 | ||||
-rw-r--r-- | src/ZHandle.cc | 2 | ||||
-rw-r--r-- | tests/test-Async.cc | 56 |
29 files changed, 456 insertions, 68 deletions
diff --git a/LuaJIT b/LuaJIT -Subproject 1d190c99a2547b44deb8f5e483452d9f51925fb +Subproject 751cd9d82180f1bd99a738acc29bc114995a42e @@ -118,6 +118,7 @@ Exceptions.cc \ \ Local.cc \ Threads.cc \ +Async.cc \ \ BString.cc \ Main.cc \ @@ -178,6 +179,7 @@ test-Sanity.cc \ test-String.cc \ test-Tasks.cc \ test-Threads.cc \ +test-Async.cc \ test-Handles.cc \ test-Sockets.cc \ test-Http.cc \ diff --git a/includes/Async.h b/includes/Async.h new file mode 100644 index 0000000..15e0ac7 --- /dev/null +++ b/includes/Async.h @@ -0,0 +1,102 @@ +#pragma once + +#include <Atomic.h> +#include <Exceptions.h> +#include <Local.h> +#include <Threads.h> + +namespace Balau { + +class AsyncManager; +class AsyncFinishWorker; + +typedef void (*IdleReadyCallback_t)(void *); + +class AsyncOperation { + protected: + virtual void run() { } + virtual void finish() { } + virtual void done() { } + virtual bool needsMainQueue() { return true; } + virtual bool needsFinishWorker() { return false; } + virtual bool needsSynchronousCallback() { return true; } + protected: + virtual ~AsyncOperation() { } + private: + CQueue<AsyncOperation> * m_idleQueue = NULL; + IdleReadyCallback_t m_idleReadyCallback = NULL; + void * m_idleReadyParam = NULL; + void finalize(); + + friend class AsyncManager; + friend class AsyncFinishWorker; +}; + +class AsyncFinishWorker : public Thread { + public: + AsyncFinishWorker(AsyncManager * async, Queue<AsyncOperation> * queue) : m_async(async), m_queue(queue) { } + virtual void * proc(); + AsyncManager * m_async; + Queue<AsyncOperation> * m_queue; + bool m_stopping = false; + volatile bool m_stopped = false; +}; + +class AsyncManager : public Thread { + public: + void setFinishers(int minIdle, int maxIdle) { + AAssert(minIdle < maxIdle, "Minimum number of threads needs to be less than maximum number of threads."); + m_minIdle = minIdle; + m_maxIdle = maxIdle; + } + void setIdleReadyCallback(IdleReadyCallback_t idleReadyCallback, void * param); + void queueOp(AsyncOperation * op); + void idle(); + bool isReady() { return m_ready; } + + protected: + virtual void threadExit(); + + private: + void checkIdle(); + void killOneFinisher(); + void startOneFinisher(); + void joinStoppedFinishers(); + void stopAllWorkers(); + virtual void * proc(); + struct TLS { + CQueue<AsyncOperation> idleQueue; + IdleReadyCallback_t idleReadyCallback; + void * idleReadyParam; + }; + TLS * getTLS() { + TLS * tls = (TLS *) m_tlsManager.getTLS(); + if (!tls) { + tls = new TLS(); + m_tlsManager.setTLS(tls); + m_TLSes.push(tls); + Atomic::Increment(&m_numTLSes); + } + return tls; + } + Queue<AsyncOperation> m_queue; + Queue<AsyncOperation> m_finished; + Queue<TLS> m_TLSes; + volatile int m_numTLSes = 0; + PThreadsTLSManager m_tlsManager; + std::list<AsyncFinishWorker *> m_workers; + int m_numFinishers = 0; + int m_numFinishersIdle = 0; + int m_minIdle = 1; + int m_maxIdle = 4; + bool m_stopping = false; + volatile bool m_ready = false; + volatile bool m_stopperPushed = false; + + void incIdle() { Atomic::Increment(&m_numFinishersIdle); } + void decIdle() { Atomic::Decrement(&m_numFinishersIdle); } + + friend class AsyncFinishWorker; +}; + +}; diff --git a/includes/BLua.h b/includes/BLua.h index be78660..12e921f 100644 --- a/includes/BLua.h +++ b/includes/BLua.h @@ -20,6 +20,7 @@ class LuaObject { class LuaObjectFactory { public: LuaObjectFactory() : m_wantsDestruct(false), m_pushed(false) { } + virtual ~LuaObjectFactory() { } virtual void push(Lua & L); void pushDestruct(Lua & L); template<class T> diff --git a/includes/BStream.h b/includes/BStream.h index 86b0e0f..d412782 100644 --- a/includes/BStream.h +++ b/includes/BStream.h @@ -21,10 +21,10 @@ class BStream : public Handle { private: IO<Handle> m_h; uint8_t * m_buffer; - size_t m_availBytes; - size_t m_cursor; + size_t m_availBytes = 0; + size_t m_cursor = 0; String m_name; - bool m_passThru; + bool m_passThru = false; bool m_detached; bool m_closed; }; diff --git a/includes/HttpServer.h b/includes/HttpServer.h index 7dbf912..6c28910 100644 --- a/includes/HttpServer.h +++ b/includes/HttpServer.h @@ -44,7 +44,7 @@ class HttpServer { class Action { public: Action(const Regex & regex, const Regex & host = Regexes::any) : m_regex(regex), m_host(host), m_refCount(0) { } - ~Action() { AAssert(m_refCount == 0, "Don't delete an Action directly"); } + virtual ~Action() { AAssert(m_refCount == 0, "Don't delete an Action directly"); } struct ActionMatch { Regex::Captures uri, host; }; diff --git a/includes/Input.h b/includes/Input.h index 9d45baa..e0ad329 100644 --- a/includes/Input.h +++ b/includes/Input.h @@ -16,11 +16,11 @@ class Input : public SeekableHandle { virtual time_t getMTime(); const char * getFName() { return m_fname.to_charp(); } private: - int m_fd; + int m_fd = -1; String m_name; String m_fname; - off_t m_size; - time_t m_mtime; + off_t m_size = -1; + time_t m_mtime = -1; }; }; diff --git a/includes/Local.h b/includes/Local.h index 6a598ab..9221384 100644 --- a/includes/Local.h +++ b/includes/Local.h @@ -8,7 +8,15 @@ class TLSManager { public: virtual void * getTLS(); virtual void * setTLS(void * val); - void * createTLS(); +}; + +class PThreadsTLSManager : public TLSManager { + public: + virtual void * getTLS(); + virtual void * setTLS(void * val); + void init(); + private: + pthread_key_t m_key; }; extern TLSManager * g_tlsManager; @@ -16,6 +24,7 @@ extern TLSManager * g_tlsManager; class Local : public AtStart { public: static int getSize() { return s_size; } + static void * createTLS() { void * r = calloc(s_size * sizeof(void *), 1); return r; } protected: Local() : AtStart(0) { } void * getGlobal() { return m_globals[m_idx]; } @@ -26,7 +35,6 @@ class Local : public AtStart { void set(void * obj) { void * r = getTLS(); if (r) setLocal(obj); else setGlobal(obj); } int getIndex() { return m_idx; } private: - static void * create() { void * r = calloc(s_size * sizeof(void *), 1); return r; } static void * getTLS() { return g_tlsManager->getTLS(); } static void * setTLS(void * val) { return g_tlsManager->setTLS(val); } virtual void doStart(); diff --git a/includes/LuaTask.h b/includes/LuaTask.h index 0d763de..37d64fa 100644 --- a/includes/LuaTask.h +++ b/includes/LuaTask.h @@ -12,13 +12,14 @@ class LuaMainTask; class LuaExecCell { public: LuaExecCell(); + virtual ~LuaExecCell() { } void detach() { m_detached = true; } void exec(LuaMainTask * mainTask); protected: virtual void run(Lua &) = 0; private: Events::Async m_event; - bool m_detached; + bool m_detached = false; friend class LuaTask; }; diff --git a/includes/Output.h b/includes/Output.h index 59d9d67..d771227 100644 --- a/includes/Output.h +++ b/includes/Output.h @@ -16,11 +16,11 @@ class Output : public SeekableHandle { virtual time_t getMTime(); const char * getFName() { return m_fname.to_charp(); } private: - int m_fd; + int m_fd = -1; String m_name; String m_fname; - off_t m_size; - time_t m_mtime; + off_t m_size = -1; + time_t m_mtime = -1; }; }; diff --git a/includes/Printer.h b/includes/Printer.h index b4001f1..8fd3674 100644 --- a/includes/Printer.h +++ b/includes/Printer.h @@ -38,6 +38,7 @@ enum { E_THREAD = 64, E_OUTPUT = 128, E_HTTPSERVER = 256, + E_ASYNC = 512, }; class Printer { @@ -74,8 +75,8 @@ class Printer { static void setDetailled(bool enable) { getPrinter()->m_detailledLogs = enable; } private: - uint32_t m_verbosity; - bool m_detailledLogs; + uint32_t m_verbosity = M_STATUS | M_WARNING | M_ERROR | M_ENGINE_DEBUG; + bool m_detailledLogs = false; }; }; diff --git a/includes/SimpleMustache.h b/includes/SimpleMustache.h index 52a2603..28cb477 100644 --- a/includes/SimpleMustache.h +++ b/includes/SimpleMustache.h @@ -22,7 +22,7 @@ class SimpleMustache { friend class Context; }; - Context() : m_type(CONTEXTLIST), m_root(true) { } + Context() { } ~Context() { empty(); } Proxy operator[](ssize_t idx) { ensureList(); return Proxy(this, idx); } Context & operator[](const char * str); @@ -55,7 +55,7 @@ class SimpleMustache { BOOLSEC, CONTEXTLIST, LAMBDA, - } m_type; + } m_type = CONTEXTLIST; Context(ContextType type) : m_type(type), m_root(false) { } Context(Context & c) { Failure("You can't copy a Context; use references"); } Context & operator=(Context & c) { Failure("You can't assign a Context; use references"); return *this; } @@ -64,7 +64,7 @@ class SimpleMustache { typedef std::map<String, Context *> SubContext; typedef std::vector<SubContext> ContextList; ContextList m_contextList; - bool m_root; + bool m_root = true; void empty(bool skipFirst = false); void ensureList(bool single = false); diff --git a/includes/Socket.h b/includes/Socket.h index b113e43..e5a9f21 100644 --- a/includes/Socket.h +++ b/includes/Socket.h @@ -50,9 +50,9 @@ class Socket : public Handle { int m_fd; String m_name; - bool m_connected; - bool m_connecting; - bool m_listening; + bool m_connected = false; + bool m_connecting = false; + bool m_listening = false; sockaddr_in6 m_localAddr, m_remoteAddr; SocketEvent * m_evtR, * m_evtW; }; diff --git a/includes/Task.h b/includes/Task.h index 9347dc4..86bbce1 100644 --- a/includes/Task.h +++ b/includes/Task.h @@ -32,6 +32,8 @@ class Task; namespace Events { class Callback { + public: + virtual ~Callback() { } protected: virtual void gotEvent(BaseEvent *) = 0; friend class BaseEvent; @@ -210,23 +212,23 @@ class QueueBase { public: bool isEmpty() { ScopeLock sl(m_lock); return !m_front; } protected: - QueueBase() : m_front(NULL), m_back(NULL) { pthread_cond_init(&m_cond, NULL); } - ~QueueBase() { while (!isEmpty()) iPop(NULL); pthread_cond_destroy(&m_cond); } + QueueBase() { pthread_cond_init(&m_cond, NULL); } + ~QueueBase() { while (!isEmpty()) iPop(NULL, false); pthread_cond_destroy(&m_cond); } void iPush(void * t, Events::Async * event); - void * iPop(Events::Async * event); + void * iPop(Events::Async * event, bool wait); private: QueueBase(const QueueBase &) = delete; QueueBase & operator=(const QueueBase &) = delete; Lock m_lock; struct Cell { - Cell(void * elem) : m_next(NULL), m_prev(NULL), m_elem(elem) { } + Cell(void * elem) : m_elem(elem) { } Cell(const Cell &) = delete; Cell & operator=(const Cell &) = delete; - Cell * m_next, * m_prev; + Cell * m_next = NULL, * m_prev = NULL; void * m_elem; }; - Cell * m_front, * m_back; + Cell * m_front = NULL, * m_back = NULL; pthread_cond_t m_cond; }; @@ -234,16 +236,23 @@ template<class T> class Queue : public QueueBase { public: void push(T * t) { iPush(t, NULL); } - T * pop() { return (T *) iPop(NULL); } + T * pop() { return (T *) iPop(NULL, true); } }; template<class T> class TQueue : public QueueBase { public: void push(T * t) { iPush(t, &m_event); } - T * pop() { return (T *) iPop(&m_event); } + T * pop() { return (T *) iPop(&m_event, true); } private: Events::Async m_event; }; +template<class T> +class CQueue : public QueueBase { + public: + void push(T * t) { iPush(t, NULL); } + T * pop() { return (T *) iPop(NULL, false); } +}; + }; diff --git a/includes/TaskMan.h b/includes/TaskMan.h index 35e60ed..2b2742d 100644 --- a/includes/TaskMan.h +++ b/includes/TaskMan.h @@ -7,6 +7,7 @@ #include <ev++.h> #include <ext/hash_set> #include <queue> +#include <Async.h> #include <Threads.h> #include <Exceptions.h> #include <Task.h> @@ -19,7 +20,6 @@ class TaskScheduler; namespace Events { -class Async; class TaskEvent; }; @@ -61,9 +61,17 @@ class TaskMan { private: static void iRegisterTask(Task * t, Task * stick, Events::TaskEvent * event); + static void registerAsyncOp(AsyncOperation * op); void * getStack(); void freeStack(void * stack); void addToPending(Task * t); + static void asyncIdleReady(void * param) { + TaskMan * taskMan = (TaskMan *) param; + taskMan->asyncIdleReady(); + } + void asyncIdleReady() { + m_evt.send(); + } #ifndef _WIN32 coro_context m_returnContext; #else @@ -71,17 +79,22 @@ class TaskMan { #endif friend class Task; friend class TaskScheduler; + template<class T> + friend T * createAsyncOp(T * op); struct taskHasher { size_t operator()(const Task * t) const { return reinterpret_cast<uintptr_t>(t); } }; typedef gnu::hash_set<Task *, taskHasher> taskHash_t; taskHash_t m_tasks, m_signaledTasks; Queue<Task> m_pendingAdd; - bool m_stopped; struct ev_loop * m_loop; - bool m_allowedToSignal; ev::async m_evt; std::queue<void *> m_stacks; int m_nStacks; - int m_stopCode; + int m_stopCode = 0; + bool m_stopped = false; + bool m_allowedToSignal = false; }; +template<class T> +T * createAsyncOp(T * op) { TaskMan::registerAsyncOp(op); return op; } + }; diff --git a/includes/ZHandle.h b/includes/ZHandle.h index 048e0cb..e972466 100644 --- a/includes/ZHandle.h +++ b/includes/ZHandle.h @@ -28,9 +28,9 @@ class ZStream : public Handle { void doFlush(bool finish); IO<Handle> m_h; z_stream m_zin, m_zout; - bool m_detached, m_closed, m_eof; + bool m_detached = false, m_closed = false, m_eof = false; String m_name; - uint8_t * m_in; + uint8_t * m_in = NULL; }; }; diff --git a/src/Async.cc b/src/Async.cc new file mode 100644 index 0000000..18c8ef1 --- /dev/null +++ b/src/Async.cc @@ -0,0 +1,173 @@ +#include "Async.h" + +namespace { + +class AsyncStopper : public Balau::AsyncOperation { + public: + virtual bool needsSynchronousCallback() { return false; } + virtual void done() { delete this; } +}; + +}; + +void Balau::AsyncManager::setIdleReadyCallback(void (*callback)(void *), void * param) { + while (!m_ready); + TLS * tls = getTLS(); + tls->idleReadyCallback = callback; + tls->idleReadyParam = param; +} + +void Balau::AsyncManager::queueOp(AsyncOperation * op) { + if (m_stopperPushed) { + Printer::elog(E_ASYNC, "AsyncManager's queue has been stopped; running operation %p on this thread instead.", op); + op->run(); + op->finalize(); + return; + } + while (!m_ready); + TLS * tls = getTLS(); + Printer::elog(E_ASYNC, "Queuing operation at %p", op); + if (op->needsSynchronousCallback()) { + Printer::elog(E_ASYNC, "Operation at %p needs synchronous callback, copying values; idleQueue = %p; idleReadyCallback = %p; idleReadyParam = %p", op, &tls->idleQueue, tls->idleReadyCallback, tls->idleReadyParam); + op->m_idleQueue = &tls->idleQueue; + op->m_idleReadyCallback = tls->idleReadyCallback; + op->m_idleReadyParam = tls->idleReadyParam; + } + if (op->needsMainQueue()) + m_queue.push(op); + else + m_finished.push(op); +} + +void Balau::AsyncManager::checkIdle() { + if (m_numFinishersIdle > m_maxIdle) + killOneFinisher(); + if (m_numFinishersIdle < m_minIdle) + startOneFinisher(); + joinStoppedFinishers(); +} + +void Balau::AsyncManager::killOneFinisher() { + Printer::elog(E_ASYNC, "Too many workers idle (%i / %i), killing one.", m_numFinishersIdle, m_maxIdle); + m_finished.push(new AsyncStopper()); +} + +void Balau::AsyncManager::startOneFinisher() { + AsyncFinishWorker * worker = new AsyncFinishWorker(this, &m_finished); + Printer::elog(E_ASYNC, "Not enough workers idle (%i / %i), starting one at %p.", m_numFinishersIdle, m_minIdle, worker); + m_workers.push_back(worker); + m_numFinishers++; + worker->threadStart(); +} + +void Balau::AsyncManager::joinStoppedFinishers() { + for (auto i = m_workers.begin(); i != m_workers.end(); i++) { + AsyncFinishWorker * worker = *i; + if (!worker->m_stopped) + continue; + Printer::elog(E_ASYNC, "Joining stopped worker at %p", worker); + m_numFinishers--; + m_workers.erase(i); + worker->join(); + delete worker; + break; + } +} + +void * Balau::AsyncManager::proc() { + Printer::elog(E_ASYNC, "AsyncManager thread starting up"); + m_tlsManager.init(); + m_ready = true; + while (!m_stopping) { + checkIdle(); + AsyncOperation * op = m_queue.pop(); + Printer::elog(E_ASYNC, "AsyncManager got an operation at %p", op); + if (dynamic_cast<AsyncStopper *>(op)) { + Printer::elog(E_ASYNC, "AsyncManager got a stopper operation"); + m_stopping = true; + } + Printer::elog(E_ASYNC, "AsyncManager running operation at %p", op); + op->run(); + if (op->needsFinishWorker()) { + Printer::elog(E_ASYNC, "AsyncManager pushing operation at %p in the finisher's queue", op); + m_finished.push(op); + } else { + Printer::elog(E_ASYNC, "AsyncManager finalizing operation at %p", op); + op->finalize(); + } + } + stopAllWorkers(); + + Printer::elog(E_ASYNC, "Async thread waits for all idle queues to empty"); + while (Atomic::Prefetch::Decrement(&m_numTLSes)) { + TLS * tls = m_TLSes.pop(); + while (!tls->idleQueue.isEmpty()); + } + + Printer::elog(E_ASYNC, "Async thread stopping"); + return NULL; +} + +void * Balau::AsyncFinishWorker::proc() { + Printer::elog(E_ASYNC, "AsyncFinishWorker thread starting up"); + AsyncOperation * op; + while (!m_stopping) { + m_async->incIdle(); + op = m_queue->pop(); + m_async->decIdle(); + Printer::elog(E_ASYNC, "AsyncFinishWorker got operation at %p", op); + if (dynamic_cast<AsyncStopper *>(op)) { + Printer::elog(E_ASYNC, "AsyncFinishWorker got a stopper operation"); + m_stopping = true; + } + op->finalize(); + } + + m_stopped = true; + Printer::elog(E_ASYNC, "AsyncFinishWorker thread stopping"); + + return NULL; +} + +void Balau::AsyncOperation::finalize() { + Printer::elog(E_ASYNC, "AsyncOperation::finalize() is finishing operation %p", this); + finish(); + if (needsSynchronousCallback()) { + Printer::elog(E_ASYNC, "AsyncOperation::finalize() is pushing operation %p to its idle queue", this); + bool wasEmpty = m_idleQueue->isEmpty(); + m_idleQueue->push(this); + Printer::elog(E_ASYNC, "AsyncOperation::finalize() has pushed operation %p to its idle queue; wasEmpty = %s; callback = %p", this, wasEmpty ? "true" : "false", m_idleReadyCallback); + if (wasEmpty && m_idleReadyCallback) { + Printer::elog(E_ASYNC, "AsyncOperation::finalize() is calling ready callback to wake up main loop"); + m_idleReadyCallback(m_idleReadyParam); + } + } else { + Printer::elog(E_ASYNC, "AsyncOperation::finalize() is wrapping up operation %p", this); + done(); + } +} + +void Balau::AsyncManager::idle() { + Printer::elog(E_ASYNC, "AsyncManager::idle() is running"); + while (!m_ready); + AsyncOperation * op; + TLS * tls = getTLS(); + while ((op = tls->idleQueue.pop())) { + Printer::elog(E_ASYNC, "AsyncManager::idle() is wrapping up operation %p", op); + op->done(); + } +} + +void Balau::AsyncManager::threadExit() { + Printer::elog(E_ASYNC, "AsyncManager thread is being asked to stop; creating stopper"); + if (Atomic::CmpXChgBool(&m_stopperPushed, true, false)) + m_queue.push(new AsyncStopper()); +} + +void Balau::AsyncManager::stopAllWorkers() { + Printer::elog(E_ASYNC, "AsyncManager thread is being stopping and joining %i workers", m_numFinishers); + for (int i = 0; i < m_numFinishers; i++) + m_finished.push(new AsyncStopper()); + for (auto worker : m_workers) + worker->join(); +} diff --git a/src/BStream.cc b/src/BStream.cc index 9420459..6c51c4f 100644 --- a/src/BStream.cc +++ b/src/BStream.cc @@ -3,7 +3,7 @@ static const int s_blockSize = 16 * 1024; -Balau::BStream::BStream(const IO<Handle> & h) : m_h(h), m_buffer((uint8_t *) malloc(s_blockSize)), m_availBytes(0), m_cursor(0), m_passThru(false) { +Balau::BStream::BStream(const IO<Handle> & h) : m_h(h), m_buffer((uint8_t *) malloc(s_blockSize)) { AAssert(m_h->canRead(), "You can't create a buffered stream with a Handle that can't read"); m_name.set("Stream(%s)", m_h->getName()); if ((m_h.isA<Buffer>()) || (m_h.isA<BStream>())) diff --git a/src/Input.cc b/src/Input.cc index 46b9b94..3608cd0 100644 --- a/src/Input.cc +++ b/src/Input.cc @@ -47,7 +47,7 @@ static int eioStatsDone(eio_req * req) { return 0; } -Balau::Input::Input(const char * fname) throw (GeneralException) : m_fd(-1), m_size(-1), m_mtime(-1) { +Balau::Input::Input(const char * fname) throw (GeneralException) { m_name.set("Input(%s)", fname); m_fname = fname; diff --git a/src/Local.cc b/src/Local.cc index 9729c35..a9eccdd 100644 --- a/src/Local.cc +++ b/src/Local.cc @@ -13,10 +13,6 @@ void * Balau::TLSManager::setTLS(void * val) { return r; } -void * Balau::TLSManager::createTLS() { - return Local::create(); -} - static Balau::TLSManager dummyTLSManager; Balau::TLSManager * Balau::g_tlsManager = &dummyTLSManager; @@ -29,31 +25,31 @@ void Balau::Local::doStart() { m_globals[m_idx] = 0; } -class PThreadsTLSManager : public Balau::TLSManager, public Balau::AtStart { +class GlobalPThreadsTLSManager : public Balau::PThreadsTLSManager, public Balau::AtStart { public: - PThreadsTLSManager() : AtStart(0) { } - virtual void * getTLS(); - virtual void * setTLS(void * val); - virtual void doStart(); - private: - pthread_key_t m_key; + GlobalPThreadsTLSManager() : AtStart(0) { } + void doStart(); }; -PThreadsTLSManager pthreadsTLSManager; +GlobalPThreadsTLSManager pthreadsTLSManager; + +void GlobalPThreadsTLSManager::doStart() { + init(); + Balau::g_tlsManager = this; +} -void PThreadsTLSManager::doStart() { +void Balau::PThreadsTLSManager::init() { int r; r = pthread_key_create(&m_key, NULL); RAssert(r == 0, "Unable to create a pthtread_key: %i", r); - Balau::g_tlsManager = this; } -void * PThreadsTLSManager::getTLS() { +void * Balau::PThreadsTLSManager::getTLS() { return pthread_getspecific(m_key); } -void * PThreadsTLSManager::setTLS(void * val) { +void * Balau::PThreadsTLSManager::setTLS(void * val) { void * r = pthread_getspecific(m_key); pthread_setspecific(m_key, val); return r; diff --git a/src/LuaTask.cc b/src/LuaTask.cc index b023f45..7e0b256 100644 --- a/src/LuaTask.cc +++ b/src/LuaTask.cc @@ -11,7 +11,7 @@ class LuaTaskStopper : public Balau::LuaExecCell { }; -Balau::LuaExecCell::LuaExecCell() : m_detached(false) { +Balau::LuaExecCell::LuaExecCell() { Printer::elog(E_TASK, "LuaExecCell created at %p", this); } diff --git a/src/Output.cc b/src/Output.cc index c585ce5..dd0a588 100644 --- a/src/Output.cc +++ b/src/Output.cc @@ -47,7 +47,7 @@ static int eioStatsDone(eio_req * req) { return 0; } -Balau::Output::Output(const char * fname, bool truncate) throw (GeneralException) : m_fd(-1), m_size(-1), m_mtime(-1) { +Balau::Output::Output(const char * fname, bool truncate) throw (GeneralException) { m_name.set("Output(%s)", fname); m_fname = fname; diff --git a/src/Printer.cc b/src/Printer.cc index ee6029c..a7fee9f 100644 --- a/src/Printer.cc +++ b/src/Printer.cc @@ -16,7 +16,7 @@ static const char * prefixes[] = { "(**) ", }; -Balau::Printer::Printer() : m_verbosity(M_STATUS | M_WARNING | M_ERROR | M_ENGINE_DEBUG), m_detailledLogs(false) { +Balau::Printer::Printer() { #ifdef DEBUG m_detailledLogs = true; #endif diff --git a/src/Socket.cc b/src/Socket.cc index 100b94e..7511bde 100644 --- a/src/Socket.cc +++ b/src/Socket.cc @@ -250,7 +250,7 @@ static DNSRequest resolveName(const char * name, const char * service = NULL, st return req; } -Balau::Socket::Socket() throw (GeneralException) : m_fd(socket(AF_INET6, SOCK_STREAM, 0)), m_connected(false), m_connecting(false), m_listening(false) { +Balau::Socket::Socket() throw (GeneralException) : m_fd(socket(AF_INET6, SOCK_STREAM, 0)) { m_name = "Socket(unconnected)"; RAssert(m_fd >= 0, "socket() returned %i", m_fd); m_evtR = new SocketEvent(m_fd, ev::READ); @@ -271,8 +271,9 @@ Balau::Socket::Socket() throw (GeneralException) : m_fd(socket(AF_INET6, SOCK_ST Printer::elog(E_SOCKET, "Creating a socket at %p", this); } -Balau::Socket::Socket(int fd) : m_fd(fd), m_connected(true), m_connecting(false), m_listening(false) { +Balau::Socket::Socket(int fd) : m_fd(fd) { socklen_t len; + m_connected = true; len = sizeof(m_localAddr); getsockname(m_fd, (sockaddr *) &m_localAddr, &len); diff --git a/src/Task.cc b/src/Task.cc index df8eee2..4465203 100644 --- a/src/Task.cc +++ b/src/Task.cc @@ -37,7 +37,7 @@ void Balau::Task::setup(TaskMan * taskMan, void * stack) { m_taskMan = taskMan; - m_tls = g_tlsManager->createTLS(); + m_tls = Local::createTLS(); void * oldTLS = g_tlsManager->getTLS(); g_tlsManager->setTLS(m_tls); localTask.set(this); @@ -310,9 +310,9 @@ void Balau::QueueBase::iPush(void * t, Events::Async * event) { pthread_cond_signal(&m_cond); } -void * Balau::QueueBase::iPop(Events::Async * event) { - m_lock.enter(); - while (!m_front) { +void * Balau::QueueBase::iPop(Events::Async * event, bool wait) { + ScopeLock sl(m_lock); + while (!m_front && wait) { if (event) { Task::prepare(event); m_lock.leave(); @@ -332,6 +332,5 @@ void * Balau::QueueBase::iPop(Events::Async * event) { m_back = NULL; void * t = c->m_elem; delete c; - m_lock.leave(); return t; } diff --git a/src/TaskMan.cc b/src/TaskMan.cc index 33d07d0..177cec1 100644 --- a/src/TaskMan.cc +++ b/src/TaskMan.cc @@ -1,10 +1,24 @@ +#include "Async.h" #include "TaskMan.h" #include "Task.h" #include "Main.h" #include "Local.h" +static Balau::AsyncManager s_async; + namespace { +class AsyncStarter : public Balau::AtStart, Balau::AtExit { + public: + AsyncStarter() : AtStart(1000), AtExit(0) { } + void doStart() { + s_async.threadStart(); + } + void doExit() { + s_async.join(); + } +}; + class Stopper : public Balau::Task { public: Stopper(int code) : m_code(code) { } @@ -14,6 +28,10 @@ class Stopper : public Balau::Task { int m_code; }; +}; + +static AsyncStarter s_asyncStarter; + void Stopper::Do() { getTaskMan()->stopMe(m_code); } @@ -22,8 +40,6 @@ const char * Stopper::getName() const { return "Stopper"; } -}; - static Balau::DefaultTmpl<Balau::TaskMan> defaultTaskMan(50); static Balau::LocalTmpl<Balau::TaskMan> localTaskMan; @@ -137,7 +153,7 @@ void Balau::TaskMan::stopMe(int code) { } } -Balau::TaskMan::TaskMan() : m_stopped(false), m_allowedToSignal(false), m_stopCode(0) { +Balau::TaskMan::TaskMan() { #ifndef _WIN32 coro_create(&m_returnContext, 0, 0, 0, 0); #else @@ -225,6 +241,8 @@ int Balau::TaskMan::mainLoop() { if (t->getStatus() == Task::STARTING) starting.insert(t); + s_async.setIdleReadyCallback(asyncIdleReady, this); + do { Printer::elog(E_TASK, "TaskMan::mainLoop() at %p with m_tasks.size = %li", this, m_tasks.size()); @@ -250,6 +268,9 @@ int Balau::TaskMan::mainLoop() { ev_run(m_loop, noWait || m_stopped ? EVRUN_NOWAIT : EVRUN_ONCE); Printer::elog(E_TASK, "TaskMan at %p Getting out of libev main loop", this); + // calling async's idle + s_async.idle(); + // let's check what task got stopped, and signal them for (Task * t : stopped) { IAssert((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED), "Task %p in stopped list but isn't stopped.", t); @@ -327,6 +348,7 @@ int Balau::TaskMan::mainLoop() { } while (!m_stopped); Printer::elog(E_TASK, "TaskManager at %p stopping.", this); + s_async.setIdleReadyCallback(NULL, NULL); return m_stopCode; } @@ -342,6 +364,10 @@ void Balau::TaskMan::iRegisterTask(Balau::Task * t, Balau::Task * stick, Events: } } +void Balau::TaskMan::registerAsyncOp(Balau::AsyncOperation * op) { + s_async.queueOp(op); +} + void Balau::TaskMan::addToPending(Balau::Task * t) { m_pendingAdd.push(t); m_evt.send(); diff --git a/src/Threads.cc b/src/Threads.cc index 867013d..7b1b7bc 100644 --- a/src/Threads.cc +++ b/src/Threads.cc @@ -43,7 +43,7 @@ void * Balau::ThreadHelper::threadProc(void * arg) { void * r = NULL; bool success = false; try { - void * tls = g_tlsManager->createTLS(); + void * tls = Local::createTLS(); g_tlsManager->setTLS(tls); Balau::Thread * thread = reinterpret_cast<Balau::Thread *>(arg); r = thread->proc(); diff --git a/src/ZHandle.cc b/src/ZHandle.cc index 3a14d7d..4f216c7 100644 --- a/src/ZHandle.cc +++ b/src/ZHandle.cc @@ -1,7 +1,7 @@ #include "ZHandle.h" #include "Task.h" -Balau::ZStream::ZStream(const IO<Handle> & h, int level, header_t header) : m_h(h), m_detached(false), m_closed(false), m_eof(false), m_in(NULL) { +Balau::ZStream::ZStream(const IO<Handle> & h, int level, header_t header) : m_h(h) { m_zin.zalloc = m_zout.zalloc = NULL; m_zin.zfree = m_zout.zfree = NULL; m_zin.opaque = m_zout.opaque = NULL; diff --git a/tests/test-Async.cc b/tests/test-Async.cc new file mode 100644 index 0000000..5f93217 --- /dev/null +++ b/tests/test-Async.cc @@ -0,0 +1,56 @@ +#include <Async.h> +#include <Task.h> +#include <TaskMan.h> +#include <Main.h> + +using namespace Balau; + +class AsyncOpTest : public AsyncOperation { + public: + AsyncOpTest(Events::Custom * evt) : m_evt(evt) { } + virtual void run() { + Printer::log(M_STATUS, "Async operation running"); + TAssert(!m_ran); + TAssert(!m_finished); + TAssert(!m_done); + m_ran = true; + } + virtual void finish() { + Printer::log(M_STATUS, "Async operation finishing"); + TAssert(m_ran); + TAssert(!m_finished); + TAssert(!m_done); + m_finished = true; + } + virtual void done() { + Printer::log(M_STATUS, "Async operation done"); + TAssert(m_ran); + TAssert(m_finished); + TAssert(!m_done); + m_done = true; + m_evt->doSignal(); + } + virtual bool needsFinishWorker() { return true; } + + bool m_ran = false; + bool m_finished = false; + bool m_done = false; + Events::Custom * m_evt; +}; + +void MainTask::Do() { + Printer::log(M_STATUS, "Test::Async running."); + + Events::Custom evt; + AsyncOpTest * op = createAsyncOp(new AsyncOpTest(&evt)); + waitFor(&evt); + TAssert(!evt.gotSignal()); + yield(); + TAssert(evt.gotSignal()); + TAssert(op->m_ran); + TAssert(op->m_finished); + TAssert(op->m_done); + delete op; + + Printer::log(M_STATUS, "Test::Async passed."); +} |