summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile10
-rw-r--r--includes/Handle.h64
-rw-r--r--includes/Input.h18
-rw-r--r--includes/Task.h12
-rw-r--r--src/Handle.cc151
-rw-r--r--src/Input.cc60
-rw-r--r--src/Task.cc6
-rw-r--r--src/TaskMan.cc15
-rw-r--r--src/Threads.cc4
-rw-r--r--tests/test-Handles.cc22
-rw-r--r--tests/test-Sanity.cc1
-rw-r--r--tests/test-Tasks.cc4
12 files changed, 356 insertions, 11 deletions
diff --git a/Makefile b/Makefile
index f0424e3..16fb160 100644
--- a/Makefile
+++ b/Makefile
@@ -56,13 +56,18 @@ vpath %.cc src:tests
vpath %.c libcoro:libeio:libev
BALAU_SOURCES = \
-BString.cc \
Local.cc \
+Threads.cc \
+\
+BString.cc \
Main.cc \
Printer.cc \
+\
+Handle.cc \
+Input.cc \
+\
Task.cc \
TaskMan.cc \
-Threads.cc \
LIBCORO_SOURCES = \
coro.c \
@@ -78,6 +83,7 @@ TEST_SOURCES = \
test-Sanity.cc \
test-String.cc \
test-Tasks.cc \
+test-Handles.cc \
LIB = libBalau.a
diff --git a/includes/Handle.h b/includes/Handle.h
new file mode 100644
index 0000000..d2e1f8c
--- /dev/null
+++ b/includes/Handle.h
@@ -0,0 +1,64 @@
+#pragma once
+
+#include <Exceptions.h>
+
+namespace Balau {
+
+class EAgain : public GeneralException {
+ public:
+ EAgain() : GeneralException("Try Again") { }
+};
+
+class IO;
+
+class Handle {
+ public:
+ virtual ~Handle() { Assert(m_refCount == 0); }
+ virtual void close() throw (GeneralException) = 0;
+ virtual bool isClosed() = 0;
+ virtual bool canSeek();
+ virtual bool canRead();
+ virtual bool canWrite();
+ virtual const char * getName() = 0;
+ virtual ssize_t read(void * buf, size_t count) throw (GeneralException);
+ virtual ssize_t write(const void * buf, size_t count) throw (GeneralException);
+ virtual void rseek(off_t offset, int whence = SEEK_SET) throw (GeneralException);
+ virtual void wseek(off_t offset, int whence = SEEK_SET) throw (GeneralException);
+ virtual off_t rtell() throw (GeneralException);
+ virtual off_t wtell() throw (GeneralException);
+ virtual off_t getSize();
+ protected:
+ Handle() : m_refCount(0) { }
+ private:
+ void addRef() { m_refCount++; }
+ void delRef() { if (--m_refCount == 0) { if (!isClosed()) close(); delete this; } }
+ int refCount() { return m_refCount; }
+ friend class IO;
+
+ int m_refCount;
+};
+
+class IO {
+ public:
+ template<class T> IO(T * h) { setHandle(h); }
+ ~IO() { m_h->delRef(); }
+ IO(const IO & io) { setHandle(io.m_h); }
+ Handle * operator->() { return m_h; }
+ protected:
+ void setHandle(Handle * h) { m_h = h; m_h->addRef(); }
+ private:
+ Handle * m_h;
+};
+
+class SeekableHandle : public Handle {
+ public:
+ virtual bool canSeek();
+ virtual void rseek(off_t offset, int whence = SEEK_SET) throw (GeneralException);
+ virtual void wseek(off_t offset, int whence = SEEK_SET) throw (GeneralException);
+ virtual off_t rtell() throw (GeneralException);
+ virtual off_t wtell() throw (GeneralException);
+ private:
+ off_t m_wOffset, m_rOffset;
+};
+
+};
diff --git a/includes/Input.h b/includes/Input.h
new file mode 100644
index 0000000..418b018
--- /dev/null
+++ b/includes/Input.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include <Handle.h>
+
+namespace Balau {
+
+class Input : public SeekableHandle {
+ public:
+ Input(const char * fname) throw (GeneralException);
+ virtual void close() throw (GeneralException);
+ virtual bool isClosed();
+ virtual const char * getName();
+ private:
+ int m_fd;
+ String m_name;
+};
+
+};
diff --git a/includes/Task.h b/includes/Task.h
index f0c1bbb..8a504c7 100644
--- a/includes/Task.h
+++ b/includes/Task.h
@@ -44,6 +44,15 @@ class TaskEvent : public BaseEvent {
Task * m_taskWaited;
};
+class Custom : public BaseEvent {
+ public:
+ void doSignal() { BaseEvent::doSignal(); ev_break(m_loop, EVBREAK_ALL); }
+ protected:
+ virtual void gotOwner(Task * task);
+ private:
+ struct ev_loop * m_loop;
+};
+
};
class Task {
@@ -60,9 +69,10 @@ class Task {
virtual const char * getName() = 0;
Status getStatus() { return m_status; }
static Task * getCurrentTask();
+ static void yield(Events::BaseEvent * evt) { Task * t = getCurrentTask(); t->waitFor(evt); t->yield(); }
TaskMan * getTaskMan() { return m_taskMan; }
protected:
- void suspend();
+ void yield();
virtual void Do() = 0;
void waitFor(Events::BaseEvent * event);
private:
diff --git a/src/Handle.cc b/src/Handle.cc
new file mode 100644
index 0000000..c9e3f57
--- /dev/null
+++ b/src/Handle.cc
@@ -0,0 +1,151 @@
+#include <typeinfo>
+#include "ev++.h"
+#include "eio.h"
+#include "Main.h"
+#include "TaskMan.h"
+#include "Handle.h"
+
+class eioInterface : public Balau::AtStart {
+ public:
+ eioInterface() : AtStart(100) { }
+ void repeatCB(ev::idle & w, int revents);
+ void readyCB(ev::async & w, int revents);
+ static void wantPoll();
+ virtual void doStart();
+ ev::idle m_repeat;
+ ev::async m_ready;
+};
+
+static eioInterface eioIF;
+
+void eioInterface::repeatCB(ev::idle & w, int revents) {
+ if (eio_poll() != -1)
+ w.stop();
+}
+
+void eioInterface::readyCB(ev::async & w, int revents) {
+ if (eio_poll() == -1)
+ m_repeat.start();
+}
+
+void eioInterface::doStart() {
+ Balau::TaskMan * taskMan = Balau::TaskMan::getTaskMan();
+ Assert(taskMan);
+ struct ev_loop * loop = taskMan->getLoop();
+
+ m_repeat.set(loop);
+ m_repeat.set<eioInterface, &eioInterface::repeatCB>(this);
+
+ m_ready.set(loop);
+ m_ready.set<eioInterface, &eioInterface::readyCB>(this);
+ m_ready.start();
+
+ eio_init(wantPoll, NULL);
+}
+
+void eioInterface::wantPoll() {
+ eioIF.m_ready.send();
+}
+
+bool Balau::Handle::canSeek() { return false; }
+bool Balau::Handle::canRead() { return false; }
+bool Balau::Handle::canWrite() { return false; }
+off_t Balau::Handle::getSize() { return -1; }
+
+ssize_t Balau::Handle::read(void * buf, size_t count) throw (GeneralException) {
+ if (canRead())
+ throw GeneralException(String("Handle ") + getName() + " can read, but read() not implemented (missing in class " + typeid(*this).name() + ")");
+ else
+ throw GeneralException("Handle can't read");
+ return -1;
+}
+
+ssize_t Balau::Handle::write(const void * buf, size_t count) throw (GeneralException) {
+ if (canWrite())
+ throw GeneralException(String("Handle ") + getName() + " can write, but write() not implemented (missing in class " + typeid(this).name() + ")");
+ else
+ throw GeneralException("Handle can't write");
+ return -1;
+}
+
+void Balau::Handle::rseek(off_t offset, int whence) throw (GeneralException) {
+ if (canSeek())
+ throw GeneralException(String("Handle ") + getName() + " can seek, but rseek() not implemented (missing in class " + typeid(this).name() + ")");
+ else
+ throw GeneralException("Handle can't seek");
+}
+
+void Balau::Handle::wseek(off_t offset, int whence) throw (GeneralException) {
+ rseek(offset, whence);
+}
+
+off_t Balau::Handle::rtell() throw (GeneralException) {
+ if (canSeek())
+ throw GeneralException(String("Handle ") + getName() + " can seek, but rtell() not implemented (missing in class " + typeid(this).name() + ")");
+ else
+ throw GeneralException("Handle can't seek");
+}
+
+off_t Balau::Handle::wtell() throw (GeneralException) {
+ return rtell();
+}
+
+bool Balau::SeekableHandle::canSeek() { return true; }
+
+void Balau::SeekableHandle::rseek(off_t offset, int whence) throw (GeneralException) {
+ Assert(canRead() || canWrite());
+ off_t size;
+ if (!canRead())
+ wseek(offset, whence);
+ switch (whence) {
+ case SEEK_SET:
+ m_rOffset = offset;
+ break;
+ case SEEK_CUR:
+ m_rOffset += offset;
+ break;
+ case SEEK_END:
+ size = getSize();
+ if (getSize() < 0)
+ throw GeneralException("Can't seek from end in a Handle you don't know the max size");
+ m_rOffset = size + offset;
+ break;
+ }
+ if (m_rOffset < 0)
+ m_rOffset = 0;
+}
+
+void Balau::SeekableHandle::wseek(off_t offset, int whence) throw (GeneralException) {
+ Assert(canRead() || canWrite());
+ off_t size;
+ if (!canWrite())
+ rseek(offset, whence);
+ switch (whence) {
+ case SEEK_SET:
+ m_wOffset = offset;
+ break;
+ case SEEK_CUR:
+ m_wOffset += offset;
+ break;
+ case SEEK_END:
+ size = getSize();
+ if (getSize() < 0)
+ throw GeneralException("Can't seek from end in a Handle you don't know the max size");
+ m_wOffset = size + offset;
+ break;
+ }
+ if (m_wOffset < 0)
+ m_wOffset = 0;
+}
+
+off_t Balau::SeekableHandle::rtell() throw (GeneralException) {
+ Assert(canRead() || canWrite());
+ if (!canRead())
+ return wtell();
+}
+
+off_t Balau::SeekableHandle::wtell() throw (GeneralException) {
+ Assert(canRead() || canWrite());
+ if (!canWrite())
+ return rtell();
+}
diff --git a/src/Input.cc b/src/Input.cc
new file mode 100644
index 0000000..633a1a7
--- /dev/null
+++ b/src/Input.cc
@@ -0,0 +1,60 @@
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include "eio.h"
+#include "Input.h"
+#include "Task.h"
+
+struct cbResults_t {
+ Balau::Events::Custom evt;
+ int result, errorno;
+};
+
+static int eioDone(eio_req * req) {
+ cbResults_t * cbResults = (cbResults_t *) req->data;
+ cbResults->result = req->result;
+ cbResults->errorno = req->errorno;
+ cbResults->evt.doSignal();
+}
+
+Balau::Input::Input(const char * fname) throw (GeneralException) : m_fd(-1) {
+ cbResults_t cbResults;
+ m_name.set("Input(%s)", fname);
+ eio_req * r = eio_open(fname, O_RDONLY, 0, 0, eioDone, &cbResults);
+ Assert(r != 0);
+ Task::yield(&cbResults.evt);
+ Assert(cbResults.evt.gotSignal());
+ if (cbResults.result < 0) {
+ char str[4096];
+ throw GeneralException(String("Unable to open file ") + m_name + " for reading: " + strerror_r(cbResults.errorno, str, sizeof(str)) + " (err#" + cbResults.errorno + ")");
+ } else {
+ m_fd = cbResults.result;
+ }
+}
+
+void Balau::Input::close() throw (GeneralException) {
+ if (m_fd < 0)
+ return;
+ cbResults_t cbResults;
+ eio_req * r = eio_close(m_fd, 0, eioDone, &cbResults);
+ Assert(r != 0);
+ m_fd = -1;
+ Task::yield(&cbResults.evt);
+ Assert(cbResults.evt.gotSignal());
+ if (cbResults.result < 0) {
+ char str[4096];
+ strerror_r(cbResults.errorno, str, sizeof(str));
+ throw GeneralException(String("Unable to close file ") + m_name + ": " + str);
+ } else {
+ m_fd = cbResults.result;
+ }
+}
+
+bool Balau::Input::isClosed() {
+ return m_fd < 0;
+}
+
+const char * Balau::Input::getName() {
+ return m_name.to_charp();
+}
diff --git a/src/Task.cc b/src/Task.cc
index e5c971d..a47bd8b 100644
--- a/src/Task.cc
+++ b/src/Task.cc
@@ -57,7 +57,7 @@ void Balau::Task::switchTo() {
m_status = IDLE;
}
-void Balau::Task::suspend() {
+void Balau::Task::yield() {
coro_transfer(&m_ctx, &m_taskMan->m_returnContext);
}
@@ -91,3 +91,7 @@ void Balau::Events::Timeout::gotOwner(Task * task) {
void Balau::Events::Timeout::evt_cb(ev::timer & w, int revents) {
doSignal();
}
+
+void Balau::Events::Custom::gotOwner(Task * task) {
+ m_loop = task->getTaskMan()->getLoop();
+}
diff --git a/src/TaskMan.cc b/src/TaskMan.cc
index fd0246f..ea9c0fb 100644
--- a/src/TaskMan.cc
+++ b/src/TaskMan.cc
@@ -29,15 +29,24 @@ void Balau::TaskMan::mainLoop() {
taskList_t::iterator iL;
taskHash_t::iterator iH;
Task * t;
+ bool noWait = false;
- // checking "STARTING" tasks, and running them once
+ // checking "STARTING" tasks, and running them once; also try to build the status of the noWait boolean.
for (iH = m_tasks.begin(); iH != m_tasks.end(); iH++) {
t = *iH;
- if (t->getStatus() == Task::STARTING)
+ if (t->getStatus() == Task::STARTING) {
t->switchTo();
+ if ((t->getStatus() == Task::STOPPED) || (t->getStatus() == Task::FAULTED))
+ noWait = true;
+ }
}
- ev_run(m_loop, EVRUN_ONCE);
+ // probably means we have pending tasks; or none at all, for some reason. Don't wait on it forever.
+ if (!noWait && m_tasks.size() == 0)
+ noWait = true;
+
+ // libev's event "loop". We always runs it once though.
+ ev_run(m_loop, noWait ? EVRUN_NOWAIT : EVRUN_ONCE);
// let's check who got signaled, and call them
for (iH = m_signaledTasks.begin(); iH != m_signaledTasks.end(); iH++) {
diff --git a/src/Threads.cc b/src/Threads.cc
index 7c928f8..fa3f0f3 100644
--- a/src/Threads.cc
+++ b/src/Threads.cc
@@ -1,5 +1,5 @@
-#include <Exceptions.h>
-#include <Threads.h>
+#include "Exceptions.h"
+#include "Threads.h"
Balau::Lock::Lock() {
int r;
diff --git a/tests/test-Handles.cc b/tests/test-Handles.cc
new file mode 100644
index 0000000..38b020b
--- /dev/null
+++ b/tests/test-Handles.cc
@@ -0,0 +1,22 @@
+#include <Main.h>
+#include <Input.h>
+
+BALAU_STARTUP;
+
+using namespace Balau;
+
+void MainTask::Do() {
+ Printer::log(M_STATUS, "Test::Handles running.");
+
+ bool failed = false;
+ try {
+ IO i(new Input("SomeInexistantFile.txt"));
+ }
+ catch (GeneralException) {
+ failed = true;
+ }
+ Assert(failed);
+ IO i(new Input("Makefile"));
+
+ Printer::log(M_STATUS, "Test::Handles passed.");
+}
diff --git a/tests/test-Sanity.cc b/tests/test-Sanity.cc
index 52a85cc..54fa626 100644
--- a/tests/test-Sanity.cc
+++ b/tests/test-Sanity.cc
@@ -8,6 +8,7 @@ void MainTask::Do() {
Printer::log(M_STATUS, "Test::Sanity running.");
Assert(sizeof(off_t) == 8);
+ Assert(sizeof(size_t) == 4);
Printer::log(M_STATUS, "Test::Sanity passed.");
}
diff --git a/tests/test-Tasks.cc b/tests/test-Tasks.cc
index 148f943..1faa194 100644
--- a/tests/test-Tasks.cc
+++ b/tests/test-Tasks.cc
@@ -30,13 +30,13 @@ void MainTask::Do() {
Events::TaskEvent taskEvt(testTask);
waitFor(&taskEvt);
Assert(!taskEvt.gotSignal());
- suspend();
+ yield();
Assert(taskEvt.gotSignal());
Events::Timeout timeout(0.1);
waitFor(&timeout);
Assert(!timeout.gotSignal());
- suspend();
+ yield();
Assert(timeout.gotSignal());
Printer::log(M_STATUS, "Test::Tasks passed.");