summaryrefslogtreecommitdiffstats
path: root/src/PacketHandler.vala
blob: 8c6a576b6b24f2d7d9a6c9fbdb8f3ca2a038d2b4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
namespace Eva {
  public class PacketHandler {
    private DataInputStream istream;
    private DataOutputStream ostream;
    private int sizeLength;
    private bool running = false;
    
    public signal void received_term(Term term);
    
    public PacketHandler(InputStream input, OutputStream output, int length) {
      assert(length == 1 || length == 2 || length == 4);
      
      istream = new DataInputStream(input);
      ostream = new DataOutputStream(output);
      sizeLength = length;
      
      istream.byte_order = DataStreamByteOrder.BIG_ENDIAN;
      ostream.byte_order = DataStreamByteOrder.BIG_ENDIAN;
    }
    
    public void send(Term term) throws Error {
      Buffer buf = new Buffer();
      unowned Erl.Buffer buffer = buf.buffer;
      term.encode(buf);

      buffer = buf.buffer;

      switch(sizeLength) {
      case 1:
        ostream.put_byte((uchar)buffer.buff.length, null);
        break;
      case 2:
        ostream.put_uint16((uint16)buffer.buff.length, null);
        break;
      case 4:
        ostream.put_uint32((uint32)buffer.buff.length, null);
        break;
      }

      ostream.write(((uint8[])buffer.buff)[0:buffer.buff.length], null);
    }
    
    private async void receive() throws Error {
      while(running) {
        while(istream.get_available() < sizeLength) {
          yield istream.fill_async((ssize_t)(sizeLength - istream.get_available()), 0, null);
        }
        
        int length = 0;
        switch(sizeLength) {
        case 1:
          length = istream.read_byte(null);
          break;
        case 2:
          length = istream.read_uint16(null);
          break;
        case 4:
          length = (int)istream.read_uint32(null);
          break;
        }
        
        uint8[] buffer = new uint8[length];
        ssize_t received = 0;
        
        while(received < length) {
          ssize_t ret = yield istream.read_async(buffer[received:length], 0, null);
          assert(ret > 0);
          
          received += ret;
        }
        
        Term term = Term.decode(buffer);
        Idle.add(() => {received_term(term); return false;});
      }
    }
    
    public void start() {
      running = true;
      receive.begin();
    }
  }
}