summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNicolas "Pixel" Noble <pixel@nobis-crew.org>2012-09-01 10:04:05 -0700
committerNicolas "Pixel" Noble <pixel@nobis-crew.org>2012-09-01 10:27:52 -0700
commit4a893f72cadaa875a920db8949171b002f656e43 (patch)
tree1e08a3cb6f579cab9c3c9a138d36ababdf76121b
parent56d91ddd2cd42b782cde0bb3fdf4eb9ebe7597be (diff)
parent06674e57649d536cf19715524ee40c5ad4a9026d (diff)
Merge commit '06674e57649d536cf19715524ee40c5ad4a9026d'
Conflicts: includes/LuaTask.h includes/TaskMan.h includes/Threads.h src/TaskMan.cc src/Threads.cc
m---------LuaJIT0
-rw-r--r--Makefile2
-rw-r--r--includes/Async.h102
-rw-r--r--includes/BLua.h1
-rw-r--r--includes/BStream.h6
-rw-r--r--includes/HttpServer.h2
-rw-r--r--includes/Input.h6
-rw-r--r--includes/Local.h12
-rw-r--r--includes/LuaTask.h3
-rw-r--r--includes/Output.h6
-rw-r--r--includes/Printer.h5
-rw-r--r--includes/SimpleMustache.h6
-rw-r--r--includes/Socket.h6
-rw-r--r--includes/Task.h25
-rw-r--r--includes/TaskMan.h21
-rw-r--r--includes/ZHandle.h4
-rw-r--r--src/Async.cc173
-rw-r--r--src/BStream.cc2
-rw-r--r--src/Input.cc2
-rw-r--r--src/Local.cc28
-rw-r--r--src/LuaTask.cc2
-rw-r--r--src/Output.cc2
-rw-r--r--src/Printer.cc2
-rw-r--r--src/Socket.cc5
-rw-r--r--src/Task.cc9
-rw-r--r--src/TaskMan.cc32
-rw-r--r--src/Threads.cc2
-rw-r--r--src/ZHandle.cc2
-rw-r--r--tests/test-Async.cc56
29 files changed, 456 insertions, 68 deletions
diff --git a/LuaJIT b/LuaJIT
-Subproject 1d190c99a2547b44deb8f5e483452d9f51925fb
+Subproject 751cd9d82180f1bd99a738acc29bc114995a42e
diff --git a/Makefile b/Makefile
index 478ff0f..6516423 100644
--- a/Makefile
+++ b/Makefile
@@ -118,6 +118,7 @@ Exceptions.cc \
\
Local.cc \
Threads.cc \
+Async.cc \
\
BString.cc \
Main.cc \
@@ -178,6 +179,7 @@ test-Sanity.cc \
test-String.cc \
test-Tasks.cc \
test-Threads.cc \
+test-Async.cc \
test-Handles.cc \
test-Sockets.cc \
test-Http.cc \
diff --git a/includes/Async.h b/includes/Async.h
new file mode 100644
index 0000000..15e0ac7
--- /dev/null
+++ b/includes/Async.h
@@ -0,0 +1,102 @@
+#pragma once
+
+#include <Atomic.h>
+#include <Exceptions.h>
+#include <Local.h>
+#include <Threads.h>
+
+namespace Balau {
+
+class AsyncManager;
+class AsyncFinishWorker;
+
+typedef void (*IdleReadyCallback_t)(void *);
+
+class AsyncOperation {
+ protected:
+ virtual void run() { }
+ virtual void finish() { }
+ virtual void done() { }
+ virtual bool needsMainQueue() { return true; }
+ virtual bool needsFinishWorker() { return false; }
+ virtual bool needsSynchronousCallback() { return true; }
+ protected:
+ virtual ~AsyncOperation() { }
+ private:
+ CQueue<AsyncOperation> * m_idleQueue = NULL;
+ IdleReadyCallback_t m_idleReadyCallback = NULL;
+ void * m_idleReadyParam = NULL;
+ void finalize();
+
+ friend class AsyncManager;
+ friend class AsyncFinishWorker;
+};
+
+class AsyncFinishWorker : public Thread {
+ public:
+ AsyncFinishWorker(AsyncManager * async, Queue<AsyncOperation> * queue) : m_async(async), m_queue(queue) { }
+ virtual void * proc();
+ AsyncManager * m_async;
+ Queue<AsyncOperation> * m_queue;
+ bool m_stopping = false;
+ volatile bool m_stopped = false;
+};
+
+class AsyncManager : public Thread {
+ public:
+ void setFinishers(int minIdle, int maxIdle) {
+ AAssert(minIdle < maxIdle, "Minimum number of threads needs to be less than maximum number of threads.");
+ m_minIdle = minIdle;
+ m_maxIdle = maxIdle;
+ }
+ void setIdleReadyCallback(IdleReadyCallback_t idleReadyCallback, void * param);
+ void queueOp(AsyncOperation * op);
+ void idle();
+ bool isReady() { return m_ready; }
+
+ protected:
+ virtual void threadExit();
+
+ private:
+ void checkIdle();
+ void killOneFinisher();
+ void startOneFinisher();
+ void joinStoppedFinishers();
+ void stopAllWorkers();
+ virtual void * proc();
+ struct TLS {
+ CQueue<AsyncOperation> idleQueue;
+ IdleReadyCallback_t idleReadyCallback;
+ void * idleReadyParam;
+ };
+ TLS * getTLS() {
+ TLS * tls = (TLS *) m_tlsManager.getTLS();
+ if (!tls) {
+ tls = new TLS();
+ m_tlsManager.setTLS(tls);
+ m_TLSes.push(tls);
+ Atomic::Increment(&m_numTLSes);
+ }
+ return tls;
+ }
+ Queue<AsyncOperation> m_queue;
+ Queue<AsyncOperation> m_finished;
+ Queue<TLS> m_TLSes;
+ volatile int m_numTLSes = 0;
+ PThreadsTLSManager m_tlsManager;
+ std::list<AsyncFinishWorker *> m_workers;
+ int m_numFinishers = 0;
+ int m_numFinishersIdle = 0;
+ int m_minIdle = 1;
+ int m_maxIdle = 4;
+ bool m_stopping = false;
+ volatile bool m_ready = false;
+ volatile bool m_stopperPushed = false;
+
+ void incIdle() { Atomic::Increment(&m_numFinishersIdle); }
+ void decIdle() { Atomic::Decrement(&m_numFinishersIdle); }
+
+ friend class AsyncFinishWorker;
+};
+
+};
diff --git a/includes/BLua.h b/includes/BLua.h
index be78660..12e921f 100644
--- a/includes/BLua.h
+++ b/includes/BLua.h
@@ -20,6 +20,7 @@ class LuaObject {
class LuaObjectFactory {
public:
LuaObjectFactory() : m_wantsDestruct(false), m_pushed(false) { }
+ virtual ~LuaObjectFactory() { }
virtual void push(Lua & L);
void pushDestruct(Lua & L);
template<class T>
diff --git a/includes/BStream.h b/includes/BStream.h
index 86b0e0f..d412782 100644
--- a/includes/BStream.h
+++ b/includes/BStream.h
@@ -21,10 +21,10 @@ class BStream : public Handle {
private:
IO<Handle> m_h;
uint8_t * m_buffer;
- size_t m_availBytes;
- size_t m_cursor;
+ size_t m_availBytes = 0;
+ size_t m_cursor = 0;
String m_name;
- bool m_passThru;
+ bool m_passThru = false;
bool m_detached;
bool m_closed;
};
diff --git a/includes/HttpServer.h b/includes/HttpServer.h
index 7dbf912..6c28910 100644
--- a/includes/HttpServer.h
+++ b/includes/HttpServer.h
@@ -44,7 +44,7 @@ class HttpServer {
class Action {
public:
Action(const Regex & regex, const Regex & host = Regexes::any) : m_regex(regex), m_host(host), m_refCount(0) { }
- ~Action() { AAssert(m_refCount == 0, "Don't delete an Action directly"); }
+ virtual ~Action() { AAssert(m_refCount == 0, "Don't delete an Action directly"); }
struct ActionMatch {
Regex::Captures uri, host;
};
diff --git a/includes/Input.h b/includes/Input.h
index 9d45baa..e0ad329 100644
--- a/includes/Input.h
+++ b/includes/Input.h
@@ -16,11 +16,11 @@ class Input : public SeekableHandle {
virtual time_t getMTime();
const char * getFName() { return m_fname.to_charp(); }
private:
- int m_fd;
+ int m_fd = -1;
String m_name;
String m_fname;
- off_t m_size;
- time_t m_mtime;
+ off_t m_size = -1;
+ time_t m_mtime = -1;
};
};
diff --git a/includes/Local.h b/includes/Local.h
index 6a598ab..9221384 100644
--- a/includes/Local.h
+++ b/includes/Local.h
@@ -8,7 +8,15 @@ class TLSManager {
public:
virtual void * getTLS();
virtual void * setTLS(void * val);
- void * createTLS();
+};
+
+class PThreadsTLSManager : public TLSManager {
+ public:
+ virtual void * getTLS();
+ virtual void * setTLS(void * val);
+ void init();
+ private:
+ pthread_key_t m_key;
};
extern TLSManager * g_tlsManager;
@@ -16,6 +24,7 @@ extern TLSManager * g_tlsManager;
class Local : public AtStart {
public:
static int getSize() { return s_size; }
+ static void * createTLS() { void * r = calloc(s_size * sizeof(void *), 1); return r; }
protected:
Local() : AtStart(0) { }
void * getGlobal() { return m_globals[m_idx]; }
@@ -26,7 +35,6 @@ class Local : public AtStart {
void set(void * obj) { void * r = getTLS(); if (r) setLocal(obj); else setGlobal(obj); }
int getIndex() { return m_idx; }
private:
- static void * create() { void * r = calloc(s_size * sizeof(void *), 1); return r; }
static void * getTLS() { return g_tlsManager->getTLS(); }
static void * setTLS(void * val) { return g_tlsManager->setTLS(val); }
virtual void doStart();
diff --git a/includes/LuaTask.h b/includes/LuaTask.h
index 0d763de..37d64fa 100644
--- a/includes/LuaTask.h
+++ b/includes/LuaTask.h
@@ -12,13 +12,14 @@ class LuaMainTask;
class LuaExecCell {
public:
LuaExecCell();
+ virtual ~LuaExecCell() { }
void detach() { m_detached = true; }
void exec(LuaMainTask * mainTask);
protected:
virtual void run(Lua &) = 0;
private:
Events::Async m_event;
- bool m_detached;
+ bool m_detached = false;
friend class LuaTask;
};
diff --git a/includes/Output.h b/includes/Output.h
index 59d9d67..d771227 100644
--- a/includes/Output.h
+++ b/includes/Output.h
@@ -16,11 +16,11 @@ class Output : public SeekableHandle {
virtual time_t getMTime();
const char * getFName() { return m_fname.to_charp(); }
private:
- int m_fd;
+ int m_fd = -1;
String m_name;
String m_fname;
- off_t m_size;
- time_t m_mtime;
+ off_t m_size = -1;
+ time_t m_mtime = -1;
};
};
diff --git a/includes/Printer.h b/includes/Printer.h
index b4001f1..8fd3674 100644
--- a/includes/Printer.h
+++ b/includes/Printer.h
@@ -38,6 +38,7 @@ enum {
E_THREAD = 64,
E_OUTPUT = 128,
E_HTTPSERVER = 256,
+ E_ASYNC = 512,
};
class Printer {
@@ -74,8 +75,8 @@ class Printer {
static void setDetailled(bool enable) { getPrinter()->m_detailledLogs = enable; }
private:
- uint32_t m_verbosity;
- bool m_detailledLogs;
+ uint32_t m_verbosity = M_STATUS | M_WARNING | M_ERROR | M_ENGINE_DEBUG;
+ bool m_detailledLogs = false;
};
};
diff --git a/includes/SimpleMustache.h b/includes/SimpleMustache.h
index 52a2603..28cb477 100644
--- a/includes/SimpleMustache.h
+++ b/includes/SimpleMustache.h
@@ -22,7 +22,7 @@ class SimpleMustache {
friend class Context;
};
- Context() : m_type(CONTEXTLIST), m_root(true) { }
+ Context() { }
~Context() { empty(); }
Proxy operator[](ssize_t idx) { ensureList(); return Proxy(this, idx); }
Context & operator[](const char * str);
@@ -55,7 +55,7 @@ class SimpleMustache {
BOOLSEC,
CONTEXTLIST,
LAMBDA,
- } m_type;
+ } m_type = CONTEXTLIST;
Context(ContextType type) : m_type(type), m_root(false) { }
Context(Context & c) { Failure("You can't copy a Context; use references"); }
Context & operator=(Context & c) { Failure("You can't assign a Context; use references"); return *this; }
@@ -64,7 +64,7 @@ class SimpleMustache {
typedef std::map<String, Context *> SubContext;
typedef std::vector<SubContext> ContextList;
ContextList m_contextList;
- bool m_root;
+ bool m_root = true;
void empty(bool skipFirst = false);
void ensureList(bool single = false);
diff --git a/includes/Socket.h b/includes/Socket.h
index b113e43..e5a9f21 100644
--- a/includes/Socket.h
+++ b/includes/Socket.h
@@ -50,9 +50,9 @@ class Socket : public Handle {
int m_fd;
String m_name;
- bool m_connected;
- bool m_connecting;
- bool m_listening;
+ bool m_connected = false;
+ bool m_connecting = false;
+ bool m_listening = false;
sockaddr_in6 m_localAddr, m_remoteAddr;
SocketEvent * m_evtR, * m_evtW;
};
diff --git a/includes/Task.h b/includes/Task.h
index 9347dc4..86bbce1 100644
--- a/includes/Task.h
+++ b/includes/Task.h
@@ -32,6 +32,8 @@ class Task;
namespace Events {
class Callback {
+ public:
+ virtual ~Callback() { }
protected:
virtual void gotEvent(BaseEvent *) = 0;
friend class BaseEvent;
@@ -210,23 +212,23 @@ class QueueBase {
public:
bool isEmpty() { ScopeLock sl(m_lock); return !m_front; }
protected:
- QueueBase() : m_front(NULL), m_back(NULL) { pthread_cond_init(&m_cond, NULL); }
- ~QueueBase() { while (!isEmpty()) iPop(NULL); pthread_cond_destroy(&m_cond); }
+ QueueBase() { pthread_cond_init(&m_cond, NULL); }
+ ~QueueBase() { while (!isEmpty()) iPop(NULL, false); pthread_cond_destroy(&m_cond); }
void iPush(void * t, Events::Async * event);
- void * iPop(Events::Async * event);
+ void * iPop(Events::Async * event, bool wait);
private:
QueueBase(const QueueBase &) = delete;
QueueBase & operator=(const QueueBase &) = delete;
Lock m_lock;
struct Cell {
- Cell(void * elem) : m_next(NULL), m_prev(NULL), m_elem(elem) { }
+ Cell(void * elem) : m_elem(elem) { }
Cell(const Cell &) = delete;
Cell & operator=(const Cell &) = delete;
- Cell * m_next, * m_prev;
+ Cell * m_next = NULL, * m_prev = NULL;
void * m_elem;
};
- Cell * m_front, * m_back;
+ Cell * m_front = NULL, * m_back = NULL;
pthread_cond_t m_cond;
};
@@ -234,16 +236,23 @@ template<class T>
class Queue : public QueueBase {
public:
void push(T * t) { iPush(t, NULL); }
- T * pop() { return (T *) iPop(NULL); }
+ T * pop() { return (T *) iPop(NULL, true); }
};
template<class T>
class TQueue : public QueueBase {
public:
void push(T * t) { iPush(t, &m_event); }
- T * pop() { return (T *) iPop(&m_event); }
+ T * pop() { return (T *) iPop(&m_event, true); }
private:
Events::Async m_event;
};
+template<class T>
+class CQueue : public QueueBase {
+ public:
+ void push(T * t) { iPush(t, NULL); }
+ T * pop() { return (T *) iPop(NULL, false); }
+};
+
};
diff --git a/includes/TaskMan.h b/includes/TaskMan.h
index 35e60ed..2b2742d 100644
--- a/includes/TaskMan.h
+++ b/includes/TaskMan.h
@@ -7,6 +7,7 @@
#include <ev++.h>
#include <ext/hash_set>
#include <queue>
+#include <Async.h>
#include <Threads.h>
#include <Exceptions.h>
#include <Task.h>
@@ -19,7 +20,6 @@ class TaskScheduler;
namespace Events {
-class Async;
class TaskEvent;
};
@@ -61,9 +61,17 @@ class TaskMan {
private:
static void iRegisterTask(Task * t, Task * stick, Events::TaskEvent * event);
+ static void registerAsyncOp(AsyncOperation * op);
void * getStack();
void freeStack(void * stack);
void addToPending(Task * t);
+ static void asyncIdleReady(void * param) {
+ TaskMan * taskMan = (TaskMan *) param;
+ taskMan->asyncIdleReady();
+ }
+ void asyncIdleReady() {
+ m_evt.send();
+ }
#ifndef _WIN32
coro_context m_returnContext;
#else
@@ -71,17 +79,22 @@ class TaskMan {
#endif
friend class Task;
friend class TaskScheduler;
+ template<class T>
+ friend T * createAsyncOp(T * op);
struct taskHasher { size_t operator()(const Task * t) const { return reinterpret_cast<uintptr_t>(t); } };
typedef gnu::hash_set<Task *, taskHasher> taskHash_t;
taskHash_t m_tasks, m_signaledTasks;
Queue<Task> m_pendingAdd;
- bool m_stopped;
struct ev_loop * m_loop;
- bool m_allowedToSignal;
ev::async m_evt;
std::queue<void *> m_stacks;
int m_nStacks;
- int m_stopCode;
+ int m_stopCode = 0;
+ bool m_stopped = false;
+ bool m_allowedToSignal = false;
};
+template<class T>
+T * createAsyncOp(T * op) { TaskMan::registerAsyncOp(op); return op; }
+
};
diff --git a/includes/ZHandle.h b/includes/ZHandle.h
index 048e0cb..e972466 100644
--- a/includes/ZHandle.h
+++ b/includes/ZHandle.h
@@ -28,9 +28,9 @@ class ZStream : public Handle {
void doFlush(bool finish);
IO<Handle> m_h;
z_stream m_zin, m_zout;
- bool m_detached, m_closed, m_eof;
+ bool m_detached = false, m_closed = false, m_eof = false;
String m_name;
- uint8_t * m_in;
+ uint8_t * m_in = NULL;
};
};
diff --git a/src/Async.cc b/src/Async.cc
new file mode 100644
index 0000000..18c8ef1
--- /dev/null
+++ b/src/Async.cc
@@ -0,0 +1,173 @@
+#include "Async.h"
+
+namespace {
+
+class AsyncStopper : public Balau::AsyncOperation {
+ public:
+ virtual bool needsSynchronousCallback() { return false; }
+ virtual void done() { delete this; }
+};
+
+};
+
+void Balau::AsyncManager::setIdleReadyCallback(void (*callback)(void *), void * param) {
+ while (!m_ready);
+ TLS * tls = getTLS();
+ tls->idleReadyCallback = callback;
+ tls->idleReadyParam = param;
+}
+
+void Balau::AsyncManager::queueOp(AsyncOperation * op) {
+ if (m_stopperPushed) {
+ Printer::elog(E_ASYNC, "AsyncManager's queue has been stopped; running operation %p on this thread instead.", op);
+ op->run();
+ op->finalize();
+ return;
+ }
+ while (!m_ready);
+ TLS * tls = getTLS();
+ Printer::elog(E_ASYNC, "Queuing operation at %p", op);
+ if (op->needsSynchronousCallback()) {
+ Printer::elog(E_ASYNC, "Operation at %p needs synchronous callback, copying values; idleQueue = %p; idleReadyCallback = %p; idleReadyParam = %p", op, &tls->idleQueue, tls->idleReadyCallback, tls->idleReadyParam);
+ op->m_idleQueue = &tls->idleQueue;
+ op->m_idleReadyCallback = tls->idleReadyCallback;
+ op->m_idleReadyParam = tls->idleReadyParam;
+ }
+ if (op->needsMainQueue())
+ m_queue.push(op);
+ else
+ m_finished.push(op);
+}
+
+void Balau::AsyncManager::checkIdle() {
+ if (m_numFinishersIdle > m_maxIdle)
+ killOneFinisher();
+ if (m_numFinishersIdle < m_minIdle)
+ startOneFinisher();
+ joinStoppedFinishers();
+}
+
+void Balau::AsyncManager::killOneFinisher() {
+ Printer::elog(E_ASYNC, "Too many workers idle (%i / %i), killing one.", m_numFinishersIdle, m_maxIdle);
+ m_finished.push(new AsyncStopper());
+}
+
+void Balau::AsyncManager::startOneFinisher() {
+ AsyncFinishWorker * worker = new AsyncFinishWorker(this, &m_finished);
+ Printer::elog(E_ASYNC, "Not enough workers idle (%i / %i), starting one at %p.", m_numFinishersIdle, m_minIdle, worker);
+ m_workers.push_back(worker);
+ m_numFinishers++;
+ worker->threadStart();
+}
+
+void Balau::AsyncManager::joinStoppedFinishers() {
+ for (auto i = m_workers.begin(); i != m_workers.end(); i++) {
+ AsyncFinishWorker * worker = *i;
+ if (!worker->m_stopped)
+ continue;
+ Printer::elog(E_ASYNC, "Joining stopped worker at %p", worker);
+ m_numFinishers--;
+ m_workers.erase(i);
+ worker->join();
+ delete worker;
+ break;
+ }
+}
+
+void * Balau::AsyncManager::proc() {
+ Printer::elog(E_ASYNC, "AsyncManager thread starting up");
+ m_tlsManager.init();
+ m_ready = true;
+ while (!m_stopping) {
+ checkIdle();
+ AsyncOperation * op = m_queue.pop();
+ Printer::elog(E_ASYNC, "AsyncManager got an operation at %p", op);
+ if (dynamic_cast<AsyncStopper *>(op)) {
+ Printer::elog(E_ASYNC, "AsyncManager got a stopper operation");
+ m_stopping = true;
+ }
+ Printer::elog(E_ASYNC, "AsyncManager running operation at %p", op);
+ op->run();
+ if (op->needsFinishWorker()) {
+ Printer::elog(E_ASYNC, "AsyncManager pushing operation at %p in the finisher's queue", op);
+ m_finished.push(op);
+ } else {
+ Printer::elog(E_ASYNC, "AsyncManager finalizing operation at %p", op);
+ op->finalize();
+ }
+ }
+ stopAllWorkers();
+
+ Printer::elog(E_ASYNC, "Async thread waits for all idle queues to empty");
+ while (Atomic::Prefetch::Decrement(&m_numTLSes)) {
+ TLS * tls = m_TLSes.pop();
+ while (!tls->idleQueue.isEmpty());
+ }
+
+ Printer::elog(E_ASYNC, "Async thread stopping");
+ return NULL;
+}
+
+void * Balau::AsyncFinishWorker::proc() {
+ Printer::elog(E_ASYNC, "AsyncFinishWorker thread starting up");
+ AsyncOperation * op;
+ while (!m_stopping) {
+ m_async->incIdle();
+ op = m_queue->pop();
+ m_async->decIdle();
+ Printer::elog(E_ASYNC, "AsyncFinishWorker got operation at %p", op);
+ if (dynamic_cast<AsyncStopper *>(op)) {
+ Printer::elog(E_ASYNC, "AsyncFinishWorker got a stopper operation");
+ m_stopping = true;
+ }
+ op->finalize();
+ }
+
+ m_stopped = true;
+ Printer::elog(E_ASYNC, "AsyncFinishWorker thread stopping");
+
+ return NULL;
+}
+
+void Balau::AsyncOperation::finalize() {
+ Printer::elog(E_ASYNC, "AsyncOperation::finalize() is finishing operation %p", this);
+ finish();
+ if (needsSynchronousCallback()) {
+ Printer::elog(E_ASYNC, "AsyncOperation::finalize() is pushing operation %p to its idle queue", this);
+ bool wasEmpty = m_idleQueue->isEmpty();
+ m_idleQueue->push(this);
+ Printer::elog(E_ASYNC, "AsyncOperation::finalize() has pushed operation %p to its idle queue; wasEmpty = %s; callback = %p", this, wasEmpty ? "true" : "false", m_idleReadyCallback);
+ if (wasEmpty && m_idleReadyCallback) {
+ Printer::elog(E_ASYNC, "AsyncOperation::finalize() is calling ready callback to wake up main loop");
+ m_idleReadyCallback(m_idleReadyParam);
+ }
+ } else {
+ Printer::elog(E_ASYNC, "AsyncOperation::finalize() is wrapping up operation %p", this);
+ done();
+ }
+}
+
+void Balau::AsyncManager::idle() {
+ Printer::elog(E_ASYNC, "AsyncManager::idle() is running");
+ while (!m_ready);
+ AsyncOperation * op;
+ TLS * tls = getTLS();
+ while ((op = tls->idleQueue.pop())) {
+ Printer::elog(E_ASYNC, "AsyncManager::idle() is wrapping up operation %p", op);
+ op->done();
+ }
+}
+
+void Balau::AsyncManager::threadExit() {
+ Printer::elog(E_ASYNC, "AsyncManager thread is being asked to stop; creating stopper");
+ if (Atomic::CmpXChgBool(&m_stopperPushed, true, false))
+ m_queue.push(new AsyncStopper());
+}
+
+void Balau::AsyncManager::stopAllWorkers() {
+ Printer::elog(E_ASYNC, "AsyncManager thread is being stopping and joining %i workers", m_numFinishers);
+ for (int i = 0; i < m_numFinishers; i++)
+ m_finished.push(new AsyncStopper());
+ for (auto worker : m_workers)
+ worker->join();
+}
diff --git a/src/BStream.cc b/src/BStream.cc
index 9420459..6c51c4f 100644
--- a/src/BStream.cc
+++ b/src/BStream.cc
@@ -3,7 +3,7 @@
static const int s_blockSize = 16 * 1024;
-Balau::BStream::BStream(const IO<Handle> & h) : m_h(h), m_buffer((uint8_t *) malloc(s_blockSize)), m_availBytes(0), m_cursor(0), m_passThru(false) {
+Balau::BStream::BStream(const IO<Handle> & h) : m_h(h), m_buffer((uint8_t *) malloc(s_blockSize)) {
AAssert(m_h->canRead(), "You can't create a buffered stream with a Handle that can't read");
m_name.set("Stream(%s)", m_h->getName());
if ((m_h.isA<Buffer>()) || (m_h.isA<BStream>()))
diff --git a/src/Input.cc b/src/Input.cc
index 46b9b94..3608cd0 100644
--- a/src/Input.cc
+++ b/src/Input.cc
@@ -47,7 +47,7 @@ static int eioStatsDone(eio_req * req) {
return 0;
}
-Balau::Input::Input(const char * fname) throw (GeneralException) : m_fd(-1), m_size(-1), m_mtime(-1) {
+Balau::Input::Input(const char * fname) throw (GeneralException) {
m_name.set("Input(%s)", fname);
m_fname = fname;
diff --git a/src/Local.cc b/src/Local.cc
index 9729c35..a9eccdd 100644
--- a/src/Local.cc
+++ b/src/Local.cc
@@ -13,10 +13,6 @@ void * Balau::TLSManager::setTLS(void * val) {
return r;
}
-void * Balau::TLSManager::createTLS() {
- return Local::create();
-}
-
static Balau::TLSManager dummyTLSManager;
Balau::TLSManager * Balau::g_tlsManager = &dummyTLSManager;
@@ -29,31 +25,31 @@ void Balau::Local::doStart() {
m_globals[m_idx] = 0;
}
-class PThreadsTLSManager : public Balau::TLSManager, public Balau::AtStart {
+class GlobalPThreadsTLSManager : public Balau::PThreadsTLSManager, public Balau::AtStart {
public:
- PThreadsTLSManager() : AtStart(0) { }
- virtual void * getTLS();
- virtual void * setTLS(void * val);
- virtual void doStart();
- private:
- pthread_key_t m_key;
+ GlobalPThreadsTLSManager() : AtStart(0) { }
+ void doStart();
};
-PThreadsTLSManager pthreadsTLSManager;
+GlobalPThreadsTLSManager pthreadsTLSManager;
+
+void GlobalPThreadsTLSManager::doStart() {
+ init();
+ Balau::g_tlsManager = this;
+}
-void PThreadsTLSManager::doStart() {
+void Balau::PThreadsTLSManager::init() {
int r;
r = pthread_key_create(&m_key, NULL);
RAssert(r == 0, "Unable to create a pthtread_key: %i", r);
- Balau::g_tlsManager = this;
}
-void * PThreadsTLSManager::getTLS() {
+void * Balau::PThreadsTLSManager::getTLS() {
return pthread_getspecific(m_key);
}
-void * PThreadsTLSManager::setTLS(void * val) {
+void * Balau::PThreadsTLSManager::setTLS(void * val) {
void * r = pthread_getspecific(m_key);
pthread_setspecific(m_key, val);
return r;
diff --git a/src/LuaTask.cc b/src/LuaTask.cc
index b023f45..7e0b256 100644
--- a/src/LuaTask.cc
+++ b/src/LuaTask.cc
@@ -11,7 +11,7 @@ class LuaTaskStopper : public Balau::LuaExecCell {
};
-Balau::LuaExecCell::LuaExecCell() : m_detached(false) {
+Balau::LuaExecCell::LuaExecCell() {
Printer::elog(E_TASK, "LuaExecCell created at %p", this);
}
diff --git a/src/Output.cc b/src/Output.cc
index c585ce5..dd0a588 100644
--- a/src/Output.cc
+++ b/src/Output.cc
@@ -47,7 +47,7 @@ static int eioStatsDone(eio_req * req) {
return 0;
}
-Balau::Output::Output(const char * fname, bool truncate) throw (GeneralException) : m_fd(-1), m_size(-1), m_mtime(-1) {
+Balau::Output::Output(const char * fname, bool truncate) throw (GeneralException) {
m_name.set("Output(%s)", fname);
m_fname = fname;
diff --git a/src/Printer.cc b/src/Printer.cc
index ee6029c..a7fee9f 100644
--- a/src/Printer.cc
+++ b/src/Printer.cc
@@ -16,7 +16,7 @@ static const char * prefixes[] = {
"(**) ",
};
-Balau::Printer::Printer() : m_verbosity(M_STATUS | M_WARNING | M_ERROR | M_ENGINE_DEBUG), m_detailledLogs(false) {
+Balau::Printer::Printer() {
#ifdef DEBUG
m_detailledLogs = true;
#endif
diff --git a/src/Socket.cc b/src/Socket.cc
index 100b94e..7511bde 100644
--- a/src/Socket.cc
+++ b/src/Socket.cc
@@ -250,7 +250,7 @@ static DNSRequest resolveName(const char * name, const char * service = NULL, st
return req;
}
-Balau::Socket::Socket() throw (GeneralException) : m_fd(socket(AF_INET6, SOCK_STREAM, 0)), m_connected(false), m_connecting(false), m_listening(false) {
+Balau::Socket::Socket() throw (GeneralException) : m_fd(socket(AF_INET6, SOCK_STREAM, 0)) {
m_name = "Socket(unconnected)";
RAssert(m_fd >= 0, "socket() returned %i", m_fd);
m_evtR = new SocketEvent(m_fd, ev::READ);
@@ -271,8 +271,9 @@ Balau::Socket::Socket() throw (GeneralException) : m_fd(socket(AF_INET6, SOCK_ST
Printer::elog(E_SOCKET, "Creating a socket at %p", this);
}
-Balau::Socket::Socket(int fd) : m_fd(fd), m_connected(true), m_connecting(false), m_listening(false) {
+Balau::Socket::Socket(int fd) : m_fd(fd) {
socklen_t len;
+ m_connected = true;
len = sizeof(m_localAddr);
getsockname(m_fd, (sockaddr *) &m_localAddr, &len);
diff --git a/src/Task.cc b/src/Task.cc
index df8eee2..4465203 100644
--- a/src/Task.cc
+++ b/src/Task.cc
@@ -37,7 +37,7 @@ void Balau::Task::setup(TaskMan * taskMan, void * stack) {
m_taskMan = taskMan;
- m_tls = g_tlsManager->createTLS();
+ m_tls = Local::createTLS();
void * oldTLS = g_tlsManager->getTLS();
g_tlsManager->setTLS(m_tls);
localTask.set(this);
@@ -310,9 +310,9 @@ void Balau::QueueBase::iPush(void * t, Events::Async * event) {
pthread_cond_signal(&m_cond);
}
-void * Balau::QueueBase::iPop(Events::Async * event) {
- m_lock.enter();
- while (!m_front) {
+void * Balau::QueueBase::iPop(Events::Async * event, bool wait) {
+ ScopeLock sl(m_lock);
+ while (!m_front && wait) {
if (event) {
Task::prepare(event);
m_lock.leave();
@@ -332,6 +332,5 @@ void * Balau::QueueBase::iPop(Events::Async * event) {
m_back = NULL;
void * t = c->m_elem;
delete c;
- m_lock.leave();
return t;
}
diff --git a/src/TaskMan.cc b/src/TaskMan.cc
index 33d07d0..177cec1 100644
--- a/src/TaskMan.cc
+++ b/src/TaskMan.cc
@@ -1,10 +1,24 @@
+#include "Async.h"
#include "TaskMan.h"
#include "Task.h"
#include "Main.h"
#include "Local.h"
+static Balau::AsyncManager s_async;
+
namespace {
+class AsyncStarter : public Balau::AtStart, Balau::AtExit {
+ public:
+ AsyncStarter() : AtStart(1000), AtExit(0) { }
+ void doStart() {
+ s_async.threadStart();
+ }
+ void doExit() {
+ s_async.join();
+ }
+};
+
class Stopper : public Balau::Task {
public:
Stopper(int code) : m_code(code) { }
@@ -14,6 +28,10 @@ class Stopper : public Balau::Task {
int m_code;
};
+};
+
+static AsyncStarter s_asyncStarter;
+
void Stopper::Do() {
getTaskMan()->stopMe(m_code);
}
@@ -22,8 +40,6 @@ const char * Stopper::getName() const {
return "Stopper";
}
-};
-
static Balau::DefaultTmpl<Balau::TaskMan> defaultTaskMan(50);
static Balau::LocalTmpl<Balau::TaskMan> localTaskMan;
@@ -137,7 +153,7 @@ void Balau::TaskMan::stopMe(int code) {
}
}
-Balau::TaskMan::TaskMan() : m_stopped(false), m_allowedToSignal(false), m_stopCode(0) {
+Balau::TaskMan::TaskMan() {
#ifndef _WIN32
coro_create(&m_returnContext, 0, 0, 0, 0);
#else
@@ -225,6 +241,8 @@ int Balau::TaskMan::mainLoop() {
if (t->getStatus() == Task::STARTING)
starting.insert(t);
+ s_async.setIdleReadyCallback(asyncIdleReady, this);
+
do {
Printer::elog(E_TASK, "TaskMan::mainLoop() at %p with m_tasks.size = %li", this, m_tasks.size());
@@ -250,6 +268,9 @@ int Balau::TaskMan::mainLoop() {
ev_run(m_loop, noWait || m_stopped ? EVRUN_NOWAIT : EVRUN_ONCE);
Printer::elog(E_TASK, "TaskMan at %p Getting out of libev main loop", this);
+ // calling async's idle
+ s_async.idle();
+
// let's check what task got stopped, and signal them
for (Task * t : stopped) {
IAssert((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED), "Task %p in stopped list but isn't stopped.", t);
@@ -327,6 +348,7 @@ int Balau::TaskMan::mainLoop() {
} while (!m_stopped);
Printer::elog(E_TASK, "TaskManager at %p stopping.", this);
+ s_async.setIdleReadyCallback(NULL, NULL);
return m_stopCode;
}
@@ -342,6 +364,10 @@ void Balau::TaskMan::iRegisterTask(Balau::Task * t, Balau::Task * stick, Events:
}
}
+void Balau::TaskMan::registerAsyncOp(Balau::AsyncOperation * op) {
+ s_async.queueOp(op);
+}
+
void Balau::TaskMan::addToPending(Balau::Task * t) {
m_pendingAdd.push(t);
m_evt.send();
diff --git a/src/Threads.cc b/src/Threads.cc
index 867013d..7b1b7bc 100644
--- a/src/Threads.cc
+++ b/src/Threads.cc
@@ -43,7 +43,7 @@ void * Balau::ThreadHelper::threadProc(void * arg) {
void * r = NULL;
bool success = false;
try {
- void * tls = g_tlsManager->createTLS();
+ void * tls = Local::createTLS();
g_tlsManager->setTLS(tls);
Balau::Thread * thread = reinterpret_cast<Balau::Thread *>(arg);
r = thread->proc();
diff --git a/src/ZHandle.cc b/src/ZHandle.cc
index 3a14d7d..4f216c7 100644
--- a/src/ZHandle.cc
+++ b/src/ZHandle.cc
@@ -1,7 +1,7 @@
#include "ZHandle.h"
#include "Task.h"
-Balau::ZStream::ZStream(const IO<Handle> & h, int level, header_t header) : m_h(h), m_detached(false), m_closed(false), m_eof(false), m_in(NULL) {
+Balau::ZStream::ZStream(const IO<Handle> & h, int level, header_t header) : m_h(h) {
m_zin.zalloc = m_zout.zalloc = NULL;
m_zin.zfree = m_zout.zfree = NULL;
m_zin.opaque = m_zout.opaque = NULL;
diff --git a/tests/test-Async.cc b/tests/test-Async.cc
new file mode 100644
index 0000000..5f93217
--- /dev/null
+++ b/tests/test-Async.cc
@@ -0,0 +1,56 @@
+#include <Async.h>
+#include <Task.h>
+#include <TaskMan.h>
+#include <Main.h>
+
+using namespace Balau;
+
+class AsyncOpTest : public AsyncOperation {
+ public:
+ AsyncOpTest(Events::Custom * evt) : m_evt(evt) { }
+ virtual void run() {
+ Printer::log(M_STATUS, "Async operation running");
+ TAssert(!m_ran);
+ TAssert(!m_finished);
+ TAssert(!m_done);
+ m_ran = true;
+ }
+ virtual void finish() {
+ Printer::log(M_STATUS, "Async operation finishing");
+ TAssert(m_ran);
+ TAssert(!m_finished);
+ TAssert(!m_done);
+ m_finished = true;
+ }
+ virtual void done() {
+ Printer::log(M_STATUS, "Async operation done");
+ TAssert(m_ran);
+ TAssert(m_finished);
+ TAssert(!m_done);
+ m_done = true;
+ m_evt->doSignal();
+ }
+ virtual bool needsFinishWorker() { return true; }
+
+ bool m_ran = false;
+ bool m_finished = false;
+ bool m_done = false;
+ Events::Custom * m_evt;
+};
+
+void MainTask::Do() {
+ Printer::log(M_STATUS, "Test::Async running.");
+
+ Events::Custom evt;
+ AsyncOpTest * op = createAsyncOp(new AsyncOpTest(&evt));
+ waitFor(&evt);
+ TAssert(!evt.gotSignal());
+ yield();
+ TAssert(evt.gotSignal());
+ TAssert(op->m_ran);
+ TAssert(op->m_finished);
+ TAssert(op->m_done);
+ delete op;
+
+ Printer::log(M_STATUS, "Test::Async passed.");
+}