From 38833205e4011a8a318b8dc6809621a89ad9f446 Mon Sep 17 00:00:00 2001 From: "Nicolas \"Pixel\" Noble" Date: Wed, 29 Aug 2012 23:19:09 -0700 Subject: Using true C++11 initializers in classes. --- src/BStream.cc | 2 +- src/Input.cc | 2 +- src/Output.cc | 2 +- src/Printer.cc | 2 +- src/Socket.cc | 5 +++-- src/TaskMan.cc | 2 +- src/ZHandle.cc | 2 +- 7 files changed, 9 insertions(+), 8 deletions(-) (limited to 'src') diff --git a/src/BStream.cc b/src/BStream.cc index 9420459..6c51c4f 100644 --- a/src/BStream.cc +++ b/src/BStream.cc @@ -3,7 +3,7 @@ static const int s_blockSize = 16 * 1024; -Balau::BStream::BStream(const IO & h) : m_h(h), m_buffer((uint8_t *) malloc(s_blockSize)), m_availBytes(0), m_cursor(0), m_passThru(false) { +Balau::BStream::BStream(const IO & h) : m_h(h), m_buffer((uint8_t *) malloc(s_blockSize)) { AAssert(m_h->canRead(), "You can't create a buffered stream with a Handle that can't read"); m_name.set("Stream(%s)", m_h->getName()); if ((m_h.isA()) || (m_h.isA())) diff --git a/src/Input.cc b/src/Input.cc index 14aed91..8168231 100644 --- a/src/Input.cc +++ b/src/Input.cc @@ -47,7 +47,7 @@ static int eioStatsDone(eio_req * req) { return 0; } -Balau::Input::Input(const char * fname) throw (GeneralException) : m_fd(-1), m_size(-1), m_mtime(-1) { +Balau::Input::Input(const char * fname) throw (GeneralException) { m_name.set("Input(%s)", fname); m_fname = fname; diff --git a/src/Output.cc b/src/Output.cc index 6de404e..a4f6275 100644 --- a/src/Output.cc +++ b/src/Output.cc @@ -47,7 +47,7 @@ static int eioStatsDone(eio_req * req) { return 0; } -Balau::Output::Output(const char * fname, bool truncate) throw (GeneralException) : m_fd(-1), m_size(-1), m_mtime(-1) { +Balau::Output::Output(const char * fname, bool truncate) throw (GeneralException) { m_name.set("Output(%s)", fname); m_fname = fname; diff --git a/src/Printer.cc b/src/Printer.cc index ee6029c..a7fee9f 100644 --- a/src/Printer.cc +++ b/src/Printer.cc @@ -16,7 +16,7 @@ static const char * prefixes[] = { "(**) ", }; -Balau::Printer::Printer() : m_verbosity(M_STATUS | M_WARNING | M_ERROR | M_ENGINE_DEBUG), m_detailledLogs(false) { +Balau::Printer::Printer() { #ifdef DEBUG m_detailledLogs = true; #endif diff --git a/src/Socket.cc b/src/Socket.cc index 1c3778a..148d0a4 100644 --- a/src/Socket.cc +++ b/src/Socket.cc @@ -230,7 +230,7 @@ static DNSRequest resolveName(const char * name, const char * service = NULL, st } #endif -Balau::Socket::Socket() throw (GeneralException) : m_fd(socket(AF_INET6, SOCK_STREAM, 0)), m_connected(false), m_connecting(false), m_listening(false) { +Balau::Socket::Socket() throw (GeneralException) : m_fd(socket(AF_INET6, SOCK_STREAM, 0)) { m_name = "Socket(unconnected)"; RAssert(m_fd >= 0, "socket() returned %i", m_fd); m_evtR = new SocketEvent(m_fd, ev::READ); @@ -251,8 +251,9 @@ Balau::Socket::Socket() throw (GeneralException) : m_fd(socket(AF_INET6, SOCK_ST Printer::elog(E_SOCKET, "Creating a socket at %p", this); } -Balau::Socket::Socket(int fd) : m_fd(fd), m_connected(true), m_connecting(false), m_listening(false) { +Balau::Socket::Socket(int fd) : m_fd(fd) { socklen_t len; + m_connected = true; len = sizeof(m_localAddr); getsockname(m_fd, (sockaddr *) &m_localAddr, &len); diff --git a/src/TaskMan.cc b/src/TaskMan.cc index 8fd6df5..17f0a40 100644 --- a/src/TaskMan.cc +++ b/src/TaskMan.cc @@ -125,7 +125,7 @@ void asyncDummy(ev::async & w, int revents) { Balau::Printer::elog(Balau::E_TASK, "TaskMan is getting woken up..."); } -Balau::TaskMan::TaskMan() : m_stopped(false), m_allowedToSignal(false), m_stopCode(0) { +Balau::TaskMan::TaskMan() { #ifndef _WIN32 coro_create(&m_returnContext, 0, 0, 0, 0); #else diff --git a/src/ZHandle.cc b/src/ZHandle.cc index 7ab0133..e2f22f0 100644 --- a/src/ZHandle.cc +++ b/src/ZHandle.cc @@ -1,7 +1,7 @@ #include "ZHandle.h" #include "Task.h" -Balau::ZStream::ZStream(const IO & h, int level, header_t header) : m_h(h), m_detached(false), m_closed(false), m_eof(false), m_in(NULL) { +Balau::ZStream::ZStream(const IO & h, int level, header_t header) : m_h(h) { m_zin.zalloc = m_zout.zalloc = NULL; m_zin.zfree = m_zout.zfree = NULL; m_zin.opaque = m_zout.opaque = NULL; -- cgit v1.2.3 From d2db92f6b5d275b3150deb7a52a8da142a7cc953 Mon Sep 17 00:00:00 2001 From: "Nicolas \"Pixel\" Noble" Date: Fri, 31 Aug 2012 16:13:04 -0700 Subject: Simplifying TLS code a bit (removing createTLS...) and making the PthreadsTLSManager its own global class. --- src/Local.cc | 28 ++++++++++++---------------- src/Task.cc | 2 +- src/Threads.cc | 2 +- 3 files changed, 14 insertions(+), 18 deletions(-) (limited to 'src') diff --git a/src/Local.cc b/src/Local.cc index 9729c35..a9eccdd 100644 --- a/src/Local.cc +++ b/src/Local.cc @@ -13,10 +13,6 @@ void * Balau::TLSManager::setTLS(void * val) { return r; } -void * Balau::TLSManager::createTLS() { - return Local::create(); -} - static Balau::TLSManager dummyTLSManager; Balau::TLSManager * Balau::g_tlsManager = &dummyTLSManager; @@ -29,31 +25,31 @@ void Balau::Local::doStart() { m_globals[m_idx] = 0; } -class PThreadsTLSManager : public Balau::TLSManager, public Balau::AtStart { +class GlobalPThreadsTLSManager : public Balau::PThreadsTLSManager, public Balau::AtStart { public: - PThreadsTLSManager() : AtStart(0) { } - virtual void * getTLS(); - virtual void * setTLS(void * val); - virtual void doStart(); - private: - pthread_key_t m_key; + GlobalPThreadsTLSManager() : AtStart(0) { } + void doStart(); }; -PThreadsTLSManager pthreadsTLSManager; +GlobalPThreadsTLSManager pthreadsTLSManager; + +void GlobalPThreadsTLSManager::doStart() { + init(); + Balau::g_tlsManager = this; +} -void PThreadsTLSManager::doStart() { +void Balau::PThreadsTLSManager::init() { int r; r = pthread_key_create(&m_key, NULL); RAssert(r == 0, "Unable to create a pthtread_key: %i", r); - Balau::g_tlsManager = this; } -void * PThreadsTLSManager::getTLS() { +void * Balau::PThreadsTLSManager::getTLS() { return pthread_getspecific(m_key); } -void * PThreadsTLSManager::setTLS(void * val) { +void * Balau::PThreadsTLSManager::setTLS(void * val) { void * r = pthread_getspecific(m_key); pthread_setspecific(m_key, val); return r; diff --git a/src/Task.cc b/src/Task.cc index 6278fd5..aba29b8 100644 --- a/src/Task.cc +++ b/src/Task.cc @@ -35,7 +35,7 @@ void Balau::Task::setup(TaskMan * taskMan, void * stack) { m_taskMan = taskMan; - m_tls = g_tlsManager->createTLS(); + m_tls = Local::createTLS(); void * oldTLS = g_tlsManager->getTLS(); g_tlsManager->setTLS(m_tls); localTask.set(this); diff --git a/src/Threads.cc b/src/Threads.cc index fe90394..9ecfff5 100644 --- a/src/Threads.cc +++ b/src/Threads.cc @@ -39,7 +39,7 @@ Balau::RWLock::RWLock() { } void * Balau::ThreadHelper::threadProc(void * arg) { - void * tls = g_tlsManager->createTLS(); + void * tls = Local::createTLS(); g_tlsManager->setTLS(tls); Balau::Thread * thread = reinterpret_cast(arg); void * r = thread->proc(); -- cgit v1.2.3 From 06674e57649d536cf19715524ee40c5ad4a9026d Mon Sep 17 00:00:00 2001 From: "Nicolas \"Pixel\" Noble" Date: Sat, 1 Sep 2012 00:12:35 -0700 Subject: Adding async operations; first step towards tossing libeio out. --- src/Async.cc | 171 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/TaskMan.cc | 30 ++++++++++ 2 files changed, 201 insertions(+) create mode 100644 src/Async.cc (limited to 'src') diff --git a/src/Async.cc b/src/Async.cc new file mode 100644 index 0000000..829c677 --- /dev/null +++ b/src/Async.cc @@ -0,0 +1,171 @@ +#include "Async.h" + +namespace { + +class AsyncStopper : public Balau::AsyncOperation { + public: + virtual bool needsSynchronousCallback() { return false; } + virtual void done() { delete this; } +}; + +}; + +void Balau::AsyncManager::setIdleReadyCallback(void (*callback)(void *), void * param) { + while (!m_ready); + TLS * tls = getTLS(); + tls->idleReadyCallback = callback; + tls->idleReadyParam = param; +} + +void Balau::AsyncManager::queueOp(AsyncOperation * op) { + if (m_stopperPushed) { + Printer::elog(E_ASYNC, "AsyncManager's queue has been stopped; running operation %p on this thread instead.", op); + op->run(); + op->finalize(); + return; + } + while (!m_ready); + TLS * tls = getTLS(); + Printer::elog(E_ASYNC, "Queuing operation at %p", op); + if (op->needsSynchronousCallback()) { + Printer::elog(E_ASYNC, "Operation at %p needs synchronous callback, copying values; idleQueue = %p; idleReadyCallback = %p; idleReadyParam = %p", &tls->idleQueue, tls->idleReadyCallback, tls->idleReadyParam); + op->m_idleQueue = &tls->idleQueue; + op->m_idleReadyCallback = tls->idleReadyCallback; + op->m_idleReadyParam = tls->idleReadyParam; + } + if (op->needsMainQueue()) + m_queue.push(op); + else + m_finished.push(op); +} + +void Balau::AsyncManager::checkIdle() { + if (m_numFinishersIdle > m_maxIdle) + killOneFinisher(); + if (m_numFinishersIdle < m_minIdle) + startOneFinisher(); + joinStoppedFinishers(); +} + +void Balau::AsyncManager::killOneFinisher() { + Printer::elog(E_ASYNC, "Too many workers idle (%i / %i), killing one.", m_numFinishersIdle, m_maxIdle); + m_finished.push(new AsyncStopper()); +} + +void Balau::AsyncManager::startOneFinisher() { + AsyncFinishWorker * worker = new AsyncFinishWorker(this, &m_finished); + Printer::elog(E_ASYNC, "Not enough workers idle (%i / %i), starting one at %p.", m_numFinishersIdle, m_minIdle, worker); + m_workers.push_back(worker); + m_numFinishers++; + worker->threadStart(); +} + +void Balau::AsyncManager::joinStoppedFinishers() { + for (auto i = m_workers.begin(); i != m_workers.end(); i++) { + AsyncFinishWorker * worker = *i; + if (!worker->m_stopped) + continue; + Printer::elog(E_ASYNC, "Joining stopped worker at %p", worker); + m_numFinishers--; + m_workers.erase(i); + worker->join(); + delete worker; + break; + } +} + +void * Balau::AsyncManager::proc() { + Printer::elog(E_ASYNC, "AsyncManager thread starting up"); + m_tlsManager.init(); + m_ready = true; + while (!m_stopping) { + checkIdle(); + AsyncOperation * op = m_queue.pop(); + Printer::elog(E_ASYNC, "AsyncManager got an operation at %p", op); + if (dynamic_cast(op)) { + Printer::elog(E_ASYNC, "AsyncManager got a stopper operation"); + m_stopping = true; + } + Printer::elog(E_ASYNC, "AsyncManager running operation at %p", op); + op->run(); + if (op->needsFinishWorker()) { + Printer::elog(E_ASYNC, "AsyncManager pushing operation at %p in the finisher's queue", op); + m_finished.push(op); + } else { + Printer::elog(E_ASYNC, "AsyncManager finalizing operation at %p", op); + op->finalize(); + } + } + stopAllWorkers(); + + while (Atomic::Prefetch::Decrement(&m_numTLSes)) { + TLS * tls = m_TLSes.pop(); + while (!tls->idleQueue.isEmpty()); + } + + return NULL; +} + +void * Balau::AsyncFinishWorker::proc() { + Printer::elog(E_ASYNC, "AsyncFinishWorker thread starting up"); + AsyncOperation * op; + while (!m_stopping) { + m_async->incIdle(); + op = m_queue->pop(); + m_async->decIdle(); + Printer::elog(E_ASYNC, "AsyncFinishWorker got operation at %p", op); + if (dynamic_cast(op)) { + Printer::elog(E_ASYNC, "AsyncFinishWorker got a stopper operation"); + m_stopping = true; + } + op->finalize(); + } + + m_stopped = true; + Printer::elog(E_ASYNC, "AsyncFinishWorker thread stopping"); + + return NULL; +} + +void Balau::AsyncOperation::finalize() { + Printer::elog(E_ASYNC, "AsyncOperation::finalize() is finishing operation %p", this); + finish(); + if (needsSynchronousCallback()) { + Printer::elog(E_ASYNC, "AsyncOperation::finalize() is pushing operation %p to its idle queue", this); + bool wasEmpty = m_idleQueue->isEmpty(); + m_idleQueue->push(this); + Printer::elog(E_ASYNC, "AsyncOperation::finalize() has pushed operation %p to its idle queue; wasEmpty = %s; callback = %p", this, wasEmpty ? "true" : "false", m_idleReadyCallback); + if (wasEmpty && m_idleReadyCallback) { + Printer::elog(E_ASYNC, "AsyncOperation::finalize() is calling ready callback to wake up main loop"); + m_idleReadyCallback(m_idleReadyParam); + } + } else { + Printer::elog(E_ASYNC, "AsyncOperation::finalize() is wrapping up operation %p", this); + done(); + } +} + +void Balau::AsyncManager::idle() { + Printer::elog(E_ASYNC, "AsyncManager::idle() is running"); + while (!m_ready); + AsyncOperation * op; + TLS * tls = getTLS(); + while ((op = tls->idleQueue.pop(false))) { + Printer::elog(E_ASYNC, "AsyncManager::idle() is wrapping up operation %p", op); + op->done(); + } +} + +void Balau::AsyncManager::threadExit() { + Printer::elog(E_ASYNC, "AsyncManager thread is being asked to stop; creating stopper"); + if (Atomic::CmpXChgBool(&m_stopperPushed, true, false)) + m_queue.push(new AsyncStopper()); +} + +void Balau::AsyncManager::stopAllWorkers() { + Printer::elog(E_ASYNC, "AsyncManager thread is being stopping and joining %i workers", m_numFinishers); + for (int i = 0; i < m_numFinishers; i++) + m_finished.push(new AsyncStopper()); + for (auto worker : m_workers) + worker->join(); +} diff --git a/src/TaskMan.cc b/src/TaskMan.cc index 17f0a40..49c326a 100644 --- a/src/TaskMan.cc +++ b/src/TaskMan.cc @@ -1,8 +1,24 @@ +#include "Async.h" #include "TaskMan.h" #include "Task.h" #include "Main.h" #include "Local.h" +static Balau::AsyncManager s_async; + +namespace { + +class AsyncStarter : public Balau::AtStart, Balau::AtExit { + public: + AsyncStarter() : AtStart(1000), AtExit(0) { } + void doStart() { + s_async.threadStart(); + } + void doExit() { + s_async.join(); + } +}; + class Stopper : public Balau::Task { public: Stopper(int code) : m_code(code) { } @@ -12,6 +28,10 @@ class Stopper : public Balau::Task { int m_code; }; +}; + +static AsyncStarter s_asyncStarter; + void Stopper::Do() { getTaskMan()->stopMe(m_code); } @@ -207,6 +227,8 @@ int Balau::TaskMan::mainLoop() { if (t->getStatus() == Task::STARTING) starting.insert(t); + s_async.setIdleReadyCallback(asyncIdleReady, this); + do { bool noWait = false; @@ -239,6 +261,9 @@ int Balau::TaskMan::mainLoop() { ev_run(m_loop, noWait || m_stopped ? EVRUN_NOWAIT : EVRUN_ONCE); Printer::elog(E_TASK, "TaskMan at %p Getting out of libev main loop", this); + // calling async's idle loop here + s_async.idle(); + // let's check what task got stopped, and signal them for (Task * t : stopped) { IAssert((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED), "Task %p in stopped list but isn't stopped.", t); @@ -316,6 +341,7 @@ int Balau::TaskMan::mainLoop() { } while (!m_stopped); Printer::elog(E_TASK, "TaskManager at %p stopping.", this); + s_async.setIdleReadyCallback(NULL, NULL); return m_stopCode; } @@ -329,6 +355,10 @@ void Balau::TaskMan::registerTask(Balau::Task * t, Balau::Task * stick) { } } +void Balau::TaskMan::registerAsyncOp(Balau::AsyncOperation * op) { + s_async.queueOp(op); +} + void Balau::TaskMan::addToPending(Balau::Task * t) { m_pendingAdd.push(t); } -- cgit v1.2.3