From bd1985eb89916da6815e4f3f6afa5f4f7a2070ff Mon Sep 17 00:00:00 2001 From: massiot Date: Thu, 7 Jan 2010 23:30:42 +0000 Subject: * Initial import. git-svn-id: svn://svn.videolan.org/multicat/trunk@1 271ca591-50e6-457e-a4d7-9fbfa1de8f49 --- aggregartp.c | 235 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 235 insertions(+) create mode 100644 aggregartp.c (limited to 'aggregartp.c') diff --git a/aggregartp.c b/aggregartp.c new file mode 100644 index 0000000..070bbd9 --- /dev/null +++ b/aggregartp.c @@ -0,0 +1,235 @@ +/***************************************************************************** + * aggregartp.c: split an RTP stream for several contribution links + ***************************************************************************** + * Copyright (C) 2009 VideoLAN + * $Id: aggregartp.c 48 2007-11-30 14:08:21Z cmassiot $ + * + * Authors: Christophe Massiot + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA. + *****************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "util.h" + +#define MAX_OUTPUTS 4 +#define DEFAULT_MTU 1500 + +/***************************************************************************** + * Local declarations + *****************************************************************************/ +typedef struct output_t +{ + int i_fd; + unsigned int i_weight; + + unsigned int i_weighted_size, i_remainder; +} output_t; + +static int i_input_fd; +static output_t p_outputs[MAX_OUTPUTS]; +static unsigned int i_max_weight = 0; +static bool b_overwrite_timestamps = false; +static bool b_overwrite_ssrc = false; +static in_addr_t i_ssrc = 0; +static uint16_t i_rtp_cc = 0; + +static void usage(void) +{ + msg_Raw( NULL, "Usage: aggregartp [-i ] [-t ] [-w] [-s ] [-U] [-m ] @ [,] ... [,]" ); + msg_Raw( NULL, " host format: [[:]][@[]]" ); + msg_Raw( NULL, " -w: overwrite RTP timestamps" ); + msg_Raw( NULL, " -S: overwrite RTP SSRC" ); + msg_Raw( NULL, " -U: prepend RTP header" ); + exit(EXIT_FAILURE); +} + +/***************************************************************************** + * NextOutput: pick the output for the next packet + *****************************************************************************/ +static output_t *NextOutput(void) +{ + unsigned int i_min_size = p_outputs[0].i_weighted_size; + int i, i_output = 0; + + for ( i = 1; i < MAX_OUTPUTS && p_outputs[i].i_weight; i++ ) + { + if ( p_outputs[i].i_weighted_size < i_min_size ) + { + i_min_size = p_outputs[i].i_weighted_size; + i_output = i; + } + } + + for ( i = 0; i < MAX_OUTPUTS && p_outputs[i].i_weight; i++ ) + p_outputs[i].i_weighted_size -= i_min_size; + + return &p_outputs[i_output]; +} + +/***************************************************************************** + * Entry point + *****************************************************************************/ +int main( int i_argc, char **pp_argv ) +{ + int i, c; + int i_priority = -1; + int i_ttl = 0; + bool b_udp = false; + int i_mtu = DEFAULT_MTU; + uint8_t *p_buffer, *p_read_buffer; + + while ( (c = getopt( i_argc, pp_argv, "i:t:wo:Um:h" )) != -1 ) + { + switch ( c ) + { + case 'i': + i_priority = strtol( optarg, NULL, 0 ); + break; + + case 't': + i_ttl = strtol( optarg, NULL, 0 ); + break; + + case 'w': + b_overwrite_timestamps = true; + break; + + case 'o': + { + struct in_addr maddr; + if ( !inet_aton( optarg, &maddr ) ) + usage(); + i_ssrc = maddr.s_addr; + b_overwrite_ssrc = true; + break; + } + + case 'U': + b_udp = true; + break; + + case 'm': + i_mtu = strtol( optarg, NULL, 0 ); + break; + + case 'h': + default: + usage(); + break; + } + } + if ( optind >= i_argc - 1 ) + usage(); + + i_input_fd = OpenSocket( pp_argv[optind], 0, NULL ); + optind++; + + i = 0; + while ( optind < i_argc && i < MAX_OUTPUTS ) + { + p_outputs[i].i_fd = OpenSocket( pp_argv[optind++], i_ttl, + &p_outputs[i].i_weight ); + p_outputs[i].i_weighted_size = p_outputs[i].i_remainder = 0; + i_max_weight += p_outputs[i].i_weight; + i++; + } + if ( optind < i_argc ) + { + msg_Err( NULL, "max number of outputs: %d (recompile)", MAX_OUTPUTS ); + exit(EXIT_FAILURE); + } + msg_Dbg( NULL, "%d outputs weight %u", i, i_max_weight ); + for ( ; i < MAX_OUTPUTS; i++ ) + p_outputs[i].i_weight = 0; + + if ( b_udp ) + { + p_buffer = malloc( i_mtu + RTP_HEADER_SIZE ); + p_read_buffer = p_buffer + RTP_HEADER_SIZE; + } + else + p_buffer = p_read_buffer = malloc( i_mtu ); + + if ( i_priority > 0 ) + { + struct sched_param param; + int i_error; + + memset( ¶m, 0, sizeof(struct sched_param) ); + param.sched_priority = i_priority; + if ( (i_error = pthread_setschedparam( pthread_self(), SCHED_RR, + ¶m )) ) + { + msg_Warn( NULL, "couldn't set thread priority: %s", + strerror(i_error) ); + } + } + + for ( ; ; ) + { + ssize_t i_size = read( i_input_fd, p_read_buffer, i_mtu ); + output_t *p_output; + + if ( i_size < 0 && errno != EAGAIN && errno != EINTR ) + { + msg_Err( NULL, "unrecoverable read error, dying (%s)", + strerror(errno) ); + exit(EXIT_FAILURE); + } + if ( i_size <= 0 ) continue; + + if ( b_udp ) + { + rtp_SetHdr( p_buffer, i_rtp_cc ); + i_rtp_cc++; + i_size += RTP_HEADER_SIZE; + rtp_SetSSRC( p_buffer, (uint8_t *)&i_ssrc ); + /* this isn't RFC-compliant, but we assume that at the other end, + * the RTP header will be stripped */ + rtp_SetTimestamp( p_buffer, wall_Date() / 300 ); + } + else + { + if ( b_overwrite_ssrc ) + rtp_SetSSRC( p_buffer, (uint8_t *)&i_ssrc ); + if ( b_overwrite_timestamps ) + rtp_SetTimestamp( p_buffer, wall_Date() / 300 ); + } + + p_output = NextOutput(); + if ( write( p_output->i_fd, p_buffer, i_size ) < 0 ) + msg_Warn( NULL, "write error (%s)", strerror(errno) ); + + p_output->i_weighted_size += (i_size + p_output->i_remainder) + / p_output->i_weight; + p_output->i_remainder = (i_size + p_output->i_remainder) + % p_output->i_weight; + } + + return EXIT_SUCCESS; +} + -- cgit v1.2.3