summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--includes/TaskMan.h2
-rw-r--r--includes/Threads.h44
-rw-r--r--src/Socket.cc2
-rw-r--r--src/TaskMan.cc18
4 files changed, 43 insertions, 23 deletions
diff --git a/includes/TaskMan.h b/includes/TaskMan.h
index 026ea88..1e7a2f1 100644
--- a/includes/TaskMan.h
+++ b/includes/TaskMan.h
@@ -51,7 +51,7 @@ class TaskMan {
struct taskHasher { size_t operator()(const Task * t) const { return reinterpret_cast<uintptr_t>(t); } };
typedef gnu::hash_set<Task *, taskHasher> taskHash_t;
taskHash_t m_tasks, m_signaledTasks;
- Queue<Task *> m_pendingAdd;
+ Queue<Task> m_pendingAdd;
bool m_stopped;
struct ev_loop * m_loop;
bool m_allowedToSignal;
diff --git a/includes/Threads.h b/includes/Threads.h
index 256e8e3..ca60627 100644
--- a/includes/Threads.h
+++ b/includes/Threads.h
@@ -1,6 +1,5 @@
#pragma once
-#include <queue>
#include <pthread.h>
namespace Balau {
@@ -40,33 +39,52 @@ class Thread {
template<class T>
class Queue {
public:
- Queue() { pthread_cond_init(&m_cond, NULL); }
- ~Queue() { while (size()) pop(); pthread_cond_destroy(&m_cond); }
- void push(T & t) {
+ Queue() : m_front(NULL), m_back(NULL) { pthread_cond_init(&m_cond, NULL); }
+ ~Queue() { while (!isEmpty()) pop(); pthread_cond_destroy(&m_cond); }
+ void push(T * t) {
m_lock.enter();
- m_queue.push(t);
+ Cell * c = new Cell(t);
+ c->m_prev = m_back;
+ if (m_back)
+ m_back->m_next = c;
+ else
+ m_front = c;
+ m_back = c;
pthread_cond_signal(&m_cond);
m_lock.leave();
}
- T pop() {
+ T * pop() {
m_lock.enter();
- if (m_queue.size() == 0)
+ while (!m_front)
pthread_cond_wait(&m_cond, &m_lock.m_lock);
- T t = m_queue.front();
- m_queue.pop();
+ Cell * c = m_front;
+ m_front = c->m_next;
+ if (m_front)
+ m_front->m_prev = NULL;
+ else
+ m_back = NULL;
+ T * t = c->m_elem;
+ delete c;
m_lock.leave();
return t;
}
- int size() {
- int r;
+ bool isEmpty() {
+ bool r;
m_lock.enter();
- r = m_queue.size();
+ r = !m_front;
m_lock.leave();
return r;
}
private:
- std::queue<T> m_queue;
Lock m_lock;
+ class Cell {
+ public:
+ Cell(T * elem) : m_next(NULL), m_prev(NULL), m_elem(elem) { }
+ Cell * m_next, * m_prev;
+ T * m_elem;
+ };
+ Cell * volatile m_front;
+ Cell * volatile m_back;
pthread_cond_t m_cond;
};
diff --git a/src/Socket.cc b/src/Socket.cc
index 3ca5494..af397a6 100644
--- a/src/Socket.cc
+++ b/src/Socket.cc
@@ -183,7 +183,7 @@ class ResolverThread : public Balau::Thread, public Balau::AtStart, public Balau
virtual void * proc();
virtual void doStart();
virtual void doExit();
- Balau::Queue<DNSRequest *> m_queue;
+ Balau::Queue<DNSRequest> m_queue;
};
void ResolverThread::doStart() {
diff --git a/src/TaskMan.cc b/src/TaskMan.cc
index 393eb57..28fb0e3 100644
--- a/src/TaskMan.cc
+++ b/src/TaskMan.cc
@@ -34,7 +34,7 @@ class TaskScheduler : public Thread, public AtStart, public AtExit {
void unregisterTaskMan(TaskMan * t);
void stopAll();
private:
- Queue<Task *> m_queue;
+ Queue<Task> m_queue;
std::queue<TaskMan *> m_taskManagers;
Lock m_lock;
volatile bool m_stopping;
@@ -80,6 +80,7 @@ void Balau::TaskScheduler::stopAll() {
m_taskManagers.pop();
altQueue.push(tm);
tm->addToPending(new Stopper());
+ tm->m_evt.send();
}
while (!altQueue.empty()) {
tm = altQueue.front();
@@ -125,7 +126,9 @@ void Balau::TaskScheduler::doExit() {
join();
}
-void asyncDummy(ev::async & w, int revents) { }
+void asyncDummy(ev::async & w, int revents) {
+ Balau::Printer::elog(Balau::E_TASK, "TaskMan is getting woken up...");
+}
Balau::TaskMan::TaskMan() : m_stopped(false), m_allowedToSignal(false) {
#ifndef _WIN32
@@ -217,11 +220,8 @@ void Balau::TaskMan::mainLoop() {
noWait = true;
}
- // probably means we have pending tasks; or none at all, for some reason. Don't wait on it forever.
- if (m_tasks.size() == 0)
- noWait = true;
-
- if (m_pendingAdd.size() != 0)
+ // if we begin that loop with any pending task, just don't loop, so we can add them immediately.
+ if (!m_pendingAdd.isEmpty())
noWait = true;
// libev's event "loop". We always runs it once though.
@@ -254,8 +254,10 @@ void Balau::TaskMan::mainLoop() {
m_signaledTasks.clear();
// Adding tasks that were added, maybe from other threads
- while (((m_pendingAdd.size() != 0) || (m_tasks.size() == 0)) && !m_stopped) {
+ while (!m_pendingAdd.isEmpty()) {
+ Printer::elog(E_TASK, "TaskMan at %p trying to pop a task...", this);
t = m_pendingAdd.pop();
+ Printer::elog(E_TASK, "TaskMan at %p popped task %p...", this, t);
Assert(m_tasks.find(t) == m_tasks.end());
ev_now_update(m_loop);
t->setup(this, getStack());