summaryrefslogtreecommitdiffstats
path: root/src/PacketHandler.vala
blob: 0f5cd3c6175ed24ba6fcf5c2fa989bcc338e02ab (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
namespace Eva {
  public class PacketHandler {
    private DataInputStream istream;
    private DataOutputStream ostream;
    private int sizeLength;
    
    public signal void received_term(Term term);
    
    public PacketHandler(InputStream input, OutputStream output, int length) {
      assert(length == 1 || length == 2 || length == 4);
      
      istream = new DataInputStream(input);
      ostream = new DataOutputStream(output);
      sizeLength = length;
      
      istream.byte_order = DataStreamByteOrder.BIG_ENDIAN;
      ostream.byte_order = DataStreamByteOrder.BIG_ENDIAN;
    }
    
    public void send(Term term) throws Error {
      Erl.Buffer buffer = Erl.Buffer.with_version();
      term.encode(buffer);
      
      switch(sizeLength) {
      case 1:
        ostream.put_byte((uchar)buffer.buff.length, null);
        break;
      case 2:
        ostream.put_uint16((uint16)buffer.buff.length, null);
        break;
      case 4:
        ostream.put_uint32((uint32)buffer.buff.length, null);
        break;
      }
      
      ostream.write(buffer.buff, buffer.buff.length, null);
    }
    
    private async void receive() throws Error {
      while(true) {
        while(istream.get_available() < sizeLength) {
          yield istream.fill_async((ssize_t)(sizeLength - istream.get_available()), 0, null);
        }
        
        int length = 0;
        switch(sizeLength) {
        case 1:
          length = istream.read_byte(null);
          break;
        case 2:
          length = istream.read_uint16(null);
          break;
        case 4:
          length = (int)istream.read_uint32(null);
          break;
        }
        
        uint8[] buffer = new uint8[length];
        size_t received = 0;
        
        while(received < length) {
          size_t ret = yield istream.read_async(&(buffer[received]), length-received, 0, null);
          assert(ret > 0);
          
          received += ret;
        }
        
        Term term = Term.decode(buffer);
        Idle.add(() => {received_term(term); return false;});
      }
    }
    
    public void start() {
      receive.begin();
    }
  }
}