From 965148b43b5b859934b7af2e8447ba1026a43a19 Mon Sep 17 00:00:00 2001 From: Pixel Date: Mon, 10 Oct 2011 19:36:55 -0700 Subject: Adding the basic "Handle" structure, and adding an early version of Input. Renamed suspend() to yield(). Fixed a couple of bugs, and reorganized slightly some code. --- src/Handle.cc | 151 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Input.cc | 60 +++++++++++++++++++++++ src/Task.cc | 6 ++- src/TaskMan.cc | 15 ++++-- src/Threads.cc | 4 +- 5 files changed, 230 insertions(+), 6 deletions(-) create mode 100644 src/Handle.cc create mode 100644 src/Input.cc (limited to 'src') diff --git a/src/Handle.cc b/src/Handle.cc new file mode 100644 index 0000000..c9e3f57 --- /dev/null +++ b/src/Handle.cc @@ -0,0 +1,151 @@ +#include +#include "ev++.h" +#include "eio.h" +#include "Main.h" +#include "TaskMan.h" +#include "Handle.h" + +class eioInterface : public Balau::AtStart { + public: + eioInterface() : AtStart(100) { } + void repeatCB(ev::idle & w, int revents); + void readyCB(ev::async & w, int revents); + static void wantPoll(); + virtual void doStart(); + ev::idle m_repeat; + ev::async m_ready; +}; + +static eioInterface eioIF; + +void eioInterface::repeatCB(ev::idle & w, int revents) { + if (eio_poll() != -1) + w.stop(); +} + +void eioInterface::readyCB(ev::async & w, int revents) { + if (eio_poll() == -1) + m_repeat.start(); +} + +void eioInterface::doStart() { + Balau::TaskMan * taskMan = Balau::TaskMan::getTaskMan(); + Assert(taskMan); + struct ev_loop * loop = taskMan->getLoop(); + + m_repeat.set(loop); + m_repeat.set(this); + + m_ready.set(loop); + m_ready.set(this); + m_ready.start(); + + eio_init(wantPoll, NULL); +} + +void eioInterface::wantPoll() { + eioIF.m_ready.send(); +} + +bool Balau::Handle::canSeek() { return false; } +bool Balau::Handle::canRead() { return false; } +bool Balau::Handle::canWrite() { return false; } +off_t Balau::Handle::getSize() { return -1; } + +ssize_t Balau::Handle::read(void * buf, size_t count) throw (GeneralException) { + if (canRead()) + throw GeneralException(String("Handle ") + getName() + " can read, but read() not implemented (missing in class " + typeid(*this).name() + ")"); + else + throw GeneralException("Handle can't read"); + return -1; +} + +ssize_t Balau::Handle::write(const void * buf, size_t count) throw (GeneralException) { + if (canWrite()) + throw GeneralException(String("Handle ") + getName() + " can write, but write() not implemented (missing in class " + typeid(this).name() + ")"); + else + throw GeneralException("Handle can't write"); + return -1; +} + +void Balau::Handle::rseek(off_t offset, int whence) throw (GeneralException) { + if (canSeek()) + throw GeneralException(String("Handle ") + getName() + " can seek, but rseek() not implemented (missing in class " + typeid(this).name() + ")"); + else + throw GeneralException("Handle can't seek"); +} + +void Balau::Handle::wseek(off_t offset, int whence) throw (GeneralException) { + rseek(offset, whence); +} + +off_t Balau::Handle::rtell() throw (GeneralException) { + if (canSeek()) + throw GeneralException(String("Handle ") + getName() + " can seek, but rtell() not implemented (missing in class " + typeid(this).name() + ")"); + else + throw GeneralException("Handle can't seek"); +} + +off_t Balau::Handle::wtell() throw (GeneralException) { + return rtell(); +} + +bool Balau::SeekableHandle::canSeek() { return true; } + +void Balau::SeekableHandle::rseek(off_t offset, int whence) throw (GeneralException) { + Assert(canRead() || canWrite()); + off_t size; + if (!canRead()) + wseek(offset, whence); + switch (whence) { + case SEEK_SET: + m_rOffset = offset; + break; + case SEEK_CUR: + m_rOffset += offset; + break; + case SEEK_END: + size = getSize(); + if (getSize() < 0) + throw GeneralException("Can't seek from end in a Handle you don't know the max size"); + m_rOffset = size + offset; + break; + } + if (m_rOffset < 0) + m_rOffset = 0; +} + +void Balau::SeekableHandle::wseek(off_t offset, int whence) throw (GeneralException) { + Assert(canRead() || canWrite()); + off_t size; + if (!canWrite()) + rseek(offset, whence); + switch (whence) { + case SEEK_SET: + m_wOffset = offset; + break; + case SEEK_CUR: + m_wOffset += offset; + break; + case SEEK_END: + size = getSize(); + if (getSize() < 0) + throw GeneralException("Can't seek from end in a Handle you don't know the max size"); + m_wOffset = size + offset; + break; + } + if (m_wOffset < 0) + m_wOffset = 0; +} + +off_t Balau::SeekableHandle::rtell() throw (GeneralException) { + Assert(canRead() || canWrite()); + if (!canRead()) + return wtell(); +} + +off_t Balau::SeekableHandle::wtell() throw (GeneralException) { + Assert(canRead() || canWrite()); + if (!canWrite()) + return rtell(); +} diff --git a/src/Input.cc b/src/Input.cc new file mode 100644 index 0000000..633a1a7 --- /dev/null +++ b/src/Input.cc @@ -0,0 +1,60 @@ +#include +#include +#include +#include +#include "eio.h" +#include "Input.h" +#include "Task.h" + +struct cbResults_t { + Balau::Events::Custom evt; + int result, errorno; +}; + +static int eioDone(eio_req * req) { + cbResults_t * cbResults = (cbResults_t *) req->data; + cbResults->result = req->result; + cbResults->errorno = req->errorno; + cbResults->evt.doSignal(); +} + +Balau::Input::Input(const char * fname) throw (GeneralException) : m_fd(-1) { + cbResults_t cbResults; + m_name.set("Input(%s)", fname); + eio_req * r = eio_open(fname, O_RDONLY, 0, 0, eioDone, &cbResults); + Assert(r != 0); + Task::yield(&cbResults.evt); + Assert(cbResults.evt.gotSignal()); + if (cbResults.result < 0) { + char str[4096]; + throw GeneralException(String("Unable to open file ") + m_name + " for reading: " + strerror_r(cbResults.errorno, str, sizeof(str)) + " (err#" + cbResults.errorno + ")"); + } else { + m_fd = cbResults.result; + } +} + +void Balau::Input::close() throw (GeneralException) { + if (m_fd < 0) + return; + cbResults_t cbResults; + eio_req * r = eio_close(m_fd, 0, eioDone, &cbResults); + Assert(r != 0); + m_fd = -1; + Task::yield(&cbResults.evt); + Assert(cbResults.evt.gotSignal()); + if (cbResults.result < 0) { + char str[4096]; + strerror_r(cbResults.errorno, str, sizeof(str)); + throw GeneralException(String("Unable to close file ") + m_name + ": " + str); + } else { + m_fd = cbResults.result; + } +} + +bool Balau::Input::isClosed() { + return m_fd < 0; +} + +const char * Balau::Input::getName() { + return m_name.to_charp(); +} diff --git a/src/Task.cc b/src/Task.cc index e5c971d..a47bd8b 100644 --- a/src/Task.cc +++ b/src/Task.cc @@ -57,7 +57,7 @@ void Balau::Task::switchTo() { m_status = IDLE; } -void Balau::Task::suspend() { +void Balau::Task::yield() { coro_transfer(&m_ctx, &m_taskMan->m_returnContext); } @@ -91,3 +91,7 @@ void Balau::Events::Timeout::gotOwner(Task * task) { void Balau::Events::Timeout::evt_cb(ev::timer & w, int revents) { doSignal(); } + +void Balau::Events::Custom::gotOwner(Task * task) { + m_loop = task->getTaskMan()->getLoop(); +} diff --git a/src/TaskMan.cc b/src/TaskMan.cc index fd0246f..ea9c0fb 100644 --- a/src/TaskMan.cc +++ b/src/TaskMan.cc @@ -29,15 +29,24 @@ void Balau::TaskMan::mainLoop() { taskList_t::iterator iL; taskHash_t::iterator iH; Task * t; + bool noWait = false; - // checking "STARTING" tasks, and running them once + // checking "STARTING" tasks, and running them once; also try to build the status of the noWait boolean. for (iH = m_tasks.begin(); iH != m_tasks.end(); iH++) { t = *iH; - if (t->getStatus() == Task::STARTING) + if (t->getStatus() == Task::STARTING) { t->switchTo(); + if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED)) + noWait = true; + } } - ev_run(m_loop, EVRUN_ONCE); + // probably means we have pending tasks; or none at all, for some reason. Don't wait on it forever. + if (!noWait && m_tasks.size() == 0) + noWait = true; + + // libev's event "loop". We always runs it once though. + ev_run(m_loop, noWait ? EVRUN_NOWAIT : EVRUN_ONCE); // let's check who got signaled, and call them for (iH = m_signaledTasks.begin(); iH != m_signaledTasks.end(); iH++) { diff --git a/src/Threads.cc b/src/Threads.cc index 7c928f8..fa3f0f3 100644 --- a/src/Threads.cc +++ b/src/Threads.cc @@ -1,5 +1,5 @@ -#include -#include +#include "Exceptions.h" +#include "Threads.h" Balau::Lock::Lock() { int r; -- cgit v1.2.3