diff options
| -rw-r--r-- | include/Atomic.h | 227 | 
1 files changed, 163 insertions, 64 deletions
| diff --git a/include/Atomic.h b/include/Atomic.h index 3b325f2..1137d27 100644 --- a/include/Atomic.h +++ b/include/Atomic.h @@ -1,6 +1,9 @@  #ifndef __ATOMIC_H__  #define __ATOMIC_H__ +#include <deque> +#include <Exceptions.h> +  namespace Atomic {  #if (__GNUC__ >= 5) || ((__GNUC__ == 4) && ((__GNUC_MINOR__ >= 1))) @@ -21,10 +24,10 @@ template <class T> T Increment(volatile T * ptr, T delta = 1) { __sync_fetch_and  template <class T> T Decrement(volatile T * ptr, T delta = 1) { __sync_fetch_and_sub(ptr, delta); }  }; -template <class T> T CmpXChgVal(volatile T * ptr, T xch, T cmp) { return __sync_val_compare_and_swap(ptr, cmp, xch); } -template <class T> bool CmpXChgBool(volatile T * ptr, T xch, T cmp) { return __sync_bool_compare_and_swap(ptr, cmp, xch); } +template <class T> T CmpXChgVal(volatile T * ptr, const T xch, const T cmp) { return __sync_val_compare_and_swap(ptr, cmp, xch); } +template <class T> bool CmpXChgBool(volatile T * ptr, const T xch, const T cmp) { return __sync_bool_compare_and_swap(ptr, cmp, xch); } -template <class T> T Exchange32(volatile T * ptr, T exchange) { +template <class T> T Exchange32(volatile T * ptr, const T exchange) {  #if defined(i386) || defined (__x86_64)      __asm__ __volatile__("lock xchgl %0, (%1)" : "+r"(exchange) : "r"(ptr));      return exchange; @@ -35,7 +38,7 @@ template <class T> T Exchange32(volatile T * ptr, T exchange) {  #endif  } -template <class T> T Exchange64(volatile T * ptr, T exchange) { +template <class T> T Exchange64(volatile T * ptr, const T exchange) {  #if defined(i386) || defined (__x86_64)      __asm__ __volatile__("lock xchgq %0, (%1)" : "+r"(exchange) : "r"(ptr));      return exchange; @@ -50,14 +53,14 @@ template <class T> T Exchange64(volatile T * ptr, T exchange) {  #ifdef _MSVC  // Visual Studio version of the atomic operations -#error Not yet implemented... and probably never will -.- +#error MSVC not yet implemented... and probably never will -.-  #else  #error No known platform for atomic operations.  #endif  #endif -template <class T> T * ExchangePtr(T * volatile * ptr, T * exchange) { +template <class T> T * ExchangePtr(T * volatile * ptr, const T * exchange) {  #if defined (__x86_64)      return Exchange64(ptr, exchange);  #else @@ -65,72 +68,168 @@ template <class T> T * ExchangePtr(T * volatile * ptr, T * exchange) {  #endif  } -// Based on "Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms" -template <class T> +// This is a multiple producers / single consumer queue system. Stolen from the Fugue VM. + +template<class T>  class Queue { +  private: +    struct ptr_t; +    struct node_t { +          node_t() : next(0), val(0) { } +          explicit node_t(T * val) : next(0), val(val) { } +        node_t * next; +        T * val; +        ptr_t * ptr; +    }; +     +    struct ptr_t { +          ptr_t() : next(0), node(0) { } +          explicit ptr_t(node_t * node) : next(0), node(node) { } +          ptr_t(ptr_t * next, node_t * node) : next(next), node(node) { } +        ptr_t * next; +        node_t * node; +    }; +        public: -      Queue() : Head(new node_t()), Tail(Head) { } -    void enqueue(const T & val) { -        node_t * node = new node_t(val); -        ptr_t tail, next; -        while(true) { -            tail = Tail; -            next = tail.ptr->next; -            if (tail == Tail) { -                if (next.ptr == NULL) { -                    ptr_t new_ptr(node, next.count + 1); -                    if (CmpXChgBool(&tail.ptr->next, new_ptr, next)) -                        break; -                } else { -                    ptr_t new_ptr(next.ptr, tail.count + 1); -                    CmpXChgBool(&Tail, new_ptr, tail); -                } -            } +    static const int NUMSLOTS = 16384; +      Queue() { +          unsigned int i; +           +          WHead = OWHead = new node_t; +          RHead = ORHead = new node_t; +           +          NodeFreeList = NULL; +           +          for(i = 0; i < NUMSLOTS; i++) { +              ptr_t * ptr = new ptr_t(NodeFreeList, &NodeBuffer[i]); +              FreeListHead = NodeFreeList = ptr; +              NodeBuffer[i].ptr = ptr; +          } +      } +      ~Queue() { +          unsigned int i; +          typename std::deque<node_t *>::iterator itr; +       +          for (itr = PendingReads.begin(); itr != PendingReads.end(); itr++) +              delete (*itr)->val; +           +          node_t * n; +           +          n = WHead; +          while (n) { +              delete n->val; +              n = n->next; +          } +           +          n = RHead; +          while (n) { +              delete n->val; +              n = n->next; +          } +           +          for (i = 0; i < NUMSLOTS; i++) +              delete NodeBuffer[i].ptr; +           +          delete ORHead; +          delete OWHead; +      } + +    void enqueue(T * val) { +        node_t * node = AllocateNode(val); +         +        while (true) { +            node->next = WHead; +            if (CmpXChgBool(&WHead, node, node->next)) +                return;          } -        ptr_t new_ptr(node, tail.count + 1); -        CmpXChgBool(&Tail, new_ptr, tail);      } -    bool dequeue(T * pval) { -        ptr_t head, tail, next; -        while(true) { -            head = Head; -            tail = Tail; -            next = head->next; -            if (head == Head) { -                if (head.ptr == tail.ptr) { -                    if (next.ptr == NULL) -                        return false; -                    ptr_t new_ptr(next.ptr, tail.count + 1); -                    CmpXChgBool(&Tail, new_ptr, tail); -                } else { -                    *pval = next.ptr->value; -                    ptr_t new_ptr(next.ptr, head.count + 1); -                    if (CmpXChgBool(&Head, new_ptr, head)) -                        break; -                } -            } + +  public: +    T * unqueue(T ** pval = 0) { +        T * r; +        if (!PendingReads.empty()) { +            r = this->PopPendingReads(); +            if (pval) +                *pval = r; +            return r; +        } +         +        if (!RHead->next) +            SwapReadAndWrite(); +         +        node_t * n; + +        n = RHead; +        while (RHead->next) { +            PendingReads.push_back(n); +            n = n->next; +            RHead = n;          } -        delete head.ptr; -        return true; +         +        if (PendingReads.empty()) { +            if (pval) +                *pval = 0; +            return 0; +        } +         +        r = this->PopPendingRead(); +        if (pval) +            *pval = r; +        return r;      } +    private: -    struct node_t; -    struct ptr_t { -        ptr_t() : ptr(NULL), count(0) { } -        ptr_t(node_t * ptr, unsigned int count = 0) : ptr(ptr), count(count) { } -        ptr_t(const ptr_t & c) : ptr(c.ptr), count(c.count) { } -        node_t * ptr; -        unsigned int count; -    }; -    struct node_t { -        node_t() { } -        node_t(const T & val) : val(val) { } -        T val; -        ptr_t next; -    }; -    ptr_t Head, Tail; +    T * PopPendingRead() { +        node_t * node = PendingReads.back(); +        T * val = node->val; +        FreeNode(node); +        PendingReads.pop_back(); +        return val; +    } +     +    void SwapReadAndWrite() { +        while (true) { +            node_t * rh = RHead, ow = WHead; +            if (CmpXChgBool(&WHead, rh, ow)) { +                RHead = ow; +                return; +            } +        } +    } +     +    node_t * AllocateNode(T * val) throw (GeneralException) { +        ptr_t * ptr; +         +        if (!FreeListHead->next) +            throw GeneralException("No more free slots in the queue."); +         +        ptr_t * newhead; +         +        do { +            ptr = FreeListHead; +            newhead = ptr->next; +        } while (!CmpXChgBool(&FreeListHead, newhead, ptr)); +         +        ptr->node->val = val; +        return ptr->node; +    } +     +    void FreeNode(node_t * node) { +        node->val = 0; +         +        while (true) { +            node->ptr->next = FreeListHead; +            if (CmpXChgBool(&FreeListHead, node->ptr, node->ptr->next)) +                break; +        } +    } +     +    node_t * WHead, * OWHead, * RHead, * ORHead, NodeBuffer[NUMSLOTS]; +    ptr_t * NodeFreeList, * FreeListHead; +    // this should be stored in TLS... +    typename std::deque<node_t *> PendingReads;  }; -}; +}; // namespace Atomic  #endif | 
