From 965148b43b5b859934b7af2e8447ba1026a43a19 Mon Sep 17 00:00:00 2001 From: Pixel Date: Mon, 10 Oct 2011 19:36:55 -0700 Subject: Adding the basic "Handle" structure, and adding an early version of Input. Renamed suspend() to yield(). Fixed a couple of bugs, and reorganized slightly some code. --- Makefile | 10 +++- includes/Handle.h | 64 +++++++++++++++++++++ includes/Input.h | 18 ++++++ includes/Task.h | 12 +++- src/Handle.cc | 151 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/Input.cc | 60 ++++++++++++++++++++ src/Task.cc | 6 +- src/TaskMan.cc | 15 ++++- src/Threads.cc | 4 +- tests/test-Handles.cc | 22 ++++++++ tests/test-Sanity.cc | 1 + tests/test-Tasks.cc | 4 +- 12 files changed, 356 insertions(+), 11 deletions(-) create mode 100644 includes/Handle.h create mode 100644 includes/Input.h create mode 100644 src/Handle.cc create mode 100644 src/Input.cc create mode 100644 tests/test-Handles.cc diff --git a/Makefile b/Makefile index f0424e3..16fb160 100644 --- a/Makefile +++ b/Makefile @@ -56,13 +56,18 @@ vpath %.cc src:tests vpath %.c libcoro:libeio:libev BALAU_SOURCES = \ -BString.cc \ Local.cc \ +Threads.cc \ +\ +BString.cc \ Main.cc \ Printer.cc \ +\ +Handle.cc \ +Input.cc \ +\ Task.cc \ TaskMan.cc \ -Threads.cc \ LIBCORO_SOURCES = \ coro.c \ @@ -78,6 +83,7 @@ TEST_SOURCES = \ test-Sanity.cc \ test-String.cc \ test-Tasks.cc \ +test-Handles.cc \ LIB = libBalau.a diff --git a/includes/Handle.h b/includes/Handle.h new file mode 100644 index 0000000..d2e1f8c --- /dev/null +++ b/includes/Handle.h @@ -0,0 +1,64 @@ +#pragma once + +#include + +namespace Balau { + +class EAgain : public GeneralException { + public: + EAgain() : GeneralException("Try Again") { } +}; + +class IO; + +class Handle { + public: + virtual ~Handle() { Assert(m_refCount == 0); } + virtual void close() throw (GeneralException) = 0; + virtual bool isClosed() = 0; + virtual bool canSeek(); + virtual bool canRead(); + virtual bool canWrite(); + virtual const char * getName() = 0; + virtual ssize_t read(void * buf, size_t count) throw (GeneralException); + virtual ssize_t write(const void * buf, size_t count) throw (GeneralException); + virtual void rseek(off_t offset, int whence = SEEK_SET) throw (GeneralException); + virtual void wseek(off_t offset, int whence = SEEK_SET) throw (GeneralException); + virtual off_t rtell() throw (GeneralException); + virtual off_t wtell() throw (GeneralException); + virtual off_t getSize(); + protected: + Handle() : m_refCount(0) { } + private: + void addRef() { m_refCount++; } + void delRef() { if (--m_refCount == 0) { if (!isClosed()) close(); delete this; } } + int refCount() { return m_refCount; } + friend class IO; + + int m_refCount; +}; + +class IO { + public: + template IO(T * h) { setHandle(h); } + ~IO() { m_h->delRef(); } + IO(const IO & io) { setHandle(io.m_h); } + Handle * operator->() { return m_h; } + protected: + void setHandle(Handle * h) { m_h = h; m_h->addRef(); } + private: + Handle * m_h; +}; + +class SeekableHandle : public Handle { + public: + virtual bool canSeek(); + virtual void rseek(off_t offset, int whence = SEEK_SET) throw (GeneralException); + virtual void wseek(off_t offset, int whence = SEEK_SET) throw (GeneralException); + virtual off_t rtell() throw (GeneralException); + virtual off_t wtell() throw (GeneralException); + private: + off_t m_wOffset, m_rOffset; +}; + +}; diff --git a/includes/Input.h b/includes/Input.h new file mode 100644 index 0000000..418b018 --- /dev/null +++ b/includes/Input.h @@ -0,0 +1,18 @@ +#pragma once + +#include + +namespace Balau { + +class Input : public SeekableHandle { + public: + Input(const char * fname) throw (GeneralException); + virtual void close() throw (GeneralException); + virtual bool isClosed(); + virtual const char * getName(); + private: + int m_fd; + String m_name; +}; + +}; diff --git a/includes/Task.h b/includes/Task.h index f0c1bbb..8a504c7 100644 --- a/includes/Task.h +++ b/includes/Task.h @@ -44,6 +44,15 @@ class TaskEvent : public BaseEvent { Task * m_taskWaited; }; +class Custom : public BaseEvent { + public: + void doSignal() { BaseEvent::doSignal(); ev_break(m_loop, EVBREAK_ALL); } + protected: + virtual void gotOwner(Task * task); + private: + struct ev_loop * m_loop; +}; + }; class Task { @@ -60,9 +69,10 @@ class Task { virtual const char * getName() = 0; Status getStatus() { return m_status; } static Task * getCurrentTask(); + static void yield(Events::BaseEvent * evt) { Task * t = getCurrentTask(); t->waitFor(evt); t->yield(); } TaskMan * getTaskMan() { return m_taskMan; } protected: - void suspend(); + void yield(); virtual void Do() = 0; void waitFor(Events::BaseEvent * event); private: diff --git a/src/Handle.cc b/src/Handle.cc new file mode 100644 index 0000000..c9e3f57 --- /dev/null +++ b/src/Handle.cc @@ -0,0 +1,151 @@ +#include +#include "ev++.h" +#include "eio.h" +#include "Main.h" +#include "TaskMan.h" +#include "Handle.h" + +class eioInterface : public Balau::AtStart { + public: + eioInterface() : AtStart(100) { } + void repeatCB(ev::idle & w, int revents); + void readyCB(ev::async & w, int revents); + static void wantPoll(); + virtual void doStart(); + ev::idle m_repeat; + ev::async m_ready; +}; + +static eioInterface eioIF; + +void eioInterface::repeatCB(ev::idle & w, int revents) { + if (eio_poll() != -1) + w.stop(); +} + +void eioInterface::readyCB(ev::async & w, int revents) { + if (eio_poll() == -1) + m_repeat.start(); +} + +void eioInterface::doStart() { + Balau::TaskMan * taskMan = Balau::TaskMan::getTaskMan(); + Assert(taskMan); + struct ev_loop * loop = taskMan->getLoop(); + + m_repeat.set(loop); + m_repeat.set(this); + + m_ready.set(loop); + m_ready.set(this); + m_ready.start(); + + eio_init(wantPoll, NULL); +} + +void eioInterface::wantPoll() { + eioIF.m_ready.send(); +} + +bool Balau::Handle::canSeek() { return false; } +bool Balau::Handle::canRead() { return false; } +bool Balau::Handle::canWrite() { return false; } +off_t Balau::Handle::getSize() { return -1; } + +ssize_t Balau::Handle::read(void * buf, size_t count) throw (GeneralException) { + if (canRead()) + throw GeneralException(String("Handle ") + getName() + " can read, but read() not implemented (missing in class " + typeid(*this).name() + ")"); + else + throw GeneralException("Handle can't read"); + return -1; +} + +ssize_t Balau::Handle::write(const void * buf, size_t count) throw (GeneralException) { + if (canWrite()) + throw GeneralException(String("Handle ") + getName() + " can write, but write() not implemented (missing in class " + typeid(this).name() + ")"); + else + throw GeneralException("Handle can't write"); + return -1; +} + +void Balau::Handle::rseek(off_t offset, int whence) throw (GeneralException) { + if (canSeek()) + throw GeneralException(String("Handle ") + getName() + " can seek, but rseek() not implemented (missing in class " + typeid(this).name() + ")"); + else + throw GeneralException("Handle can't seek"); +} + +void Balau::Handle::wseek(off_t offset, int whence) throw (GeneralException) { + rseek(offset, whence); +} + +off_t Balau::Handle::rtell() throw (GeneralException) { + if (canSeek()) + throw GeneralException(String("Handle ") + getName() + " can seek, but rtell() not implemented (missing in class " + typeid(this).name() + ")"); + else + throw GeneralException("Handle can't seek"); +} + +off_t Balau::Handle::wtell() throw (GeneralException) { + return rtell(); +} + +bool Balau::SeekableHandle::canSeek() { return true; } + +void Balau::SeekableHandle::rseek(off_t offset, int whence) throw (GeneralException) { + Assert(canRead() || canWrite()); + off_t size; + if (!canRead()) + wseek(offset, whence); + switch (whence) { + case SEEK_SET: + m_rOffset = offset; + break; + case SEEK_CUR: + m_rOffset += offset; + break; + case SEEK_END: + size = getSize(); + if (getSize() < 0) + throw GeneralException("Can't seek from end in a Handle you don't know the max size"); + m_rOffset = size + offset; + break; + } + if (m_rOffset < 0) + m_rOffset = 0; +} + +void Balau::SeekableHandle::wseek(off_t offset, int whence) throw (GeneralException) { + Assert(canRead() || canWrite()); + off_t size; + if (!canWrite()) + rseek(offset, whence); + switch (whence) { + case SEEK_SET: + m_wOffset = offset; + break; + case SEEK_CUR: + m_wOffset += offset; + break; + case SEEK_END: + size = getSize(); + if (getSize() < 0) + throw GeneralException("Can't seek from end in a Handle you don't know the max size"); + m_wOffset = size + offset; + break; + } + if (m_wOffset < 0) + m_wOffset = 0; +} + +off_t Balau::SeekableHandle::rtell() throw (GeneralException) { + Assert(canRead() || canWrite()); + if (!canRead()) + return wtell(); +} + +off_t Balau::SeekableHandle::wtell() throw (GeneralException) { + Assert(canRead() || canWrite()); + if (!canWrite()) + return rtell(); +} diff --git a/src/Input.cc b/src/Input.cc new file mode 100644 index 0000000..633a1a7 --- /dev/null +++ b/src/Input.cc @@ -0,0 +1,60 @@ +#include +#include +#include +#include +#include "eio.h" +#include "Input.h" +#include "Task.h" + +struct cbResults_t { + Balau::Events::Custom evt; + int result, errorno; +}; + +static int eioDone(eio_req * req) { + cbResults_t * cbResults = (cbResults_t *) req->data; + cbResults->result = req->result; + cbResults->errorno = req->errorno; + cbResults->evt.doSignal(); +} + +Balau::Input::Input(const char * fname) throw (GeneralException) : m_fd(-1) { + cbResults_t cbResults; + m_name.set("Input(%s)", fname); + eio_req * r = eio_open(fname, O_RDONLY, 0, 0, eioDone, &cbResults); + Assert(r != 0); + Task::yield(&cbResults.evt); + Assert(cbResults.evt.gotSignal()); + if (cbResults.result < 0) { + char str[4096]; + throw GeneralException(String("Unable to open file ") + m_name + " for reading: " + strerror_r(cbResults.errorno, str, sizeof(str)) + " (err#" + cbResults.errorno + ")"); + } else { + m_fd = cbResults.result; + } +} + +void Balau::Input::close() throw (GeneralException) { + if (m_fd < 0) + return; + cbResults_t cbResults; + eio_req * r = eio_close(m_fd, 0, eioDone, &cbResults); + Assert(r != 0); + m_fd = -1; + Task::yield(&cbResults.evt); + Assert(cbResults.evt.gotSignal()); + if (cbResults.result < 0) { + char str[4096]; + strerror_r(cbResults.errorno, str, sizeof(str)); + throw GeneralException(String("Unable to close file ") + m_name + ": " + str); + } else { + m_fd = cbResults.result; + } +} + +bool Balau::Input::isClosed() { + return m_fd < 0; +} + +const char * Balau::Input::getName() { + return m_name.to_charp(); +} diff --git a/src/Task.cc b/src/Task.cc index e5c971d..a47bd8b 100644 --- a/src/Task.cc +++ b/src/Task.cc @@ -57,7 +57,7 @@ void Balau::Task::switchTo() { m_status = IDLE; } -void Balau::Task::suspend() { +void Balau::Task::yield() { coro_transfer(&m_ctx, &m_taskMan->m_returnContext); } @@ -91,3 +91,7 @@ void Balau::Events::Timeout::gotOwner(Task * task) { void Balau::Events::Timeout::evt_cb(ev::timer & w, int revents) { doSignal(); } + +void Balau::Events::Custom::gotOwner(Task * task) { + m_loop = task->getTaskMan()->getLoop(); +} diff --git a/src/TaskMan.cc b/src/TaskMan.cc index fd0246f..ea9c0fb 100644 --- a/src/TaskMan.cc +++ b/src/TaskMan.cc @@ -29,15 +29,24 @@ void Balau::TaskMan::mainLoop() { taskList_t::iterator iL; taskHash_t::iterator iH; Task * t; + bool noWait = false; - // checking "STARTING" tasks, and running them once + // checking "STARTING" tasks, and running them once; also try to build the status of the noWait boolean. for (iH = m_tasks.begin(); iH != m_tasks.end(); iH++) { t = *iH; - if (t->getStatus() == Task::STARTING) + if (t->getStatus() == Task::STARTING) { t->switchTo(); + if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) + noWait = true; + } } - ev_run(m_loop, EVRUN_ONCE); + // probably means we have pending tasks; or none at all, for some reason. Don't wait on it forever. + if (!noWait && m_tasks.size() == 0) + noWait = true; + + // libev's event "loop". We always runs it once though. + ev_run(m_loop, noWait ? EVRUN_NOWAIT : EVRUN_ONCE); // let's check who got signaled, and call them for (iH = m_signaledTasks.begin(); iH != m_signaledTasks.end(); iH++) { diff --git a/src/Threads.cc b/src/Threads.cc index 7c928f8..fa3f0f3 100644 --- a/src/Threads.cc +++ b/src/Threads.cc @@ -1,5 +1,5 @@ -#include -#include +#include "Exceptions.h" +#include "Threads.h" Balau::Lock::Lock() { int r; diff --git a/tests/test-Handles.cc b/tests/test-Handles.cc new file mode 100644 index 0000000..38b020b --- /dev/null +++ b/tests/test-Handles.cc @@ -0,0 +1,22 @@ +#include +#include + +BALAU_STARTUP; + +using namespace Balau; + +void MainTask::Do() { + Printer::log(M_STATUS, "Test::Handles running."); + + bool failed = false; + try { + IO i(new Input("SomeInexistantFile.txt")); + } + catch (GeneralException) { + failed = true; + } + Assert(failed); + IO i(new Input("Makefile")); + + Printer::log(M_STATUS, "Test::Handles passed."); +} diff --git a/tests/test-Sanity.cc b/tests/test-Sanity.cc index 52a85cc..54fa626 100644 --- a/tests/test-Sanity.cc +++ b/tests/test-Sanity.cc @@ -8,6 +8,7 @@ void MainTask::Do() { Printer::log(M_STATUS, "Test::Sanity running."); Assert(sizeof(off_t) == 8); + Assert(sizeof(size_t) == 4); Printer::log(M_STATUS, "Test::Sanity passed."); } diff --git a/tests/test-Tasks.cc b/tests/test-Tasks.cc index 148f943..1faa194 100644 --- a/tests/test-Tasks.cc +++ b/tests/test-Tasks.cc @@ -30,13 +30,13 @@ void MainTask::Do() { Events::TaskEvent taskEvt(testTask); waitFor(&taskEvt); Assert(!taskEvt.gotSignal()); - suspend(); + yield(); Assert(taskEvt.gotSignal()); Events::Timeout timeout(0.1); waitFor(&timeout); Assert(!timeout.gotSignal()); - suspend(); + yield(); Assert(timeout.gotSignal()); Printer::log(M_STATUS, "Test::Tasks passed."); -- cgit v1.2.3