summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPixel <pixel@nobis-crew.org>2011-10-10 19:36:55 -0700
committerPixel <pixel@nobis-crew.org>2011-10-10 19:36:55 -0700
commit965148b43b5b859934b7af2e8447ba1026a43a19 (patch)
tree290ff60cd1ebc9a065109620b2e9a5ff4e5b3d17 /src
parentcf9a801ebcb4df0a8b1ea75e58ca8ea8960ba13b (diff)
Adding the basic "Handle" structure, and adding an early version of Input.
Renamed suspend() to yield(). Fixed a couple of bugs, and reorganized slightly some code.
Diffstat (limited to 'src')
-rw-r--r--src/Handle.cc151
-rw-r--r--src/Input.cc60
-rw-r--r--src/Task.cc6
-rw-r--r--src/TaskMan.cc15
-rw-r--r--src/Threads.cc4
5 files changed, 230 insertions, 6 deletions
diff --git a/src/Handle.cc b/src/Handle.cc
new file mode 100644
index 0000000..c9e3f57
--- /dev/null
+++ b/src/Handle.cc
@@ -0,0 +1,151 @@
+#include <typeinfo>
+#include "ev++.h"
+#include "eio.h"
+#include "Main.h"
+#include "TaskMan.h"
+#include "Handle.h"
+
+class eioInterface : public Balau::AtStart {
+ public:
+ eioInterface() : AtStart(100) { }
+ void repeatCB(ev::idle & w, int revents);
+ void readyCB(ev::async & w, int revents);
+ static void wantPoll();
+ virtual void doStart();
+ ev::idle m_repeat;
+ ev::async m_ready;
+};
+
+static eioInterface eioIF;
+
+void eioInterface::repeatCB(ev::idle & w, int revents) {
+ if (eio_poll() != -1)
+ w.stop();
+}
+
+void eioInterface::readyCB(ev::async & w, int revents) {
+ if (eio_poll() == -1)
+ m_repeat.start();
+}
+
+void eioInterface::doStart() {
+ Balau::TaskMan * taskMan = Balau::TaskMan::getTaskMan();
+ Assert(taskMan);
+ struct ev_loop * loop = taskMan->getLoop();
+
+ m_repeat.set(loop);
+ m_repeat.set<eioInterface, &eioInterface::repeatCB>(this);
+
+ m_ready.set(loop);
+ m_ready.set<eioInterface, &eioInterface::readyCB>(this);
+ m_ready.start();
+
+ eio_init(wantPoll, NULL);
+}
+
+void eioInterface::wantPoll() {
+ eioIF.m_ready.send();
+}
+
+bool Balau::Handle::canSeek() { return false; }
+bool Balau::Handle::canRead() { return false; }
+bool Balau::Handle::canWrite() { return false; }
+off_t Balau::Handle::getSize() { return -1; }
+
+ssize_t Balau::Handle::read(void * buf, size_t count) throw (GeneralException) {
+ if (canRead())
+ throw GeneralException(String("Handle ") + getName() + " can read, but read() not implemented (missing in class " + typeid(*this).name() + ")");
+ else
+ throw GeneralException("Handle can't read");
+ return -1;
+}
+
+ssize_t Balau::Handle::write(const void * buf, size_t count) throw (GeneralException) {
+ if (canWrite())
+ throw GeneralException(String("Handle ") + getName() + " can write, but write() not implemented (missing in class " + typeid(this).name() + ")");
+ else
+ throw GeneralException("Handle can't write");
+ return -1;
+}
+
+void Balau::Handle::rseek(off_t offset, int whence) throw (GeneralException) {
+ if (canSeek())
+ throw GeneralException(String("Handle ") + getName() + " can seek, but rseek() not implemented (missing in class " + typeid(this).name() + ")");
+ else
+ throw GeneralException("Handle can't seek");
+}
+
+void Balau::Handle::wseek(off_t offset, int whence) throw (GeneralException) {
+ rseek(offset, whence);
+}
+
+off_t Balau::Handle::rtell() throw (GeneralException) {
+ if (canSeek())
+ throw GeneralException(String("Handle ") + getName() + " can seek, but rtell() not implemented (missing in class " + typeid(this).name() + ")");
+ else
+ throw GeneralException("Handle can't seek");
+}
+
+off_t Balau::Handle::wtell() throw (GeneralException) {
+ return rtell();
+}
+
+bool Balau::SeekableHandle::canSeek() { return true; }
+
+void Balau::SeekableHandle::rseek(off_t offset, int whence) throw (GeneralException) {
+ Assert(canRead() || canWrite());
+ off_t size;
+ if (!canRead())
+ wseek(offset, whence);
+ switch (whence) {
+ case SEEK_SET:
+ m_rOffset = offset;
+ break;
+ case SEEK_CUR:
+ m_rOffset += offset;
+ break;
+ case SEEK_END:
+ size = getSize();
+ if (getSize() < 0)
+ throw GeneralException("Can't seek from end in a Handle you don't know the max size");
+ m_rOffset = size + offset;
+ break;
+ }
+ if (m_rOffset < 0)
+ m_rOffset = 0;
+}
+
+void Balau::SeekableHandle::wseek(off_t offset, int whence) throw (GeneralException) {
+ Assert(canRead() || canWrite());
+ off_t size;
+ if (!canWrite())
+ rseek(offset, whence);
+ switch (whence) {
+ case SEEK_SET:
+ m_wOffset = offset;
+ break;
+ case SEEK_CUR:
+ m_wOffset += offset;
+ break;
+ case SEEK_END:
+ size = getSize();
+ if (getSize() < 0)
+ throw GeneralException("Can't seek from end in a Handle you don't know the max size");
+ m_wOffset = size + offset;
+ break;
+ }
+ if (m_wOffset < 0)
+ m_wOffset = 0;
+}
+
+off_t Balau::SeekableHandle::rtell() throw (GeneralException) {
+ Assert(canRead() || canWrite());
+ if (!canRead())
+ return wtell();
+}
+
+off_t Balau::SeekableHandle::wtell() throw (GeneralException) {
+ Assert(canRead() || canWrite());
+ if (!canWrite())
+ return rtell();
+}
diff --git a/src/Input.cc b/src/Input.cc
new file mode 100644
index 0000000..633a1a7
--- /dev/null
+++ b/src/Input.cc
@@ -0,0 +1,60 @@
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include "eio.h"
+#include "Input.h"
+#include "Task.h"
+
+struct cbResults_t {
+ Balau::Events::Custom evt;
+ int result, errorno;
+};
+
+static int eioDone(eio_req * req) {
+ cbResults_t * cbResults = (cbResults_t *) req->data;
+ cbResults->result = req->result;
+ cbResults->errorno = req->errorno;
+ cbResults->evt.doSignal();
+}
+
+Balau::Input::Input(const char * fname) throw (GeneralException) : m_fd(-1) {
+ cbResults_t cbResults;
+ m_name.set("Input(%s)", fname);
+ eio_req * r = eio_open(fname, O_RDONLY, 0, 0, eioDone, &cbResults);
+ Assert(r != 0);
+ Task::yield(&cbResults.evt);
+ Assert(cbResults.evt.gotSignal());
+ if (cbResults.result < 0) {
+ char str[4096];
+ throw GeneralException(String("Unable to open file ") + m_name + " for reading: " + strerror_r(cbResults.errorno, str, sizeof(str)) + " (err#" + cbResults.errorno + ")");
+ } else {
+ m_fd = cbResults.result;
+ }
+}
+
+void Balau::Input::close() throw (GeneralException) {
+ if (m_fd < 0)
+ return;
+ cbResults_t cbResults;
+ eio_req * r = eio_close(m_fd, 0, eioDone, &cbResults);
+ Assert(r != 0);
+ m_fd = -1;
+ Task::yield(&cbResults.evt);
+ Assert(cbResults.evt.gotSignal());
+ if (cbResults.result < 0) {
+ char str[4096];
+ strerror_r(cbResults.errorno, str, sizeof(str));
+ throw GeneralException(String("Unable to close file ") + m_name + ": " + str);
+ } else {
+ m_fd = cbResults.result;
+ }
+}
+
+bool Balau::Input::isClosed() {
+ return m_fd < 0;
+}
+
+const char * Balau::Input::getName() {
+ return m_name.to_charp();
+}
diff --git a/src/Task.cc b/src/Task.cc
index e5c971d..a47bd8b 100644
--- a/src/Task.cc
+++ b/src/Task.cc
@@ -57,7 +57,7 @@ void Balau::Task::switchTo() {
m_status = IDLE;
}
-void Balau::Task::suspend() {
+void Balau::Task::yield() {
coro_transfer(&m_ctx, &m_taskMan->m_returnContext);
}
@@ -91,3 +91,7 @@ void Balau::Events::Timeout::gotOwner(Task * task) {
void Balau::Events::Timeout::evt_cb(ev::timer & w, int revents) {
doSignal();
}
+
+void Balau::Events::Custom::gotOwner(Task * task) {
+ m_loop = task->getTaskMan()->getLoop();
+}
diff --git a/src/TaskMan.cc b/src/TaskMan.cc
index fd0246f..ea9c0fb 100644
--- a/src/TaskMan.cc
+++ b/src/TaskMan.cc
@@ -29,15 +29,24 @@ void Balau::TaskMan::mainLoop() {
taskList_t::iterator iL;
taskHash_t::iterator iH;
Task * t;
+ bool noWait = false;
- // checking "STARTING" tasks, and running them once
+ // checking "STARTING" tasks, and running them once; also try to build the status of the noWait boolean.
for (iH = m_tasks.begin(); iH != m_tasks.end(); iH++) {
t = *iH;
- if (t->getStatus() == Task::STARTING)
+ if (t->getStatus() == Task::STARTING) {
t->switchTo();
+ if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED))
+ noWait = true;
+ }
}
- ev_run(m_loop, EVRUN_ONCE);
+ // probably means we have pending tasks; or none at all, for some reason. Don't wait on it forever.
+ if (!noWait && m_tasks.size() == 0)
+ noWait = true;
+
+ // libev's event "loop". We always runs it once though.
+ ev_run(m_loop, noWait ? EVRUN_NOWAIT : EVRUN_ONCE);
// let's check who got signaled, and call them
for (iH = m_signaledTasks.begin(); iH != m_signaledTasks.end(); iH++) {
diff --git a/src/Threads.cc b/src/Threads.cc
index 7c928f8..fa3f0f3 100644
--- a/src/Threads.cc
+++ b/src/Threads.cc
@@ -1,5 +1,5 @@
-#include <Exceptions.h>
-#include <Threads.h>
+#include "Exceptions.h"
+#include "Threads.h"
Balau::Lock::Lock() {
int r;