From a183b6355e22b4d701da4ae5e3243c3ebdeeb1e8 Mon Sep 17 00:00:00 2001 From: Pixel Date: Thu, 12 Nov 2009 00:35:43 -0800 Subject: Adding atomic queues. --- include/Atomic.h | 69 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/include/Atomic.h b/include/Atomic.h index 35edbf1..3b325f2 100644 --- a/include/Atomic.h +++ b/include/Atomic.h @@ -21,7 +21,8 @@ template T Increment(volatile T * ptr, T delta = 1) { __sync_fetch_and template T Decrement(volatile T * ptr, T delta = 1) { __sync_fetch_and_sub(ptr, delta); } }; -template T CmpXChg(volatile T * ptr, T xch, T cmp) { return __sync_val_compare_and_swap(ptr, cmp, xch); } +template T CmpXChgVal(volatile T * ptr, T xch, T cmp) { return __sync_val_compare_and_swap(ptr, cmp, xch); } +template bool CmpXChgBool(volatile T * ptr, T xch, T cmp) { return __sync_bool_compare_and_swap(ptr, cmp, xch); } template T Exchange32(volatile T * ptr, T exchange) { #if defined(i386) || defined (__x86_64) @@ -64,6 +65,72 @@ template T * ExchangePtr(T * volatile * ptr, T * exchange) { #endif } +// Based on "Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms" +template +class Queue { + public: + Queue() : Head(new node_t()), Tail(Head) { } + void enqueue(const T & val) { + node_t * node = new node_t(val); + ptr_t tail, next; + while(true) { + tail = Tail; + next = tail.ptr->next; + if (tail == Tail) { + if (next.ptr == NULL) { + ptr_t new_ptr(node, next.count + 1); + if (CmpXChgBool(&tail.ptr->next, new_ptr, next)) + break; + } else { + ptr_t new_ptr(next.ptr, tail.count + 1); + CmpXChgBool(&Tail, new_ptr, tail); + } + } + } + ptr_t new_ptr(node, tail.count + 1); + CmpXChgBool(&Tail, new_ptr, tail); + } + bool dequeue(T * pval) { + ptr_t head, tail, next; + while(true) { + head = Head; + tail = Tail; + next = head->next; + if (head == Head) { + if (head.ptr == tail.ptr) { + if (next.ptr == NULL) + return false; + ptr_t new_ptr(next.ptr, tail.count + 1); + CmpXChgBool(&Tail, new_ptr, tail); + } else { + *pval = next.ptr->value; + ptr_t new_ptr(next.ptr, head.count + 1); + if (CmpXChgBool(&Head, new_ptr, head)) + break; + } + } + } + delete head.ptr; + return true; + } + private: + struct node_t; + struct ptr_t { + ptr_t() : ptr(NULL), count(0) { } + ptr_t(node_t * ptr, unsigned int count = 0) : ptr(ptr), count(count) { } + ptr_t(const ptr_t & c) : ptr(c.ptr), count(c.count) { } + node_t * ptr; + unsigned int count; + }; + struct node_t { + node_t() { } + node_t(const T & val) : val(val) { } + T val; + ptr_t next; + }; + ptr_t Head, Tail; +}; + }; #endif -- cgit v1.2.3