diff options
Diffstat (limited to 'include')
-rw-r--r-- | include/Atomic.h | 227 |
1 files changed, 163 insertions, 64 deletions
diff --git a/include/Atomic.h b/include/Atomic.h index 3b325f2..1137d27 100644 --- a/include/Atomic.h +++ b/include/Atomic.h @@ -1,6 +1,9 @@ #ifndef __ATOMIC_H__ #define __ATOMIC_H__ +#include <deque> +#include <Exceptions.h> + namespace Atomic { #if (__GNUC__ >= 5) || ((__GNUC__ == 4) && ((__GNUC_MINOR__ >= 1))) @@ -21,10 +24,10 @@ template <class T> T Increment(volatile T * ptr, T delta = 1) { __sync_fetch_and template <class T> T Decrement(volatile T * ptr, T delta = 1) { __sync_fetch_and_sub(ptr, delta); } }; -template <class T> T CmpXChgVal(volatile T * ptr, T xch, T cmp) { return __sync_val_compare_and_swap(ptr, cmp, xch); } -template <class T> bool CmpXChgBool(volatile T * ptr, T xch, T cmp) { return __sync_bool_compare_and_swap(ptr, cmp, xch); } +template <class T> T CmpXChgVal(volatile T * ptr, const T xch, const T cmp) { return __sync_val_compare_and_swap(ptr, cmp, xch); } +template <class T> bool CmpXChgBool(volatile T * ptr, const T xch, const T cmp) { return __sync_bool_compare_and_swap(ptr, cmp, xch); } -template <class T> T Exchange32(volatile T * ptr, T exchange) { +template <class T> T Exchange32(volatile T * ptr, const T exchange) { #if defined(i386) || defined (__x86_64) __asm__ __volatile__("lock xchgl %0, (%1)" : "+r"(exchange) : "r"(ptr)); return exchange; @@ -35,7 +38,7 @@ template <class T> T Exchange32(volatile T * ptr, T exchange) { #endif } -template <class T> T Exchange64(volatile T * ptr, T exchange) { +template <class T> T Exchange64(volatile T * ptr, const T exchange) { #if defined(i386) || defined (__x86_64) __asm__ __volatile__("lock xchgq %0, (%1)" : "+r"(exchange) : "r"(ptr)); return exchange; @@ -50,14 +53,14 @@ template <class T> T Exchange64(volatile T * ptr, T exchange) { #ifdef _MSVC // Visual Studio version of the atomic operations -#error Not yet implemented... and probably never will -.- +#error MSVC not yet implemented... and probably never will -.- #else #error No known platform for atomic operations. #endif #endif -template <class T> T * ExchangePtr(T * volatile * ptr, T * exchange) { +template <class T> T * ExchangePtr(T * volatile * ptr, const T * exchange) { #if defined (__x86_64) return Exchange64(ptr, exchange); #else @@ -65,72 +68,168 @@ template <class T> T * ExchangePtr(T * volatile * ptr, T * exchange) { #endif } -// Based on "Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms" -template <class T> +// This is a multiple producers / single consumer queue system. Stolen from the Fugue VM. + +template<class T> class Queue { + private: + struct ptr_t; + struct node_t { + node_t() : next(0), val(0) { } + explicit node_t(T * val) : next(0), val(val) { } + node_t * next; + T * val; + ptr_t * ptr; + }; + + struct ptr_t { + ptr_t() : next(0), node(0) { } + explicit ptr_t(node_t * node) : next(0), node(node) { } + ptr_t(ptr_t * next, node_t * node) : next(next), node(node) { } + ptr_t * next; + node_t * node; + }; + public: - Queue() : Head(new node_t()), Tail(Head) { } - void enqueue(const T & val) { - node_t * node = new node_t(val); - ptr_t tail, next; - while(true) { - tail = Tail; - next = tail.ptr->next; - if (tail == Tail) { - if (next.ptr == NULL) { - ptr_t new_ptr(node, next.count + 1); - if (CmpXChgBool(&tail.ptr->next, new_ptr, next)) - break; - } else { - ptr_t new_ptr(next.ptr, tail.count + 1); - CmpXChgBool(&Tail, new_ptr, tail); - } - } + static const int NUMSLOTS = 16384; + Queue() { + unsigned int i; + + WHead = OWHead = new node_t; + RHead = ORHead = new node_t; + + NodeFreeList = NULL; + + for(i = 0; i < NUMSLOTS; i++) { + ptr_t * ptr = new ptr_t(NodeFreeList, &NodeBuffer[i]); + FreeListHead = NodeFreeList = ptr; + NodeBuffer[i].ptr = ptr; + } + } + ~Queue() { + unsigned int i; + typename std::deque<node_t *>::iterator itr; + + for (itr = PendingReads.begin(); itr != PendingReads.end(); itr++) + delete (*itr)->val; + + node_t * n; + + n = WHead; + while (n) { + delete n->val; + n = n->next; + } + + n = RHead; + while (n) { + delete n->val; + n = n->next; + } + + for (i = 0; i < NUMSLOTS; i++) + delete NodeBuffer[i].ptr; + + delete ORHead; + delete OWHead; + } + + void enqueue(T * val) { + node_t * node = AllocateNode(val); + + while (true) { + node->next = WHead; + if (CmpXChgBool(&WHead, node, node->next)) + return; } - ptr_t new_ptr(node, tail.count + 1); - CmpXChgBool(&Tail, new_ptr, tail); } - bool dequeue(T * pval) { - ptr_t head, tail, next; - while(true) { - head = Head; - tail = Tail; - next = head->next; - if (head == Head) { - if (head.ptr == tail.ptr) { - if (next.ptr == NULL) - return false; - ptr_t new_ptr(next.ptr, tail.count + 1); - CmpXChgBool(&Tail, new_ptr, tail); - } else { - *pval = next.ptr->value; - ptr_t new_ptr(next.ptr, head.count + 1); - if (CmpXChgBool(&Head, new_ptr, head)) - break; - } - } + + public: + T * unqueue(T ** pval = 0) { + T * r; + if (!PendingReads.empty()) { + r = this->PopPendingReads(); + if (pval) + *pval = r; + return r; + } + + if (!RHead->next) + SwapReadAndWrite(); + + node_t * n; + + n = RHead; + while (RHead->next) { + PendingReads.push_back(n); + n = n->next; + RHead = n; } - delete head.ptr; - return true; + + if (PendingReads.empty()) { + if (pval) + *pval = 0; + return 0; + } + + r = this->PopPendingRead(); + if (pval) + *pval = r; + return r; } + private: - struct node_t; - struct ptr_t { - ptr_t() : ptr(NULL), count(0) { } - ptr_t(node_t * ptr, unsigned int count = 0) : ptr(ptr), count(count) { } - ptr_t(const ptr_t & c) : ptr(c.ptr), count(c.count) { } - node_t * ptr; - unsigned int count; - }; - struct node_t { - node_t() { } - node_t(const T & val) : val(val) { } - T val; - ptr_t next; - }; - ptr_t Head, Tail; + T * PopPendingRead() { + node_t * node = PendingReads.back(); + T * val = node->val; + FreeNode(node); + PendingReads.pop_back(); + return val; + } + + void SwapReadAndWrite() { + while (true) { + node_t * rh = RHead, ow = WHead; + if (CmpXChgBool(&WHead, rh, ow)) { + RHead = ow; + return; + } + } + } + + node_t * AllocateNode(T * val) throw (GeneralException) { + ptr_t * ptr; + + if (!FreeListHead->next) + throw GeneralException("No more free slots in the queue."); + + ptr_t * newhead; + + do { + ptr = FreeListHead; + newhead = ptr->next; + } while (!CmpXChgBool(&FreeListHead, newhead, ptr)); + + ptr->node->val = val; + return ptr->node; + } + + void FreeNode(node_t * node) { + node->val = 0; + + while (true) { + node->ptr->next = FreeListHead; + if (CmpXChgBool(&FreeListHead, node->ptr, node->ptr->next)) + break; + } + } + + node_t * WHead, * OWHead, * RHead, * ORHead, NodeBuffer[NUMSLOTS]; + ptr_t * NodeFreeList, * FreeListHead; + // this should be stored in TLS... + typename std::deque<node_t *> PendingReads; }; -}; +}; // namespace Atomic #endif |