diff options
-rw-r--r-- | includes/TaskMan.h | 2 | ||||
-rw-r--r-- | includes/Threads.h | 44 | ||||
-rw-r--r-- | src/Socket.cc | 2 | ||||
-rw-r--r-- | src/TaskMan.cc | 18 |
4 files changed, 43 insertions, 23 deletions
diff --git a/includes/TaskMan.h b/includes/TaskMan.h index 026ea88..1e7a2f1 100644 --- a/includes/TaskMan.h +++ b/includes/TaskMan.h @@ -51,7 +51,7 @@ class TaskMan { struct taskHasher { size_t operator()(const Task * t) const { return reinterpret_cast<uintptr_t>(t); } }; typedef gnu::hash_set<Task *, taskHasher> taskHash_t; taskHash_t m_tasks, m_signaledTasks; - Queue<Task *> m_pendingAdd; + Queue<Task> m_pendingAdd; bool m_stopped; struct ev_loop * m_loop; bool m_allowedToSignal; diff --git a/includes/Threads.h b/includes/Threads.h index 256e8e3..ca60627 100644 --- a/includes/Threads.h +++ b/includes/Threads.h @@ -1,6 +1,5 @@ #pragma once -#include <queue> #include <pthread.h> namespace Balau { @@ -40,33 +39,52 @@ class Thread { template<class T> class Queue { public: - Queue() { pthread_cond_init(&m_cond, NULL); } - ~Queue() { while (size()) pop(); pthread_cond_destroy(&m_cond); } - void push(T & t) { + Queue() : m_front(NULL), m_back(NULL) { pthread_cond_init(&m_cond, NULL); } + ~Queue() { while (!isEmpty()) pop(); pthread_cond_destroy(&m_cond); } + void push(T * t) { m_lock.enter(); - m_queue.push(t); + Cell * c = new Cell(t); + c->m_prev = m_back; + if (m_back) + m_back->m_next = c; + else + m_front = c; + m_back = c; pthread_cond_signal(&m_cond); m_lock.leave(); } - T pop() { + T * pop() { m_lock.enter(); - if (m_queue.size() == 0) + while (!m_front) pthread_cond_wait(&m_cond, &m_lock.m_lock); - T t = m_queue.front(); - m_queue.pop(); + Cell * c = m_front; + m_front = c->m_next; + if (m_front) + m_front->m_prev = NULL; + else + m_back = NULL; + T * t = c->m_elem; + delete c; m_lock.leave(); return t; } - int size() { - int r; + bool isEmpty() { + bool r; m_lock.enter(); - r = m_queue.size(); + r = !m_front; m_lock.leave(); return r; } private: - std::queue<T> m_queue; Lock m_lock; + class Cell { + public: + Cell(T * elem) : m_next(NULL), m_prev(NULL), m_elem(elem) { } + Cell * m_next, * m_prev; + T * m_elem; + }; + Cell * volatile m_front; + Cell * volatile m_back; pthread_cond_t m_cond; }; diff --git a/src/Socket.cc b/src/Socket.cc index 3ca5494..af397a6 100644 --- a/src/Socket.cc +++ b/src/Socket.cc @@ -183,7 +183,7 @@ class ResolverThread : public Balau::Thread, public Balau::AtStart, public Balau virtual void * proc(); virtual void doStart(); virtual void doExit(); - Balau::Queue<DNSRequest *> m_queue; + Balau::Queue<DNSRequest> m_queue; }; void ResolverThread::doStart() { diff --git a/src/TaskMan.cc b/src/TaskMan.cc index 393eb57..28fb0e3 100644 --- a/src/TaskMan.cc +++ b/src/TaskMan.cc @@ -34,7 +34,7 @@ class TaskScheduler : public Thread, public AtStart, public AtExit { void unregisterTaskMan(TaskMan * t); void stopAll(); private: - Queue<Task *> m_queue; + Queue<Task> m_queue; std::queue<TaskMan *> m_taskManagers; Lock m_lock; volatile bool m_stopping; @@ -80,6 +80,7 @@ void Balau::TaskScheduler::stopAll() { m_taskManagers.pop(); altQueue.push(tm); tm->addToPending(new Stopper()); + tm->m_evt.send(); } while (!altQueue.empty()) { tm = altQueue.front(); @@ -125,7 +126,9 @@ void Balau::TaskScheduler::doExit() { join(); } -void asyncDummy(ev::async & w, int revents) { } +void asyncDummy(ev::async & w, int revents) { + Balau::Printer::elog(Balau::E_TASK, "TaskMan is getting woken up..."); +} Balau::TaskMan::TaskMan() : m_stopped(false), m_allowedToSignal(false) { #ifndef _WIN32 @@ -217,11 +220,8 @@ void Balau::TaskMan::mainLoop() { noWait = true; } - // probably means we have pending tasks; or none at all, for some reason. Don't wait on it forever. - if (m_tasks.size() == 0) - noWait = true; - - if (m_pendingAdd.size() != 0) + // if we begin that loop with any pending task, just don't loop, so we can add them immediately. + if (!m_pendingAdd.isEmpty()) noWait = true; // libev's event "loop". We always runs it once though. @@ -254,8 +254,10 @@ void Balau::TaskMan::mainLoop() { m_signaledTasks.clear(); // Adding tasks that were added, maybe from other threads - while (((m_pendingAdd.size() != 0) || (m_tasks.size() == 0)) && !m_stopped) { + while (!m_pendingAdd.isEmpty()) { + Printer::elog(E_TASK, "TaskMan at %p trying to pop a task...", this); t = m_pendingAdd.pop(); + Printer::elog(E_TASK, "TaskMan at %p popped task %p...", this, t); Assert(m_tasks.find(t) == m_tasks.end()); ev_now_update(m_loop); t->setup(this, getStack()); |