diff options
-rw-r--r-- | AUTHORS | 13 | ||||
-rw-r--r-- | COPYING | 340 | ||||
-rw-r--r-- | INSTALL | 103 | ||||
-rw-r--r-- | Makefile | 37 | ||||
-rw-r--r-- | README | 35 | ||||
-rw-r--r-- | aggregartp.c | 235 | ||||
-rw-r--r-- | desaggregartp.c | 358 | ||||
-rw-r--r-- | ingests.c | 258 | ||||
-rw-r--r-- | multicat.c | 523 | ||||
-rw-r--r-- | offsets.c | 129 | ||||
-rw-r--r-- | util.c | 389 | ||||
-rw-r--r-- | util.h | 158 |
12 files changed, 2578 insertions, 0 deletions
@@ -0,0 +1,13 @@ +# Contributors to multicat +# $Id: AUTHORS 94 2010-01-04 20:15:43Z gatty $ +# +# The format of this file was inspired by the Linux kernel CREDITS file. +# Authors are listed alphabetically. +# +# The fields are: name (N), email (E), web-address (W), CVS account login (C), +# PGP key ID and fingerprint (P), description (D), and snail-mail address (S). + +N: Christophe Massiot +E: massiot AT via DOT ecp DOT fr +C: massiot +D: Most of the code @@ -0,0 +1,340 @@ + GNU GENERAL PUBLIC LICENSE + Version 2, June 1991 + + Copyright (C) 1989, 1991 Free Software Foundation, Inc. + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + Preamble + + The licenses for most software are designed to take away your +freedom to share and change it. By contrast, the GNU General Public +License is intended to guarantee your freedom to share and change free +software--to make sure the software is free for all its users. This +General Public License applies to most of the Free Software +Foundation's software and to any other program whose authors commit to +using it. (Some other Free Software Foundation software is covered by +the GNU Library General Public License instead.) You can apply it to +your programs, too. + + When we speak of free software, we are referring to freedom, not +price. Our General Public Licenses are designed to make sure that you +have the freedom to distribute copies of free software (and charge for +this service if you wish), that you receive source code or can get it +if you want it, that you can change the software or use pieces of it +in new free programs; and that you know you can do these things. + + To protect your rights, we need to make restrictions that forbid +anyone to deny you these rights or to ask you to surrender the rights. +These restrictions translate to certain responsibilities for you if you +distribute copies of the software, or if you modify it. + + For example, if you distribute copies of such a program, whether +gratis or for a fee, you must give the recipients all the rights that +you have. You must make sure that they, too, receive or can get the +source code. And you must show them these terms so they know their +rights. + + We protect your rights with two steps: (1) copyright the software, and +(2) offer you this license which gives you legal permission to copy, +distribute and/or modify the software. + + Also, for each author's protection and ours, we want to make certain +that everyone understands that there is no warranty for this free +software. If the software is modified by someone else and passed on, we +want its recipients to know that what they have is not the original, so +that any problems introduced by others will not reflect on the original +authors' reputations. + + Finally, any free program is threatened constantly by software +patents. We wish to avoid the danger that redistributors of a free +program will individually obtain patent licenses, in effect making the +program proprietary. To prevent this, we have made it clear that any +patent must be licensed for everyone's free use or not licensed at all. + + The precise terms and conditions for copying, distribution and +modification follow. + + GNU GENERAL PUBLIC LICENSE + TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION + + 0. This License applies to any program or other work which contains +a notice placed by the copyright holder saying it may be distributed +under the terms of this General Public License. The "Program", below, +refers to any such program or work, and a "work based on the Program" +means either the Program or any derivative work under copyright law: +that is to say, a work containing the Program or a portion of it, +either verbatim or with modifications and/or translated into another +language. (Hereinafter, translation is included without limitation in +the term "modification".) Each licensee is addressed as "you". + +Activities other than copying, distribution and modification are not +covered by this License; they are outside its scope. The act of +running the Program is not restricted, and the output from the Program +is covered only if its contents constitute a work based on the +Program (independent of having been made by running the Program). +Whether that is true depends on what the Program does. + + 1. You may copy and distribute verbatim copies of the Program's +source code as you receive it, in any medium, provided that you +conspicuously and appropriately publish on each copy an appropriate +copyright notice and disclaimer of warranty; keep intact all the +notices that refer to this License and to the absence of any warranty; +and give any other recipients of the Program a copy of this License +along with the Program. + +You may charge a fee for the physical act of transferring a copy, and +you may at your option offer warranty protection in exchange for a fee. + + 2. You may modify your copy or copies of the Program or any portion +of it, thus forming a work based on the Program, and copy and +distribute such modifications or work under the terms of Section 1 +above, provided that you also meet all of these conditions: + + a) You must cause the modified files to carry prominent notices + stating that you changed the files and the date of any change. + + b) You must cause any work that you distribute or publish, that in + whole or in part contains or is derived from the Program or any + part thereof, to be licensed as a whole at no charge to all third + parties under the terms of this License. + + c) If the modified program normally reads commands interactively + when run, you must cause it, when started running for such + interactive use in the most ordinary way, to print or display an + announcement including an appropriate copyright notice and a + notice that there is no warranty (or else, saying that you provide + a warranty) and that users may redistribute the program under + these conditions, and telling the user how to view a copy of this + License. (Exception: if the Program itself is interactive but + does not normally print such an announcement, your work based on + the Program is not required to print an announcement.) + +These requirements apply to the modified work as a whole. If +identifiable sections of that work are not derived from the Program, +and can be reasonably considered independent and separate works in +themselves, then this License, and its terms, do not apply to those +sections when you distribute them as separate works. But when you +distribute the same sections as part of a whole which is a work based +on the Program, the distribution of the whole must be on the terms of +this License, whose permissions for other licensees extend to the +entire whole, and thus to each and every part regardless of who wrote it. + +Thus, it is not the intent of this section to claim rights or contest +your rights to work written entirely by you; rather, the intent is to +exercise the right to control the distribution of derivative or +collective works based on the Program. + +In addition, mere aggregation of another work not based on the Program +with the Program (or with a work based on the Program) on a volume of +a storage or distribution medium does not bring the other work under +the scope of this License. + + 3. You may copy and distribute the Program (or a work based on it, +under Section 2) in object code or executable form under the terms of +Sections 1 and 2 above provided that you also do one of the following: + + a) Accompany it with the complete corresponding machine-readable + source code, which must be distributed under the terms of Sections + 1 and 2 above on a medium customarily used for software interchange; or, + + b) Accompany it with a written offer, valid for at least three + years, to give any third party, for a charge no more than your + cost of physically performing source distribution, a complete + machine-readable copy of the corresponding source code, to be + distributed under the terms of Sections 1 and 2 above on a medium + customarily used for software interchange; or, + + c) Accompany it with the information you received as to the offer + to distribute corresponding source code. (This alternative is + allowed only for noncommercial distribution and only if you + received the program in object code or executable form with such + an offer, in accord with Subsection b above.) + +The source code for a work means the preferred form of the work for +making modifications to it. For an executable work, complete source +code means all the source code for all modules it contains, plus any +associated interface definition files, plus the scripts used to +control compilation and installation of the executable. However, as a +special exception, the source code distributed need not include +anything that is normally distributed (in either source or binary +form) with the major components (compiler, kernel, and so on) of the +operating system on which the executable runs, unless that component +itself accompanies the executable. + +If distribution of executable or object code is made by offering +access to copy from a designated place, then offering equivalent +access to copy the source code from the same place counts as +distribution of the source code, even though third parties are not +compelled to copy the source along with the object code. + + 4. You may not copy, modify, sublicense, or distribute the Program +except as expressly provided under this License. Any attempt +otherwise to copy, modify, sublicense or distribute the Program is +void, and will automatically terminate your rights under this License. +However, parties who have received copies, or rights, from you under +this License will not have their licenses terminated so long as such +parties remain in full compliance. + + 5. You are not required to accept this License, since you have not +signed it. However, nothing else grants you permission to modify or +distribute the Program or its derivative works. These actions are +prohibited by law if you do not accept this License. Therefore, by +modifying or distributing the Program (or any work based on the +Program), you indicate your acceptance of this License to do so, and +all its terms and conditions for copying, distributing or modifying +the Program or works based on it. + + 6. Each time you redistribute the Program (or any work based on the +Program), the recipient automatically receives a license from the +original licensor to copy, distribute or modify the Program subject to +these terms and conditions. You may not impose any further +restrictions on the recipients' exercise of the rights granted herein. +You are not responsible for enforcing compliance by third parties to +this License. + + 7. If, as a consequence of a court judgment or allegation of patent +infringement or for any other reason (not limited to patent issues), +conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot +distribute so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you +may not distribute the Program at all. For example, if a patent +license would not permit royalty-free redistribution of the Program by +all those who receive copies directly or indirectly through you, then +the only way you could satisfy both it and this License would be to +refrain entirely from distribution of the Program. + +If any portion of this section is held invalid or unenforceable under +any particular circumstance, the balance of the section is intended to +apply and the section as a whole is intended to apply in other +circumstances. + +It is not the purpose of this section to induce you to infringe any +patents or other property right claims or to contest validity of any +such claims; this section has the sole purpose of protecting the +integrity of the free software distribution system, which is +implemented by public license practices. Many people have made +generous contributions to the wide range of software distributed +through that system in reliance on consistent application of that +system; it is up to the author/donor to decide if he or she is willing +to distribute software through any other system and a licensee cannot +impose that choice. + +This section is intended to make thoroughly clear what is believed to +be a consequence of the rest of this License. + + 8. If the distribution and/or use of the Program is restricted in +certain countries either by patents or by copyrighted interfaces, the +original copyright holder who places the Program under this License +may add an explicit geographical distribution limitation excluding +those countries, so that distribution is permitted only in or among +countries not thus excluded. In such case, this License incorporates +the limitation as if written in the body of this License. + + 9. The Free Software Foundation may publish revised and/or new versions +of the General Public License from time to time. Such new versions will +be similar in spirit to the present version, but may differ in detail to +address new problems or concerns. + +Each version is given a distinguishing version number. If the Program +specifies a version number of this License which applies to it and "any +later version", you have the option of following the terms and conditions +either of that version or of any later version published by the Free +Software Foundation. If the Program does not specify a version number of +this License, you may choose any version ever published by the Free Software +Foundation. + + 10. If you wish to incorporate parts of the Program into other free +programs whose distribution conditions are different, write to the author +to ask for permission. For software which is copyrighted by the Free +Software Foundation, write to the Free Software Foundation; we sometimes +make exceptions for this. Our decision will be guided by the two goals +of preserving the free status of all derivatives of our free software and +of promoting the sharing and reuse of software generally. + + NO WARRANTY + + 11. BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY +FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN +OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES +PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED +OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS +TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE +PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING, +REPAIR OR CORRECTION. + + 12. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING +WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR +REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, +INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING +OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED +TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY +YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER +PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE +POSSIBILITY OF SUCH DAMAGES. + + END OF TERMS AND CONDITIONS + + How to Apply These Terms to Your New Programs + + If you develop a new program, and you want it to be of the greatest +possible use to the public, the best way to achieve this is to make it +free software which everyone can redistribute and change under these terms. + + To do so, attach the following notices to the program. It is safest +to attach them to the start of each source file to most effectively +convey the exclusion of warranty; and each file should have at least +the "copyright" line and a pointer to where the full notice is found. + + <one line to give the program's name and a brief idea of what it does.> + Copyright (C) <year> <name of author> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA + + +Also add information on how to contact you by electronic and paper mail. + +If the program is interactive, make it output a short notice like this +when it starts in an interactive mode: + + Gnomovision version 69, Copyright (C) year name of author + Gnomovision comes with ABSOLUTELY NO WARRANTY; for details type `show w'. + This is free software, and you are welcome to redistribute it + under certain conditions; type `show c' for details. + +The hypothetical commands `show w' and `show c' should show the appropriate +parts of the General Public License. Of course, the commands you use may +be called something other than `show w' and `show c'; they could even be +mouse-clicks or menu items--whatever suits your program. + +You should also get your employer (if you work as a programmer) or your +school, if any, to sign a "copyright disclaimer" for the program, if +necessary. Here is a sample; alter the names: + + Yoyodyne, Inc., hereby disclaims all copyright interest in the program + `Gnomovision' (which makes passes at compilers) written by James Hacker. + + <signature of Ty Coon>, 1 April 1989 + Ty Coon, President of Vice + +This General Public License does not permit incorporating your program into +proprietary programs. If your program is a subroutine library, you may +consider it more useful to permit linking proprietary applications with the +library. If this is what you want to do, use the GNU Library General +Public License instead of this License. @@ -0,0 +1,103 @@ +$Id: INSTALL 93 2010-01-03 13:07:23Z gatty $ + +Installing Multicat +=================== + +No autotools yet... You'd have to tweak the Makefile by hand. Compile the +program with `make`. + + +The socket description format +============================= + +For conveniency all tools use the same way of describing a socket in a +program argument: + +<connect address>:<connect port>@<bind address>:<bind port> + +All parts are optional; default port (1234) or wildcard address will then +be used. + +Some examples: + +Reading all streams coming to port 5004: + @:5004 +Reading from a multicast stream on port 5004: + @239.255.0.1:5004 +The same, with source-specific multicast: + 192.168.0.1@239.255.0.1:5004 +Writing to a multicast stream on port 5004: + 239.255.0.1:5004 +The same, but binding to a specific interface: + 239.255.0.1:5004@192.168.0.2 + + +Using Multicat +============== + +Recording a multicast address to a file: + +multicat @239.255.0.1:5004 /tmp/myfile.ts + +This will also create file /tmp/myfile.aux. Playing back the file: + +multicat -p 68 /tmp/myfile.ts 239.255.0.2:5004 + +Adding an RTP header to an existing multicast stream: + +multicat -p 68 -u @239.255.0.1:5004 239.255.0.2:5004 + +The PCR PID is here supposed to be 68. If you don't specify it, the timestamps +will not be RFC-compliant (but will work in most cases). You can use the +get_pcr_pid tool from libdvbpsi to determine it. Otherwise, if you are sure +the stream contains a single program, and only one PID carries a PCR, you can +pass "-p 8192" the disable the PID check. This isn't on by default because +it can produce awful things with multi-program transport streams, and the +world would be a better place if people had to knowingly turn it on. + + +Using IngesTS +============= + +ingests -p 68 /tmp/afile.ts + +This will create file /tmp/afile.aux. 68 is supposed to be the PCR PID. +The same note as above applies to ingesTS. + +Playing the file: + +multicat -p 68 /tmp/afile.ts 239.255.0.2:5004 + + +Using OffseTS +============= + +We want to take the first 60 minutes of a TS file. We must scale it in a +27 MHz clock: +60 * 60 (seconds) * 27000000 (MHz) = 97200000000 + +Find the offset in 1316-blocks: + +offsets /tmp/myfile.aux 97200000000 + +It returns for instance "556896". Then cut the file using dd: + +dd if=/tmp/myfile.ts of=/tmp/mynewfile.ts bs=1316 count=556896 + +Alternatively, if we want to *remove* the first hour: + +dd if=/tmp/myfile.ts of=/tmp/mynewfile.ts bs=1316 skip=556896 + +It can also be done with multicat using the -s and -n options. + + +Using AggregaRTP and DesaggregaRTP +================================== + +Splitting an RTP stream to two streams with different routing policies: + +aggregartp @239.255.0.1:5004 239.1.0.1:5004@192.168.0.1 239.2.0.1:5004@172.16.0.1 + +At the other end, reassembling the two streams into one usable stream: + +desaggregartp 192.168.0.1@239.1.0.1:5004 172.16.0.1@239.2.0.1:5004 239.254.0.1:5004 diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..2cafdf8 --- /dev/null +++ b/Makefile @@ -0,0 +1,37 @@ +# multicat Makefile + +CFLAGS += -Wall -O3 -fomit-frame-pointer -D_FILE_OFFSET_BITS=64 -D_ISOC99_SOURCE -D_BSD_SOURCE +CFLAGS += -g +LDFLAGS += -lrt + +OBJ_MULTICAT = multicat.o util.o +OBJ_INGESTS = ingests.o util.o +OBJ_AGGREGARTP = aggregartp.o util.o +OBJ_DESAGGREGARTP = desaggregartp.o util.o +OBJ_OFFSETS = offsets.o + +all: multicat ingests aggregartp desaggregartp offsets + +$(OBJ_MULTICAT): Makefile util.h +$(OBJ_INGESTS): Makefile util.h +$(OBJ_AGGREGARTP): Makefile util.h +$(OBJ_DESAGGREGARTP): Makefile util.h +$(OBJ_OFFSETS): Makefile + +multicat: $(OBJ_MULTICAT) + $(CC) $(LDFLAGS) -o $@ $(OBJ_MULTICAT) + +ingests: $(OBJ_INGESTS) + $(CC) $(LDFLAGS) -o $@ $(OBJ_INGESTS) + +aggregartp: $(OBJ_AGGREGARTP) + $(CC) $(LDFLAGS) -o $@ $(OBJ_AGGREGARTP) + +desaggregartp: $(OBJ_DESAGGREGARTP) + $(CC) $(LDFLAGS) -o $@ $(OBJ_DESAGGREGARTP) + +offsets: $(OBJ_OFFSETS) + $(CC) -o $@ $(OBJ_OFFSETS) + +clean: + -rm -f multicat $(OBJ_MULTICAT) ingests $(OBJ_INGESTS) aggregartp $(OBJ_AGGREGARTP) desaggregartp $(OBJ_DESAGGREGARTP) offsets $(OBJ_OFFSETS) @@ -0,0 +1,35 @@ +$Id: README 26 2009-10-20 18:44:22Z massiot $ + +Welcome to Multicat! + +The multicat package contains a set of tools designed to easily and +efficiently manipulate multicast streams in general, and MPEG-2 +Transport Streams (ISO/IEC 13818-1) in particular. + +The multicat tool itself is a 1 input/1 output application. Inputs and +outputs can be network streams (unicast and multicast), files, character +devices or FIFOs. + +Multicat tries to rebuild the internal clock of the input stream; but +it wants to remain agnostic of what is transported, so in case of files +the said clock is stored to an auxiliary file (example.aux accompanies +example.ts) while recording. Other inputs are considered "live", and +the input clock is simply derived from the reception time of the packets. + +IngesTS is a companion application designed to manipulate TS files. It +reads the PCR values of the file, and builds the auxiliary file that is +necessary for multicat. + +OffseTS is another companion application to manipulate auxiliary files. +Given an offset in time from the beginning of the file, it returns the offset +of the position in number of packets. + +Finally aggregaRTP and desaggregaRTP can be used to carry a high-bitrate +signal over several contribution links. + +The multicat suite of applications is very lightweight and designed to +operate in tight environments. Memory and CPU usages are kept to a minimum, +and they feature only one thread of execution. They have no dependancy. + +-- +Meuuh 2010-01-07 diff --git a/aggregartp.c b/aggregartp.c new file mode 100644 index 0000000..070bbd9 --- /dev/null +++ b/aggregartp.c @@ -0,0 +1,235 @@ +/***************************************************************************** + * aggregartp.c: split an RTP stream for several contribution links + ***************************************************************************** + * Copyright (C) 2009 VideoLAN + * $Id: aggregartp.c 48 2007-11-30 14:08:21Z cmassiot $ + * + * Authors: Christophe Massiot <massiot@via.ecp.fr> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA. + *****************************************************************************/ + +#include <stdlib.h> +#include <stdio.h> +#include <unistd.h> +#include <stdint.h> +#include <stdbool.h> +#include <string.h> +#include <errno.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <pthread.h> + +#include "util.h" + +#define MAX_OUTPUTS 4 +#define DEFAULT_MTU 1500 + +/***************************************************************************** + * Local declarations + *****************************************************************************/ +typedef struct output_t +{ + int i_fd; + unsigned int i_weight; + + unsigned int i_weighted_size, i_remainder; +} output_t; + +static int i_input_fd; +static output_t p_outputs[MAX_OUTPUTS]; +static unsigned int i_max_weight = 0; +static bool b_overwrite_timestamps = false; +static bool b_overwrite_ssrc = false; +static in_addr_t i_ssrc = 0; +static uint16_t i_rtp_cc = 0; + +static void usage(void) +{ + msg_Raw( NULL, "Usage: aggregartp [-i <RT priority>] [-t <ttl>] [-w] [-s <SSRC IP>] [-U] [-m <mtu>] @<src host> <dest host 1>[,<weight 1>] ... [<dest host N>,<weight N>]" ); + msg_Raw( NULL, " host format: [<connect addr>[:<connect port>]][@[<bind addr][:<bind port>]]" ); + msg_Raw( NULL, " -w: overwrite RTP timestamps" ); + msg_Raw( NULL, " -S: overwrite RTP SSRC" ); + msg_Raw( NULL, " -U: prepend RTP header" ); + exit(EXIT_FAILURE); +} + +/***************************************************************************** + * NextOutput: pick the output for the next packet + *****************************************************************************/ +static output_t *NextOutput(void) +{ + unsigned int i_min_size = p_outputs[0].i_weighted_size; + int i, i_output = 0; + + for ( i = 1; i < MAX_OUTPUTS && p_outputs[i].i_weight; i++ ) + { + if ( p_outputs[i].i_weighted_size < i_min_size ) + { + i_min_size = p_outputs[i].i_weighted_size; + i_output = i; + } + } + + for ( i = 0; i < MAX_OUTPUTS && p_outputs[i].i_weight; i++ ) + p_outputs[i].i_weighted_size -= i_min_size; + + return &p_outputs[i_output]; +} + +/***************************************************************************** + * Entry point + *****************************************************************************/ +int main( int i_argc, char **pp_argv ) +{ + int i, c; + int i_priority = -1; + int i_ttl = 0; + bool b_udp = false; + int i_mtu = DEFAULT_MTU; + uint8_t *p_buffer, *p_read_buffer; + + while ( (c = getopt( i_argc, pp_argv, "i:t:wo:Um:h" )) != -1 ) + { + switch ( c ) + { + case 'i': + i_priority = strtol( optarg, NULL, 0 ); + break; + + case 't': + i_ttl = strtol( optarg, NULL, 0 ); + break; + + case 'w': + b_overwrite_timestamps = true; + break; + + case 'o': + { + struct in_addr maddr; + if ( !inet_aton( optarg, &maddr ) ) + usage(); + i_ssrc = maddr.s_addr; + b_overwrite_ssrc = true; + break; + } + + case 'U': + b_udp = true; + break; + + case 'm': + i_mtu = strtol( optarg, NULL, 0 ); + break; + + case 'h': + default: + usage(); + break; + } + } + if ( optind >= i_argc - 1 ) + usage(); + + i_input_fd = OpenSocket( pp_argv[optind], 0, NULL ); + optind++; + + i = 0; + while ( optind < i_argc && i < MAX_OUTPUTS ) + { + p_outputs[i].i_fd = OpenSocket( pp_argv[optind++], i_ttl, + &p_outputs[i].i_weight ); + p_outputs[i].i_weighted_size = p_outputs[i].i_remainder = 0; + i_max_weight += p_outputs[i].i_weight; + i++; + } + if ( optind < i_argc ) + { + msg_Err( NULL, "max number of outputs: %d (recompile)", MAX_OUTPUTS ); + exit(EXIT_FAILURE); + } + msg_Dbg( NULL, "%d outputs weight %u", i, i_max_weight ); + for ( ; i < MAX_OUTPUTS; i++ ) + p_outputs[i].i_weight = 0; + + if ( b_udp ) + { + p_buffer = malloc( i_mtu + RTP_HEADER_SIZE ); + p_read_buffer = p_buffer + RTP_HEADER_SIZE; + } + else + p_buffer = p_read_buffer = malloc( i_mtu ); + + if ( i_priority > 0 ) + { + struct sched_param param; + int i_error; + + memset( ¶m, 0, sizeof(struct sched_param) ); + param.sched_priority = i_priority; + if ( (i_error = pthread_setschedparam( pthread_self(), SCHED_RR, + ¶m )) ) + { + msg_Warn( NULL, "couldn't set thread priority: %s", + strerror(i_error) ); + } + } + + for ( ; ; ) + { + ssize_t i_size = read( i_input_fd, p_read_buffer, i_mtu ); + output_t *p_output; + + if ( i_size < 0 && errno != EAGAIN && errno != EINTR ) + { + msg_Err( NULL, "unrecoverable read error, dying (%s)", + strerror(errno) ); + exit(EXIT_FAILURE); + } + if ( i_size <= 0 ) continue; + + if ( b_udp ) + { + rtp_SetHdr( p_buffer, i_rtp_cc ); + i_rtp_cc++; + i_size += RTP_HEADER_SIZE; + rtp_SetSSRC( p_buffer, (uint8_t *)&i_ssrc ); + /* this isn't RFC-compliant, but we assume that at the other end, + * the RTP header will be stripped */ + rtp_SetTimestamp( p_buffer, wall_Date() / 300 ); + } + else + { + if ( b_overwrite_ssrc ) + rtp_SetSSRC( p_buffer, (uint8_t *)&i_ssrc ); + if ( b_overwrite_timestamps ) + rtp_SetTimestamp( p_buffer, wall_Date() / 300 ); + } + + p_output = NextOutput(); + if ( write( p_output->i_fd, p_buffer, i_size ) < 0 ) + msg_Warn( NULL, "write error (%s)", strerror(errno) ); + + p_output->i_weighted_size += (i_size + p_output->i_remainder) + / p_output->i_weight; + p_output->i_remainder = (i_size + p_output->i_remainder) + % p_output->i_weight; + } + + return EXIT_SUCCESS; +} + diff --git a/desaggregartp.c b/desaggregartp.c new file mode 100644 index 0000000..2e9027d --- /dev/null +++ b/desaggregartp.c @@ -0,0 +1,358 @@ +/***************************************************************************** + * desaggregartp.c: rebuild an RTP stream from several aggregated links + ***************************************************************************** + * Copyright (C) 2009 VideoLAN + * $Id: desaggregartp.c 48 2007-11-30 14:08:21Z cmassiot $ + * + * Authors: Christophe Massiot <massiot@via.ecp.fr> + *****************************************************************************/ + +#include <stdlib.h> +#include <stdio.h> +#include <unistd.h> +#include <stdint.h> +#include <stdbool.h> +#include <string.h> +#include <errno.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <pthread.h> +#include <poll.h> + +#include "util.h" + +#define POW2_32 UINT32_MAX +#define DEFAULT_BUFFER_LENGTH 200 /* ms */ +#define DEFAULT_MTU 1500 +#define MAX_INPUTS 4 + +/* Maximum gap allowed between two CRs. */ +#define CR_MAX_GAP 300 /* ms */ +#define CR_MAX_JITTER 100 /* ms */ +#define CR_AVERAGE 150 + +/***************************************************************************** + * Local declarations + *****************************************************************************/ +typedef struct block_t +{ + uint8_t *p_data; + unsigned int i_size; + uint64_t i_date; + struct block_t *p_next, *p_prev; +} block_t; + +static int i_output_fd; +static int pi_inputs_fd[MAX_INPUTS]; +static int i_nb_inputs = 0; +static int b_udp = 0; + +block_t *p_first = NULL; +block_t *p_last = NULL; + +typedef struct input_clock_t +{ + /* Synchronization information */ + int64_t delta_cr; + uint64_t cr_ref, wall_ref; + uint64_t last_cr; /* reference to detect unexpected stream + * discontinuities */ +} input_clock_t; + +static input_clock_t input_clock; + +static uint64_t i_last_timestamp = POW2_32; /* not 27 MHz, but RTP-native */ +static uint64_t i_buffer_length = DEFAULT_BUFFER_LENGTH * 27000; + +static void usage(void) +{ + msg_Raw( NULL, "Usage: desaggregartp [-i <RT priority>] [-b <buffer length>] [-U] [-m <mtu>] <src host 1> ... [<src host N>] <dest host>" ); + msg_Raw( NULL, " host format: [<connect addr>[:<connect port>]][@[<bind addr][:<bind port>]]" ); + msg_Raw( NULL, " -U: strip RTP header" ); + msg_Raw( NULL, " -b: buffer length in ms" ); + exit(EXIT_FAILURE); +} + +/***************************************************************************** + * clock_Init + *****************************************************************************/ +void clock_Init(void) +{ + input_clock.last_cr = 0; + input_clock.cr_ref = 0; + input_clock.wall_ref = 0; + input_clock.delta_cr = 0; +} + +/***************************************************************************** + * clock_ToWall + *****************************************************************************/ +uint64_t clock_ToWall( uint64_t i_clock ) +{ + return input_clock.wall_ref + (i_clock + input_clock.delta_cr + - input_clock.cr_ref); +} + +/***************************************************************************** + * clock_NewRef + *****************************************************************************/ +void clock_NewRef( uint64_t i_clock, uint64_t i_wall ) +{ + uint64_t i_extrapoled_clock; + int64_t i_clock_diff = input_clock.last_cr - i_clock; + + if ( i_clock_diff > (CR_MAX_GAP * 27000) + || i_clock_diff < -(CR_MAX_GAP * 27000) ) + { + msg_Warn( NULL, "clock gap, unexpected stream discontinuity %lld", + i_clock_diff ); + clock_Init(); + input_clock.cr_ref = input_clock.last_cr = i_clock; + input_clock.wall_ref = i_wall; + return; + } + + input_clock.last_cr = i_clock; + + /* Smooth clock reference variations. */ + i_extrapoled_clock = input_clock.cr_ref + + i_wall - input_clock.wall_ref; + i_clock_diff = i_extrapoled_clock - i_clock; + + if ( (i_clock_diff - input_clock.delta_cr) > (CR_MAX_JITTER * 27000) + || (i_clock_diff - input_clock.delta_cr) < -(CR_MAX_JITTER * 27000) ) + { + msg_Warn( NULL, "too much jitter %lld", + i_clock_diff - input_clock.delta_cr ); + clock_Init(); + input_clock.cr_ref = input_clock.last_cr = i_clock; + input_clock.wall_ref = i_wall; + return; + } + + /* Bresenham algorithm to smooth variations. */ + input_clock.delta_cr = (input_clock.delta_cr * (CR_AVERAGE - 1) + + i_clock_diff) / CR_AVERAGE; +} + +/***************************************************************************** + * Packet handlers + *****************************************************************************/ +static void SendPacket(void) +{ + block_t *p_block = p_first; + uint8_t *p_data, *p_end; + + p_first = p_block->p_next; + if ( p_first == NULL ) + p_last = NULL; + else + p_first->p_prev = NULL; + + if ( b_udp ) + p_data = rtp_GetPayload( p_block->p_data ); + else + p_data = p_block->p_data; + p_end = p_block->p_data + p_block->i_size; + + if ( write( i_output_fd, p_data, p_end - p_data ) < 0 ) + msg_Warn( NULL, "write error (%s)", strerror(errno) ); + free( p_block ); +} + +static void BuildTimestamp( uint32_t i_timestamp ) +{ + int64_t i_delta_timestamp; + + i_delta_timestamp = (POW2_32 * 3 / 2 + (int64_t)i_timestamp + - (i_last_timestamp % POW2_32)) + % POW2_32 - POW2_32 / 2; + i_last_timestamp += i_delta_timestamp; +} + +static void RecvPacket( block_t *p_block, uint64_t i_date ) +{ + uint64_t i_scaled_timestamp; + + if ( !rtp_CheckHdr( p_block->p_data ) ) + { + msg_Warn( NULL, "non-RTP packet received" ); + free( p_block ); + return; + } + + BuildTimestamp( rtp_GetTimestamp( p_block->p_data ) ); + + switch ( rtp_GetType( p_block->p_data ) ) + { + case 33: /* MPEG-2 TS: 90 kHz */ + i_scaled_timestamp = i_last_timestamp * 300; + break; + default: /* assume milliseconds */ + i_scaled_timestamp = i_last_timestamp * 27000; + break; + } + + clock_NewRef( i_scaled_timestamp, i_date ); + p_block->i_date = clock_ToWall( i_scaled_timestamp ) + i_buffer_length; + + /* Insert the block at the correct position */ + if ( p_last == NULL ) + { + p_first = p_last = p_block; + p_block->p_prev = p_block->p_next = NULL; + } + else + { + block_t *p_prev = p_last; + while ( p_prev != NULL && p_prev->i_date > p_block->i_date ) + p_prev = p_prev->p_prev; + if ( p_prev == NULL ) + { + p_block->p_next = p_first; + p_first->p_prev = p_block; + p_block->p_prev = NULL; + p_first = p_block; + } + else + { + p_block->p_prev = p_prev; + p_block->p_next = p_prev->p_next; + p_prev->p_next = p_block; + if ( p_block->p_next != NULL ) + p_block->p_next->p_prev = p_block; + else + p_last = p_block; + } + } +} + +/***************************************************************************** + * Entry point + *****************************************************************************/ +int main( int i_argc, char **pp_argv ) +{ + int i, c; + int i_priority = -1; + int i_ttl = 0; + int i_mtu = DEFAULT_MTU; + struct pollfd pfd[MAX_INPUTS]; + + while ( (c = getopt( i_argc, pp_argv, "i:t:b:Um:h" )) != -1 ) + { + switch ( c ) + { + case 'i': + i_priority = strtol( optarg, NULL, 0 ); + break; + + case 't': + i_ttl = strtol( optarg, NULL, 0 ); + break; + + case 'b': + i_buffer_length = strtoll( optarg, NULL, 0 ) * 1000; + break; + + case 'U': + b_udp = 1; + break; + + case 'm': + i_mtu = strtol( optarg, NULL, 0 ); + break; + + case 'h': + default: + usage(); + break; + } + } + if ( optind >= i_argc - 1 ) + usage(); + + i_nb_inputs = 0; + while ( optind < i_argc - 1 && i_nb_inputs < MAX_INPUTS ) + { + pi_inputs_fd[i_nb_inputs] = OpenSocket( pp_argv[optind++], 0, NULL ); + pfd[i_nb_inputs].fd = pi_inputs_fd[i_nb_inputs]; + pfd[i_nb_inputs].events = POLLIN; + i_nb_inputs++; + } + if ( optind < i_argc - 1 ) + { + msg_Err( NULL, "max number of inputs: %d (recompile)", MAX_INPUTS ); + exit(EXIT_FAILURE); + } + msg_Dbg( NULL, "%d inputs", i_nb_inputs ); + + i_output_fd = OpenSocket( pp_argv[optind], i_ttl, NULL ); + clock_Init(); + + if ( i_priority > 0 ) + { + struct sched_param param; + int i_error; + + memset( ¶m, 0, sizeof(struct sched_param) ); + param.sched_priority = i_priority; + if ( (i_error = pthread_setschedparam( pthread_self(), SCHED_RR, + ¶m )) ) + { + msg_Warn( NULL, "couldn't set thread priority: %s", + strerror(i_error) ); + } + } + + for ( ; ; ) + { + int i_timeout = -1; + uint64_t i_current_date; + + while ( p_first != NULL + && p_first->i_date <= (i_current_date = wall_Date()) + 26999 ) + SendPacket(); + + if ( p_first != NULL ) + i_timeout = (p_first->i_date - i_current_date) / 27000; + + if ( poll( pfd, i_nb_inputs, i_timeout ) < 0 ) + { + int saved_errno = errno; + msg_Warn( NULL, "couldn't poll(): %s", strerror(errno) ); + if ( saved_errno == EINTR ) continue; + exit(EXIT_FAILURE); + } + i_current_date = wall_Date(); + + for ( i = 0; i < i_nb_inputs; i++ ) + { + if ( pfd[i].revents & POLLIN ) + { + block_t *p_block = malloc( sizeof(block_t) + i_mtu ); + ssize_t i_size; + + p_block->p_data = (uint8_t *)p_block + sizeof(block_t); + i_size = read( pi_inputs_fd[i], p_block->p_data, i_mtu ); + if ( i_size < 0 && errno != EAGAIN && errno != EINTR ) + { + msg_Err( NULL, "unrecoverable read error, dying (%s)", + strerror(errno) ); + exit(EXIT_FAILURE); + } + if ( i_size <= 0 ) + { + free( p_block ); + continue; + } + + p_block->i_size = i_size; + RecvPacket( p_block, i_current_date ); + } + } + } + + return EXIT_SUCCESS; +} + diff --git a/ingests.c b/ingests.c new file mode 100644 index 0000000..bad50a0 --- /dev/null +++ b/ingests.c @@ -0,0 +1,258 @@ +/***************************************************************************** + * ingests.c: create the aux file for a transport stream file + ***************************************************************************** + * Copyright (C) 2009 VideoLAN + * $Id: ingests.c 52 2009-10-06 16:48:00Z cmassiot $ + * + * Authors: Christophe Massiot <massiot@via.ecp.fr> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA. + *****************************************************************************/ + +#include <stdlib.h> +#include <stdio.h> +#include <unistd.h> +#include <stdint.h> +#include <stdbool.h> +#include <string.h> +#include <errno.h> + +#include "util.h" + +/***************************************************************************** + * Local declarations + *****************************************************************************/ +#define READ_ONCE 100 +#define MAX_PCR_GAP (500ULL * 27000ULL) /* that's 500 ms */ + +#define POW2_33 8589934592ULL + +static uint16_t i_pcr_pid = 0; +static size_t i_ts_in_payload = DEFAULT_PAYLOAD_SIZE / TS_SIZE; + +static int i_fd; +static FILE *p_output_aux; +static int i_ts_read = 0; + +static bool b_init = true; +static int i_ts_since_output = 0; +static uint64_t i_last_pcr = POW2_33 * 300; +static uint64_t i_last_pcr_diff = 0; +static int i_last_nb_payloads = 0; +static uint64_t i_last_stc = 0; + +static void usage(void) +{ + msg_Raw( NULL, "Usage: ingests -p <PCR PID> [-m <payload size>] <input ts>" ); + exit(EXIT_FAILURE); +} + +/***************************************************************************** + * OutputAux: date payload packets + *****************************************************************************/ +static void OutputAux( int i_nb_payloads, uint64_t i_duration ) +{ + uint8_t *p_aux; + int i; + + p_aux = malloc( i_nb_payloads * sizeof(uint64_t) ); + + for ( i = 0; i < i_nb_payloads; i++ ) + { + uint64_t i_stc = i_last_stc + i_duration * (i + 1) / i_nb_payloads; + + p_aux[8 * i + 0] = i_stc >> 56; + p_aux[8 * i + 1] = (i_stc >> 48) & 0xff; + p_aux[8 * i + 2] = (i_stc >> 40) & 0xff; + p_aux[8 * i + 3] = (i_stc >> 32) & 0xff; + p_aux[8 * i + 4] = (i_stc >> 24) & 0xff; + p_aux[8 * i + 5] = (i_stc >> 16) & 0xff; + p_aux[8 * i + 6] = (i_stc >> 8) & 0xff; + p_aux[8 * i + 7] = (i_stc >> 0) & 0xff; + } + i_last_stc += i_duration; + + if ( fwrite( p_aux, 8, i_nb_payloads, p_output_aux ) != i_nb_payloads ) + msg_Err( NULL, "couldn't write to auxiliary file" ); +} + +/***************************************************************************** + * Output: date as many payload packets as possible + *****************************************************************************/ +static void Output(void) +{ + int i_nb_payloads = (i_ts_since_output + i_ts_in_payload - 1) + / i_ts_in_payload; + + if ( i_ts_since_output <= 0 ) + return; + + if ( b_init ) + { + /* Emulate CBR */ + OutputAux( i_last_nb_payloads, + i_last_pcr_diff * i_last_nb_payloads / i_nb_payloads ); + b_init = false; + } + + OutputAux( i_nb_payloads, i_last_pcr_diff ); + i_ts_since_output -= i_nb_payloads * i_ts_in_payload; + i_last_nb_payloads = i_nb_payloads; +} + +/***************************************************************************** + * OutputFirst: manipulate structures to emulate CBR at the beginning + *****************************************************************************/ +static void OutputFirst(void) +{ + i_last_nb_payloads = (i_ts_since_output + i_ts_in_payload - 1) + / i_ts_in_payload; + i_ts_since_output -= i_last_nb_payloads; +} + +/***************************************************************************** + * OutputFirst: emulate CBR at the end + *****************************************************************************/ +static void OutputLast(void) +{ + int i_nb_payloads = (i_ts_since_output + i_ts_in_payload - 1) + / i_ts_in_payload; + OutputAux( i_nb_payloads, + i_last_pcr_diff * i_nb_payloads / i_last_nb_payloads ); +} + +/***************************************************************************** + * TSHandle: find a PCR and stamp packets + *****************************************************************************/ +static void TSHandle( uint8_t *p_ts ) +{ + uint16_t i_pid = ts_GetPID( p_ts ); + + if ( !ts_CheckSync( p_ts ) ) + { + msg_Err( NULL, "lost TS synchro, go and fix your file (pos=%llu)", + (uint64_t)i_ts_read * TS_SIZE ); + exit(EXIT_FAILURE); + } + + i_ts_since_output++; + + if ( (i_pid == i_pcr_pid || i_pcr_pid == 8192) && ts_HasPCR( p_ts ) ) + { + uint64_t i_pcr = ts_GetPCR( p_ts ) * 300 + ts_GetPCRExt( p_ts ); + + if ( i_last_pcr == POW2_33 * 300 ) /* init */ + { + i_last_pcr = i_pcr; + OutputFirst(); + return; + } + if ( (POW2_33 * 300 + i_pcr) - i_last_pcr < MAX_PCR_GAP ) + /* Clock wrapped */ + i_last_pcr_diff = POW2_33 * 300 + i_pcr - i_last_pcr; + else if ( (i_pcr <= i_last_pcr) || + (i_pcr - i_last_pcr > MAX_PCR_GAP) ) + /* Do not change the slope - consider CBR */ + msg_Warn( NULL, "PCR discontinuity (%llu->%llu, pos=%llu)", + i_last_pcr, i_pcr, (uint64_t)i_ts_read * TS_SIZE ); + else + i_last_pcr_diff = i_pcr - i_last_pcr; + + i_last_pcr = i_pcr; + Output(); + } +} + +/***************************************************************************** + * Entry point + *****************************************************************************/ +int main( int i_argc, char **pp_argv ) +{ + bool b_stream; + uint8_t *p_buffer; + unsigned int i_payload_size; + + for ( ; ; ) + { + char c; + + if ( (c = getopt(i_argc, pp_argv, "p:m:h")) == -1 ) + break; + + switch ( c ) + { + case 'p': + i_pcr_pid = strtoul(optarg, NULL, 0); + break; + + case 'm': + i_payload_size = strtoul(optarg, NULL, 0); + i_ts_in_payload = i_payload_size / TS_SIZE; + if ( i_payload_size % TS_SIZE ) + { + msg_Err( NULL, "payload size must be a multiple of 188" ); + exit(EXIT_FAILURE); + } + break; + + case 'h': + default: + usage(); + break; + } + } + if ( optind >= i_argc || !i_pcr_pid ) + usage(); + + i_fd = OpenFile( pp_argv[optind], true, false, &b_stream ); + if ( b_stream ) + usage(); + p_output_aux = OpenAuxFile( pp_argv[optind], false, false ); + + p_buffer = malloc( TS_SIZE * READ_ONCE ); + + for ( ; ; ) + { + int i; + ssize_t i_ret; + + if ( (i_ret = read( i_fd, p_buffer, TS_SIZE * READ_ONCE )) < 0 ) + { + msg_Err( NULL, "read error (%s)", strerror(errno) ); + break; + } + if ( i_ret == 0 ) + { + msg_Dbg( NULL, "end of file reached" ); + break; + } + + for ( i = 0; i < i_ret / TS_SIZE; i++ ) + { + TSHandle( p_buffer + TS_SIZE * i ); + i_ts_read++; + } + } + + if ( !i_last_pcr_diff ) + msg_Err( NULL, "no PCR found" ); + else + OutputLast(); /* Emulate CBR */ + fclose( p_output_aux ); + close( i_fd ); + + return 0; +} + diff --git a/multicat.c b/multicat.c new file mode 100644 index 0000000..50c99eb --- /dev/null +++ b/multicat.c @@ -0,0 +1,523 @@ +/***************************************************************************** + * multicat.c: netcat-equivalent for multicast + ***************************************************************************** + * Copyright (C) 2009 VideoLAN + * $Id: multicat.c 48 2007-11-30 14:08:21Z cmassiot $ + * + * Authors: Christophe Massiot <massiot@via.ecp.fr> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA. + *****************************************************************************/ + +#include <stdlib.h> +#include <stdio.h> +#include <unistd.h> +#include <stdint.h> +#include <stdbool.h> +#include <string.h> +#include <errno.h> +#include <signal.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <pthread.h> + +#include "util.h" + +#define RTP_HEADER_MAX_SIZE (RTP_HEADER_SIZE + 15 * 4) +#define RTP_TS_TYPE 33 + +/***************************************************************************** + * Local declarations + *****************************************************************************/ +static int i_input_fd, i_output_fd; +FILE *p_input_aux, *p_output_aux; +static uint16_t i_pcr_pid = 0; +static bool b_overwrite_ssrc = false; +static in_addr_t i_ssrc = 0; +static bool b_input_udp = false, b_output_udp = false; +static size_t i_asked_payload_size = DEFAULT_PAYLOAD_SIZE; + +static bool b_die = false; +static uint16_t i_rtp_cc; +static uint64_t i_stc = 0; /* system time clock, used for date calculations */ +static uint64_t i_pcr = 0, i_pcr_stc = 0; /* for RTP/TS output */ +void (*pf_Skip)( size_t i_len, int i_nb_chunks ); +ssize_t (*pf_Read)( void *p_buf, size_t i_len ); +ssize_t (*pf_Write)( const void *p_buf, size_t i_len ); + +static void usage(void) +{ + msg_Raw( NULL, "Usage: multicat [-i <RT priority>] [-t <ttl>] [-p <PCR PID>] [-s <chunks>] [-n <chunks>] [-d <time>] [-a] [-S <SSRC IP>] [-u] [-U] [-m <payload size>] <input item> <output item>" ); + msg_Raw( NULL, " item format: <file path | device path | FIFO path | network host>" ); + msg_Raw( NULL, " host format: [<connect addr>[:<connect port>]][@[<bind addr][:<bind port>]]" ); + msg_Raw( NULL, " -p: overwrite or create RTP timestamps using PCR PID (MPEG-2/TS)" ); + msg_Raw( NULL, " -s: skip the first N chunks of payload" ); + msg_Raw( NULL, " -n: exit after playing N chunks of payload" ); + msg_Raw( NULL, " -d: exit after definite time (in 27 MHz units)" ); + msg_Raw( NULL, " -a: append to existing destination file (risky)" ); + msg_Raw( NULL, " -S: overwrite or create RTP SSRC" ); + msg_Raw( NULL, " -u: source has no RTP header" ); + msg_Raw( NULL, " -U: destination has no RTP header" ); + msg_Raw( NULL, " -m: size of the payload chunk, excluding optional RTP header (default 1316)" ); + exit(EXIT_FAILURE); +} + +/***************************************************************************** + * Signal Handler + *****************************************************************************/ +static void SigHandler( int i_signal ) +{ + b_die = true; +} + +/***************************************************************************** + * udp_*: UDP socket handlers + *****************************************************************************/ +static int i_udp_nb_skips = 0; + +static void udp_Skip( size_t i_len, int i_nb_chunks ) +{ + i_udp_nb_skips = i_nb_chunks; +} + +static ssize_t udp_Read( void *p_buf, size_t i_len ) +{ + ssize_t i_ret; + if ( (i_ret = recv( i_input_fd, p_buf, i_len, 0 )) < 0 ) + { + msg_Err( NULL, "recv error (%s)", strerror(errno) ); + b_die = true; + return 0; + } + + i_stc = wall_Date(); + if ( i_udp_nb_skips ) + { + i_udp_nb_skips--; + return 0; + } + return i_ret; +} + +static ssize_t udp_Write( const void *p_buf, size_t i_len ) +{ + size_t i_ret; + if ( (i_ret = send( i_output_fd, p_buf, i_len, 0 )) < 0 ) + msg_Err( NULL, "write error (%s)", strerror(errno) ); + return i_ret; +} + +/***************************************************************************** + * stream_*: FIFO and character device handlers + *****************************************************************************/ +static int i_stream_nb_skips = 0; + +static void stream_Skip( size_t i_len, int i_nb_chunks ) +{ + i_stream_nb_skips = i_nb_chunks; +} + +static ssize_t stream_Read( void *p_buf, size_t i_len ) +{ + ssize_t i_ret; + if ( (i_ret = read( i_input_fd, p_buf, i_len )) < 0 ) + { + msg_Err( NULL, "read error (%s)", strerror(errno) ); + b_die = true; + return 0; + } + + i_stc = wall_Date(); + if ( i_stream_nb_skips ) + { + i_stream_nb_skips--; + return 0; + } + return i_ret; +} + +static ssize_t stream_Write( const void *p_buf, size_t i_len ) +{ + size_t i_ret; + if ( (i_ret = write( i_output_fd, p_buf, i_len )) < 0 ) + msg_Err( NULL, "write error (%s)", strerror(errno) ); + return i_ret; +} + +/***************************************************************************** + * file_*: handler for the auxiliary file format + *****************************************************************************/ +static void file_Skip( size_t i_len, int i_nb_chunks ) +{ + lseek( i_input_fd, (off_t)i_len * (off_t)i_nb_chunks, SEEK_SET ); + fseeko( p_input_aux, 8 * (off_t)i_nb_chunks, SEEK_SET ); +} + +static ssize_t file_Read( void *p_buf, size_t i_len ) +{ + /* for correct throughput without rounding approximations */ + static uint64_t i_file_first_stc = 0, i_file_first_wall = 0; + + uint8_t p_aux[8]; + uint64_t i_wall; + ssize_t i_ret; + + if ( (i_ret = read( i_input_fd, p_buf, i_len )) < 0 ) + { + msg_Err( NULL, "read error (%s)", strerror(errno) ); + b_die = true; + return 0; + } + if ( i_ret == 0 ) + { + msg_Dbg( NULL, "end of file reached" ); + b_die = true; + return 0; + } + + if ( fread( p_aux, 8, 1, p_input_aux ) != 1 ) + { + msg_Warn( NULL, "premature end of aux file reached" ); + b_die = true; + return 0; + } + i_stc = ((uint64_t)p_aux[0] << 56) + | ((uint64_t)p_aux[1] << 48) + | ((uint64_t)p_aux[2] << 40) + | ((uint64_t)p_aux[3] << 32) + | ((uint64_t)p_aux[4] << 24) + | ((uint64_t)p_aux[5] << 16) + | ((uint64_t)p_aux[6] << 8) + | ((uint64_t)p_aux[7] << 0); + i_wall = wall_Date(); + + if ( !i_file_first_wall ) + { + i_file_first_wall = i_wall; + i_file_first_stc = i_stc; + } + + if ( (i_stc - i_file_first_stc) > (i_wall - i_file_first_wall) ) + wall_Sleep( (i_stc - i_file_first_stc) - (i_wall - i_file_first_wall) ); + return i_ret; +} + +static ssize_t file_Write( const void *p_buf, size_t i_len ) +{ + uint8_t p_aux[8]; + ssize_t i_ret; + + if ( (i_ret = write( i_output_fd, p_buf, i_len )) < 0 ) + { + msg_Err( NULL, "couldn't write to file" ); + return i_ret; + } + + p_aux[0] = i_stc >> 56; + p_aux[1] = (i_stc >> 48) & 0xff; + p_aux[2] = (i_stc >> 40) & 0xff; + p_aux[3] = (i_stc >> 32) & 0xff; + p_aux[4] = (i_stc >> 24) & 0xff; + p_aux[5] = (i_stc >> 16) & 0xff; + p_aux[6] = (i_stc >> 8) & 0xff; + p_aux[7] = (i_stc >> 0) & 0xff; + if ( fwrite( p_aux, 8, 1, p_output_aux ) != 1 ) + msg_Err( NULL, "couldn't write to auxiliary file" ); + + return i_ret; +} + +/***************************************************************************** + * GetPCR: read PCRs to align RTP timestamps with PCR scale (RFC compliance) + *****************************************************************************/ +static void GetPCR( const uint8_t *p_buffer, size_t i_read_size ) +{ + while ( i_read_size >= TS_SIZE ) + { + uint16_t i_pid = ts_GetPID( p_buffer ); + + if ( !ts_CheckSync( p_buffer ) ) + { + msg_Warn( NULL, "invalid TS packet (sync=0x%x)", p_buffer[0] ); + return; + } + if ( (i_pid == i_pcr_pid || i_pcr_pid == 8192) + && ts_HasPCR( p_buffer ) ) + { + i_pcr = ts_GetPCR( p_buffer ) * 300 + ts_GetPCRExt( p_buffer ); + i_pcr_stc = i_stc; + } + p_buffer += TS_SIZE; + i_read_size -= TS_SIZE; + } +} + +/***************************************************************************** + * Entry point + *****************************************************************************/ +int main( int i_argc, char **pp_argv ) +{ + int i_priority = -1; + int i_ttl = 0; + int i_skip_chunks = 0, i_nb_chunks = -1; + uint64_t i_duration = 0, i_last_stc = 0; + bool b_append = false; + uint8_t *p_buffer, *p_read_buffer; + size_t i_max_read_size, i_max_write_size; + int c; + + while ( (c = getopt( i_argc, pp_argv, "i:t:p:s:n:d:aS:uUm:h" )) != -1 ) + { + switch ( c ) + { + case 'i': + i_priority = strtol( optarg, NULL, 0 ); + break; + + case 't': + i_ttl = strtol( optarg, NULL, 0 ); + break; + + case 'p': + i_pcr_pid = strtol( optarg, NULL, 0 ); + break; + + case 's': + i_skip_chunks = strtol( optarg, NULL, 0 ); + break; + + case 'n': + i_nb_chunks = strtol( optarg, NULL, 0 ); + break; + + case 'd': + i_duration = strtoull( optarg, NULL, 0 ); + break; + + case 'a': + b_append = true; + break; + + case 'S': + { + struct in_addr maddr; + if ( !inet_aton( optarg, &maddr ) ) + usage(); + i_ssrc = maddr.s_addr; + b_overwrite_ssrc = true; + break; + } + + case 'u': + b_input_udp = true; + break; + + case 'U': + b_output_udp = true; + break; + + case 'm': + i_asked_payload_size = strtol( optarg, NULL, 0 ); + break; + + case 'h': + default: + usage(); + break; + } + } + if ( optind >= i_argc - 1 ) + usage(); + + if ( (i_input_fd = OpenSocket( pp_argv[optind], i_ttl, NULL )) >= 0 ) + { + pf_Read = udp_Read; + pf_Skip = udp_Skip; + } + else + { + bool b_stream; + i_input_fd = OpenFile( pp_argv[optind], true, false, &b_stream ); + if ( !b_stream ) + { + p_input_aux = OpenAuxFile( pp_argv[optind], true, false ); + pf_Read = file_Read; + pf_Skip = file_Skip; + } + else + { + pf_Read = stream_Read; + pf_Skip = stream_Skip; + } + b_input_udp = true; /* We don't need no, RTP header */ + } + optind++; + + if ( (i_output_fd = OpenSocket( pp_argv[optind], i_ttl, NULL )) + >= 0 ) + pf_Write = udp_Write; + else + { + bool b_stream; + i_output_fd = OpenFile( pp_argv[optind], false, b_append, &b_stream ); + if ( !b_stream ) + { + p_output_aux = OpenAuxFile( pp_argv[optind], false, b_append ); + pf_Write = file_Write; + } + else + pf_Write = stream_Write; + b_output_udp = true; /* We don't need no, RTP header */ + } + optind++; + + srand( time(NULL) * getpid() ); + i_max_read_size = i_asked_payload_size + (b_input_udp ? 0 : + RTP_HEADER_MAX_SIZE); + i_max_write_size = i_asked_payload_size + (b_output_udp ? 0 : + (b_input_udp ? RTP_HEADER_SIZE : + RTP_HEADER_MAX_SIZE)); + p_buffer = malloc( (i_max_read_size < i_max_write_size) ? i_max_read_size : + i_max_write_size ); + p_read_buffer = p_buffer + ((b_input_udp && !b_output_udp) ? + RTP_HEADER_SIZE : 0); + if ( b_input_udp && !b_output_udp ) + i_rtp_cc = rand() & 0xffff; + + if ( i_skip_chunks ) + pf_Skip( i_asked_payload_size, i_skip_chunks ); + + if ( i_priority > 0 ) + { + struct sched_param param; + int i_error; + + memset( ¶m, 0, sizeof(struct sched_param) ); + param.sched_priority = i_priority; + if ( (i_error = pthread_setschedparam( pthread_self(), SCHED_RR, + ¶m )) ) + { + msg_Warn( NULL, "couldn't set thread priority: %s", + strerror(i_error) ); + } + } + + signal( SIGTERM, SigHandler ); + signal( SIGHUP, SigHandler ); + signal( SIGINT, SigHandler ); + + while ( !b_die ) + { + ssize_t i_read_size = pf_Read( p_read_buffer, i_max_read_size ); + uint8_t *p_payload; + size_t i_payload_size; + uint8_t *p_write_buffer; + size_t i_write_size; + + if ( i_read_size <= 0 ) continue; + + /* Determine start and size of payload */ + if ( !b_input_udp ) + { + if ( !rtp_CheckHdr( p_read_buffer ) ) + msg_Warn( NULL, "invalid RTP packet received" ); + p_payload = rtp_GetPayload( p_read_buffer ); + i_payload_size = p_read_buffer + i_read_size - p_payload; + } + else + { + p_payload = p_read_buffer; + i_payload_size = i_read_size; + } + + /* Pad to get the asked payload size */ + while ( i_payload_size + TS_SIZE <= i_asked_payload_size ) + { + ts_Pad( &p_payload[i_payload_size] ); + i_read_size += TS_SIZE; + i_payload_size += TS_SIZE; + } + + /* Prepare header and size of output */ + if ( b_output_udp ) + { + p_write_buffer = p_payload; + i_write_size = i_payload_size; + } + else /* RTP output */ + { + if ( b_input_udp ) + { + p_write_buffer = p_buffer; + i_write_size = i_payload_size + RTP_HEADER_SIZE; + + rtp_SetHdr( p_write_buffer, i_rtp_cc ); + i_rtp_cc++; + + if ( i_pcr_pid ) + { + GetPCR( p_payload, i_payload_size ); + rtp_SetTimestamp( p_write_buffer, + (i_pcr + (i_stc - i_pcr_stc)) / 300 ); + } + else + { + /* This isn't RFC-compliant but no one really cares */ + rtp_SetTimestamp( p_write_buffer, i_stc / 300 ); + } + rtp_SetSSRC( p_write_buffer, (uint8_t *)&i_ssrc ); + } + else /* RTP output, RTP input */ + { + p_write_buffer = p_read_buffer; + i_write_size = i_read_size; + + if ( i_pcr_pid ) + { + if ( rtp_GetType( p_write_buffer ) != RTP_TS_TYPE ) + msg_Warn( NULL, "input isn't MPEG transport stream" ); + else + GetPCR( p_payload, i_payload_size ); + rtp_SetTimestamp( p_write_buffer, + (i_pcr + (i_stc - i_pcr_stc)) / 300 ); + } + if ( b_overwrite_ssrc ) + rtp_SetSSRC( p_write_buffer, (uint8_t *)&i_ssrc ); + } + } + + pf_Write( p_write_buffer, i_write_size ); + + if ( i_nb_chunks > 0 ) + i_nb_chunks--; + if ( !i_nb_chunks ) + b_die = true; + + if ( i_duration ) + { + if ( i_last_stc ) + { + if ( i_last_stc <= i_stc ) + b_die = true; + } + else + i_last_stc = i_stc + i_duration; + } + } + + return EXIT_SUCCESS; +} + diff --git a/offsets.c b/offsets.c new file mode 100644 index 0000000..364eb7d --- /dev/null +++ b/offsets.c @@ -0,0 +1,129 @@ +/***************************************************************************** + * offsets.c: find position in an aux file + ***************************************************************************** + * Copyright (C) 2009 VideoLAN + * $Id: offsets.c 10 2005-11-16 18:09:00Z cmassiot $ + * + * Authors: Christophe Massiot <massiot@via.ecp.fr> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA. + *****************************************************************************/ + +#include <stdlib.h> +#include <stdio.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <stdint.h> +#include <string.h> +#include <errno.h> +#include <sys/mman.h> + +/***************************************************************************** + * Entry point + *****************************************************************************/ +int main( int i_argc, char **pp_argv ) +{ + uint8_t *p_aux; + uint64_t i_wanted; + off_t i_offset1 = 0, i_offset2; + int i_stc_fd; + struct stat stc_stat; + uint64_t i_stc0; + + if ( i_argc != 3 ) + { + fprintf( stderr, "Usage: offsets <aux file> <27 MHz timestamp>\n" ); + exit(EXIT_FAILURE); + } + + i_wanted = strtoull( pp_argv[2], NULL, 0 ); + if ( !i_wanted ) + { + printf( "0\n" ); + exit(EXIT_SUCCESS); + } + + if ( (i_stc_fd = open( pp_argv[1], O_RDONLY )) == -1 ) + { + fprintf( stderr, "unable to open %s (%s)\n", pp_argv[1], + strerror(errno) ); + exit(EXIT_FAILURE); + } + + if ( fstat( i_stc_fd, &stc_stat ) == -1 ) + { + fprintf( stderr, "unable to stat %s (%s)\n", pp_argv[1], + strerror(errno) ); + exit(EXIT_FAILURE); + } + + p_aux = mmap( NULL, stc_stat.st_size, PROT_READ, MAP_SHARED, + i_stc_fd, 0 ); + if ( p_aux == MAP_FAILED ) + { + fprintf( stderr, "unable to mmap %s (%s)\n", pp_argv[1], + strerror(errno) ); + exit(EXIT_FAILURE); + } + + if ( p_aux[0] == 0x47 && p_aux[188] == 0x47 && p_aux[376] == 0x47 ) + { + fprintf( stderr, "this is a TS file, not an aux file\n" ); + exit(EXIT_FAILURE); + } + + i_offset2 = stc_stat.st_size / sizeof(uint64_t); + i_stc0 = ((uint64_t)p_aux[0] << 56) + | ((uint64_t)p_aux[1] << 48) + | ((uint64_t)p_aux[2] << 40) + | ((uint64_t)p_aux[3] << 32) + | ((uint64_t)p_aux[4] << 24) + | ((uint64_t)p_aux[5] << 16) + | ((uint64_t)p_aux[6] << 8) + | ((uint64_t)p_aux[7] << 0); + + for ( ; ; ) + { + off_t i_mid_offset = (i_offset1 + i_offset2) / 2; + uint8_t *p_mid_aux = p_aux + i_mid_offset * sizeof(uint64_t); + uint64_t i_mid_stc = ((uint64_t)p_mid_aux[0] << 56) + | ((uint64_t)p_mid_aux[1] << 48) + | ((uint64_t)p_mid_aux[2] << 40) + | ((uint64_t)p_mid_aux[3] << 32) + | ((uint64_t)p_mid_aux[4] << 24) + | ((uint64_t)p_mid_aux[5] << 16) + | ((uint64_t)p_mid_aux[6] << 8) + | ((uint64_t)p_mid_aux[7] << 0); + + + if ( i_offset1 == i_mid_offset ) + break; + + if ( i_mid_stc - i_stc0 >= i_wanted ) + i_offset2 = i_mid_offset; + else + i_offset1 = i_mid_offset; + } + + munmap( p_aux, stc_stat.st_size ); + close( i_stc_fd ); + + printf( "%jd\n", (intmax_t)i_offset2 ); + + exit(EXIT_SUCCESS); +} + @@ -0,0 +1,389 @@ +/***************************************************************************** + * util.c: Utils for the multicat suite + ***************************************************************************** + * Copyright (C) 2004, 2009 VideoLAN + * $Id: util.c 27 2009-10-20 19:15:04Z massiot $ + * + * Authors: Christophe Massiot <massiot@via.ecp.fr> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA. + *****************************************************************************/ + +#include <stdlib.h> +#include <stdio.h> +#include <stdint.h> +#include <stdbool.h> +#include <string.h> +#include <errno.h> +#include <stdarg.h> +#include <sys/time.h> +#include <time.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +#include "util.h" + +/***************************************************************************** + * Local declarations + *****************************************************************************/ +#define MAX_MSG 1024 +#define PSZ_AUX_EXT "aux" + +int i_verbose = VERB_DBG; + +/***************************************************************************** + * msg_Info + *****************************************************************************/ +void msg_Info( void *_unused, const char *psz_format, ... ) +{ + if ( i_verbose >= VERB_INFO ) + { + va_list args; + char psz_fmt[MAX_MSG]; + va_start( args, psz_format ); + + snprintf( psz_fmt, MAX_MSG, "info: %s\n", psz_format ); + vfprintf( stderr, psz_fmt, args ); + } +} + +/***************************************************************************** + * msg_Err + *****************************************************************************/ +void msg_Err( void *_unused, const char *psz_format, ... ) +{ + va_list args; + char psz_fmt[MAX_MSG]; + va_start( args, psz_format ); + + snprintf( psz_fmt, MAX_MSG, "error: %s\n", psz_format ); + vfprintf( stderr, psz_fmt, args ); +} + +/***************************************************************************** + * msg_Warn + *****************************************************************************/ +void msg_Warn( void *_unused, const char *psz_format, ... ) +{ + if ( i_verbose >= VERB_WARN ) + { + va_list args; + char psz_fmt[MAX_MSG]; + va_start( args, psz_format ); + + snprintf( psz_fmt, MAX_MSG, "warning: %s\n", psz_format ); + vfprintf( stderr, psz_fmt, args ); + } +} + +/***************************************************************************** + * msg_Dbg + *****************************************************************************/ +void msg_Dbg( void *_unused, const char *psz_format, ... ) +{ + if ( i_verbose >= VERB_DBG ) + { + va_list args; + char psz_fmt[MAX_MSG]; + va_start( args, psz_format ); + + snprintf( psz_fmt, MAX_MSG, "debug: %s\n", psz_format ); + vfprintf( stderr, psz_fmt, args ); + } +} + +/***************************************************************************** + * msg_Raw + *****************************************************************************/ +void msg_Raw( void *_unused, const char *psz_format, ... ) +{ + va_list args; + char psz_fmt[MAX_MSG]; + va_start( args, psz_format ); + + snprintf( psz_fmt, MAX_MSG, "%s\n", psz_format ); + vfprintf( stderr, psz_fmt, args ); +} + +/***************************************************************************** + * wall_Date: returns a 27 MHz timestamp + *****************************************************************************/ +uint64_t wall_Date( void ) +{ +#if defined (HAVE_CLOCK_NANOSLEEP) + struct timespec ts; + + /* Try to use POSIX monotonic clock if available */ + if( clock_gettime( CLOCK_MONOTONIC, &ts ) == EINVAL ) + /* Run-time fallback to real-time clock (always available) */ + (void)clock_gettime( CLOCK_REALTIME, &ts ); + + return ((uint64_t)ts.tv_sec * (uint64_t)27000000) + + (uint64_t)(ts.tv_nsec * 27 / 1000); +#else + struct timeval tv_date; + + /* gettimeofday() could return an error, and should be tested. However, the + * only possible error, according to 'man', is EFAULT, which can not happen + * here, since tv is a local variable. */ + gettimeofday( &tv_date, NULL ); + return( (uint64_t) tv_date.tv_sec * 27000000 + (uint64_t) tv_date.tv_usec * 27 ); +#endif +} + +/***************************************************************************** + * wall_Sleep + *****************************************************************************/ +void wall_Sleep( uint64_t i_delay ) +{ + struct timespec ts; + ts.tv_sec = i_delay / 27000000; + ts.tv_nsec = (i_delay % 27000000) * 1000 / 27; + +#if defined( HAVE_CLOCK_NANOSLEEP ) + int val; + while ( ( val = clock_nanosleep( CLOCK_MONOTONIC, 0, &ts, &ts ) ) == EINTR ); + if( val == EINVAL ) + { + ts.tv_sec = i_delay / 27000000; + ts.tv_nsec = (i_delay % 27000000) * 1000 / 27; + while ( clock_nanosleep( CLOCK_REALTIME, 0, &ts, &ts ) == EINTR ); + } +#else + while ( nanosleep( &ts, &ts ) && errno == EINTR ); +#endif +} + +/***************************************************************************** + * PrintSocket: print socket characteristics for debug purposes + *****************************************************************************/ +static void PrintSocket( const char *psz_text, struct sockaddr_in *p_bind, + struct sockaddr_in *p_connect ) +{ + msg_Dbg( NULL, "%s bind:%s:%u", psz_text, + inet_ntoa( p_bind->sin_addr ), ntohs( p_bind->sin_port ) ); + msg_Dbg( NULL, "%s connect:%s:%u", psz_text, + inet_ntoa( p_connect->sin_addr ), ntohs( p_connect->sin_port ) ); +} + +/***************************************************************************** + * ParseHost: parse a host:port string + *****************************************************************************/ +static int ParseHost( struct sockaddr_in *p_sock, char *psz_host ) +{ + char *psz_token = strrchr( psz_host, ':' ); + if ( psz_token ) + { + char *psz_parser; + *psz_token++ = '\0'; + p_sock->sin_port = htons( strtol( psz_token, &psz_parser, 0 ) ); + if ( *psz_parser ) return -1; + } + else + p_sock->sin_port = htons( DEFAULT_PORT ); + + if ( !*psz_host ) + p_sock->sin_addr.s_addr = INADDR_ANY; + else if ( !inet_aton( psz_host, &p_sock->sin_addr ) ) + return -1; + + return 0; +} + +/***************************************************************************** + * OpenSocket: parse argv and open sockets + *****************************************************************************/ +int OpenSocket( const char *_psz_arg, int i_ttl, unsigned int *pi_weight ) +{ + char *psz_token; + struct sockaddr_in bind_addr, connect_addr; + int i_fd, i; + char *psz_arg = strdup(_psz_arg); + + bind_addr.sin_family = connect_addr.sin_family = AF_INET; + bind_addr.sin_addr.s_addr = connect_addr.sin_addr.s_addr = INADDR_ANY; + bind_addr.sin_port = connect_addr.sin_port = 0; + + psz_token = strrchr( psz_arg, ',' ); + if ( psz_token ) + { + *psz_token++ = '\0'; + if ( pi_weight ) + *pi_weight = strtoul( psz_token, NULL, 0 ); + } + else if ( pi_weight ) + *pi_weight = 1; + + psz_token = strrchr( psz_arg, '@' ); + if ( psz_token ) + { + *psz_token++ = '\0'; + if ( ParseHost( &bind_addr, psz_token ) < 0 ) + { + free(psz_arg); + return -1; + } + } + + if ( psz_arg[0] && ParseHost( &connect_addr, psz_arg ) < 0 ) + { + free(psz_arg); + return -1; + } + free( psz_arg ); + + if ( (i_fd = socket( AF_INET, SOCK_DGRAM, 0 )) < 0 ) + { + msg_Err( NULL, "unable to open socket (%s)", strerror(errno) ); + exit(EXIT_FAILURE); + } + + i = 1; + if ( setsockopt( i_fd, SOL_SOCKET, SO_REUSEADDR, (void *)&i, + sizeof(i) ) == -1 ) + { + msg_Err( NULL, "unable to set socket (%s)", strerror(errno) ); + exit(EXIT_FAILURE); + } + + /* Increase the receive buffer size to 1/2MB (8Mb/s during 1/2s) to avoid + * packet loss caused by scheduling problems */ + i = 0x80000; + setsockopt( i_fd, SOL_SOCKET, SO_RCVBUF, (void *) &i, sizeof( i ) ); + + if ( bind( i_fd, (struct sockaddr *)&bind_addr, sizeof(bind_addr) ) < 0 ) + { + msg_Err( NULL, "couldn't bind" ); + PrintSocket( "socket definition:", &bind_addr, &connect_addr ); + exit(EXIT_FAILURE); + } + + /* Join the multicast group if the socket is a multicast address */ + if ( IN_MULTICAST( ntohl(bind_addr.sin_addr.s_addr)) ) + { + struct ip_mreq imr; + + imr.imr_multiaddr.s_addr = bind_addr.sin_addr.s_addr; + imr.imr_interface.s_addr = INADDR_ANY; /* FIXME could be an option */ + + /* Join Multicast group without source filter */ + if ( setsockopt( i_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, + (char *)&imr, sizeof(struct ip_mreq) ) == -1 ) + { + msg_Err( NULL, "couldn't join multicast group" ); + PrintSocket( "socket definition:", &bind_addr, &connect_addr ); + exit(EXIT_FAILURE); + } + } + + if ( connect_addr.sin_addr.s_addr ) + { + if ( connect( i_fd, (struct sockaddr *)&connect_addr, + sizeof(connect_addr) ) < 0 ) + { + msg_Err( NULL, "cannot connect socket (%s)", + strerror(errno) ); + PrintSocket( "socket definition:", &bind_addr, &connect_addr ); + exit(EXIT_FAILURE); + } + + if ( IN_MULTICAST( ntohl(connect_addr.sin_addr.s_addr) ) && i_ttl ) + { + if ( setsockopt( i_fd, IPPROTO_IP, IP_MULTICAST_TTL, + (void *)&i_ttl, sizeof(i_ttl) ) == -1 ) + { + msg_Err( NULL, "couldn't set TTL" ); + PrintSocket( "socket definition:", &bind_addr, &connect_addr ); + exit(EXIT_FAILURE); + } + } + } + + return i_fd; +} + +/***************************************************************************** + * OpenFile: parse argv and open file descriptors + *****************************************************************************/ +int OpenFile( const char *psz_arg, bool b_read, bool b_append, bool *pb_stream ) +{ + struct stat sb; + int i_fd; + int i_mode = b_read ? O_RDONLY : O_WRONLY; + + if ( stat( psz_arg, &sb ) < 0 ) + { + if ( b_read ) + { + msg_Err( NULL, "file %s doesn't exist (%s)", psz_arg, + strerror(errno) ); + exit(EXIT_FAILURE); + } + *pb_stream = false; + i_mode |= O_CREAT; + } + else if ( S_ISCHR(sb.st_mode) || S_ISFIFO(sb.st_mode) ) + { + *pb_stream = true; + } + else + { + *pb_stream = false; + if ( !b_read ) + { + if ( b_append ) + i_mode |= O_APPEND; + else + i_mode |= O_TRUNC; + } + } + + if ( (i_fd = open( psz_arg, i_mode, 0644 )) < 0 ) + { + msg_Err( NULL, "couldn't open file %s (%s)", psz_arg, strerror(errno) ); + exit(EXIT_FAILURE); + } + + return i_fd; +} + +/***************************************************************************** + * OpenAuxFile + *****************************************************************************/ +FILE *OpenAuxFile( const char *psz_arg, bool b_read, bool b_append ) +{ + char psz_aux[strlen(psz_arg) + strlen(PSZ_AUX_EXT) + 2]; + char *psz_token; + FILE *p_aux; + + strcpy( psz_aux, psz_arg ); + psz_token = strrchr( psz_aux, '.' ); + if ( psz_token ) *psz_token = '\0'; + strcat( psz_aux, "." PSZ_AUX_EXT ); + + if ( (p_aux = fopen( psz_aux, + b_read ? "rb" : (b_append ? "ab" : "wb") )) < 0 ) + { + msg_Err( NULL, "couldn't open file %s (%s)", psz_aux, + strerror(errno) ); + exit(EXIT_FAILURE); + } + + return p_aux; +} @@ -0,0 +1,158 @@ +/***************************************************************************** + * util.h: Utils for the multicat suite + ***************************************************************************** + * Copyright (C) 2009 VideoLAN + * $Id: multicat.h 65 2009-11-15 22:57:53Z massiot $ + * + * Authors: Christophe Massiot <massiot@via.ecp.fr> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA. + *****************************************************************************/ + +#define HAVE_CLOCK_NANOSLEEP + +#define DEFAULT_PORT 1234 +#define DEFAULT_PAYLOAD_SIZE 1316 +#define TS_SIZE 188 +#define RTP_HEADER_SIZE 12 + +#define VERB_DBG 3 +#define VERB_INFO 2 +#define VERB_WARN 1 + +/***************************************************************************** + * Prototypes + *****************************************************************************/ +void msg_Info( void *_unused, const char *psz_format, ... ); +void msg_Err( void *_unused, const char *psz_format, ... ); +void msg_Warn( void *_unused, const char *psz_format, ... ); +void msg_Dbg( void *_unused, const char *psz_format, ... ); +void msg_Raw( void *_unused, const char *psz_format, ... ); +uint64_t wall_Date( void ); +void wall_Sleep( uint64_t i_delay ); +int OpenSocket( const char *_psz_arg, int i_ttl, unsigned int *pi_weight ); +int OpenFile( const char *psz_arg, bool b_read, bool b_append, + bool *pb_stream ); +FILE *OpenAuxFile( const char *psz_arg, bool b_read, bool b_append ); + +/***************************************************************************** + * Miscellaneous RTP handlers + *****************************************************************************/ +/* + * Reminder : RTP header + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |V=2|P|X| CC |M| PT | sequence number | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | timestamp | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | synchronization source (SSRC) identifier | + +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ + | contributing source (CSRC) identifiers | + | .... | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + */ + +static inline bool rtp_CheckHdr( const uint8_t *p_hdr ) +{ + return (p_hdr[0] & 0xc0) == 0x80; +} + +static inline uint8_t rtp_GetType( const uint8_t *p_hdr ) +{ + return p_hdr[1] & 0x7f; +} + +static inline uint32_t rtp_GetTimestamp( uint8_t *p_hdr ) +{ + return (p_hdr[4] << 24) | (p_hdr[5] << 16) | (p_hdr[6] << 8) | p_hdr[7]; +} + +static inline uint8_t *rtp_GetPayload( uint8_t *p_hdr ) +{ + unsigned int i_size = RTP_HEADER_SIZE; + i_size += 4 * (p_hdr[0] & 0xf); + if ( p_hdr[0] & 0x10 ) /* header extension */ + i_size += 4 + (p_hdr[i_size + 2] << 8) + p_hdr[i_size + 3]; + return p_hdr + i_size; +} + +static inline void rtp_SetTimestamp( uint8_t *p_hdr, uint32_t i_timestamp ) +{ + p_hdr[4] = (i_timestamp >> 24) & 0xff; + p_hdr[5] = (i_timestamp >> 16) & 0xff; + p_hdr[6] = (i_timestamp >> 8) & 0xff; + p_hdr[7] = i_timestamp & 0xff; +} + +static inline void rtp_SetSSRC( uint8_t *p_hdr, const uint8_t pi_ssrc[4] ) +{ + p_hdr[8] = pi_ssrc[0]; + p_hdr[9] = pi_ssrc[1]; + p_hdr[10] = pi_ssrc[2]; + p_hdr[11] = pi_ssrc[3]; +} + +static inline void rtp_SetHdr( uint8_t *p_hdr, uint16_t i_rtp_cc ) +{ + p_hdr[0] = 0x80; + p_hdr[1] = 33; /* assume MPEG-2 ts */ + p_hdr[2] = i_rtp_cc >> 8; + p_hdr[3] = i_rtp_cc & 0xff; +} + +/***************************************************************************** + * Miscellaneous TS handlers + *****************************************************************************/ +static inline bool ts_CheckSync( const uint8_t *p_ts ) +{ + return p_ts[0] == 0x47; +} + +static inline uint16_t ts_GetPID( const uint8_t *p_ts ) +{ + return (((uint16_t)p_ts[1] & 0x1f) << 8) | p_ts[2]; +} + +static inline int ts_HasPCR( const uint8_t *p_ts ) +{ + return ( p_ts[3] & 0x20 ) && /* adaptation field present */ + ( p_ts[4] >= 7 ) && /* adaptation field size */ + ( p_ts[5] & 0x10 ); /* has PCR */ +} + +static inline uint64_t ts_GetPCR( const uint8_t *p_ts ) +{ + return ( (uint64_t)p_ts[6] << 25 ) | + ( (uint64_t)p_ts[7] << 17 ) | + ( (uint64_t)p_ts[8] << 9 ) | + ( (uint64_t)p_ts[9] << 1 ) | + ( (uint64_t)p_ts[10] >> 7 ); +} + +static inline uint64_t ts_GetPCRExt( const uint8_t *p_ts ) +{ + return (((uint64_t)p_ts[10] & 1) << 8) | (uint64_t)p_ts[11]; +} + +static inline void ts_Pad( uint8_t *p_ts ) +{ + p_ts[0] = 0x47; + p_ts[1] = 0x1f; + p_ts[2] = 0xff; + p_ts[3] = 0x10; + memset( p_ts + 4, 0xff, TS_SIZE - 4 ); +}; |