/* * Copyright (c) 1996 The Regents of the University of California. * All rights reserved. * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without written agreement * is hereby granted, provided that the above copyright notice and the * following two paragraphs appear in all copies of this software. * * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING * OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE * UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. * * * $Id: monitor.cc,v 1.24 1997/01/13 23:31:40 aswan Exp $ */ #include #include #include #include #include #include "Tcl.h" //#include "crypt.h" #include "config.h" #include "member.h" #include "rtp.h" #include "monitor.h" #ifndef HAVE_SNPRINTF extern "C" { int snprintf(char* buf, int s, const char* fmt, ...); } #endif // forget about source if no control message for 6 report intervals #define CTRL_IDLE 6 #define HASH(a) ((int)((((a) >> 20) ^ ((a) >> 10) ^ (a)) & (HASH_SIZE-1))) static class MonitorMatcher : public Matcher { public: MonitorMatcher() : Matcher("monitor") {} TclObject* match(const char*) { return (new Monitor()); } } session_matcher; UpdateTimer::UpdateTimer(Monitor& m) : m_(m), inactive_(1) { Tcl& tcl = Tcl::instance(); const char* bwstr = tcl.attr("sessbw"); bw_ = (bwstr == 0) ? 64. : atof(bwstr); // Convert session bandwidth from kb/s to bytes/sec (*128) // and then to control b/w (* 5%) bw_ *= 128. * .05; } #define TIMER_INTERVAL 2000 void UpdateTimer::timeout() { // update median m_.median(); if (--inactive_ == 0) { int newinterval = m_.CheckActive(bw_); inactive_ = newinterval / TIMER_INTERVAL; } msched(TIMER_INTERVAL); } Monitor::Monitor() : net_(0), nrunt_(0), badversion_(0), badpt_(0), nmembers_(0),members_(0), senders_(0), last_(0), median_(0), avgpktsize_(0), bits_(0), sorttype_(SORT_MAX), stat_(0), thresh_(0), ut_(*this) { pktbuf_ = new u_char[2 * RTP_MTU]; memset((char*)hashtab_, 0, sizeof(hashtab_)); // start the timer firing every 2 seconds ut_.msched(TIMER_INTERVAL); } Monitor::~Monitor() { delete pktbuf_; } int Monitor::command(int argc, const char*const* argv) { Tcl& tcl = Tcl::instance(); static char *work; static int worksize = 0; if (worksize == 0) { worksize = 1024; work = new char[worksize]; } int size = worksize; if (argc == 2) { if (strcmp(argv[1], "clean") == 0) { for (Member* m = members_; m != NULL; m = m->next_) { if (! m->active()) { /* * remove this member */ Member** p = &hashtab_[HASH(m->srcid())]; while (*p != m) p = &(*p)->hlink_; *p = (*p)->hlink_; if (m->sender()) { p = &senders_; while (*p != 0 && *p != m) p = &(*p)->snext_; if (*p != 0) *p = (*p)->snext_; } p = &members_; while (*p != m) p = &(*p)->next_; if (*p == last_) { for (Member* t = members_; t != 0; t = t->next_) if (t->next_ == 0) last_ = t; } *p = (*p)->next_; if (median_ == m) median_ = 0; /* XXX do something more intelligent */ delete m; --nmembers_; } if (m->locked()) { for (Member* s = senders_; s != 0; s = s->snext_) { int n = s->pos(); if ((stat_ == 0 && m->loss(n) < thresh_) || (stat_ == 1 && m->filtloss(n) < thresh_) || (stat_ == 2 && m->jitter(n) < thresh_)) m->locked(false); } } } } if (strcmp(argv[1], "inactive") == 0) { Member* m; char *p = work; *p = 0; for (m=members_; m!=NULL; m=m->next_) { if (! m->active()) { strncpy(p, m->name(), size); int len = strlen(p); size -= len+1; p += len; *p++ = ' '; *p = 0; if (size < 2) { int newsize = worksize << 1; char* newwork = new char[worksize]; memcpy(newwork, work, worksize); worksize = newsize; p = newwork + (p - work); delete[] work; work = newwork; } } } tcl.result(work); return(TCL_OK); } if (strcmp(argv[1], "listeners") == 0) { char *p = work; *p = 0; for (Member* m=members_; m!=NULL; m=m->next_) { if (!m->locked()) continue; strncpy(p, m->name(), size); int len = strlen(p); size -= len+1; p += len; *p++ = ' '; *p = 0; if (size < 2) { int newsize = worksize << 1; char* newwork = new char[worksize]; memcpy(newwork, work, worksize); worksize = newsize; p = newwork + (p - work); delete[] work; work = newwork; } } tcl.result(work); return (TCL_OK); } if (strcmp(argv[1], "senders") == 0) { char *p = work; *p = 0; for (Member* s = senders_; s != 0; s = s->snext_) { strncpy(p, s->name(), size); int len = strlen(p); size -= len+1; p += len; *p++ = ' '; *p = 0; if (size < 2) { int newsize = worksize << 1; char* newwork = new char[worksize]; memcpy(newwork, work, worksize); worksize = newsize; p = newwork + (p - work); delete[] work; work = newwork; } } tcl.result(work); return (TCL_OK); } if (strcmp(argv[1], "sort") == 0) { sort(); tcl.result(""); return (TCL_OK); } if (strcmp(argv[1], "statnames") == 0) { tcl.result("{loss \"Packet Loss\"} {filtlost \"Filtered Packet Loss\"} {jitter \"Delay Jitter\"}"); return (TCL_OK); } if (strcmp(argv[1], "stats") == 0) { char* p = work; int len = snprintf(p, size, "{\"Bad Version\" %d} {\"Unexpected Payload Type\" %d} {\"Runt Packets\" %d} {\"Average Packet Size\" \"%d bytes\"}", badversion_, badpt_, nrunt_, avgpktsize_); tcl.result(work); return(TCL_OK); } } else if (argc == 3) { if (strcmp(argv[1], "ignore") == 0) { Member* m = (Member*)TclObject::lookup(argv[2]); if (!m->sender()) return(TCL_ERROR); m->ignore(true); Member** p = &senders_; while (*p != 0 && *p != m) p = &(*p)->snext_; if (*p != 0) *p = (*p)->snext_; return(TCL_OK); } if (strcmp(argv[1], "net") == 0) { net((Network*)TclObject::lookup(argv[2])); return (TCL_OK); } if (strcmp(argv[1], "sort") == 0) { if (strcmp(argv[2], "max") == 0) sort(SORT_MAX); else if (strcmp(argv[2], "avg") == 0) sort(SORT_AVG); else if (strcmp(argv[2], "address") == 0) sort(SORT_ADDR); else { TclObject* obj = TclObject::lookup(argv[2]); if (obj != 0) { Member* m = (Member*)obj; sort(m->pos()); } } tcl.result(""); return (TCL_OK); } if (strcmp(argv[1], "stat") == 0) { int v = atoi(argv[2]); if (v < 0 || v > 2) return(TCL_ERROR); stat_ = v; Tcl_Interp* interp = Tcl::instance().interp(); char tmpname[32]; for (Member* s = members_; s != 0; s = s->next_) { int n = s->pos(); for (Member* l = members_; l != 0; l = l->next_) { sprintf(tmpname, "%s:%s", l->name(), s->name() ); char val[32]; switch (stat_) { case 0: sprintf(val, "%d %%", (int)l->loss(n)); break; case 1: sprintf(val, "%d %%", (int)l->filtloss(n)); break; case 2: u_int32_t j = l->jitter(n); j = (j&0x0ffff) * 1000 > 16; sprintf(val, "%u ms", j); break; } Tcl_SetVar2(interp, "stats", tmpname, val, TCL_GLOBAL_ONLY); } } tcl.result(""); return(TCL_OK); } if (strcmp(argv[1], "thresh") == 0) { int v = atoi(argv[2]); // XXX this is ugly; fix it later if (stat_ == 2) thresh_ = (v << 16) / 1000; else thresh_ = v; Tcl& tcl = Tcl::instance(); for (Member* m = members_; m != 0; m = m->next_) { if (!m->locked()) { for (Member* s = senders_; s != 0; s = s->snext_) { int n = s->pos(); if ((stat_ == 0 && m->loss(n) >= thresh_) || (stat_ == 1 && m->filtloss(n) >= thresh_) || (stat_ == 2 && m->jitter(n) >= thresh_)) { m->locked(true); tcl.evalf("new-listener %s", m->name()); } } } } return (TCL_OK); } } return (TCL_OK); } Member* Monitor::lookup(u_int32_t ssrc, u_int32_t addr) { int h = HASH(ssrc); Member* m; for (m = hashtab_[h]; m != NULL; m = m->hlink_) { if (m->srcid() == ssrc) return m; } // if called from parse_rr_records, we don't want to // create a new entry so just ignore it for now if (addr == 0) return 0; m = new Member(ssrc, addr); m->active(1); if (members_ == 0) { m->next_ = 0; members_ = last_ = m; } else { m->next_ = 0; last_->next_ = m; last_ = m; } m->hlink_ = hashtab_[h]; hashtab_[h] = m; ++nmembers_; Tcl_Interp* interp = Tcl::instance().interp(); char tmpname[32], objname[32]; sprintf(tmpname, "#%u", ssrc); strcpy(objname, m->name()); Tcl_SetVar2(interp, "names", objname, tmpname, TCL_GLOBAL_ONLY); return m; } int Monitor::getpos() { for (int i=0; i<32; i++) if (!((bits_>>i) & 1)) { bits_ |= 1<recv(pktbuf_, 2 * RTP_MTU, src); if (cc <= 0) return; rtcphdr* rh = (rtcphdr*) pktbuf_; if (cc < (int)sizeof(*rh)) { ++nrunt_; return; } /* * try to filter out junk: first thing in packet must be * sr, rr or bye & version number must be correct. */ switch(ntohs(rh->rh_flags) & 0xc0ff) { case RTP_VERSION << 14 | RTCP_PT_SR: case RTP_VERSION << 14 | RTCP_PT_RR: case RTP_VERSION << 14 | RTCP_PT_BYE: break; default: u_short flags = ntohs(rh->rh_flags); if ((flags>>14)&3 != RTP_VERSION) ++badversion_; else ++badpt_; return; } // update average packet size avgpktsize_ += (cc - avgpktsize_) >> 4; u_int32_t ssrc = rh->rh_ssrc; Member *m = lookup(ssrc, src); m->active(1); /* * Outer loop parses multiple RTCP records of a "compound packet". * There is no framing between records. Boundaries are implicit * and the overall length comes from UDP. */ u_char* epack = (u_char*)rh + cc; while ((u_char*)rh < epack) { u_int len = (ntohs(rh->rh_len) << 2) + 4; u_char* ep = (u_char*)rh + len; if (ep > epack) { m->badlen(1); return; } u_int flags = ntohs(rh->rh_flags); if (flags >> 14 != RTP_VERSION) { m->badver(1); return; } switch (flags & 0xff) { case RTCP_PT_SR: parse_sr(rh, flags, ep, m, src); break; case RTCP_PT_RR: parse_rr(rh, flags, ep, m, src); break; case RTCP_PT_SDES: parse_sdes(rh, flags, ep, m, ssrc, src); break; case RTCP_PT_BYE: parse_bye(rh, flags, ep, m); break; default: m->badopt(1); break; } rh = (rtcphdr*)ep; } } void Monitor::parse_sr(rtcphdr* rh, int flags, u_char* ep, Member* m, u_int32_t addr) { rtcp_sr* sr = (rtcp_sr*)(rh + 1); u_int32_t ssrc = rh->rh_ssrc; if (m->srcid() != ssrc) { m = lookup(ssrc, addr); } time_t tm = time(0); m->ts(tm); #ifdef RTCP_DEBUG Tcl& tcl = Tcl::instance(); tcl.evalf("rtcp_debug \"%u\" \"sr: npkts: %d nbytes: %d\"", ssrc, ntohl(sr->sr_np), ntohl(sr->sr_nb)); #endif int cnt = flags >> 8 & 0x1f; parse_rr_records(ssrc, (rtcp_rr*)(sr + 1), cnt, ep, m, addr); } void Monitor::parse_rr(rtcphdr* rh, int flags, u_char* ep, Member* m, u_int32_t addr) { u_int32_t ssrc = rh->rh_ssrc; if (m->srcid() != ssrc) { m = lookup(ssrc, addr); } time_t tm = time(0); m->ts(tm); int cnt = flags >> 8 & 0x1f; parse_rr_records(ssrc, (rtcp_rr*)(rh + 1), cnt, ep, m, addr); } void Monitor::parse_rr_records(u_int32_t ssrc, rtcp_rr* rr, int cnt, u_char* ep, Member *m, u_int32_t addr) { while ((u_char*)rr < ep) { u_int32_t srcid = rr->rr_srcid; Member* s = lookup(srcid, 0); #ifdef RTCP_DEBUG { Tcl& tcl = Tcl::instance(); const char *p = NULL; if (s != 0) p = s->sdes(RTCP_SDES_CNAME); tcl.evalf("rtcp_debug \"%u\" \"rr: srcid: %u (%s) fracloss: %u%% cumloss: %u ehsr: %u dv: %u lsr: %u dlsr: %u\"", m->srcid(), ntohl(rr->rr_srcid), p ? p : "?", ((ntohl(rr->rr_loss) >> 24) & 0xff) * 100 / 256, (ntohl(rr->rr_loss) & 0xffffff), ntohl(rr->rr_ehsr), ntohl(rr->rr_dv), ntohl(rr->rr_lsr), ntohl(rr->rr_dlsr)); } #endif if (s == 0) { rr++; continue; } if (! s->sender()) { /* * senders list must be kept in order -- add to the * end of the list */ s->snext_ = 0; if (senders_ == 0) senders_ = s; else { Member* sn = senders_; while (sn->snext_ != 0) sn = sn->snext_; sn->snext_ = s; } s->pos(getpos()); s->sender(true); /* * don't update stats yet; wait until we hear at least * twice that this srcid is a sender */ rr++; continue; } int n = s->pos(); m->loss(n, ntohl(rr->rr_loss)&0x0ffffff, ntohl(rr->rr_ehsr)); m->filtloss(n, ntohl(rr->rr_loss) >> 24); u_int32_t jit = ntohl(rr->rr_dv); if ((jit > 0x1000) && ((jit & 0x00f0ffff) == 0)) { jit = (jit & 0xff000000) >> 24 | (jit & 0x000f0000) >> 16; } m->jitter(n, jit); char idx[64]; sprintf(idx, "%s:%s", m->name(), s->name()); char statbuf[32]; bool isnew = false; //XXX this is ugly; fix it later u_char l; switch (stat_) { case 0: // packet loss l = m->loss(n); if (!m->locked() && l >= thresh_) isnew = true; sprintf(statbuf, "%d %%", (int)l ); break; case 1: // filtered packet loss l = m->filtloss(n); if (!m->locked() && l >= thresh_) isnew = true; sprintf(statbuf, "%d %%", (int)m->filtloss(n) ); break; case 2: // jitter u_int32_t j = m->jitter(n); if (!m->locked() && j >= thresh_) isnew = true; j = (j&0x0ffff) * 1000 >> 16; sprintf(statbuf, "%u ms", j); break; } Tcl& tcl = Tcl::instance(); if (isnew) { m->locked(1); tcl.evalf("new-listener %s", m->name()); } Tcl_Interp* interp = tcl.interp(); Tcl_SetVar2(interp, "stats", idx, statbuf, TCL_GLOBAL_ONLY); rr++; } } void Monitor::parse_sdes(rtcphdr* rh, int flags, u_char* ep, Member* m, u_int32_t ssrc, u_int32_t addr) { int cnt = flags >> 8 & 0x1f; u_int32_t* p = (u_int32_t*)&rh->rh_ssrc; while (--cnt >= 0) { int n = sdesbody(p, ep, m, addr); if (n == 0) break; p += n; } if (cnt >= 0) m->badsdes(1); } int Monitor::sdesbody(u_int32_t* p, u_char* ep, Member* m, u_int32_t addr) { u_int32_t srcid = *p; if (m->srcid() != srcid) m = lookup(srcid, addr); if (m == 0) return (0); u_char* cp = (u_char*)(p + 1); while (cp < ep) { u_int type = cp[0]; if (type == 0) { /* end of chunk */ return (((cp - (u_char*)p) >> 2) + 1); } u_int len = cp[1]; u_char* eopt = cp + len + 2; if (eopt > ep) return (0); if (type >= RTCP_SDES_MIN && type <= RTCP_SDES_MAX) { char buf[256]; memcpy(buf, (char*)&cp[2], len); buf[len] = 0; const char* oldval= m->sdes(RTCP_SDES_NAME); char newval[256]; newval[0] = '\0'; if (type == RTCP_SDES_NAME && buf != 0 && (oldval == 0 || strcmp(oldval, buf))) strcpy(newval, buf); oldval = m->sdes(RTCP_SDES_CNAME); if (type == RTCP_SDES_CNAME && buf != 0 && (oldval == 0 || strcmp(oldval, buf))) strcpy(newval, buf); m->sdes(type, buf); if (newval[0] != 0) { char objname[32]; strcpy(objname, m->name()); Tcl_Interp* interp = Tcl::instance().interp(); Tcl_SetVar2(interp, "names", objname, newval, TCL_GLOBAL_ONLY); } } cp = eopt; } return (0); } void Monitor::parse_bye(rtcphdr* rh, int flags, u_char* ep, Member* m) { int cnt = flags >> 8 & 0x1f; u_int32_t* p = (u_int32_t*)&rh->rh_ssrc; while (--cnt >= 0) { if (p >= (u_int32_t*)ep) { //m->badbye(1); return; } if (m->srcid() != rh->rh_ssrc) m = lookup(*p, 0); if (m != 0) m->active(0); ++p; } } void Monitor::sort() { Member* m; int i, b; u_int32_t v; for (m=members_; m!=NULL; m=m->next_) { v = 0; switch(sorttype_) { case SORT_MAX: for (i=0, b=bits_; b!=0; i++, b>>=1) { if (! b&1) continue; if (stat_ == 0 && m->loss(i) > v) v = m->loss(i); if (stat_ == 1 && m->filtloss(i) > v) v = m->filtloss(i); if (stat_ == 2 && m->jitter(i) > v) v = m->jitter(i); } m->val(v); break; case SORT_AVG: for (i=0, b=bits_; b!=0; i++, b>>=1) { if (! b&1) continue; switch (stat_) { case 0: v += m->loss(i); break; case 1: v += m->filtloss(i); break; case 2: v += m->jitter(i); break; } } m->val(v); break; case SORT_ADDR: m->val(~m->addr()); break; default: if (! (bits_>>sorttype_)&1) return; // XXX error switch (stat_) { case 0: m->val(m->loss(sorttype_)); break; case 1: m->val(m->filtloss(sorttype_)); break; case 2: m->val(m->jitter(sorttype_)); break; } } } members_ = dosort(members_); // reset the end pointer for (Member* t = members_; t != 0; t = t->next_) if (t->next_ == 0) last_ = t; } Member* Monitor::dosort(Member* l) { if (l == NULL || l->next_ == NULL) return (l); Member* a = l; Member* b = l->next_; while (b != NULL && b->next_ != NULL) { a = a->next_; b = b->next_->next_; } b = a->next_; a->next_ = NULL; b = dosort(b); a = dosort(l); Member* first; if (a->val() >= b->val()) { first = a; a = a->next_; } else { first = b; b = b->next_; } Member* m = first; while (a != NULL && b != NULL) { if (a->val() >= b->val()) { m->next_ = a; m = a; a = a->next_; } else { m->next_ = b; m = b; b = b->next_; } } if (a != NULL) m->next_ = a; if (b != NULL) m->next_ = b; return (first); } int Monitor::CheckActive(double maxbw) { time_t now = time(0); double idletime = (double(nmembers_) * avgpktsize_) / maxbw; if (idletime < 5.0) idletime = 5.0; u_int maxidle = u_int(idletime) * CTRL_IDLE; Member *m, *next; for (m = members_; m != 0; m = next) { next = m->next_; u_int32_t t = (u_int32_t) m->ts(); if (u_int(now - t) > maxidle) m->active(0); } // Return interval until next re-sort return (int(idletime * 1000.)); } void Monitor::median() { int n, b; Member *less, *more; int nless, nmore; if (nmembers_ == 0) return; Tcl_Interp* interp = Tcl::instance().interp(); for (Member* s = senders_ ; s!= 0; s = s->snext_) { n = s->pos(); less = more = 0; nless = nmore = 0; if (nmembers_ > 2) { u_int32_t pivot; if (median_ == 0) pivot = 5; // XXX else pivot = median_->getstat(stat_, n); u_int32_t max = 0; // do the first separation before calling mselect // since the member list is linked by next, not mnext for (Member* m = members_; m != 0; m = m->next_) { u_int32_t stat = m->getstat(stat_, n); if (stat > max) max = stat; if (stat < pivot) { m->mnext_ = less; less = m; nless++; } else { m->mnext_ = more; more = m; nmore++; } } int i = nmembers_ / 2; if (nless > i) median_ = mselect(less, i, 0, pivot, n); else median_ = mselect(more, i-nless, pivot, max, n); } else { median_ = members_; } char val[32]; switch (stat_) { case 0: case 1: sprintf(val, "%d %%", (int)median_->getstat(stat_, n)); break; case 2: u_int32_t j = median_->jitter(n); j = (j&0x0ffff) * 1000 > 16; sprintf(val, "%u ms", j); break; } Tcl_SetVar2(interp, "median", (char*)s->name(), val, TCL_GLOBAL_ONLY); } } Member* Monitor::mselect(Member* m, int i, int low, int high, int n) { Member *less = 0, *more = 0; int nless = 0, nmore = 0, done = 1; if (m->mnext_ == 0 || high == low) return m; if (high == low+1) return m; // XXX u_int32_t v = m->getstat(stat_, n); u_int32_t pivot = (low + high) / 2; Member *t, *next; for (t = m; t != 0; t = next) { next = t->mnext_; u_int32_t s = t->getstat(stat_, n); if (s != v) done = 0; if (s < pivot) { t->mnext_ = less; less = t; nless++; } else { t->mnext_ = more; more = t; nmore++; } } if (done) return m; return (nless>i) ? mselect(less, i, low, pivot, n) : mselect(more, i-nless, pivot, high, n); }