diff options
Diffstat (limited to 'src/Net/Connection.cpp')
-rw-r--r-- | src/Net/Connection.cpp | 170 |
1 files changed, 129 insertions, 41 deletions
diff --git a/src/Net/Connection.cpp b/src/Net/Connection.cpp index 0984f0a..2ccfddb 100644 --- a/src/Net/Connection.cpp +++ b/src/Net/Connection.cpp @@ -20,66 +20,92 @@ #include "Connection.h" #include "FdManager.h" #include "IPAddress.h" +#include "ThreadManager.h" + #include <cstring> #include <sys/socket.h> +#include <sigc++/bind.h> + namespace Mad { namespace Net { + +Connection::StaticInit Connection::staticInit; + + Connection::~Connection() { - if(isConnected()) + if(_isConnected()) doDisconnect(); if(transR.data) delete [] transR.data; - while(!sendQueueEmpty()) { + while(!_sendQueueEmpty()) { delete [] transS.front().data; transS.pop(); } gnutls_certificate_free_credentials(x509_cred); + gl_rwlock_destroy(stateLock); + gl_lock_destroy(sendLock); + gl_lock_destroy(receiveLock); + if(peer) delete peer; } void Connection::handshake() { - if(isConnected()) + gl_rwlock_wrlock(stateLock); + if(state != CONNECT) { + gl_rwlock_unlock(stateLock); return; + } state = HANDSHAKE; + gl_rwlock_unlock(stateLock); doHandshake(); } void Connection::bye() { - if(state != DISCONNECT) + gl_rwlock_wrlock(stateLock); + if(state != DISCONNECT) { + gl_rwlock_unlock(stateLock); return; + } state = BYE; + gl_rwlock_unlock(stateLock); doBye(); } void Connection::doHandshake() { - if(state != HANDSHAKE) + gl_rwlock_rdlock(stateLock); + if(state != HANDSHAKE) { + gl_rwlock_unlock(stateLock); return; + } int ret = gnutls_handshake(session); if(ret < 0) { + gl_rwlock_unlock(stateLock); + if(ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN) { updateEvents(); return; } - // TODO: Error doDisconnect(); return; } state = CONNECTION_HEADER; + gl_rwlock_unlock(stateLock); + connectionHeader(); } @@ -102,13 +128,21 @@ void Connection::doBye() { doDisconnect(); } -bool Connection::enterReceiveLoop() { - if(!isConnected() || isDisconnecting()) - return false; +void Connection::enterReceiveLoop() { + gl_rwlock_wrlock(stateLock); + + if(!_isConnected() || _isDisconnecting()) { + gl_rwlock_unlock(stateLock); + return; + } + + if(_isConnecting()) + ThreadManager::get()->pushWork(sigc::mem_fun(connectedSignal, &sigc::signal<void>::emit)); state = PACKET_HEADER; + gl_rwlock_unlock(stateLock); - return rawReceive(sizeof(Packet::Data), sigc::mem_fun(this, &Connection::packetHeaderReceiveHandler)); + rawReceive(sizeof(Packet::Data), sigc::mem_fun(this, &Connection::packetHeaderReceiveHandler)); } void Connection::packetHeaderReceiveHandler(const void *data, unsigned long length) { @@ -124,7 +158,7 @@ void Connection::packetHeaderReceiveHandler(const void *data, unsigned long leng header = *(const Packet::Data*)data; if(header.length == 0) { - signal(Packet(ntohs(header.requestId))); + ThreadManager::get()->pushWork(sigc::bind(sigc::mem_fun(receiveSignal, &sigc::signal<void,const Packet&>::emit), Packet(ntohs(header.requestId)))); enterReceiveLoop(); } @@ -144,7 +178,7 @@ void Connection::packetDataReceiveHandler(const void *data, unsigned long length return; } - signal(Packet(ntohs(header.requestId), data, length)); + ThreadManager::get()->pushWork(sigc::bind(sigc::mem_fun(receiveSignal, &sigc::signal<void,const Packet&>::emit), Packet(ntohs(header.requestId), data, length))); enterReceiveLoop(); } @@ -153,12 +187,17 @@ void Connection::doReceive() { if(!isConnected()) return; - if(receiveComplete()) + gl_lock_lock(receiveLock); + + if(_receiveComplete()) { + gl_lock_unlock(receiveLock); return; + } ssize_t ret = gnutls_record_recv(session, transR.data+transR.transmitted, transR.length-transR.transmitted); if(ret < 0) { + gl_lock_unlock(receiveLock); if(ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN) return; @@ -169,15 +208,20 @@ void Connection::doReceive() { transR.transmitted += ret; - if(receiveComplete()) { + if(_receiveComplete()) { // Save data pointer, as transR.notify might start a new reception uint8_t *data = transR.data; transR.data = 0; + gl_lock_unlock(receiveLock); + transR.notify(data, transR.length); delete [] data; } + else { + gl_lock_unlock(receiveLock); + } updateEvents(); } @@ -188,14 +232,19 @@ bool Connection::rawReceive(unsigned long length, if(!isConnected()) return false; - if(!receiveComplete()) + gl_lock_lock(receiveLock); + if(!_receiveComplete()) { + gl_lock_unlock(receiveLock); return false; + } transR.data = new uint8_t[length]; transR.length = length; transR.transmitted = 0; transR.notify = notify; + gl_lock_unlock(receiveLock); + updateEvents(); return true; @@ -205,11 +254,14 @@ void Connection::doSend() { if(!isConnected()) return; - while(!sendQueueEmpty()) { + gl_lock_lock(sendLock); + while(!_sendQueueEmpty()) { ssize_t ret = gnutls_record_send(session, transS.front().data+transS.front().transmitted, transS.front().length-transS.front().transmitted); if(ret < 0) { + gl_lock_unlock(sendLock); + if(ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN) return; @@ -226,6 +278,8 @@ void Connection::doSend() { } } + gl_lock_unlock(sendLock); + updateEvents(); } @@ -235,7 +289,10 @@ bool Connection::rawSend(const uint8_t *data, unsigned long length) { Transmission trans = {length, 0, new uint8_t[length], sigc::slot<void,const void*,unsigned long>()}; std::memcpy(trans.data, data, length); + + gl_lock_lock(sendLock); transS.push(trans); + gl_lock_unlock(sendLock); updateEvents(); @@ -248,14 +305,24 @@ void Connection::sendReceive(short events) { return; } - if(state == HANDSHAKE) { - doHandshake(); - return; - } + switch(state) { + case CONNECT: + handshake(); + return; + case HANDSHAKE: + doHandshake(); + return; + case DISCONNECT: + if(!_sendQueueEmpty()) + break; - if(state == BYE) { - doBye(); - return; + bye(); + return; + case BYE: + doBye(); + return; + default: + break; } if(events & POLLIN) @@ -263,48 +330,69 @@ void Connection::sendReceive(short events) { if(events & POLLOUT) doSend(); - - if(state == DISCONNECT && sendQueueEmpty()) - bye(); } bool Connection::send(const Packet &packet) { - if(!isConnected() || isConnecting() || isDisconnecting()) + gl_rwlock_rdlock(stateLock); + bool err = (!_isConnected() || _isConnecting() || _isDisconnecting()); + gl_rwlock_unlock(stateLock); + + if(err) return false; return rawSend((const uint8_t*)packet.getRawData(), packet.getRawDataLength()); } void Connection::disconnect() { - if(isConnected() && !isDisconnecting()) { - state = DISCONNECT; - - if(sendQueueEmpty()) - bye(); + gl_rwlock_wrlock(stateLock); + if(!_isConnected() || _isDisconnecting()) { + gl_rwlock_unlock(stateLock); + return; } + + state = DISCONNECT; + + gl_rwlock_unlock(stateLock); + + updateEvents(); } void Connection::doDisconnect() { - if(!isConnected()) - return; + gl_rwlock_wrlock(stateLock); + + if(_isConnected()) { + FdManager::get()->unregisterFd(sock); - FdManager::get()->unregisterFd(sock); + shutdown(sock, SHUT_RDWR); + close(sock); - shutdown(sock, SHUT_RDWR); - close(sock); + gnutls_deinit(session); - gnutls_deinit(session); + ThreadManager::get()->pushWork(sigc::mem_fun(disconnectedSignal, &sigc::signal<void>::emit)); - state = DISCONNECTED; + state = DISCONNECTED; + } + + gl_rwlock_unlock(stateLock); } -void Connection::updateEvents() const { - short events = (receiveComplete() ? 0 : POLLIN) | (sendQueueEmpty() ? 0 : POLLOUT); +void Connection::updateEvents() { + gl_lock_lock(receiveLock); + short events = (_receiveComplete() ? 0 : POLLIN); + gl_lock_unlock(receiveLock); + + gl_lock_lock(sendLock); + events |= (_sendQueueEmpty() ? 0 : POLLOUT); + gl_lock_unlock(sendLock); + gl_rwlock_rdlock(stateLock); if(state == HANDSHAKE || state == BYE) events = ((gnutls_record_get_direction(session) == 0) ? POLLIN : POLLOUT); + else if(state == CONNECT || state == DISCONNECT) + events |= POLLOUT; FdManager::get()->setFdEvents(sock, events); + gl_rwlock_unlock(stateLock); } } |