summaryrefslogtreecommitdiff
path: root/includes/Task.h
diff options
context:
space:
mode:
Diffstat (limited to 'includes/Task.h')
-rw-r--r--includes/Task.h94
1 files changed, 89 insertions, 5 deletions
diff --git a/includes/Task.h b/includes/Task.h
index b3f395a..86bbce1 100644
--- a/includes/Task.h
+++ b/includes/Task.h
@@ -21,6 +21,11 @@ class EAgain : public GeneralException {
Events::BaseEvent * m_evt;
};
+class TaskSwitch : public GeneralException {
+ public:
+ TaskSwitch() : GeneralException("Task Switch") { }
+};
+
class TaskMan;
class Task;
@@ -64,6 +69,7 @@ class BaseEvent {
class Timeout : public BaseEvent {
public:
+ Timeout() { }
Timeout(ev_tstamp tstamp) { set(tstamp); }
virtual ~Timeout() { m_evt.stop(); }
void evt_cb(ev::timer & w, int revents) { doSignal(); }
@@ -75,11 +81,12 @@ class Timeout : public BaseEvent {
class TaskEvent : public BaseEvent {
public:
- TaskEvent(Task * taskWaited);
+ TaskEvent(Task * taskWaited = NULL);
virtual ~TaskEvent();
void ack();
void signal();
Task * taskWaited() { return m_taskWaited; }
+ void attachToTask(Task * taskWaited);
void evt_cb(ev::async & w, int revents) { doSignal(); }
protected:
virtual void gotOwner(Task * task);
@@ -121,11 +128,22 @@ class Task {
enum Status {
STARTING,
RUNNING,
- IDLE,
+ SLEEPING,
STOPPED,
FAULTED,
YIELDED,
};
+ static const char * StatusToString(enum Status status) {
+ static const char * strs[] = {
+ "STARTING",
+ "RUNNING",
+ "SLEEPING",
+ "STOPPED",
+ "FAULTED",
+ "YIELDED",
+ };
+ return strs[status];
+ };
Task();
virtual ~Task();
virtual const char * getName() const = 0;
@@ -135,18 +153,37 @@ class Task {
Task * t = getCurrentTask();
t->waitFor(evt);
}
- static void yield(Events::BaseEvent * evt, bool interruptible = false) throw (GeneralException);
+ enum OperationYieldType {
+ SIMPLE,
+ INTERRUPTIBLE,
+ STACKLESS,
+ };
+ static void operationYield(Events::BaseEvent * evt = NULL, enum OperationYieldType yieldType = SIMPLE) throw (GeneralException);
TaskMan * getTaskMan() const { return m_taskMan; }
struct ev_loop * getLoop();
+ bool isStackless() { return m_stackless; }
protected:
- void yield(bool changeStatus = false);
+ void yield(bool stillRunning = false) throw (GeneralException);
+ void yield(Events::BaseEvent * evt) {
+ waitFor(evt);
+ yield();
+ }
virtual void Do() = 0;
void waitFor(Events::BaseEvent * event);
bool setOkayToEAgain(bool enable) {
+ if (m_stackless) {
+ AAssert(enable, "You can't make a task go not-okay-to-eagain if it's stackless.");
+ }
bool oldValue = m_okayToEAgain;
m_okayToEAgain = enable;
return oldValue;
}
+ void setStackless() {
+ AAssert(!m_stackless, "Can't set a task to be stackless twice");
+ AAssert(m_status == STARTING, "Can't set a task to be stackless after it started. status = %s", StatusToString(m_status));
+ m_stackless = true;
+ m_okayToEAgain = true;
+ }
private:
static size_t stackSize() { return 64 * 1024; }
void setup(TaskMan * taskMan, void * stack);
@@ -168,7 +205,54 @@ class Task {
Lock m_eventLock;
typedef std::list<Events::TaskEvent *> waitedByList_t;
waitedByList_t m_waitedBy;
- bool m_okayToEAgain;
+ bool m_okayToEAgain, m_stackless;
+};
+
+class QueueBase {
+ public:
+ bool isEmpty() { ScopeLock sl(m_lock); return !m_front; }
+ protected:
+ QueueBase() { pthread_cond_init(&m_cond, NULL); }
+ ~QueueBase() { while (!isEmpty()) iPop(NULL, false); pthread_cond_destroy(&m_cond); }
+ void iPush(void * t, Events::Async * event);
+ void * iPop(Events::Async * event, bool wait);
+
+ private:
+ QueueBase(const QueueBase &) = delete;
+ QueueBase & operator=(const QueueBase &) = delete;
+ Lock m_lock;
+ struct Cell {
+ Cell(void * elem) : m_elem(elem) { }
+ Cell(const Cell &) = delete;
+ Cell & operator=(const Cell &) = delete;
+ Cell * m_next = NULL, * m_prev = NULL;
+ void * m_elem;
+ };
+ Cell * m_front = NULL, * m_back = NULL;
+ pthread_cond_t m_cond;
+};
+
+template<class T>
+class Queue : public QueueBase {
+ public:
+ void push(T * t) { iPush(t, NULL); }
+ T * pop() { return (T *) iPop(NULL, true); }
+};
+
+template<class T>
+class TQueue : public QueueBase {
+ public:
+ void push(T * t) { iPush(t, &m_event); }
+ T * pop() { return (T *) iPop(&m_event, true); }
+ private:
+ Events::Async m_event;
+};
+
+template<class T>
+class CQueue : public QueueBase {
+ public:
+ void push(T * t) { iPush(t, NULL); }
+ T * pop() { return (T *) iPop(NULL, false); }
};
};