/********** This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. (See .) This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA **********/ // Copyright (c) 1996-2007, Live Networks, Inc. All rights reserved // A common framework, used for the "openRTSP" and "playSIP" applications // Implementation #include "playCommon.hh" #include "BasicUsageEnvironment.hh" #include "GroupsockHelper.hh" #ifdef SUPPORT_REAL_RTSP #include "../RealRTSP/include/RealRTSP.hh" #endif #if defined(__WIN32__) || defined(_WIN32) #define snprintf _snprintf #else #include #define USE_SIGNALS 1 #endif // Forward function definitions: void setupStreams(); void startPlayingStreams(); void tearDownStreams(); void closeMediaSinks(); void subsessionAfterPlaying(void* clientData); void subsessionByeHandler(void* clientData); void sessionAfterPlaying(void* clientData = NULL); void sessionTimerHandler(void* clientData); void shutdown(int exitCode = 1); void signalHandlerShutdown(int sig); void checkForPacketArrival(void* clientData); void checkInterPacketGaps(void* clientData); void beginQOSMeasurement(); char const* progName; UsageEnvironment* env; Medium* ourClient = NULL; MediaSession* session = NULL; TaskToken sessionTimerTask = NULL; TaskToken arrivalCheckTimerTask = NULL; TaskToken interPacketGapCheckTimerTask = NULL; TaskToken qosMeasurementTimerTask = NULL; Boolean createReceivers = True; Boolean outputQuickTimeFile = False; Boolean generateMP4Format = False; QuickTimeFileSink* qtOut = NULL; Boolean outputAVIFile = False; AVIFileSink* aviOut = NULL; Boolean audioOnly = False; Boolean videoOnly = False; char const* singleMedium = NULL; int verbosityLevel = 1; // by default, print verbose output double duration = 0; double durationSlop = -1.0; // extra seconds to play at the end double initialSeekTime = 0.0f; double scale = 1.0f; unsigned interPacketGapMaxTime = 0; unsigned totNumPacketsReceived = ~0; // used if checking inter-packet gaps Boolean playContinuously = False; int simpleRTPoffsetArg = -1; Boolean sendOptionsRequest = True; Boolean sendOptionsRequestOnly = False; Boolean oneFilePerFrame = False; Boolean notifyOnPacketArrival = False; Boolean streamUsingTCP = False; portNumBits tunnelOverHTTPPortNum = 0; char* username = NULL; char* password = NULL; char* proxyServerName = NULL; unsigned short proxyServerPortNum = 0; unsigned char desiredAudioRTPPayloadFormat = 0; char* mimeSubtype = NULL; unsigned short movieWidth = 240; // default Boolean movieWidthOptionSet = False; unsigned short movieHeight = 180; // default Boolean movieHeightOptionSet = False; unsigned movieFPS = 15; // default Boolean movieFPSOptionSet = False; char* fileNamePrefix = ""; unsigned fileSinkBufferSize = 20000; unsigned socketInputBufferSize = 0; Boolean packetLossCompensate = False; Boolean syncStreams = False; Boolean generateHintTracks = False; unsigned qosMeasurementIntervalMS = 0; // 0 means: Don't output QOS data unsigned statusCode = 0; struct timeval startTime; void usage() { *env << "Usage: " << progName << " [-p ] [-r|-q|-4|-i] [-a|-v] [-V] [-d ] [-D [-c] [-S ] [-n] [-O]" << (controlConnectionUsesTCP ? " [-t|-T ]" : "") << " [-u " << (allowProxyServers ? " [ []]" : "") << "]" << (supportCodecSelection ? " [-A |-M ]" : "") << " [-s ] [-z ]" << " [-w -h ] [-f ] [-y] [-H] [-Q []] [-F ] [-b ] [-B ] [-I ] [-m] (or " << progName << " -o [-V] )\n"; //##### Add "-R " ##### shutdown(); } int main(int argc, char** argv) { // Begin by setting up our usage environment: TaskScheduler* scheduler = BasicTaskScheduler::createNew(); env = BasicUsageEnvironment::createNew(*scheduler); progName = argv[0]; gettimeofday(&startTime, NULL); #ifdef USE_SIGNALS // Allow ourselves to be shut down gracefully by a SIGHUP or a SIGUSR1: signal(SIGHUP, signalHandlerShutdown); signal(SIGUSR1, signalHandlerShutdown); #endif unsigned short desiredPortNum = 0; // unfortunately we can't use getopt() here, as Windoze doesn't have it while (argc > 2) { char* const opt = argv[1]; if (opt[0] != '-') usage(); switch (opt[1]) { case 'p': { // specify start port number int portArg; if (sscanf(argv[2], "%d", &portArg) != 1) { usage(); } if (portArg <= 0 || portArg >= 65536 || portArg&1) { *env << "bad port number: " << portArg << " (must be even, and in the range (0,65536))\n"; usage(); } desiredPortNum = (unsigned short)portArg; ++argv; --argc; break; } case 'r': { // do not receive data (instead, just 'play' the stream(s)) createReceivers = False; break; } case 'q': { // output a QuickTime file (to stdout) outputQuickTimeFile = True; break; } case '4': { // output a 'mp4'-format file (to stdout) outputQuickTimeFile = True; generateMP4Format = True; break; } case 'i': { // output an AVI file (to stdout) outputAVIFile = True; break; } case 'I': { // specify input interface... NetAddressList addresses(argv[2]); if (addresses.numAddresses() == 0) { *env << "Failed to find network address for \"" << argv[2] << "\""; break; } ReceivingInterfaceAddr = *(unsigned*)(addresses.firstAddress()->data()); ++argv; --argc; break; } case 'a': { // receive/record an audio stream only audioOnly = True; singleMedium = "audio"; break; } case 'v': { // receive/record a video stream only videoOnly = True; singleMedium = "video"; break; } case 'V': { // disable verbose output verbosityLevel = 0; break; } case 'd': { // specify duration, or how much to delay after end time float arg; if (sscanf(argv[2], "%g", &arg) != 1) { usage(); } if (argv[2][0] == '-') { // not "arg<0", in case argv[2] was "-0" // a 'negative' argument was specified; use this for "durationSlop": duration = 0; // use whatever's in the SDP durationSlop = -arg; } else { duration = arg; durationSlop = 0; } ++argv; --argc; break; } case 'D': { // specify maximum number of seconds to wait for packets: if (sscanf(argv[2], "%u", &interPacketGapMaxTime) != 1) { usage(); } ++argv; --argc; break; } case 'c': { // play continuously playContinuously = True; break; } case 'S': { // specify an offset to use with "SimpleRTPSource"s if (sscanf(argv[2], "%d", &simpleRTPoffsetArg) != 1) { usage(); } if (simpleRTPoffsetArg < 0) { *env << "offset argument to \"-S\" must be >= 0\n"; usage(); } ++argv; --argc; break; } case 'O': { // Don't send an "OPTIONS" request before "DESCRIBE" sendOptionsRequest = False; break; } case 'o': { // Send only the "OPTIONS" request to the server sendOptionsRequestOnly = True; break; } case 'm': { // output multiple files - one for each frame oneFilePerFrame = True; break; } case 'n': { // notify the user when the first data packet arrives notifyOnPacketArrival = True; break; } case 't': { // stream RTP and RTCP over the TCP 'control' connection if (controlConnectionUsesTCP) { streamUsingTCP = True; } else { usage(); } break; } case 'T': { // stream RTP and RTCP over a HTTP connection if (controlConnectionUsesTCP) { if (argc > 3 && argv[2][0] != '-') { // The next argument is the HTTP server port number: if (sscanf(argv[2], "%hu", &tunnelOverHTTPPortNum) == 1 && tunnelOverHTTPPortNum > 0) { ++argv; --argc; break; } } } // If we get here, the option was specified incorrectly: usage(); break; } case 'u': { // specify a username and password username = argv[2]; password = argv[3]; argv+=2; argc-=2; if (allowProxyServers && argc > 3 && argv[2][0] != '-') { // The next argument is the name of a proxy server: proxyServerName = argv[2]; ++argv; --argc; if (argc > 3 && argv[2][0] != '-') { // The next argument is the proxy server port number: if (sscanf(argv[2], "%hu", &proxyServerPortNum) != 1) { usage(); } ++argv; --argc; } } break; } case 'A': { // specify a desired audio RTP payload format unsigned formatArg; if (sscanf(argv[2], "%u", &formatArg) != 1 || formatArg >= 96) { usage(); } desiredAudioRTPPayloadFormat = (unsigned char)formatArg; ++argv; --argc; break; } case 'M': { // specify a MIME subtype for a dynamic RTP payload type mimeSubtype = argv[2]; if (desiredAudioRTPPayloadFormat==0) desiredAudioRTPPayloadFormat =96; ++argv; --argc; break; } case 'w': { // specify a width (pixels) for an output QuickTime or AVI movie if (sscanf(argv[2], "%hu", &movieWidth) != 1) { usage(); } movieWidthOptionSet = True; ++argv; --argc; break; } case 'h': { // specify a height (pixels) for an output QuickTime or AVI movie if (sscanf(argv[2], "%hu", &movieHeight) != 1) { usage(); } movieHeightOptionSet = True; ++argv; --argc; break; } case 'f': { // specify a frame rate (per second) for an output QT or AVI movie if (sscanf(argv[2], "%u", &movieFPS) != 1) { usage(); } movieFPSOptionSet = True; ++argv; --argc; break; } case 'F': { // specify a prefix for the audio and video output files fileNamePrefix = argv[2]; ++argv; --argc; break; } case 'b': { // specify the size of buffers for "FileSink"s if (sscanf(argv[2], "%u", &fileSinkBufferSize) != 1) { usage(); } ++argv; --argc; break; } case 'B': { // specify the size of input socket buffers if (sscanf(argv[2], "%u", &socketInputBufferSize) != 1) { usage(); } ++argv; --argc; break; } // Note: The following option is deprecated, and may someday be removed: case 'l': { // try to compensate for packet loss by repeating frames packetLossCompensate = True; break; } case 'y': { // synchronize audio and video streams syncStreams = True; break; } case 'H': { // generate hint tracks (as well as the regular data tracks) generateHintTracks = True; break; } case 'Q': { // output QOS measurements qosMeasurementIntervalMS = 1000; // default: 1 second if (argc > 3 && argv[2][0] != '-') { // The next argument is the measurement interval, // in multiples of 100 ms if (sscanf(argv[2], "%u", &qosMeasurementIntervalMS) != 1) { usage(); } qosMeasurementIntervalMS *= 100; ++argv; --argc; } break; } case 's': { // specify initial seek time (trick play) float arg; if (sscanf(argv[2], "%g", &arg) != 1 || arg < 0) { usage(); } initialSeekTime = arg; ++argv; --argc; break; } case 'z': { // scale (trick play) float arg; if (sscanf(argv[2], "%g", &arg) != 1 || arg == 0.0f) { usage(); } scale = arg; ++argv; --argc; break; } default: { usage(); break; } } ++argv; --argc; } if (argc != 2) usage(); if (outputQuickTimeFile && outputAVIFile) { *env << "The -i and -q (or -4) flags cannot both be used!\n"; usage(); } Boolean outputCompositeFile = outputQuickTimeFile || outputAVIFile; if (!createReceivers && outputCompositeFile) { *env << "The -r and -q (or -4 or -i) flags cannot both be used!\n"; usage(); } if (outputCompositeFile && !movieWidthOptionSet) { *env << "Warning: The -q, -4 or -i option was used, but not -w. Assuming a video width of " << movieWidth << " pixels\n"; } if (outputCompositeFile && !movieHeightOptionSet) { *env << "Warning: The -q, -4 or -i option was used, but not -h. Assuming a video height of " << movieHeight << " pixels\n"; } if (outputCompositeFile && !movieFPSOptionSet) { *env << "Warning: The -q, -4 or -i option was used, but not -f. Assuming a video frame rate of " << movieFPS << " frames-per-second\n"; } if (audioOnly && videoOnly) { *env << "The -a and -v flags cannot both be used!\n"; usage(); } if (sendOptionsRequestOnly && !sendOptionsRequest) { *env << "The -o and -O flags cannot both be used!\n"; usage(); } if (tunnelOverHTTPPortNum > 0) { if (streamUsingTCP) { *env << "The -t and -T flags cannot both be used!\n"; usage(); } else { streamUsingTCP = True; } } if (!createReceivers && notifyOnPacketArrival) { *env << "Warning: Because we're not receiving stream data, the -n flag has no effect\n"; } if (durationSlop < 0) { // This parameter wasn't set, so use a default value. // If we're measuring QOS stats, then don't add any slop, to avoid // having 'empty' measurement intervals at the end. durationSlop = qosMeasurementIntervalMS > 0 ? 0.0 : 5.0; } char* url = argv[1]; // Create our client object: ourClient = createClient(*env, verbosityLevel, progName); if (ourClient == NULL) { *env << "Failed to create " << clientProtocolName << " client: " << env->getResultMsg() << "\n"; shutdown(); } if (sendOptionsRequest) { // Begin by sending an "OPTIONS" command: char* optionsResponse = getOptionsResponse(ourClient, url, username, password); if (sendOptionsRequestOnly) { if (optionsResponse == NULL) { *env << clientProtocolName << " \"OPTIONS\" request failed: " << env->getResultMsg() << "\n"; } else { *env << clientProtocolName << " \"OPTIONS\" request returned: " << optionsResponse << "\n"; } shutdown(); } delete[] optionsResponse; } // Open the URL, to get a SDP description: char* sdpDescription = getSDPDescriptionFromURL(ourClient, url, username, password, proxyServerName, proxyServerPortNum, desiredPortNum); if (sdpDescription == NULL) { *env << "Failed to get a SDP description from URL \"" << url << "\": " << env->getResultMsg() << "\n"; shutdown(); } *env << "Opened URL \"" << url << "\", returning a SDP description:\n" << sdpDescription << "\n"; // Create a media session object from this SDP description: session = MediaSession::createNew(*env, sdpDescription); delete[] sdpDescription; if (session == NULL) { *env << "Failed to create a MediaSession object from the SDP description: " << env->getResultMsg() << "\n"; shutdown(); } else if (!session->hasSubsessions()) { *env << "This session has no media subsessions (i.e., \"m=\" lines)\n"; shutdown(); } // Then, setup the "RTPSource"s for the session: MediaSubsessionIterator iter(*session); MediaSubsession *subsession; Boolean madeProgress = False; char const* singleMediumToTest = singleMedium; while ((subsession = iter.next()) != NULL) { // If we've asked to receive only a single medium, then check this now: if (singleMediumToTest != NULL) { if (strcmp(subsession->mediumName(), singleMediumToTest) != 0) { *env << "Ignoring \"" << subsession->mediumName() << "/" << subsession->codecName() << "\" subsession, because we've asked to receive a single " << singleMedium << " session only\n"; continue; } else { // Receive this subsession only singleMediumToTest = "xxxxx"; // this hack ensures that we get only 1 subsession of this type } } if (desiredPortNum != 0) { subsession->setClientPortNum(desiredPortNum); desiredPortNum += 2; } if (createReceivers) { if (!subsession->initiate(simpleRTPoffsetArg)) { *env << "Unable to create receiver for \"" << subsession->mediumName() << "/" << subsession->codecName() << "\" subsession: " << env->getResultMsg() << "\n"; } else { *env << "Created receiver for \"" << subsession->mediumName() << "/" << subsession->codecName() << "\" subsession (client ports " << subsession->clientPortNum() << "-" << subsession->clientPortNum()+1 << ")\n"; madeProgress = True; if (subsession->rtpSource() != NULL) { // Because we're saving the incoming data, rather than playing // it in real time, allow an especially large time threshold // (1 second) for reordering misordered incoming packets: unsigned const thresh = 1000000; // 1 second subsession->rtpSource()->setPacketReorderingThresholdTime(thresh); if (socketInputBufferSize > 0) { // Set the RTP source's input buffer size as specified: int socketNum = subsession->rtpSource()->RTPgs()->socketNum(); unsigned curBufferSize = getReceiveBufferSize(*env, socketNum); unsigned newBufferSize = setReceiveBufferTo(*env, socketNum, socketInputBufferSize); *env << "Changed socket receive buffer size for the \"" << subsession->mediumName() << "/" << subsession->codecName() << "\" subsession from " << curBufferSize << " to " << newBufferSize << " bytes\n"; } } } } else { if (subsession->clientPortNum() == 0) { *env << "No client port was specified for the \"" << subsession->mediumName() << "/" << subsession->codecName() << "\" subsession. (Try adding the \"-p \" option.)\n"; } else { madeProgress = True; } } } if (!madeProgress) shutdown(); // Perform additional 'setup' on each subsession, before playing them: setupStreams(); // Create output files: if (createReceivers) { if (outputQuickTimeFile) { // Create a "QuickTimeFileSink", to write to 'stdout': qtOut = QuickTimeFileSink::createNew(*env, *session, "stdout", fileSinkBufferSize, movieWidth, movieHeight, movieFPS, packetLossCompensate, syncStreams, generateHintTracks, generateMP4Format); if (qtOut == NULL) { *env << "Failed to create QuickTime file sink for stdout: " << env->getResultMsg(); shutdown(); } qtOut->startPlaying(sessionAfterPlaying, NULL); } else if (outputAVIFile) { // Create an "AVIFileSink", to write to 'stdout': aviOut = AVIFileSink::createNew(*env, *session, "stdout", fileSinkBufferSize, movieWidth, movieHeight, movieFPS, packetLossCompensate); if (aviOut == NULL) { *env << "Failed to create AVI file sink for stdout: " << env->getResultMsg(); shutdown(); } aviOut->startPlaying(sessionAfterPlaying, NULL); #ifdef SUPPORT_REAL_RTSP } else if (session->isRealNetworksRDT) { // For RealNetworks' sessions, we create a single output file, // named "output.rm". char outFileName[1000]; if (singleMedium == NULL) { snprintf(outFileName, sizeof outFileName, "%soutput.rm", fileNamePrefix); } else { // output to 'stdout' as normal, even though we actually output all media sprintf(outFileName, "stdout"); } FileSink* fileSink = FileSink::createNew(*env, outFileName, fileSinkBufferSize, oneFilePerFrame); // The output file needs to begin with a special 'RMFF' header, // in order for it to be usable. Write this header first: unsigned headerSize; unsigned char* headerData = RealGenerateRMFFHeader(session, headerSize); struct timeval timeNow; gettimeofday(&timeNow, NULL); fileSink->addData(headerData, headerSize, timeNow); delete[] headerData; // Start playing the output file from the first subsession. // (Hack: Because all subsessions' data is actually multiplexed on the // single RTSP TCP connection, playing from one subsession is sufficient.) iter.reset(); madeProgress = False; while ((subsession = iter.next()) != NULL) { if (subsession->readSource() == NULL) continue; // was not initiated fileSink->startPlaying(*(subsession->readSource()), subsessionAfterPlaying, subsession); madeProgress = True; break; // play from one subsession only } if (!madeProgress) shutdown(); #endif } else { // Create and start "FileSink"s for each subsession: madeProgress = False; iter.reset(); while ((subsession = iter.next()) != NULL) { if (subsession->readSource() == NULL) continue; // was not initiated // Create an output file for each desired stream: char outFileName[1000]; if (singleMedium == NULL) { // Output file name is // "--" static unsigned streamCounter = 0; snprintf(outFileName, sizeof outFileName, "%s%s-%s-%d", fileNamePrefix, subsession->mediumName(), subsession->codecName(), ++streamCounter); } else { sprintf(outFileName, "stdout"); } FileSink* fileSink; if (strcmp(subsession->mediumName(), "audio") == 0 && (strcmp(subsession->codecName(), "AMR") == 0 || strcmp(subsession->codecName(), "AMR-WB") == 0)) { // For AMR audio streams, we use a special sink that inserts AMR frame hdrs: fileSink = AMRAudioFileSink::createNew(*env, outFileName, fileSinkBufferSize, oneFilePerFrame); } else if (strcmp(subsession->mediumName(), "video") == 0 && (strcmp(subsession->codecName(), "H264") == 0)) { // For H.264 video stream, we use a special sink that insert start_codes: fileSink = H264VideoFileSink::createNew(*env, outFileName, fileSinkBufferSize, oneFilePerFrame); } else { // Normal case: fileSink = FileSink::createNew(*env, outFileName, fileSinkBufferSize, oneFilePerFrame); } subsession->sink = fileSink; if (subsession->sink == NULL) { *env << "Failed to create FileSink for \"" << outFileName << "\": " << env->getResultMsg() << "\n"; } else { if (singleMedium == NULL) { *env << "Created output file: \"" << outFileName << "\"\n"; } else { *env << "Outputting data from the \"" << subsession->mediumName() << "/" << subsession->codecName() << "\" subsession to 'stdout'\n"; } if (strcmp(subsession->mediumName(), "video") == 0 && strcmp(subsession->codecName(), "MP4V-ES") == 0 && subsession->fmtp_config() != NULL) { // For MPEG-4 video RTP streams, the 'config' information // from the SDP description contains useful VOL etc. headers. // Insert this data at the front of the output file: unsigned configLen; unsigned char* configData = parseGeneralConfigStr(subsession->fmtp_config(), configLen); struct timeval timeNow; gettimeofday(&timeNow, NULL); fileSink->addData(configData, configLen, timeNow); delete[] configData; } subsession->sink->startPlaying(*(subsession->readSource()), subsessionAfterPlaying, subsession); // Also set a handler to be called if a RTCP "BYE" arrives // for this subsession: if (subsession->rtcpInstance() != NULL) { subsession->rtcpInstance()->setByeHandler(subsessionByeHandler, subsession); } madeProgress = True; } } if (!madeProgress) shutdown(); } } // Finally, start playing each subsession, to start the data flow: startPlayingStreams(); env->taskScheduler().doEventLoop(); // does not return return 0; // only to prevent compiler warning } void setupStreams() { MediaSubsessionIterator iter(*session); MediaSubsession *subsession; Boolean madeProgress = False; while ((subsession = iter.next()) != NULL) { if (subsession->clientPortNum() == 0) continue; // port # was not set if (!clientSetupSubsession(ourClient, subsession, streamUsingTCP)) { *env << "Failed to setup \"" << subsession->mediumName() << "/" << subsession->codecName() << "\" subsession: " << env->getResultMsg() << "\n"; } else { *env << "Setup \"" << subsession->mediumName() << "/" << subsession->codecName() << "\" subsession (client ports " << subsession->clientPortNum() << "-" << subsession->clientPortNum()+1 << ")\n"; madeProgress = True; } } if (!madeProgress) shutdown(); } void startPlayingStreams() { if (duration == 0) { if (scale > 0) duration = session->playEndTime() - initialSeekTime; // use SDP end time else if (scale < 0) duration = initialSeekTime; } if (duration < 0) duration = 0.0; if (!clientStartPlayingSession(ourClient, session)) { *env << "Failed to start playing session: " << env->getResultMsg() << "\n"; shutdown(); } else { *env << "Started playing session\n"; } if (qosMeasurementIntervalMS > 0) { // Begin periodic QOS measurements: beginQOSMeasurement(); } // Figure out how long to delay (if at all) before shutting down, or // repeating the playing Boolean timerIsBeingUsed = False; double secondsToDelay = duration; if (duration > 0) { double const maxDelayTime = (double)( ((unsigned)0x7FFFFFFF)/1000000.0 ); if (duration > maxDelayTime) { *env << "Warning: specified end time " << duration << " exceeds maximum " << maxDelayTime << "; will not do a delayed shutdown\n"; } else { timerIsBeingUsed = True; double absScale = scale > 0 ? scale : -scale; // ASSERT: scale != 0 secondsToDelay = duration/absScale + durationSlop; int uSecsToDelay = (int)(secondsToDelay*1000000.0); sessionTimerTask = env->taskScheduler().scheduleDelayedTask( uSecsToDelay, (TaskFunc*)sessionTimerHandler, (void*)NULL); } } char const* actionString = createReceivers? "Receiving streamed data":"Data is being streamed"; if (timerIsBeingUsed) { *env << actionString << " (for up to " << secondsToDelay << " seconds)...\n"; } else { #ifdef USE_SIGNALS pid_t ourPid = getpid(); *env << actionString << " (signal with \"kill -HUP " << (int)ourPid << "\" or \"kill -USR1 " << (int)ourPid << "\" to terminate)...\n"; #else *env << actionString << "...\n"; #endif } // Watch for incoming packets (if desired): checkForPacketArrival(NULL); checkInterPacketGaps(NULL); } void tearDownStreams() { if (session == NULL) return; clientTearDownSession(ourClient, session); } void closeMediaSinks() { Medium::close(qtOut); Medium::close(aviOut); if (session == NULL) return; MediaSubsessionIterator iter(*session); MediaSubsession* subsession; while ((subsession = iter.next()) != NULL) { Medium::close(subsession->sink); subsession->sink = NULL; } } void subsessionAfterPlaying(void* clientData) { // Begin by closing this media subsession's stream: MediaSubsession* subsession = (MediaSubsession*)clientData; Medium::close(subsession->sink); subsession->sink = NULL; // Next, check whether *all* subsessions' streams have now been closed: MediaSession& session = subsession->parentSession(); MediaSubsessionIterator iter(session); while ((subsession = iter.next()) != NULL) { if (subsession->sink != NULL) return; // this subsession is still active } // All subsessions' streams have now been closed sessionAfterPlaying(); } void subsessionByeHandler(void* clientData) { struct timeval timeNow; gettimeofday(&timeNow, NULL); unsigned secsDiff = timeNow.tv_sec - startTime.tv_sec; MediaSubsession* subsession = (MediaSubsession*)clientData; *env << "Received RTCP \"BYE\" on \"" << subsession->mediumName() << "/" << subsession->codecName() << "\" subsession (after " << secsDiff << " seconds)\n"; // Act now as if the subsession had closed: subsessionAfterPlaying(subsession); } void sessionAfterPlaying(void* /*clientData*/) { if (!playContinuously) { shutdown(0); } else { // We've been asked to play the stream(s) over again: startPlayingStreams(); } } void sessionTimerHandler(void* /*clientData*/) { sessionTimerTask = NULL; sessionAfterPlaying(); } class qosMeasurementRecord { public: qosMeasurementRecord(struct timeval const& startTime, RTPSource* src) : fSource(src), fNext(NULL), kbits_per_second_min(1e20), kbits_per_second_max(0), kBytesTotal(0.0), packet_loss_fraction_min(1.0), packet_loss_fraction_max(0.0), totNumPacketsReceived(0), totNumPacketsExpected(0) { measurementEndTime = measurementStartTime = startTime; #ifdef SUPPORT_REAL_RTSP if (session->isRealNetworksRDT) { // hack for RealMedia sessions (RDT, not RTP) RealRDTSource* rdt = (RealRDTSource*)src; kBytesTotal = rdt->totNumKBytesReceived(); totNumPacketsReceived = rdt->totNumPacketsReceived(); totNumPacketsExpected = totNumPacketsReceived; // because we use TCP return; } #endif RTPReceptionStatsDB::Iterator statsIter(src->receptionStatsDB()); // Assume that there's only one SSRC source (usually the case): RTPReceptionStats* stats = statsIter.next(True); if (stats != NULL) { kBytesTotal = stats->totNumKBytesReceived(); totNumPacketsReceived = stats->totNumPacketsReceived(); totNumPacketsExpected = stats->totNumPacketsExpected(); } } virtual ~qosMeasurementRecord() { delete fNext; } void periodicQOSMeasurement(struct timeval const& timeNow); public: RTPSource* fSource; qosMeasurementRecord* fNext; public: struct timeval measurementStartTime, measurementEndTime; double kbits_per_second_min, kbits_per_second_max; double kBytesTotal; double packet_loss_fraction_min, packet_loss_fraction_max; unsigned totNumPacketsReceived, totNumPacketsExpected; }; static qosMeasurementRecord* qosRecordHead = NULL; static void periodicQOSMeasurement(void* clientData); // forward static unsigned nextQOSMeasurementUSecs; static void scheduleNextQOSMeasurement() { nextQOSMeasurementUSecs += qosMeasurementIntervalMS*1000; struct timeval timeNow; gettimeofday(&timeNow, NULL); unsigned timeNowUSecs = timeNow.tv_sec*1000000 + timeNow.tv_usec; unsigned usecsToDelay = nextQOSMeasurementUSecs < timeNowUSecs ? 0 : nextQOSMeasurementUSecs - timeNowUSecs; qosMeasurementTimerTask = env->taskScheduler().scheduleDelayedTask( usecsToDelay, (TaskFunc*)periodicQOSMeasurement, (void*)NULL); } static void periodicQOSMeasurement(void* /*clientData*/) { struct timeval timeNow; gettimeofday(&timeNow, NULL); for (qosMeasurementRecord* qosRecord = qosRecordHead; qosRecord != NULL; qosRecord = qosRecord->fNext) { qosRecord->periodicQOSMeasurement(timeNow); } // Do this again later: scheduleNextQOSMeasurement(); } void qosMeasurementRecord ::periodicQOSMeasurement(struct timeval const& timeNow) { unsigned secsDiff = timeNow.tv_sec - measurementEndTime.tv_sec; int usecsDiff = timeNow.tv_usec - measurementEndTime.tv_usec; double timeDiff = secsDiff + usecsDiff/1000000.0; measurementEndTime = timeNow; #ifdef SUPPORT_REAL_RTSP if (session->isRealNetworksRDT) { // hack for RealMedia sessions (RDT, not RTP) RealRDTSource* rdt = (RealRDTSource*)fSource; double kBytesTotalNow = rdt->totNumKBytesReceived(); double kBytesDeltaNow = kBytesTotalNow - kBytesTotal; kBytesTotal = kBytesTotalNow; double kbpsNow = timeDiff == 0.0 ? 0.0 : 8*kBytesDeltaNow/timeDiff; if (kbpsNow < 0.0) kbpsNow = 0.0; // in case of roundoff error if (kbpsNow < kbits_per_second_min) kbits_per_second_min = kbpsNow; if (kbpsNow > kbits_per_second_max) kbits_per_second_max = kbpsNow; totNumPacketsReceived = rdt->totNumPacketsReceived(); totNumPacketsExpected = totNumPacketsReceived; // because we use TCP packet_loss_fraction_min = packet_loss_fraction_max = 0.0; // ditto return; } #endif RTPReceptionStatsDB::Iterator statsIter(fSource->receptionStatsDB()); // Assume that there's only one SSRC source (usually the case): RTPReceptionStats* stats = statsIter.next(True); if (stats != NULL) { double kBytesTotalNow = stats->totNumKBytesReceived(); double kBytesDeltaNow = kBytesTotalNow - kBytesTotal; kBytesTotal = kBytesTotalNow; double kbpsNow = timeDiff == 0.0 ? 0.0 : 8*kBytesDeltaNow/timeDiff; if (kbpsNow < 0.0) kbpsNow = 0.0; // in case of roundoff error if (kbpsNow < kbits_per_second_min) kbits_per_second_min = kbpsNow; if (kbpsNow > kbits_per_second_max) kbits_per_second_max = kbpsNow; unsigned totReceivedNow = stats->totNumPacketsReceived(); unsigned totExpectedNow = stats->totNumPacketsExpected(); unsigned deltaReceivedNow = totReceivedNow - totNumPacketsReceived; unsigned deltaExpectedNow = totExpectedNow - totNumPacketsExpected; totNumPacketsReceived = totReceivedNow; totNumPacketsExpected = totExpectedNow; double lossFractionNow = deltaExpectedNow == 0 ? 0.0 : 1.0 - deltaReceivedNow/(double)deltaExpectedNow; //if (lossFractionNow < 0.0) lossFractionNow = 0.0; //reordering can cause if (lossFractionNow < packet_loss_fraction_min) { packet_loss_fraction_min = lossFractionNow; } if (lossFractionNow > packet_loss_fraction_max) { packet_loss_fraction_max = lossFractionNow; } } } void beginQOSMeasurement() { // Set up a measurement record for each active subsession: struct timeval startTime; gettimeofday(&startTime, NULL); nextQOSMeasurementUSecs = startTime.tv_sec*1000000 + startTime.tv_usec; qosMeasurementRecord* qosRecordTail = NULL; MediaSubsessionIterator iter(*session); MediaSubsession* subsession; while ((subsession = iter.next()) != NULL) { RTPSource* src = subsession->rtpSource(); #ifdef SUPPORT_REAL_RTSP if (session->isRealNetworksRDT) src = (RTPSource*)(subsession->readSource()); // hack #endif if (src == NULL) continue; qosMeasurementRecord* qosRecord = new qosMeasurementRecord(startTime, src); if (qosRecordHead == NULL) qosRecordHead = qosRecord; if (qosRecordTail != NULL) qosRecordTail->fNext = qosRecord; qosRecordTail = qosRecord; } // Then schedule the first of the periodic measurements: scheduleNextQOSMeasurement(); } void printQOSData(int exitCode) { if (exitCode != 0 && statusCode == 0) statusCode = 2; *env << "begin_QOS_statistics\n"; *env << "server_availability\t" << (statusCode == 1 ? 0 : 100) << "\n"; *env << "stream_availability\t" << (statusCode == 0 ? 100 : 0) << "\n"; // Print out stats for each active subsession: qosMeasurementRecord* curQOSRecord = qosRecordHead; if (session != NULL) { MediaSubsessionIterator iter(*session); MediaSubsession* subsession; while ((subsession = iter.next()) != NULL) { RTPSource* src = subsession->rtpSource(); #ifdef SUPPORT_REAL_RTSP if (session->isRealNetworksRDT) src = (RTPSource*)(subsession->readSource()); // hack #endif if (src == NULL) continue; *env << "subsession\t" << subsession->mediumName() << "/" << subsession->codecName() << "\n"; unsigned numPacketsReceived = 0, numPacketsExpected = 0; if (curQOSRecord != NULL) { numPacketsReceived = curQOSRecord->totNumPacketsReceived; numPacketsExpected = curQOSRecord->totNumPacketsExpected; } *env << "num_packets_received\t" << numPacketsReceived << "\n"; *env << "num_packets_lost\t" << numPacketsExpected - numPacketsReceived << "\n"; if (curQOSRecord != NULL) { unsigned secsDiff = curQOSRecord->measurementEndTime.tv_sec - curQOSRecord->measurementStartTime.tv_sec; int usecsDiff = curQOSRecord->measurementEndTime.tv_usec - curQOSRecord->measurementStartTime.tv_usec; double measurementTime = secsDiff + usecsDiff/1000000.0; *env << "elapsed_measurement_time\t" << measurementTime << "\n"; *env << "kBytes_received_total\t" << curQOSRecord->kBytesTotal << "\n"; *env << "measurement_sampling_interval_ms\t" << qosMeasurementIntervalMS << "\n"; if (curQOSRecord->kbits_per_second_max == 0) { // special case: we didn't receive any data: *env << "kbits_per_second_min\tunavailable\n" "kbits_per_second_ave\tunavailable\n" "kbits_per_second_max\tunavailable\n"; } else { *env << "kbits_per_second_min\t" << curQOSRecord->kbits_per_second_min << "\n"; *env << "kbits_per_second_ave\t" << (measurementTime == 0.0 ? 0.0 : 8*curQOSRecord->kBytesTotal/measurementTime) << "\n"; *env << "kbits_per_second_max\t" << curQOSRecord->kbits_per_second_max << "\n"; } *env << "packet_loss_percentage_min\t" << 100*curQOSRecord->packet_loss_fraction_min << "\n"; double packetLossFraction = numPacketsExpected == 0 ? 1.0 : 1.0 - numPacketsReceived/(double)numPacketsExpected; if (packetLossFraction < 0.0) packetLossFraction = 0.0; *env << "packet_loss_percentage_ave\t" << 100*packetLossFraction << "\n"; *env << "packet_loss_percentage_max\t" << (packetLossFraction == 1.0 ? 100.0 : 100*curQOSRecord->packet_loss_fraction_max) << "\n"; #ifdef SUPPORT_REAL_RTSP if (session->isRealNetworksRDT) { RealRDTSource* rdt = (RealRDTSource*)src; *env << "inter_packet_gap_ms_min\t" << rdt->minInterPacketGapUS()/1000.0 << "\n"; struct timeval totalGaps = rdt->totalInterPacketGaps(); double totalGapsMS = totalGaps.tv_sec*1000.0 + totalGaps.tv_usec/1000.0; unsigned totNumPacketsReceived = rdt->totNumPacketsReceived(); *env << "inter_packet_gap_ms_ave\t" << (totNumPacketsReceived == 0 ? 0.0 : totalGapsMS/totNumPacketsReceived) << "\n"; *env << "inter_packet_gap_ms_max\t" << rdt->maxInterPacketGapUS()/1000.0 << "\n"; } else { #endif RTPReceptionStatsDB::Iterator statsIter(src->receptionStatsDB()); // Assume that there's only one SSRC source (usually the case): RTPReceptionStats* stats = statsIter.next(True); if (stats != NULL) { *env << "inter_packet_gap_ms_min\t" << stats->minInterPacketGapUS()/1000.0 << "\n"; struct timeval totalGaps = stats->totalInterPacketGaps(); double totalGapsMS = totalGaps.tv_sec*1000.0 + totalGaps.tv_usec/1000.0; unsigned totNumPacketsReceived = stats->totNumPacketsReceived(); *env << "inter_packet_gap_ms_ave\t" << (totNumPacketsReceived == 0 ? 0.0 : totalGapsMS/totNumPacketsReceived) << "\n"; *env << "inter_packet_gap_ms_max\t" << stats->maxInterPacketGapUS()/1000.0 << "\n"; } #ifdef SUPPORT_REAL_RTSP } #endif curQOSRecord = curQOSRecord->fNext; } } } *env << "end_QOS_statistics\n"; delete qosRecordHead; } void shutdown(int exitCode) { if (env != NULL) { env->taskScheduler().unscheduleDelayedTask(sessionTimerTask); env->taskScheduler().unscheduleDelayedTask(arrivalCheckTimerTask); env->taskScheduler().unscheduleDelayedTask(interPacketGapCheckTimerTask); env->taskScheduler().unscheduleDelayedTask(qosMeasurementTimerTask); } if (qosMeasurementIntervalMS > 0) { printQOSData(exitCode); } // Close our output files: closeMediaSinks(); // Teardown, then shutdown, any outstanding RTP/RTCP subsessions tearDownStreams(); Medium::close(session); // Finally, shut down our client: Medium::close(ourClient); // Adios... exit(exitCode); } void signalHandlerShutdown(int /*sig*/) { *env << "Got shutdown signal\n"; shutdown(0); } void checkForPacketArrival(void* /*clientData*/) { if (!notifyOnPacketArrival) return; // we're not checking // Check each subsession, to see whether it has received data packets: unsigned numSubsessionsChecked = 0; unsigned numSubsessionsWithReceivedData = 0; unsigned numSubsessionsThatHaveBeenSynced = 0; MediaSubsessionIterator iter(*session); MediaSubsession* subsession; while ((subsession = iter.next()) != NULL) { RTPSource* src = subsession->rtpSource(); if (src == NULL) continue; ++numSubsessionsChecked; if (src->receptionStatsDB().numActiveSourcesSinceLastReset() > 0) { // At least one data packet has arrived ++numSubsessionsWithReceivedData; } if (src->hasBeenSynchronizedUsingRTCP()) { ++numSubsessionsThatHaveBeenSynced; } } unsigned numSubsessionsToCheck = numSubsessionsChecked; // Special case for "QuickTimeFileSink"s and "AVIFileSink"s: // They might not use all of the input sources: if (qtOut != NULL) { numSubsessionsToCheck = qtOut->numActiveSubsessions(); } else if (aviOut != NULL) { numSubsessionsToCheck = aviOut->numActiveSubsessions(); } Boolean notifyTheUser; if (!syncStreams) { notifyTheUser = numSubsessionsWithReceivedData > 0; // easy case } else { notifyTheUser = numSubsessionsWithReceivedData >= numSubsessionsToCheck && numSubsessionsThatHaveBeenSynced == numSubsessionsChecked; // Note: A subsession with no active sources is considered to be synced } if (notifyTheUser) { struct timeval timeNow; gettimeofday(&timeNow, NULL); char timestampStr[100]; sprintf(timestampStr, "%ld%03ld", timeNow.tv_sec, timeNow.tv_usec/1000); *env << (syncStreams ? "Synchronized d" : "D") << "ata packets have begun arriving [" << timestampStr << "]\007\n"; return; } // No luck, so reschedule this check again, after a delay: int uSecsToDelay = 100000; // 100 ms arrivalCheckTimerTask = env->taskScheduler().scheduleDelayedTask(uSecsToDelay, (TaskFunc*)checkForPacketArrival, NULL); } void checkInterPacketGaps(void* /*clientData*/) { if (interPacketGapMaxTime == 0) return; // we're not checking // Check each subsession, counting up how many packets have been received: unsigned newTotNumPacketsReceived = 0; MediaSubsessionIterator iter(*session); MediaSubsession* subsession; while ((subsession = iter.next()) != NULL) { RTPSource* src = subsession->rtpSource(); if (src == NULL) continue; newTotNumPacketsReceived += src->receptionStatsDB().totNumPacketsReceived(); } if (newTotNumPacketsReceived == totNumPacketsReceived) { // No additional packets have been received since the last time we // checked, so end this stream: *env << "Closing session, because we stopped receiving packets.\n"; interPacketGapCheckTimerTask = NULL; sessionAfterPlaying(); } else { totNumPacketsReceived = newTotNumPacketsReceived; // Check again, after the specified delay: interPacketGapCheckTimerTask = env->taskScheduler().scheduleDelayedTask(interPacketGapMaxTime*1000000, (TaskFunc*)checkInterPacketGaps, NULL); } }