diff options
Diffstat (limited to 'includes/Task.h')
-rw-r--r-- | includes/Task.h | 94 |
1 files changed, 89 insertions, 5 deletions
diff --git a/includes/Task.h b/includes/Task.h index b3f395a..86bbce1 100644 --- a/includes/Task.h +++ b/includes/Task.h @@ -21,6 +21,11 @@ class EAgain : public GeneralException { Events::BaseEvent * m_evt; }; +class TaskSwitch : public GeneralException { + public: + TaskSwitch() : GeneralException("Task Switch") { } +}; + class TaskMan; class Task; @@ -64,6 +69,7 @@ class BaseEvent { class Timeout : public BaseEvent { public: + Timeout() { } Timeout(ev_tstamp tstamp) { set(tstamp); } virtual ~Timeout() { m_evt.stop(); } void evt_cb(ev::timer & w, int revents) { doSignal(); } @@ -75,11 +81,12 @@ class Timeout : public BaseEvent { class TaskEvent : public BaseEvent { public: - TaskEvent(Task * taskWaited); + TaskEvent(Task * taskWaited = NULL); virtual ~TaskEvent(); void ack(); void signal(); Task * taskWaited() { return m_taskWaited; } + void attachToTask(Task * taskWaited); void evt_cb(ev::async & w, int revents) { doSignal(); } protected: virtual void gotOwner(Task * task); @@ -121,11 +128,22 @@ class Task { enum Status { STARTING, RUNNING, - IDLE, + SLEEPING, STOPPED, FAULTED, YIELDED, }; + static const char * StatusToString(enum Status status) { + static const char * strs[] = { + "STARTING", + "RUNNING", + "SLEEPING", + "STOPPED", + "FAULTED", + "YIELDED", + }; + return strs[status]; + }; Task(); virtual ~Task(); virtual const char * getName() const = 0; @@ -135,18 +153,37 @@ class Task { Task * t = getCurrentTask(); t->waitFor(evt); } - static void yield(Events::BaseEvent * evt, bool interruptible = false) throw (GeneralException); + enum OperationYieldType { + SIMPLE, + INTERRUPTIBLE, + STACKLESS, + }; + static void operationYield(Events::BaseEvent * evt = NULL, enum OperationYieldType yieldType = SIMPLE) throw (GeneralException); TaskMan * getTaskMan() const { return m_taskMan; } struct ev_loop * getLoop(); + bool isStackless() { return m_stackless; } protected: - void yield(bool changeStatus = false); + void yield(bool stillRunning = false) throw (GeneralException); + void yield(Events::BaseEvent * evt) { + waitFor(evt); + yield(); + } virtual void Do() = 0; void waitFor(Events::BaseEvent * event); bool setOkayToEAgain(bool enable) { + if (m_stackless) { + AAssert(enable, "You can't make a task go not-okay-to-eagain if it's stackless."); + } bool oldValue = m_okayToEAgain; m_okayToEAgain = enable; return oldValue; } + void setStackless() { + AAssert(!m_stackless, "Can't set a task to be stackless twice"); + AAssert(m_status == STARTING, "Can't set a task to be stackless after it started. status = %s", StatusToString(m_status)); + m_stackless = true; + m_okayToEAgain = true; + } private: static size_t stackSize() { return 64 * 1024; } void setup(TaskMan * taskMan, void * stack); @@ -168,7 +205,54 @@ class Task { Lock m_eventLock; typedef std::list<Events::TaskEvent *> waitedByList_t; waitedByList_t m_waitedBy; - bool m_okayToEAgain; + bool m_okayToEAgain, m_stackless; +}; + +class QueueBase { + public: + bool isEmpty() { ScopeLock sl(m_lock); return !m_front; } + protected: + QueueBase() { pthread_cond_init(&m_cond, NULL); } + ~QueueBase() { while (!isEmpty()) iPop(NULL, false); pthread_cond_destroy(&m_cond); } + void iPush(void * t, Events::Async * event); + void * iPop(Events::Async * event, bool wait); + + private: + QueueBase(const QueueBase &) = delete; + QueueBase & operator=(const QueueBase &) = delete; + Lock m_lock; + struct Cell { + Cell(void * elem) : m_elem(elem) { } + Cell(const Cell &) = delete; + Cell & operator=(const Cell &) = delete; + Cell * m_next = NULL, * m_prev = NULL; + void * m_elem; + }; + Cell * m_front = NULL, * m_back = NULL; + pthread_cond_t m_cond; +}; + +template<class T> +class Queue : public QueueBase { + public: + void push(T * t) { iPush(t, NULL); } + T * pop() { return (T *) iPop(NULL, true); } +}; + +template<class T> +class TQueue : public QueueBase { + public: + void push(T * t) { iPush(t, &m_event); } + T * pop() { return (T *) iPop(&m_event, true); } + private: + Events::Async m_event; +}; + +template<class T> +class CQueue : public QueueBase { + public: + void push(T * t) { iPush(t, NULL); } + T * pop() { return (T *) iPop(NULL, false); } }; }; |