namespace Eva { public class PacketHandler { private DataInputStream istream; private DataOutputStream ostream; private int sizeLength; private bool running = false; public signal void received_term(Term term); public PacketHandler(InputStream input, OutputStream output, int length) { assert(length == 1 || length == 2 || length == 4); istream = new DataInputStream(input); ostream = new DataOutputStream(output); sizeLength = length; istream.byte_order = DataStreamByteOrder.BIG_ENDIAN; ostream.byte_order = DataStreamByteOrder.BIG_ENDIAN; } public void send(Term term) throws Error { Buffer buf = new Buffer(); unowned Erl.Buffer buffer = buf.buffer; term.encode(buf); buffer = buf.buffer; switch(sizeLength) { case 1: ostream.put_byte((uchar)buffer.buff.length, null); break; case 2: ostream.put_uint16((uint16)buffer.buff.length, null); break; case 4: ostream.put_uint32((uint32)buffer.buff.length, null); break; } ostream.write(buffer.buff, buffer.buff.length, null); } private async void receive() throws Error { while(running) { while(istream.get_available() < sizeLength) { yield istream.fill_async((ssize_t)(sizeLength - istream.get_available()), 0, null); } int length = 0; switch(sizeLength) { case 1: length = istream.read_byte(null); break; case 2: length = istream.read_uint16(null); break; case 4: length = (int)istream.read_uint32(null); break; } uint8[] buffer = new uint8[length]; size_t received = 0; while(received < length) { size_t ret = yield istream.read_async(&(buffer[received]), length-received, 0, null); assert(ret > 0); received += ret; } Term term = Term.decode(buffer); Idle.add(() => {received_term(term); return false;}); } } public void start() { running = true; receive.begin(); } } }