summaryrefslogtreecommitdiff
path: root/includes
diff options
context:
space:
mode:
authorNicolas "Pixel" Noble <pixel@nobis-crew.org>2012-09-01 10:04:05 -0700
committerNicolas "Pixel" Noble <pixel@nobis-crew.org>2012-09-01 10:27:52 -0700
commit4a893f72cadaa875a920db8949171b002f656e43 (patch)
tree1e08a3cb6f579cab9c3c9a138d36ababdf76121b /includes
parent56d91ddd2cd42b782cde0bb3fdf4eb9ebe7597be (diff)
parent06674e57649d536cf19715524ee40c5ad4a9026d (diff)
Merge commit '06674e57649d536cf19715524ee40c5ad4a9026d'
Conflicts: includes/LuaTask.h includes/TaskMan.h includes/Threads.h src/TaskMan.cc src/Threads.cc
Diffstat (limited to 'includes')
-rw-r--r--includes/Async.h102
-rw-r--r--includes/BLua.h1
-rw-r--r--includes/BStream.h6
-rw-r--r--includes/HttpServer.h2
-rw-r--r--includes/Input.h6
-rw-r--r--includes/Local.h12
-rw-r--r--includes/LuaTask.h3
-rw-r--r--includes/Output.h6
-rw-r--r--includes/Printer.h5
-rw-r--r--includes/SimpleMustache.h6
-rw-r--r--includes/Socket.h6
-rw-r--r--includes/Task.h25
-rw-r--r--includes/TaskMan.h21
-rw-r--r--includes/ZHandle.h4
14 files changed, 170 insertions, 35 deletions
diff --git a/includes/Async.h b/includes/Async.h
new file mode 100644
index 0000000..15e0ac7
--- /dev/null
+++ b/includes/Async.h
@@ -0,0 +1,102 @@
+#pragma once
+
+#include <Atomic.h>
+#include <Exceptions.h>
+#include <Local.h>
+#include <Threads.h>
+
+namespace Balau {
+
+class AsyncManager;
+class AsyncFinishWorker;
+
+typedef void (*IdleReadyCallback_t)(void *);
+
+class AsyncOperation {
+ protected:
+ virtual void run() { }
+ virtual void finish() { }
+ virtual void done() { }
+ virtual bool needsMainQueue() { return true; }
+ virtual bool needsFinishWorker() { return false; }
+ virtual bool needsSynchronousCallback() { return true; }
+ protected:
+ virtual ~AsyncOperation() { }
+ private:
+ CQueue<AsyncOperation> * m_idleQueue = NULL;
+ IdleReadyCallback_t m_idleReadyCallback = NULL;
+ void * m_idleReadyParam = NULL;
+ void finalize();
+
+ friend class AsyncManager;
+ friend class AsyncFinishWorker;
+};
+
+class AsyncFinishWorker : public Thread {
+ public:
+ AsyncFinishWorker(AsyncManager * async, Queue<AsyncOperation> * queue) : m_async(async), m_queue(queue) { }
+ virtual void * proc();
+ AsyncManager * m_async;
+ Queue<AsyncOperation> * m_queue;
+ bool m_stopping = false;
+ volatile bool m_stopped = false;
+};
+
+class AsyncManager : public Thread {
+ public:
+ void setFinishers(int minIdle, int maxIdle) {
+ AAssert(minIdle < maxIdle, "Minimum number of threads needs to be less than maximum number of threads.");
+ m_minIdle = minIdle;
+ m_maxIdle = maxIdle;
+ }
+ void setIdleReadyCallback(IdleReadyCallback_t idleReadyCallback, void * param);
+ void queueOp(AsyncOperation * op);
+ void idle();
+ bool isReady() { return m_ready; }
+
+ protected:
+ virtual void threadExit();
+
+ private:
+ void checkIdle();
+ void killOneFinisher();
+ void startOneFinisher();
+ void joinStoppedFinishers();
+ void stopAllWorkers();
+ virtual void * proc();
+ struct TLS {
+ CQueue<AsyncOperation> idleQueue;
+ IdleReadyCallback_t idleReadyCallback;
+ void * idleReadyParam;
+ };
+ TLS * getTLS() {
+ TLS * tls = (TLS *) m_tlsManager.getTLS();
+ if (!tls) {
+ tls = new TLS();
+ m_tlsManager.setTLS(tls);
+ m_TLSes.push(tls);
+ Atomic::Increment(&m_numTLSes);
+ }
+ return tls;
+ }
+ Queue<AsyncOperation> m_queue;
+ Queue<AsyncOperation> m_finished;
+ Queue<TLS> m_TLSes;
+ volatile int m_numTLSes = 0;
+ PThreadsTLSManager m_tlsManager;
+ std::list<AsyncFinishWorker *> m_workers;
+ int m_numFinishers = 0;
+ int m_numFinishersIdle = 0;
+ int m_minIdle = 1;
+ int m_maxIdle = 4;
+ bool m_stopping = false;
+ volatile bool m_ready = false;
+ volatile bool m_stopperPushed = false;
+
+ void incIdle() { Atomic::Increment(&m_numFinishersIdle); }
+ void decIdle() { Atomic::Decrement(&m_numFinishersIdle); }
+
+ friend class AsyncFinishWorker;
+};
+
+};
diff --git a/includes/BLua.h b/includes/BLua.h
index be78660..12e921f 100644
--- a/includes/BLua.h
+++ b/includes/BLua.h
@@ -20,6 +20,7 @@ class LuaObject {
class LuaObjectFactory {
public:
LuaObjectFactory() : m_wantsDestruct(false), m_pushed(false) { }
+ virtual ~LuaObjectFactory() { }
virtual void push(Lua & L);
void pushDestruct(Lua & L);
template<class T>
diff --git a/includes/BStream.h b/includes/BStream.h
index 86b0e0f..d412782 100644
--- a/includes/BStream.h
+++ b/includes/BStream.h
@@ -21,10 +21,10 @@ class BStream : public Handle {
private:
IO<Handle> m_h;
uint8_t * m_buffer;
- size_t m_availBytes;
- size_t m_cursor;
+ size_t m_availBytes = 0;
+ size_t m_cursor = 0;
String m_name;
- bool m_passThru;
+ bool m_passThru = false;
bool m_detached;
bool m_closed;
};
diff --git a/includes/HttpServer.h b/includes/HttpServer.h
index 7dbf912..6c28910 100644
--- a/includes/HttpServer.h
+++ b/includes/HttpServer.h
@@ -44,7 +44,7 @@ class HttpServer {
class Action {
public:
Action(const Regex & regex, const Regex & host = Regexes::any) : m_regex(regex), m_host(host), m_refCount(0) { }
- ~Action() { AAssert(m_refCount == 0, "Don't delete an Action directly"); }
+ virtual ~Action() { AAssert(m_refCount == 0, "Don't delete an Action directly"); }
struct ActionMatch {
Regex::Captures uri, host;
};
diff --git a/includes/Input.h b/includes/Input.h
index 9d45baa..e0ad329 100644
--- a/includes/Input.h
+++ b/includes/Input.h
@@ -16,11 +16,11 @@ class Input : public SeekableHandle {
virtual time_t getMTime();
const char * getFName() { return m_fname.to_charp(); }
private:
- int m_fd;
+ int m_fd = -1;
String m_name;
String m_fname;
- off_t m_size;
- time_t m_mtime;
+ off_t m_size = -1;
+ time_t m_mtime = -1;
};
};
diff --git a/includes/Local.h b/includes/Local.h
index 6a598ab..9221384 100644
--- a/includes/Local.h
+++ b/includes/Local.h
@@ -8,7 +8,15 @@ class TLSManager {
public:
virtual void * getTLS();
virtual void * setTLS(void * val);
- void * createTLS();
+};
+
+class PThreadsTLSManager : public TLSManager {
+ public:
+ virtual void * getTLS();
+ virtual void * setTLS(void * val);
+ void init();
+ private:
+ pthread_key_t m_key;
};
extern TLSManager * g_tlsManager;
@@ -16,6 +24,7 @@ extern TLSManager * g_tlsManager;
class Local : public AtStart {
public:
static int getSize() { return s_size; }
+ static void * createTLS() { void * r = calloc(s_size * sizeof(void *), 1); return r; }
protected:
Local() : AtStart(0) { }
void * getGlobal() { return m_globals[m_idx]; }
@@ -26,7 +35,6 @@ class Local : public AtStart {
void set(void * obj) { void * r = getTLS(); if (r) setLocal(obj); else setGlobal(obj); }
int getIndex() { return m_idx; }
private:
- static void * create() { void * r = calloc(s_size * sizeof(void *), 1); return r; }
static void * getTLS() { return g_tlsManager->getTLS(); }
static void * setTLS(void * val) { return g_tlsManager->setTLS(val); }
virtual void doStart();
diff --git a/includes/LuaTask.h b/includes/LuaTask.h
index 0d763de..37d64fa 100644
--- a/includes/LuaTask.h
+++ b/includes/LuaTask.h
@@ -12,13 +12,14 @@ class LuaMainTask;
class LuaExecCell {
public:
LuaExecCell();
+ virtual ~LuaExecCell() { }
void detach() { m_detached = true; }
void exec(LuaMainTask * mainTask);
protected:
virtual void run(Lua &) = 0;
private:
Events::Async m_event;
- bool m_detached;
+ bool m_detached = false;
friend class LuaTask;
};
diff --git a/includes/Output.h b/includes/Output.h
index 59d9d67..d771227 100644
--- a/includes/Output.h
+++ b/includes/Output.h
@@ -16,11 +16,11 @@ class Output : public SeekableHandle {
virtual time_t getMTime();
const char * getFName() { return m_fname.to_charp(); }
private:
- int m_fd;
+ int m_fd = -1;
String m_name;
String m_fname;
- off_t m_size;
- time_t m_mtime;
+ off_t m_size = -1;
+ time_t m_mtime = -1;
};
};
diff --git a/includes/Printer.h b/includes/Printer.h
index b4001f1..8fd3674 100644
--- a/includes/Printer.h
+++ b/includes/Printer.h
@@ -38,6 +38,7 @@ enum {
E_THREAD = 64,
E_OUTPUT = 128,
E_HTTPSERVER = 256,
+ E_ASYNC = 512,
};
class Printer {
@@ -74,8 +75,8 @@ class Printer {
static void setDetailled(bool enable) { getPrinter()->m_detailledLogs = enable; }
private:
- uint32_t m_verbosity;
- bool m_detailledLogs;
+ uint32_t m_verbosity = M_STATUS | M_WARNING | M_ERROR | M_ENGINE_DEBUG;
+ bool m_detailledLogs = false;
};
};
diff --git a/includes/SimpleMustache.h b/includes/SimpleMustache.h
index 52a2603..28cb477 100644
--- a/includes/SimpleMustache.h
+++ b/includes/SimpleMustache.h
@@ -22,7 +22,7 @@ class SimpleMustache {
friend class Context;
};
- Context() : m_type(CONTEXTLIST), m_root(true) { }
+ Context() { }
~Context() { empty(); }
Proxy operator[](ssize_t idx) { ensureList(); return Proxy(this, idx); }
Context & operator[](const char * str);
@@ -55,7 +55,7 @@ class SimpleMustache {
BOOLSEC,
CONTEXTLIST,
LAMBDA,
- } m_type;
+ } m_type = CONTEXTLIST;
Context(ContextType type) : m_type(type), m_root(false) { }
Context(Context & c) { Failure("You can't copy a Context; use references"); }
Context & operator=(Context & c) { Failure("You can't assign a Context; use references"); return *this; }
@@ -64,7 +64,7 @@ class SimpleMustache {
typedef std::map<String, Context *> SubContext;
typedef std::vector<SubContext> ContextList;
ContextList m_contextList;
- bool m_root;
+ bool m_root = true;
void empty(bool skipFirst = false);
void ensureList(bool single = false);
diff --git a/includes/Socket.h b/includes/Socket.h
index b113e43..e5a9f21 100644
--- a/includes/Socket.h
+++ b/includes/Socket.h
@@ -50,9 +50,9 @@ class Socket : public Handle {
int m_fd;
String m_name;
- bool m_connected;
- bool m_connecting;
- bool m_listening;
+ bool m_connected = false;
+ bool m_connecting = false;
+ bool m_listening = false;
sockaddr_in6 m_localAddr, m_remoteAddr;
SocketEvent * m_evtR, * m_evtW;
};
diff --git a/includes/Task.h b/includes/Task.h
index 9347dc4..86bbce1 100644
--- a/includes/Task.h
+++ b/includes/Task.h
@@ -32,6 +32,8 @@ class Task;
namespace Events {
class Callback {
+ public:
+ virtual ~Callback() { }
protected:
virtual void gotEvent(BaseEvent *) = 0;
friend class BaseEvent;
@@ -210,23 +212,23 @@ class QueueBase {
public:
bool isEmpty() { ScopeLock sl(m_lock); return !m_front; }
protected:
- QueueBase() : m_front(NULL), m_back(NULL) { pthread_cond_init(&m_cond, NULL); }
- ~QueueBase() { while (!isEmpty()) iPop(NULL); pthread_cond_destroy(&m_cond); }
+ QueueBase() { pthread_cond_init(&m_cond, NULL); }
+ ~QueueBase() { while (!isEmpty()) iPop(NULL, false); pthread_cond_destroy(&m_cond); }
void iPush(void * t, Events::Async * event);
- void * iPop(Events::Async * event);
+ void * iPop(Events::Async * event, bool wait);
private:
QueueBase(const QueueBase &) = delete;
QueueBase & operator=(const QueueBase &) = delete;
Lock m_lock;
struct Cell {
- Cell(void * elem) : m_next(NULL), m_prev(NULL), m_elem(elem) { }
+ Cell(void * elem) : m_elem(elem) { }
Cell(const Cell &) = delete;
Cell & operator=(const Cell &) = delete;
- Cell * m_next, * m_prev;
+ Cell * m_next = NULL, * m_prev = NULL;
void * m_elem;
};
- Cell * m_front, * m_back;
+ Cell * m_front = NULL, * m_back = NULL;
pthread_cond_t m_cond;
};
@@ -234,16 +236,23 @@ template<class T>
class Queue : public QueueBase {
public:
void push(T * t) { iPush(t, NULL); }
- T * pop() { return (T *) iPop(NULL); }
+ T * pop() { return (T *) iPop(NULL, true); }
};
template<class T>
class TQueue : public QueueBase {
public:
void push(T * t) { iPush(t, &m_event); }
- T * pop() { return (T *) iPop(&m_event); }
+ T * pop() { return (T *) iPop(&m_event, true); }
private:
Events::Async m_event;
};
+template<class T>
+class CQueue : public QueueBase {
+ public:
+ void push(T * t) { iPush(t, NULL); }
+ T * pop() { return (T *) iPop(NULL, false); }
+};
+
};
diff --git a/includes/TaskMan.h b/includes/TaskMan.h
index 35e60ed..2b2742d 100644
--- a/includes/TaskMan.h
+++ b/includes/TaskMan.h
@@ -7,6 +7,7 @@
#include <ev++.h>
#include <ext/hash_set>
#include <queue>
+#include <Async.h>
#include <Threads.h>
#include <Exceptions.h>
#include <Task.h>
@@ -19,7 +20,6 @@ class TaskScheduler;
namespace Events {
-class Async;
class TaskEvent;
};
@@ -61,9 +61,17 @@ class TaskMan {
private:
static void iRegisterTask(Task * t, Task * stick, Events::TaskEvent * event);
+ static void registerAsyncOp(AsyncOperation * op);
void * getStack();
void freeStack(void * stack);
void addToPending(Task * t);
+ static void asyncIdleReady(void * param) {
+ TaskMan * taskMan = (TaskMan *) param;
+ taskMan->asyncIdleReady();
+ }
+ void asyncIdleReady() {
+ m_evt.send();
+ }
#ifndef _WIN32
coro_context m_returnContext;
#else
@@ -71,17 +79,22 @@ class TaskMan {
#endif
friend class Task;
friend class TaskScheduler;
+ template<class T>
+ friend T * createAsyncOp(T * op);
struct taskHasher { size_t operator()(const Task * t) const { return reinterpret_cast<uintptr_t>(t); } };
typedef gnu::hash_set<Task *, taskHasher> taskHash_t;
taskHash_t m_tasks, m_signaledTasks;
Queue<Task> m_pendingAdd;
- bool m_stopped;
struct ev_loop * m_loop;
- bool m_allowedToSignal;
ev::async m_evt;
std::queue<void *> m_stacks;
int m_nStacks;
- int m_stopCode;
+ int m_stopCode = 0;
+ bool m_stopped = false;
+ bool m_allowedToSignal = false;
};
+template<class T>
+T * createAsyncOp(T * op) { TaskMan::registerAsyncOp(op); return op; }
+
};
diff --git a/includes/ZHandle.h b/includes/ZHandle.h
index 048e0cb..e972466 100644
--- a/includes/ZHandle.h
+++ b/includes/ZHandle.h
@@ -28,9 +28,9 @@ class ZStream : public Handle {
void doFlush(bool finish);
IO<Handle> m_h;
z_stream m_zin, m_zout;
- bool m_detached, m_closed, m_eof;
+ bool m_detached = false, m_closed = false, m_eof = false;
String m_name;
- uint8_t * m_in;
+ uint8_t * m_in = NULL;
};
};