/* * Connection.cpp * * Copyright (C) 2008 Matthias Schiffer * * This program is free software: you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published by the * Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * See the GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License along * with this program. If not, see . */ #include "Connection.h" #include #include #include namespace Mad { namespace Net { Connection::~Connection() { doDisconnect(); waitWhileConnected(); } void Connection::handleHandshake(const boost::system::error_code& error) { if(error) { application->log(Core::Logger::LOG_NETWORK, Core::Format("Error: %1%") % error.message()); // TODO Error handling doDisconnect(); return; } { boost::lock_guard lock(connectionLock); _setState(CONNECT); receiving = false; sending = 0; received = 0; connectedSignal.emit(); if(dontStart) return; } startReceive(); } void Connection::handleShutdown(const boost::system::error_code& error) { boost::lock_guard lock(connectionLock); if(error) { application->log(Core::Logger::LOG_NETWORK, Core::Logger::LOG_VERBOSE, Core::Format("Shutdown error: %1%") % error.message()); } _setState(DISCONNECTED); disconnectedSignal.emit(); } void Connection::enterReceiveLoop() { { boost::lock_guard lock(connectionLock); if(state != CONNECTED) return; } rawReceive(sizeof(Packet::Header), boost::bind(&Connection::handleHeaderReceive, thisPtr.lock(), _1)); } void Connection::handleHeaderReceive(const boost::shared_array &data) { { boost::lock_guard lock(connectionLock); header = *reinterpret_cast(data.get()); } boost::uint32_t length = ntohl(header.length); if(length == 0) { receiveSignal.emit(boost::shared_ptr(new Packet(ntohs(header.requestId)))); enterReceiveLoop(); } else if(length > receiveLimit) { application->log(Core::Logger::LOG_NETWORK, Core::Logger::LOG_WARNING, "Packet size limit exceeded. Disconnecting."); doDisconnect(); } else { rawReceive(ntohl(header.length), boost::bind(&Connection::handleDataReceive, thisPtr.lock(), _1)); } } void Connection::handleDataReceive(const boost::shared_array &data) { { boost::upgrade_lock lock(connectionLock); receiveSignal.emit(boost::shared_ptr(new Packet(ntohs(header.requestId), data.get(), ntohl(header.length)))); } enterReceiveLoop(); } void Connection::handleRead(const boost::system::error_code& error, std::size_t bytes_transferred, std::size_t length, const boost::function1& > ¬ify) { if(error || (bytes_transferred+received) < length) { if(error == boost::system::errc::operation_canceled) return; application->log(Core::Logger::LOG_NETWORK, Core::Format("Read error: %1%") % error.message()); // TODO Error doDisconnect(); return; } boost::shared_array buffer(new boost::uint8_t[length]); { boost::shared_lock lock(connectionLock); if(state != CONNECTED || !receiving) return; std::memcpy(buffer.get(), receiveBuffer->data(), length); } { boost::lock_guard lock(connectionLock); receiving = false; received = received + bytes_transferred - length; if(received) std::memmove(receiveBuffer->data(), receiveBuffer->data()+length, received); } notify(buffer); } void Connection::rawReceive(std::size_t length, const boost::function1& > ¬ify) { boost::upgrade_lock lock(connectionLock); if(!_isConnected()) return; { boost::upgrade_to_unique_lock upgradeLock(lock); if(receiving) return; receiving = true; if(length > received) { boost::asio::async_read(socket, boost::asio::buffer(receiveBuffer->data()+received, receiveBuffer->size()-received), boost::asio::transfer_at_least(length-received), boost::bind(&Connection::handleRead, thisPtr.lock(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, length, notify)); return; } } lock.unlock(); handleRead(boost::system::error_code(), 0, length, notify); } void Connection::handleWrite(const boost::system::error_code& error, std::size_t) { if(error) application->log(Core::Logger::LOG_NETWORK, Core::Logger::LOG_VERBOSE, Core::Format("Write error: %1%") % error.message()); { boost::unique_lock lock(connectionLock); sending--; if(state == DISCONNECT && !sending) { lock.unlock(); doDisconnect(); return; } } if(error) { // TODO Error doDisconnect(); } } void Connection::rawSend(const boost::uint8_t *data, std::size_t length) { boost::upgrade_lock lock(connectionLock); if(!_isConnected()) return; { boost::upgrade_to_unique_lock upgradeLock(lock); sending++; boost::asio::async_write(socket, Buffer(data, length), boost::bind(&Connection::handleWrite, thisPtr.lock(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } } bool Connection::send(const Packet &packet) { { boost::shared_lock lock(connectionLock); if(!_isConnected() || _isConnecting() || _isDisconnecting()) return false; } rawSend((const boost::uint8_t*)packet.getRawData(), packet.getRawDataLength()); return true; } void Connection::disconnect() { { boost::lock_guard lock(connectionLock); if(!_isConnected() || _isDisconnecting()) return; _setState(DISCONNECT); if(sending) return; } doDisconnect(); } } }