summaryrefslogtreecommitdiff
path: root/include
diff options
context:
space:
mode:
authorPixel <pixel@nobis-crew.org>2009-11-12 15:37:57 -0800
committerPixel <pixel@nobis-crew.org>2009-11-12 15:37:57 -0800
commit8e7e2af92a475ddd2344fdd4d5ea878a7d77b320 (patch)
tree6b7eac9ffb52a821f138863ae2bd11a95cbf62ac /include
parenta183b6355e22b4d701da4ae5e3243c3ebdeeb1e8 (diff)
Refining a little bit more the code, and stealing another queue algorithm; less generic, but appropriate to the messagebox-type system I want.
Diffstat (limited to 'include')
-rw-r--r--include/Atomic.h227
1 files changed, 163 insertions, 64 deletions
diff --git a/include/Atomic.h b/include/Atomic.h
index 3b325f2..1137d27 100644
--- a/include/Atomic.h
+++ b/include/Atomic.h
@@ -1,6 +1,9 @@
#ifndef __ATOMIC_H__
#define __ATOMIC_H__
+#include <deque>
+#include <Exceptions.h>
+
namespace Atomic {
#if (__GNUC__ >= 5) || ((__GNUC__ == 4) && ((__GNUC_MINOR__ >= 1)))
@@ -21,10 +24,10 @@ template <class T> T Increment(volatile T * ptr, T delta = 1) { __sync_fetch_and
template <class T> T Decrement(volatile T * ptr, T delta = 1) { __sync_fetch_and_sub(ptr, delta); }
};
-template <class T> T CmpXChgVal(volatile T * ptr, T xch, T cmp) { return __sync_val_compare_and_swap(ptr, cmp, xch); }
-template <class T> bool CmpXChgBool(volatile T * ptr, T xch, T cmp) { return __sync_bool_compare_and_swap(ptr, cmp, xch); }
+template <class T> T CmpXChgVal(volatile T * ptr, const T xch, const T cmp) { return __sync_val_compare_and_swap(ptr, cmp, xch); }
+template <class T> bool CmpXChgBool(volatile T * ptr, const T xch, const T cmp) { return __sync_bool_compare_and_swap(ptr, cmp, xch); }
-template <class T> T Exchange32(volatile T * ptr, T exchange) {
+template <class T> T Exchange32(volatile T * ptr, const T exchange) {
#if defined(i386) || defined (__x86_64)
__asm__ __volatile__("lock xchgl %0, (%1)" : "+r"(exchange) : "r"(ptr));
return exchange;
@@ -35,7 +38,7 @@ template <class T> T Exchange32(volatile T * ptr, T exchange) {
#endif
}
-template <class T> T Exchange64(volatile T * ptr, T exchange) {
+template <class T> T Exchange64(volatile T * ptr, const T exchange) {
#if defined(i386) || defined (__x86_64)
__asm__ __volatile__("lock xchgq %0, (%1)" : "+r"(exchange) : "r"(ptr));
return exchange;
@@ -50,14 +53,14 @@ template <class T> T Exchange64(volatile T * ptr, T exchange) {
#ifdef _MSVC
// Visual Studio version of the atomic operations
-#error Not yet implemented... and probably never will -.-
+#error MSVC not yet implemented... and probably never will -.-
#else
#error No known platform for atomic operations.
#endif
#endif
-template <class T> T * ExchangePtr(T * volatile * ptr, T * exchange) {
+template <class T> T * ExchangePtr(T * volatile * ptr, const T * exchange) {
#if defined (__x86_64)
return Exchange64(ptr, exchange);
#else
@@ -65,72 +68,168 @@ template <class T> T * ExchangePtr(T * volatile * ptr, T * exchange) {
#endif
}
-// Based on "Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms"
-template <class T>
+// This is a multiple producers / single consumer queue system. Stolen from the Fugue VM.
+
+template<class T>
class Queue {
+ private:
+ struct ptr_t;
+ struct node_t {
+ node_t() : next(0), val(0) { }
+ explicit node_t(T * val) : next(0), val(val) { }
+ node_t * next;
+ T * val;
+ ptr_t * ptr;
+ };
+
+ struct ptr_t {
+ ptr_t() : next(0), node(0) { }
+ explicit ptr_t(node_t * node) : next(0), node(node) { }
+ ptr_t(ptr_t * next, node_t * node) : next(next), node(node) { }
+ ptr_t * next;
+ node_t * node;
+ };
+
public:
- Queue() : Head(new node_t()), Tail(Head) { }
- void enqueue(const T & val) {
- node_t * node = new node_t(val);
- ptr_t tail, next;
- while(true) {
- tail = Tail;
- next = tail.ptr->next;
- if (tail == Tail) {
- if (next.ptr == NULL) {
- ptr_t new_ptr(node, next.count + 1);
- if (CmpXChgBool(&tail.ptr->next, new_ptr, next))
- break;
- } else {
- ptr_t new_ptr(next.ptr, tail.count + 1);
- CmpXChgBool(&Tail, new_ptr, tail);
- }
- }
+ static const int NUMSLOTS = 16384;
+ Queue() {
+ unsigned int i;
+
+ WHead = OWHead = new node_t;
+ RHead = ORHead = new node_t;
+
+ NodeFreeList = NULL;
+
+ for(i = 0; i < NUMSLOTS; i++) {
+ ptr_t * ptr = new ptr_t(NodeFreeList, &NodeBuffer[i]);
+ FreeListHead = NodeFreeList = ptr;
+ NodeBuffer[i].ptr = ptr;
+ }
+ }
+ ~Queue() {
+ unsigned int i;
+ typename std::deque<node_t *>::iterator itr;
+
+ for (itr = PendingReads.begin(); itr != PendingReads.end(); itr++)
+ delete (*itr)->val;
+
+ node_t * n;
+
+ n = WHead;
+ while (n) {
+ delete n->val;
+ n = n->next;
+ }
+
+ n = RHead;
+ while (n) {
+ delete n->val;
+ n = n->next;
+ }
+
+ for (i = 0; i < NUMSLOTS; i++)
+ delete NodeBuffer[i].ptr;
+
+ delete ORHead;
+ delete OWHead;
+ }
+
+ void enqueue(T * val) {
+ node_t * node = AllocateNode(val);
+
+ while (true) {
+ node->next = WHead;
+ if (CmpXChgBool(&WHead, node, node->next))
+ return;
}
- ptr_t new_ptr(node, tail.count + 1);
- CmpXChgBool(&Tail, new_ptr, tail);
}
- bool dequeue(T * pval) {
- ptr_t head, tail, next;
- while(true) {
- head = Head;
- tail = Tail;
- next = head->next;
- if (head == Head) {
- if (head.ptr == tail.ptr) {
- if (next.ptr == NULL)
- return false;
- ptr_t new_ptr(next.ptr, tail.count + 1);
- CmpXChgBool(&Tail, new_ptr, tail);
- } else {
- *pval = next.ptr->value;
- ptr_t new_ptr(next.ptr, head.count + 1);
- if (CmpXChgBool(&Head, new_ptr, head))
- break;
- }
- }
+
+ public:
+ T * unqueue(T ** pval = 0) {
+ T * r;
+ if (!PendingReads.empty()) {
+ r = this->PopPendingReads();
+ if (pval)
+ *pval = r;
+ return r;
+ }
+
+ if (!RHead->next)
+ SwapReadAndWrite();
+
+ node_t * n;
+
+ n = RHead;
+ while (RHead->next) {
+ PendingReads.push_back(n);
+ n = n->next;
+ RHead = n;
}
- delete head.ptr;
- return true;
+
+ if (PendingReads.empty()) {
+ if (pval)
+ *pval = 0;
+ return 0;
+ }
+
+ r = this->PopPendingRead();
+ if (pval)
+ *pval = r;
+ return r;
}
+
private:
- struct node_t;
- struct ptr_t {
- ptr_t() : ptr(NULL), count(0) { }
- ptr_t(node_t * ptr, unsigned int count = 0) : ptr(ptr), count(count) { }
- ptr_t(const ptr_t & c) : ptr(c.ptr), count(c.count) { }
- node_t * ptr;
- unsigned int count;
- };
- struct node_t {
- node_t() { }
- node_t(const T & val) : val(val) { }
- T val;
- ptr_t next;
- };
- ptr_t Head, Tail;
+ T * PopPendingRead() {
+ node_t * node = PendingReads.back();
+ T * val = node->val;
+ FreeNode(node);
+ PendingReads.pop_back();
+ return val;
+ }
+
+ void SwapReadAndWrite() {
+ while (true) {
+ node_t * rh = RHead, ow = WHead;
+ if (CmpXChgBool(&WHead, rh, ow)) {
+ RHead = ow;
+ return;
+ }
+ }
+ }
+
+ node_t * AllocateNode(T * val) throw (GeneralException) {
+ ptr_t * ptr;
+
+ if (!FreeListHead->next)
+ throw GeneralException("No more free slots in the queue.");
+
+ ptr_t * newhead;
+
+ do {
+ ptr = FreeListHead;
+ newhead = ptr->next;
+ } while (!CmpXChgBool(&FreeListHead, newhead, ptr));
+
+ ptr->node->val = val;
+ return ptr->node;
+ }
+
+ void FreeNode(node_t * node) {
+ node->val = 0;
+
+ while (true) {
+ node->ptr->next = FreeListHead;
+ if (CmpXChgBool(&FreeListHead, node->ptr, node->ptr->next))
+ break;
+ }
+ }
+
+ node_t * WHead, * OWHead, * RHead, * ORHead, NodeBuffer[NUMSLOTS];
+ ptr_t * NodeFreeList, * FreeListHead;
+ // this should be stored in TLS...
+ typename std::deque<node_t *> PendingReads;
};
-};
+}; // namespace Atomic
#endif