diff options
-rw-r--r-- | includes/Async.h | 20 | ||||
-rw-r--r-- | includes/Atomic.h | 73 | ||||
-rw-r--r-- | includes/Handle.h | 10 | ||||
-rw-r--r-- | includes/HttpServer.h | 8 | ||||
-rw-r--r-- | includes/Threads.h | 3 | ||||
-rw-r--r-- | src/Async.cc | 4 | ||||
-rw-r--r-- | src/Selectable.cc | 1 | ||||
-rw-r--r-- | src/Socket.cc | 1 | ||||
-rw-r--r-- | src/Threads.cc | 4 |
9 files changed, 25 insertions, 99 deletions
diff --git a/includes/Async.h b/includes/Async.h index 6c67546..70cec65 100644 --- a/includes/Async.h +++ b/includes/Async.h @@ -1,6 +1,6 @@ #pragma once -#include <Atomic.h> +#include <atomic> #include <Exceptions.h> #include <Local.h> #include <Threads.h> @@ -37,7 +37,7 @@ class AsyncOperation { class AsyncFinishWorker : public Thread { public: - AsyncFinishWorker(AsyncManager * async, Queue<AsyncOperation> * queue) : m_async(async), m_queue(queue) { } + AsyncFinishWorker(AsyncManager * async, Queue<AsyncOperation> * queue) : m_async(async), m_queue(queue), m_stopped(false) { } bool stopped() { return m_stopped; } private: AsyncFinishWorker(const AsyncFinishWorker &) = delete; @@ -46,12 +46,12 @@ class AsyncFinishWorker : public Thread { AsyncManager * m_async; Queue<AsyncOperation> * m_queue; bool m_stopping = false; - volatile bool m_stopped = false; + std::atomic<bool> m_stopped; }; class AsyncManager : public Thread { public: - AsyncManager() { } + AsyncManager() : m_numTLSes(0), m_ready(false), m_stopperPushed(false) { } void setFinishers(int minIdle, int maxIdle) { AAssert(minIdle < maxIdle, "Minimum number of threads needs to be less than maximum number of threads."); m_minIdle = minIdle; @@ -85,14 +85,14 @@ class AsyncManager : public Thread { tls = new TLS(); m_tlsManager.setTLS(tls); m_TLSes.push(tls); - Atomic::Increment(&m_numTLSes); + ++m_numTLSes; } return tls; } Queue<AsyncOperation> m_queue; Queue<AsyncOperation> m_finished; Queue<TLS> m_TLSes; - volatile int m_numTLSes = 0; + std::atomic<int> m_numTLSes; PThreadsTLSManager m_tlsManager; std::list<AsyncFinishWorker *> m_workers; int m_numFinishers = 0; @@ -100,11 +100,11 @@ class AsyncManager : public Thread { int m_minIdle = 1; int m_maxIdle = 4; bool m_stopping = false; - volatile bool m_ready = false; - volatile bool m_stopperPushed = false; + std::atomic<bool> m_ready; + std::atomic<bool> m_stopperPushed; - void incIdle() { Atomic::Increment(&m_numFinishersIdle); } - void decIdle() { Atomic::Decrement(&m_numFinishersIdle); } + void incIdle() { m_numFinishersIdle++; } + void decIdle() { m_numFinishersIdle--; } friend class AsyncFinishWorker; }; diff --git a/includes/Atomic.h b/includes/Atomic.h deleted file mode 100644 index d084d31..0000000 --- a/includes/Atomic.h +++ /dev/null @@ -1,73 +0,0 @@ -#pragma once - -namespace Balau { - -namespace Atomic { - -#if (__GNUC__ >= 5) || ((__GNUC__ == 4) && ((__GNUC_MINOR__ >= 1))) -// gcc version of the atomic operations -template <class T> T Or(volatile T * ptr, T mask) { return __sync_or_and_fetch(ptr, mask); } -template <class T> T And(volatile T * ptr, T mask) { return __sync_and_and_fetch(ptr, mask); } -template <class T> T Xor(volatile T * ptr, T mask) { return __sync_xor_and_fetch(ptr, mask); } -template <class T> T Nand(volatile T * ptr, T mask) { return __sync_nand_and_fetch(ptr, mask); } -template <class T> T Increment(volatile T * ptr, T delta = 1) { return __sync_add_and_fetch(ptr, delta); } -template <class T> T Decrement(volatile T * ptr, T delta = 1) { return __sync_sub_and_fetch(ptr, delta); } - -namespace Prefetch { -template <class T> T Or(volatile T * ptr, T mask) { return __sync_fetch_and_or(ptr, mask); } -template <class T> T And(volatile T * ptr, T mask) { return __sync_fetch_and_and(ptr, mask); } -template <class T> T Xor(volatile T * ptr, T mask) { return __sync_fetch_and_xor(ptr, mask); } -template <class T> T Nand(volatile T * ptr, T mask) { return __sync_fetch_and_nand(ptr, mask); } -template <class T> T Increment(volatile T * ptr, T delta = 1) { return __sync_fetch_and_add(ptr, delta); } -template <class T> T Decrement(volatile T * ptr, T delta = 1) { return __sync_fetch_and_sub(ptr, delta); } -}; - -template <class T> T CmpXChgVal(volatile T * ptr, const T xch, const T cmp) { return __sync_val_compare_and_swap(ptr, cmp, xch); } -template <class T> bool CmpXChgBool(volatile T * ptr, const T xch, const T cmp) { return __sync_bool_compare_and_swap(ptr, cmp, xch); } - -static inline void MemoryFence() { __sync_synchronize(); } - -template <class T> T Exchange32(volatile T * ptr, const T exchange) { -#if defined(i386) || defined (__x86_64) - __asm__ __volatile__("lock xchgl %0, (%1)" : "+r"(exchange) : "r"(ptr)); - return exchange; -#else - T p; - do { p = *ptr; } while (!__sync_bool_compare_and_swap(ptr, p, exchange)); - return p; -#endif -} - -template <class T> T Exchange64(volatile T * ptr, const T exchange) { -#if defined(i386) || defined (__x86_64) - __asm__ __volatile__("lock xchgq %0, (%1)" : "+r"(exchange) : "r"(ptr)); - return exchange; -#else - T p; - do { p = *ptr; } while (!__sync_bool_compare_and_swap(ptr, p, exchange)); - return p; -#endif -} - -#else -#ifdef _MSVC -// Visual Studio version of the atomic operations - -#error MSVC not yet implemented. - -#else -#error No known platform for atomic operations. -#endif -#endif - -template <class T> T * ExchangePtr(T * volatile * ptr, const T * exchange) { -#if defined (__x86_64) - return Exchange64(ptr, exchange); -#else - return Exchange32(ptr, exchange); -#endif -} - -}; - -}; diff --git a/includes/Handle.h b/includes/Handle.h index 1753cb6..15415a6 100644 --- a/includes/Handle.h +++ b/includes/Handle.h @@ -1,10 +1,10 @@ #pragma once +#include <atomic> #include <Task.h> #include <Exceptions.h> #include <Printer.h> #include <BString.h> -#include <Atomic.h> namespace Balau { @@ -84,13 +84,13 @@ class Handle { ssize_t forceWrite(const void * buf, size_t count, Events::BaseEvent * evt = NULL) throw (GeneralException) __attribute__((warn_unused_result)); protected: - Handle() { } + Handle() : m_refCount(0) { } private: // the IO<> refcounting mechanism - void addRef() { Atomic::Increment(&m_refCount); } + void addRef() { ++m_refCount; } void delRef() { - if (Atomic::Decrement(&m_refCount) == 0) { + if (--m_refCount == 0) { if (!isClosed()) close(); delete this; @@ -100,7 +100,7 @@ class Handle { template<class T> friend class IO; - volatile int m_refCount = 0; + std::atomic<int> m_refCount; Handle(const Handle &) = delete; Handle & operator=(const Handle &) = delete; diff --git a/includes/HttpServer.h b/includes/HttpServer.h index 55950cb..d044e92 100644 --- a/includes/HttpServer.h +++ b/includes/HttpServer.h @@ -2,8 +2,8 @@ #include <map> #include <list> +#include <atomic> -#include <Atomic.h> #include <BString.h> #include <BRegex.h> #include <Exceptions.h> @@ -52,13 +52,13 @@ class HttpServer { Regex::Captures uri, host; }; ActionMatch matches(const char * uri, const char * host); - void unref() { if (Atomic::Decrement(&m_refCount) == 0) delete this; } - void ref() { Atomic::Increment(&m_refCount); } + void unref() { if (--m_refCount == 0) delete this; } + void ref() { ++m_refCount; } void registerMe(HttpServer * server) { server->registerAction(this); } virtual bool Do(HttpServer * server, Http::Request & req, ActionMatch & match, IO<Handle> out) throw (GeneralException) = 0; private: const Regex m_regex, m_host; - volatile int m_refCount; + std::atomic<int> m_refCount; Action(const Action &) = delete; Action & operator=(const Action &) = delete; }; diff --git a/includes/Threads.h b/includes/Threads.h index 0dfce1a..ed629be 100644 --- a/includes/Threads.h +++ b/includes/Threads.h @@ -1,5 +1,6 @@ #pragma once +#include <atomic> #include <AtStartExit.h> #include <pthread.h> @@ -78,7 +79,7 @@ class Thread { Thread(const Thread &) = delete; Thread & operator=(const Thread &) = delete; pthread_t m_thread; - volatile bool m_joined; + std::atomic<bool> m_joined; friend class ThreadHelper; }; diff --git a/src/Async.cc b/src/Async.cc index 09465d3..a638451 100644 --- a/src/Async.cc +++ b/src/Async.cc @@ -103,7 +103,7 @@ void * Balau::AsyncManager::proc() { stopAllWorkers(); Printer::elog(E_ASYNC, "Async thread waits for all idle queues to empty"); - while (Atomic::Prefetch::Decrement(&m_numTLSes)) { + while (m_numTLSes--) { TLS * tls = m_TLSes.pop(); while (!tls->idleQueue.isEmpty()); } @@ -166,7 +166,7 @@ void Balau::AsyncManager::idle() { void Balau::AsyncManager::threadExit() { Printer::elog(E_ASYNC, "AsyncManager thread is being asked to stop; creating stopper"); - if (Atomic::CmpXChgBool(&m_stopperPushed, true, false)) + if (!m_stopperPushed.exchange(true)) m_queue.push(new AsyncStopper()); } diff --git a/src/Selectable.cc b/src/Selectable.cc index f716bb1..6bef2bc 100644 --- a/src/Selectable.cc +++ b/src/Selectable.cc @@ -8,7 +8,6 @@ #include "Threads.h" #include "Printer.h" #include "Main.h" -#include "Atomic.h" #include "Async.h" #include "Task.h" #include "TaskMan.h" diff --git a/src/Socket.cc b/src/Socket.cc index b22539f..2247d77 100644 --- a/src/Socket.cc +++ b/src/Socket.cc @@ -12,7 +12,6 @@ #include "Threads.h" #include "Printer.h" #include "Main.h" -#include "Atomic.h" #include "Async.h" #include "Task.h" #include "TaskMan.h" diff --git a/src/Threads.cc b/src/Threads.cc index 6de9186..c05d23f 100644 --- a/src/Threads.cc +++ b/src/Threads.cc @@ -1,7 +1,7 @@ +#include <atomic> #include "Exceptions.h" #include "Threads.h" #include "Local.h" -#include "Atomic.h" #include "TaskMan.h" namespace Balau { @@ -90,7 +90,7 @@ Balau::Thread::~Thread() { void * Balau::Thread::join() { void * r = NULL; - if (Atomic::CmpXChgBool(&m_joined, true, false)) { + if (!m_joined.exchange(true)) { threadExit(); pthread_join(m_thread, &r); } |