summaryrefslogtreecommitdiffstats
path: root/src/Net/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/Net/Connection.cpp')
-rw-r--r--src/Net/Connection.cpp170
1 files changed, 129 insertions, 41 deletions
diff --git a/src/Net/Connection.cpp b/src/Net/Connection.cpp
index 0984f0a..2ccfddb 100644
--- a/src/Net/Connection.cpp
+++ b/src/Net/Connection.cpp
@@ -20,66 +20,92 @@
#include "Connection.h"
#include "FdManager.h"
#include "IPAddress.h"
+#include "ThreadManager.h"
+
#include <cstring>
#include <sys/socket.h>
+#include <sigc++/bind.h>
+
namespace Mad {
namespace Net {
+
+Connection::StaticInit Connection::staticInit;
+
+
Connection::~Connection() {
- if(isConnected())
+ if(_isConnected())
doDisconnect();
if(transR.data)
delete [] transR.data;
- while(!sendQueueEmpty()) {
+ while(!_sendQueueEmpty()) {
delete [] transS.front().data;
transS.pop();
}
gnutls_certificate_free_credentials(x509_cred);
+ gl_rwlock_destroy(stateLock);
+ gl_lock_destroy(sendLock);
+ gl_lock_destroy(receiveLock);
+
if(peer)
delete peer;
}
void Connection::handshake() {
- if(isConnected())
+ gl_rwlock_wrlock(stateLock);
+ if(state != CONNECT) {
+ gl_rwlock_unlock(stateLock);
return;
+ }
state = HANDSHAKE;
+ gl_rwlock_unlock(stateLock);
doHandshake();
}
void Connection::bye() {
- if(state != DISCONNECT)
+ gl_rwlock_wrlock(stateLock);
+ if(state != DISCONNECT) {
+ gl_rwlock_unlock(stateLock);
return;
+ }
state = BYE;
+ gl_rwlock_unlock(stateLock);
doBye();
}
void Connection::doHandshake() {
- if(state != HANDSHAKE)
+ gl_rwlock_rdlock(stateLock);
+ if(state != HANDSHAKE) {
+ gl_rwlock_unlock(stateLock);
return;
+ }
int ret = gnutls_handshake(session);
if(ret < 0) {
+ gl_rwlock_unlock(stateLock);
+
if(ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN) {
updateEvents();
return;
}
-
// TODO: Error
doDisconnect();
return;
}
state = CONNECTION_HEADER;
+ gl_rwlock_unlock(stateLock);
+
connectionHeader();
}
@@ -102,13 +128,21 @@ void Connection::doBye() {
doDisconnect();
}
-bool Connection::enterReceiveLoop() {
- if(!isConnected() || isDisconnecting())
- return false;
+void Connection::enterReceiveLoop() {
+ gl_rwlock_wrlock(stateLock);
+
+ if(!_isConnected() || _isDisconnecting()) {
+ gl_rwlock_unlock(stateLock);
+ return;
+ }
+
+ if(_isConnecting())
+ ThreadManager::get()->pushWork(sigc::mem_fun(connectedSignal, &sigc::signal<void>::emit));
state = PACKET_HEADER;
+ gl_rwlock_unlock(stateLock);
- return rawReceive(sizeof(Packet::Data), sigc::mem_fun(this, &Connection::packetHeaderReceiveHandler));
+ rawReceive(sizeof(Packet::Data), sigc::mem_fun(this, &Connection::packetHeaderReceiveHandler));
}
void Connection::packetHeaderReceiveHandler(const void *data, unsigned long length) {
@@ -124,7 +158,7 @@ void Connection::packetHeaderReceiveHandler(const void *data, unsigned long leng
header = *(const Packet::Data*)data;
if(header.length == 0) {
- signal(Packet(ntohs(header.requestId)));
+ ThreadManager::get()->pushWork(sigc::bind(sigc::mem_fun(receiveSignal, &sigc::signal<void,const Packet&>::emit), Packet(ntohs(header.requestId))));
enterReceiveLoop();
}
@@ -144,7 +178,7 @@ void Connection::packetDataReceiveHandler(const void *data, unsigned long length
return;
}
- signal(Packet(ntohs(header.requestId), data, length));
+ ThreadManager::get()->pushWork(sigc::bind(sigc::mem_fun(receiveSignal, &sigc::signal<void,const Packet&>::emit), Packet(ntohs(header.requestId), data, length)));
enterReceiveLoop();
}
@@ -153,12 +187,17 @@ void Connection::doReceive() {
if(!isConnected())
return;
- if(receiveComplete())
+ gl_lock_lock(receiveLock);
+
+ if(_receiveComplete()) {
+ gl_lock_unlock(receiveLock);
return;
+ }
ssize_t ret = gnutls_record_recv(session, transR.data+transR.transmitted, transR.length-transR.transmitted);
if(ret < 0) {
+ gl_lock_unlock(receiveLock);
if(ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN)
return;
@@ -169,15 +208,20 @@ void Connection::doReceive() {
transR.transmitted += ret;
- if(receiveComplete()) {
+ if(_receiveComplete()) {
// Save data pointer, as transR.notify might start a new reception
uint8_t *data = transR.data;
transR.data = 0;
+ gl_lock_unlock(receiveLock);
+
transR.notify(data, transR.length);
delete [] data;
}
+ else {
+ gl_lock_unlock(receiveLock);
+ }
updateEvents();
}
@@ -188,14 +232,19 @@ bool Connection::rawReceive(unsigned long length,
if(!isConnected())
return false;
- if(!receiveComplete())
+ gl_lock_lock(receiveLock);
+ if(!_receiveComplete()) {
+ gl_lock_unlock(receiveLock);
return false;
+ }
transR.data = new uint8_t[length];
transR.length = length;
transR.transmitted = 0;
transR.notify = notify;
+ gl_lock_unlock(receiveLock);
+
updateEvents();
return true;
@@ -205,11 +254,14 @@ void Connection::doSend() {
if(!isConnected())
return;
- while(!sendQueueEmpty()) {
+ gl_lock_lock(sendLock);
+ while(!_sendQueueEmpty()) {
ssize_t ret = gnutls_record_send(session, transS.front().data+transS.front().transmitted,
transS.front().length-transS.front().transmitted);
if(ret < 0) {
+ gl_lock_unlock(sendLock);
+
if(ret == GNUTLS_E_INTERRUPTED || ret == GNUTLS_E_AGAIN)
return;
@@ -226,6 +278,8 @@ void Connection::doSend() {
}
}
+ gl_lock_unlock(sendLock);
+
updateEvents();
}
@@ -235,7 +289,10 @@ bool Connection::rawSend(const uint8_t *data, unsigned long length) {
Transmission trans = {length, 0, new uint8_t[length], sigc::slot<void,const void*,unsigned long>()};
std::memcpy(trans.data, data, length);
+
+ gl_lock_lock(sendLock);
transS.push(trans);
+ gl_lock_unlock(sendLock);
updateEvents();
@@ -248,14 +305,24 @@ void Connection::sendReceive(short events) {
return;
}
- if(state == HANDSHAKE) {
- doHandshake();
- return;
- }
+ switch(state) {
+ case CONNECT:
+ handshake();
+ return;
+ case HANDSHAKE:
+ doHandshake();
+ return;
+ case DISCONNECT:
+ if(!_sendQueueEmpty())
+ break;
- if(state == BYE) {
- doBye();
- return;
+ bye();
+ return;
+ case BYE:
+ doBye();
+ return;
+ default:
+ break;
}
if(events & POLLIN)
@@ -263,48 +330,69 @@ void Connection::sendReceive(short events) {
if(events & POLLOUT)
doSend();
-
- if(state == DISCONNECT && sendQueueEmpty())
- bye();
}
bool Connection::send(const Packet &packet) {
- if(!isConnected() || isConnecting() || isDisconnecting())
+ gl_rwlock_rdlock(stateLock);
+ bool err = (!_isConnected() || _isConnecting() || _isDisconnecting());
+ gl_rwlock_unlock(stateLock);
+
+ if(err)
return false;
return rawSend((const uint8_t*)packet.getRawData(), packet.getRawDataLength());
}
void Connection::disconnect() {
- if(isConnected() && !isDisconnecting()) {
- state = DISCONNECT;
-
- if(sendQueueEmpty())
- bye();
+ gl_rwlock_wrlock(stateLock);
+ if(!_isConnected() || _isDisconnecting()) {
+ gl_rwlock_unlock(stateLock);
+ return;
}
+
+ state = DISCONNECT;
+
+ gl_rwlock_unlock(stateLock);
+
+ updateEvents();
}
void Connection::doDisconnect() {
- if(!isConnected())
- return;
+ gl_rwlock_wrlock(stateLock);
+
+ if(_isConnected()) {
+ FdManager::get()->unregisterFd(sock);
- FdManager::get()->unregisterFd(sock);
+ shutdown(sock, SHUT_RDWR);
+ close(sock);
- shutdown(sock, SHUT_RDWR);
- close(sock);
+ gnutls_deinit(session);
- gnutls_deinit(session);
+ ThreadManager::get()->pushWork(sigc::mem_fun(disconnectedSignal, &sigc::signal<void>::emit));
- state = DISCONNECTED;
+ state = DISCONNECTED;
+ }
+
+ gl_rwlock_unlock(stateLock);
}
-void Connection::updateEvents() const {
- short events = (receiveComplete() ? 0 : POLLIN) | (sendQueueEmpty() ? 0 : POLLOUT);
+void Connection::updateEvents() {
+ gl_lock_lock(receiveLock);
+ short events = (_receiveComplete() ? 0 : POLLIN);
+ gl_lock_unlock(receiveLock);
+
+ gl_lock_lock(sendLock);
+ events |= (_sendQueueEmpty() ? 0 : POLLOUT);
+ gl_lock_unlock(sendLock);
+ gl_rwlock_rdlock(stateLock);
if(state == HANDSHAKE || state == BYE)
events = ((gnutls_record_get_direction(session) == 0) ? POLLIN : POLLOUT);
+ else if(state == CONNECT || state == DISCONNECT)
+ events |= POLLOUT;
FdManager::get()->setFdEvents(sock, events);
+ gl_rwlock_unlock(stateLock);
}
}