/* * Connection.cpp * * Copyright (C) 2008 Matthias Schiffer * * This program is free software: you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the * Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * See the GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program. If not, see . */ #include "Connection.h" #include "ThreadManager.h" #include #include #include namespace Mad { namespace Net { boost::asio::io_service Connection::ioService; Connection::~Connection() { if(_isConnected()) doDisconnect(); } void Connection::handleHandshake(const boost::system::error_code& error) { if(error) { Common::Logger::logf("Error: %s", error.message().c_str()); // TODO Error handling doDisconnect(); return; } { boost::lock_guard lock(connectionLock); state = CONNECTED; receiving = false; sending = 0; received = 0; } ThreadManager::get()->pushWork(boost::bind((void (boost::signal0::*)())&boost::signal0::operator(), &connectedSignal)); enterReceiveLoop(); } void Connection::handleShutdown(const boost::system::error_code& error) { boost::lock_guard lock(connectionLock); if(error) { // TODO Error } state = DISCONNECTED; ThreadManager::get()->pushWork(boost::bind((void (boost::signal0::*)())&boost::signal0::operator(), &disconnectedSignal)); } void Connection::enterReceiveLoop() { { boost::lock_guard lock(connectionLock); if(!_isConnected() || _isDisconnecting()) return; } rawReceive(sizeof(Packet::Data), boost::bind(&Connection::handleHeaderReceive, this, _1)); } void Connection::handleHeaderReceive(const std::vector &data) { { boost::lock_guard lock(connectionLock); header = *reinterpret_cast(data.data()); } if(header.length == 0) { ThreadManager::get()->pushWork(boost::bind((void (boost::signal1::*)(const Packet&))&boost::signal1::operator(), &receiveSignal, Packet(ntohs(header.requestId)))); enterReceiveLoop(); } else { rawReceive(ntohs(header.length), boost::bind(&Connection::handleDataReceive, this, _1)); } } void Connection::handleDataReceive(const std::vector &data) { { boost::upgrade_lock lock(connectionLock); Packet packet(ntohs(header.requestId), data.data(), ntohs(header.length)); ThreadManager::get()->pushWork(boost::bind((void (boost::signal1::*)(const Packet&))&boost::signal1::operator(), &receiveSignal, packet)); } enterReceiveLoop(); } void Connection::handleRead(const boost::system::error_code& error, std::size_t bytes_transferred, std::size_t length, const boost::function1& > ¬ify) { if(error || (bytes_transferred+received) < length) { Common::Logger::logf(Common::Logger::VERBOSE, "Read error: %s", error.message().c_str()); // TODO Error doDisconnect(); return; } std::vector buffer; { boost::shared_lock lock(connectionLock); if(state != CONNECTED || !receiving) return; buffer.insert(buffer.end(), receiveBuffer.data(), receiveBuffer.data()+length); } { boost::lock_guard lock(connectionLock); receiving = false; received = received + bytes_transferred - length; if(received) std::memmove(receiveBuffer.data(), receiveBuffer.data()+length, received); } notify(buffer); } void Connection::rawReceive(std::size_t length, const boost::function1& > ¬ify) { boost::upgrade_lock lock(connectionLock); if(!_isConnected()) return; { boost::upgrade_to_unique_lock upgradeLock(lock); if(receiving) return; receiving = true; if(length > received) { boost::asio::async_read(socket, boost::asio::buffer(receiveBuffer.data()+received, receiveBuffer.size()-received), boost::asio::transfer_at_least(length), boost::bind(&Connection::handleRead, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, length, notify)); return; } } lock.unlock(); handleRead(boost::system::error_code(), 0, length, notify); } void Connection::handleWrite(const boost::system::error_code& error, std::size_t) { { boost::unique_lock lock(connectionLock); sending--; if(state == DISCONNECT && !sending) { lock.unlock(); doDisconnect(); return; } } if(error) { Common::Logger::logf(Common::Logger::VERBOSE, "Write error: %s", error.message().c_str()); // TODO Error doDisconnect(); } } void Connection::rawSend(const uint8_t *data, std::size_t length) { boost::upgrade_lock lock(connectionLock); if(!_isConnected()) return; { boost::upgrade_to_unique_lock upgradeLock(lock); sending++; boost::asio::async_write(socket, Buffer(data, length), boost::bind(&Connection::handleWrite, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } } bool Connection::send(const Packet &packet) { { boost::shared_lock lock(connectionLock); if(!_isConnected() || _isConnecting() || _isDisconnecting()) return false; } rawSend((const uint8_t*)packet.getRawData(), packet.getRawDataLength()); return true; } void Connection::disconnect() { { boost::lock_guard lock(connectionLock); if(!_isConnected() || _isDisconnecting()) return; state = DISCONNECT; if(sending) return; } doDisconnect(); } void Connection::doDisconnect() { boost::lock_guard lock(connectionLock); if(_isConnected()) socket.async_shutdown(boost::bind(&Connection::handleShutdown, this, boost::asio::placeholders::error)); } } }