summaryrefslogtreecommitdiff
path: root/includes/Socket.h
blob: c31bdd8b9b97ca0b9dec5dba047e23e4d96815b4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#pragma once

#ifdef _WIN32
#include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <netdb.h>
#endif
#include <Handle.h>
#include <Task.h>
#include <Printer.h>

namespace Balau {

class Socket : public Handle {
  public:

      Socket() throw (GeneralException);
    virtual void close() throw (GeneralException);
    virtual ssize_t read(void * buf, size_t count) throw (GeneralException);
    virtual ssize_t write(const void * buf, size_t count) throw (GeneralException);
    virtual bool isClosed();
    virtual bool isEOF();
    virtual bool canRead();
    virtual bool canWrite();
    virtual const char * getName();

    bool setLocal(const char * hostname = NULL, int port = 0);
    bool connect(const char * hostname, int port);
    IO<Socket> accept() throw (GeneralException);
    bool listen();
  private:
      Socket(int fd);
    class SocketEvent : public Events::BaseEvent {
      public:
          SocketEvent(int fd, int evt = EV_READ | EV_WRITE) : m_task(NULL) { Printer::elog(E_SOCKET, "Got a new SocketEvent at %p", this); m_evt.set<SocketEvent, &SocketEvent::evt_cb>(this); m_evt.set(fd, evt); }
          virtual ~SocketEvent() { m_evt.stop(); }
          void stop() { reset(); m_evt.stop(); }
      private:
        void evt_cb(ev::io & w, int revents) { Printer::elog(E_SOCKET, "Got a libev callback on a SocketEvent at %p", this); doSignal(); }
        virtual void gotOwner(Task * task);

        ev::io m_evt;
        Task * m_task;
    };

    int m_fd;
    String m_name;
    bool m_connected;
    bool m_connecting;
    bool m_listening;
    sockaddr_in6 m_localAddr, m_remoteAddr;
    SocketEvent * m_evtR, * m_evtW;
};

template<class Worker>
class Listener : public Task {
  public:
      Listener(int port, const char * local = NULL, void * opaque = NULL) : m_stop(false), m_opaque(opaque) {
          bool r = m_listener.setLocal(local, port);
          Assert(r);
          r = m_listener.listen();
          Assert(r);
          m_name = String(ClassName(this).c_str()) + " - " + m_listener.getName();
          Printer::elog(E_SOCKET, "Created a listener task at %p", this);
      }
    virtual void Do() {
        waitFor(&m_evt);
        setOkayToEAgain(true);
        while (!m_stop) {
            IO<Socket> io;
            try {
                io = m_listener.accept();
            }
            catch (EAgain) {
                Printer::elog(E_SOCKET, "Listener task at %p (%s) got an EAgain - stop = %s", this, ClassName(this).c_str(), m_stop ? "true" : "false");
                if (!m_stop)
                    yield();
                continue;
            }
            new Worker(io, m_opaque);
        }
    }
    void stop() {
        Printer::elog(E_SOCKET, "Listener task at %p (%s) is asked to stop.", this, ClassName(this).c_str());
        m_stop = true;
        m_evt.trigger();
    }
    virtual const char * getName() { return m_name.to_charp(); }
  private:
    Socket m_listener;
    Events::Async m_evt;
    volatile bool m_stop;
    String m_name;
    void * m_opaque;
};

};