/* -*-	Mode:C++; c-basic-offset:8; tab-width:8; indent-tabs-mode:t -*- */

/*
 * rap.cc
 * Copyright (C) 1997 by the University of Southern California
 * $Id: rap.cc,v 1.14 2005/09/18 23:33:34 tomh Exp $
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License,
 * version 2, as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
 *
 *
 * The copyright of this module includes the following
 * linking-with-specific-other-licenses addition:
 *
 * In addition, as a special exception, the copyright holders of
 * this module give you permission to combine (via static or
 * dynamic linking) this module with free software programs or
 * libraries that are released under the GNU LGPL and with code
 * included in the standard release of ns-2 under the Apache 2.0
 * license or under otherwise-compatible licenses with advertising
 * requirements (or modified versions of such code, with unchanged
 * license).  You may copy and distribute such a system following the
 * terms of the GNU GPL for this module and the licenses of the
 * other code concerned, provided that you include the source code of
 * that other code when and as the GNU GPL requires distribution of
 * source code.
 *
 * Note that people who make modified versions of this module
 * are not obligated to grant this special exception for their
 * modified versions; it is their choice whether to do so.  The GNU
 * General Public License gives permission to release a modified
 * version without this exception; this exception also makes it
 * possible to release a modified version which carries forward this
 * exception.
 *
 */

//
// rap.cc 
//      Code for the 'RAP Source' Agent Class
//
// Author: 
//   Mohit Talwar (mohit@catarina.usc.edu)
//
// $Header: /nfs/jade/vint/CVSROOT/ns-2/rap/rap.cc,v 1.14 2005/09/18 23:33:34 tomh Exp $

#include "rap.h"

int hdr_rap::offset_; // static offset of RAP header

static class RapHeaderClass : public PacketHeaderClass 
{
public:
	RapHeaderClass() : PacketHeaderClass("PacketHeader/RAP", 
					     sizeof(hdr_rap)) {
		bind_offset(&hdr_rap::offset_);
	}
} class_raphdr;


static class RapClass : public TclClass 
{
public:
	RapClass() : TclClass("Agent/RAP") {}
	TclObject* create(int, const char*const*) 
		{ return (new RapAgent()); }
} class_rap;


void IpgTimer::expire(Event *)
{ 
	a_->timeout(RAP_IPG_TIMEOUT); 
}

void RttTimer::expire(Event *)
{ 
	a_->timeout(RAP_RTT_TIMEOUT); 
}

//----------------------------------------------------------------------
// EqualSeqno
//      Compare TransHistory Entries on the seqno field.
//
//      "i1", "i2" are the TransHistory entries to be compared.
//----------------------------------------------------------------------
 
int EqualSeqno(void *i1, void *i2)
{ 
	return (((TransHistoryEntry *) i1)->seqno == 
		((TransHistoryEntry *) i2)->seqno);
}

//----------------------------------------------------------------------
// RapAgent::RapAgent
//	Initialize the RAP agent.
//      Bind variables which have to be accessed in both Tcl and C++.
//      Initializes time values.
//----------------------------------------------------------------------

RapAgent::RapAgent() : Agent(PT_RAP_DATA), ipgTimer_(this), rttTimer_(this),
 	seqno_(0), sessionLossCount_(0), curseq_(0), ipg_(2.0), srtt_(2.0), 
	timeout_(2.0), lastRecv_(0), lastMiss_(0), prevRecv_(0), dctr_(0), 
	flags_(0), fixIpg_(0)
{
	bind("packetSize_", &size_);	// Default 512
	bind("seqno_", &seqno_);	// Default 0
	bind("sessionLossCount_", &sessionLossCount_); // Default 0

	bind("ipg_", &ipg_);		// Default 2 seconds
	bind("beta_", &beta_);	// Default 0.5
	bind("alpha_", &alpha_);	// Default 1.0

	bind("srtt_", &srtt_);	// Default 2 seconds
	bind("variance_", &variance_);// Default 0
	bind("delta_", &delta_);	// Default 0.5
	bind("mu_", &mu_);		// Default 1.2
	bind("phi_", &phi_);		// Default 4

	bind("timeout_", &timeout_);	// Default 2 seconds

	bind("overhead_", &overhead_); // Default 0

	bind("useFineGrain_", &useFineGrain_); // Default FALSE
	bind("kfrtt_", &kfrtt_);	// Default 0.9
	bind("kxrtt_", &kxrtt_);	// Default 0.01

	bind("debugEnable_", &debugEnable_);	// Default FALSE

	bind("rap_base_hdr_size_", &rap_base_hdr_size_);

	bind("dpthresh_", &dpthresh_);

	frtt_ = xrtt_ = srtt_;
}

// Cancel all our timers before we quit
RapAgent::~RapAgent()
{
//  	fprintf(stderr, "%g: rap agent %s(%d) stops.\n", 
//  		Scheduler::instance().clock(), name(), addr());
//  	Tcl::instance().eval("[Simulator instance] flush-trace");
	stop();
}

//----------------------------------------------------------------------
// RapAgent::UpdateTimeValues
//      Update the values for srtt_, variance_ and timeout_ based on
//      the "sampleRtt". Use Jacobson/Karl's algorithm.
//      
//	"sampleRtt" is the sample  round trip time obtained from the 
//      current ack packet.
//----------------------------------------------------------------------

void RapAgent::UpdateTimeValues(double sampleRtt)
{ 
	double diff;
	static int initial = TRUE;

	if (initial) {
		frtt_ = xrtt_ = srtt_ = sampleRtt; // First sample, no history
		variance_ = 0;
		initial = FALSE;
	}

	diff = sampleRtt - srtt_;
	srtt_ += delta_ * diff;
      
	diff = (diff < 0) ? diff * -1 : diff; // Take mod
	variance_ += delta_ * (diff - variance_);

	timeout_ = mu_ * srtt_ + phi_ * variance_;

	if (useFineGrain_) {
		frtt_ = (1 - kfrtt_) * frtt_ + kfrtt_ * sampleRtt;
		xrtt_ = (1 - kxrtt_) * xrtt_ + kxrtt_ * sampleRtt;
	}

	double debugSrtt = srtt_;	// $%#& stoopid compiler
	Debug(debugEnable_, logfile_, 
	      "- srtt updated to %f\n", debugSrtt);
}

void RapAgent::start()
{
	if (debugEnable_)
		logfile_ = DebugEnable(this->addr() >> 
				       Address::instance().NodeShift_[1]);
	else
		// Should initialize it regardless of whether it'll be used.
		logfile_ = NULL;
	Debug(debugEnable_, logfile_, "%.3f: RAP start.\n", 
	      Scheduler::instance().clock());

	flags_ = flags_ & ~RF_STOP;
	startTime_ = Scheduler::instance().clock();
	RttTimeout();		// Decreases initial IPG
	IpgTimeout();
}

// Used by a sink to listen to incoming packets
void RapAgent::listen()
{
	if (debugEnable_)
		logfile_ = DebugEnable(this->addr() >> 
				       Address::instance().NodeShift_[1]);
}

void RapAgent::stop()
{
	Debug(debugEnable_, logfile_, 
	      "%.3f: RAP stop.\n", Scheduler::instance().clock());
			
	// Cancel the timer only when there is one
	if (ipgTimer_.status() == TIMER_PENDING)
		ipgTimer_.cancel();  
	if (rttTimer_.status() == TIMER_PENDING)
		rttTimer_.cancel();

	stopTime_ = Scheduler::instance().clock();
	int debugSeqno = seqno_;
	Debug(debugEnable_, logfile_, 
	      "- numPackets %d, totalTime %f\n", 
	      debugSeqno, stopTime_ - startTime_);
	flags_ |= RF_STOP;
}

//----------------------------------------------------------------------
// RapAgent::command
//      Called when a Tcl command for the RAP agent is executed.
//      Two commands are supported
//         $rapsource start
//         $rapsource stop
//----------------------------------------------------------------------
int RapAgent::command(int argc, const char*const* argv)
{
	if (argc == 2) {
		if (strcmp(argv[1], "start") == 0) {
			start();
			// return TCL_OK, so the calling function knows that 
			// the command has been processed
			return (TCL_OK);
		} else if (strcmp(argv[1], "stop") == 0) {
			stop();
			return (TCL_OK);
		} else if (strcmp(argv[1], "listen") == 0) {
			listen();
			return (TCL_OK);
		}
	} else if (argc == 3) {
                if (strcmp(argv[1], "advanceby") == 0) {
                        advanceby(atoi(argv[2]));
                        return (TCL_OK);
                }
	}


	// If the command hasn't been processed by RapAgent()::command,
	// call the command() function for the base class
	return (Agent::command(argc, argv));
}

//----------------------------------------------------------------------
// RapAgent::SendPacket
//      Called in IpgTimeout().
//      Create a packet, increase seqno_, send the packet out.
//----------------------------------------------------------------------

void RapAgent::SendPacket(int nbytes, AppData *data)
{
	TransHistoryEntry *pktInfo;
	Packet *pkt;

	type_ = PT_RAP_DATA;
	if (data)
		pkt = allocpkt(data->size()); 
	else 
		pkt = allocpkt();

	// Fill in RAP headers
	hdr_rap* hdr = hdr_rap::access(pkt);
	hdr->seqno() = ++seqno_;	// Start counting from 1;
	hdr->lastRecv = hdr->lastMiss = hdr->prevRecv = 0; // Ignore @ sender
	hdr->flags() = RH_DATA;
	if (data) {
		hdr->size() = data->size();
		pkt->setdata(data);
	} else {
		hdr->size() = size_;
	}
	// XXX Simply set packet size to the given ADU's nominal size. 
	// Make sure that the size is reasonable!!
	hdr_cmn *ch = hdr_cmn::access(pkt);
	ch->size() = nbytes; 

	send(pkt, 0);
	pktInfo = new TransHistoryEntry(seqno_);
	transmissionHistory_.SetInsert((void *) pktInfo, EqualSeqno);
	int debugSeqno = seqno_;
	Debug(debugEnable_, logfile_, 
	      "- packet %d sent\n", debugSeqno);
}

//----------------------------------------------------------------------
// RapAgent::recv
//      Called when a packet is received.
//      Should be of type PT_RAP_ACK.
//----------------------------------------------------------------------

void RapAgent::recv(Packet* pkt, Handler*)
{
	Debug(debugEnable_, logfile_, 
	      "%.3f: RAP packet received.\n", Scheduler::instance().clock());

	hdr_rap* hdr = hdr_rap::access(pkt); // Access RAP header

	switch (hdr->flags()) { 
	case RH_DATA:
		UpdateLastHole(hdr->seqno());
		SendAck(hdr->seqno());
		if ((pkt->datalen() > 0) && app_) 
			// We do have user data, process it
			app_->process_data(pkt->datalen(), pkt->userdata());
		break;
	case RH_ACK:
		RecvAck(hdr);
		break;
	default:
		fprintf(stderr, 
			"RAP agent %s received a packet with unknown flags %x",
			name(), hdr->flags());
		break;
	} 
	Packet::free(pkt);		// Discard the packet
}

//----------------------------------------------------------------------
// RapAgent::RecvAck
//      Called when an Ack is received.
//      
//      "header" is the RAP header of the Ack.
//----------------------------------------------------------------------

void RapAgent::RecvAck(hdr_rap *ackHeader)
{
	double sampleRtt;
	TransHistoryEntry *old, key(ackHeader->seqno_);

	assert(ackHeader->seqno_ > 0);

	Debug(debugEnable_, logfile_, 
	      "- ack %d\n", ackHeader->seqno_);

	old = (TransHistoryEntry *) 
		transmissionHistory_.SetRemove((void *) &key, EqualSeqno);

	if (old != NULL) {
		Debug(debugEnable_, logfile_, 
		      "- found in transmission history\n");
		assert((old->status == RAP_SENT) || (old->status == RAP_INACTIVE));

		// Get sample rtt		
		sampleRtt = key.departureTime - old->departureTime; 

		UpdateTimeValues(sampleRtt);
		
		delete old;
	}
  
	if (!anyack()) {
		flags_ |= RF_ANYACK;
		ipg_ = srtt_;
	}
  
	if (LossDetection(RAP_ACK_BASED, ackHeader))
		LossHandler();

	// XXX We only stop by sequence number when we are in 
	// "counting sequence number" mode.   -- haoboy
	if (counting_pkt() && (ackHeader->seqno_ >= curseq_)) 
		finish();
}

//----------------------------------------------------------------------
// RapAgent::timeout
//      Called when a timer fires.
//
//      "type" is the type of Timeout event
//----------------------------------------------------------------------

void RapAgent::timeout(int type)
{
	if (type == RAP_IPG_TIMEOUT)
		IpgTimeout();
	else if (type == RAP_RTT_TIMEOUT)
		RttTimeout();
	else
		assert(FALSE);
}

//----------------------------------------------------------------------
// RapAgent::IpgTimeout
//      Called when the ipgTimer_ fires.
//----------------------------------------------------------------------

void RapAgent::IpgTimeout()
{
	double waitPeriod;		// Time before next transmission

	Debug(debugEnable_, logfile_, 
	      "%.3f: IPG Timeout.\n", Scheduler::instance().clock());

	if (LossDetection(RAP_TIMER_BASED))
		LossHandler();
	else if (!counting_pkt()) {
		if (app_) {
			int nbytes;
			AppData* data = app_->get_data(nbytes);
			// Missing data in application. What should we do??
			// For now, simply schedule the next SendPacket(). 
			// If the application has nothing to send, it'll stop 
			// the rap agent later on. 
			if (data != NULL) {
				SendPacket(nbytes, data);
				dctr_++;
			}
		} else {
			// If RAP doesn't have application, just go ahead and 
			// send packet
			SendPacket(size_);
			dctr_++;
		}
	} else if (seqno_ < curseq_) {
			SendPacket(size_);
			dctr_++;
	}

	// XXX If we only bound IPG in DecreaseIpg(), the thresholding will 
	// happen immediately because DecreaseIpg() isn't called immediately. 
	// So we do it here. 
	if (fixIpg_ != 0)
		ipg_ = fixIpg_;

	if (useFineGrain_)
		waitPeriod = frtt_ / xrtt_ * ipg_;
	else 
		waitPeriod = ipg_;
	// By this point, we may have been stopped by applications above
	// Thus, do not reschedule a timer if we are stopped. 
	if (!is_stopped())
		ipgTimer_.resched(waitPeriod + Random::uniform(overhead_));
}
  
//----------------------------------------------------------------------
// RapAgent::RttTimeout
//      Called when the rttTimer_ fires.
//      Decrease IPG. Restart rttTimer_.
//----------------------------------------------------------------------

void RapAgent::RttTimeout()
{
	Debug(debugEnable_, logfile_, 
	      "%.3f: RTT Timeout.\n", Scheduler::instance().clock());

	// During the past srtt_, we are supposed to send out srtt_/ipg_
	// packets. If we sent less than that, we may not increase rate
	if (100*dctr_*(ipg_/srtt_) >= dpthresh_)
		DecreaseIpg();		// Additive increase in rate
	else 
		Debug(debugEnable_, logfile_, 
		      "- %f Cannot increase rate due to insufficient data.\n",
		      Scheduler::instance().clock());
	dctr_ = 0;

	double debugIpg = ipg_ + overhead_ / 2;
	Debug(debugEnable_, logfile_, 
	      "- ipg decreased at %.3f to %f\n", 
	      Scheduler::instance().clock(), debugIpg);

	rttTimer_.resched(srtt_);
}

//----------------------------------------------------------------------
// RapAgent::LossDetection
//      Called in ipgTimeout (RAP_TIMER_BASED) 
//          or in RecvAck (RAP_ACK_BASED).
//
// Returns:
//      TRUE if loss detected, FALSE otherwise.
//
//      "ackHeader" is the RAP header of the received Ack (PT_RAP_ACK).
//----------------------------------------------------------------------

static double currentTime;
static hdr_rap *ackHdr;
static RapAgent *rapAgent; 
static int numLosses;

int EqualStatus(void *i1, void *i2)
{ 
	return (((TransHistoryEntry *) i1)->status == 
		((TransHistoryEntry *) i2)->status);
}

void DestroyTransHistoryEntry(long item)
{
	TransHistoryEntry *entry = (TransHistoryEntry *) item;

	Debug(rapAgent->GetDebugFlag(), rapAgent->GetLogfile(),
	      "- purged seq num %d\n", entry->seqno);

	delete entry;
}

void TimerLostPacket(long item)
{
	TransHistoryEntry *entry = (TransHistoryEntry *) item;

	if ((entry->departureTime + rapAgent->GetTimeout()) <= currentTime) {
		// ~ Packets lost in RAP session
		rapAgent->IncrementLossCount(); 

		// Ignore cluster losses
		if (entry->status != RAP_INACTIVE) {
			assert(entry->status == RAP_SENT);

			numLosses++;
			Debug(rapAgent->GetDebugFlag(), rapAgent->GetLogfile(),
			      "- timerlost seq num %d , last sent %d\n", 
			      entry->seqno, rapAgent->GetSeqno());
		}
		entry->status = RAP_PURGED; 
	}
}

void AckLostPacket(long item)
{
	TransHistoryEntry *entry = (TransHistoryEntry *) item;

	int seqno, lastRecv, lastMiss, prevRecv;

	seqno = entry->seqno;
	lastRecv = ackHdr->lastRecv;
	lastMiss = ackHdr->lastMiss;
	prevRecv = ackHdr->prevRecv;

	if (seqno <= lastRecv) {
		if ((seqno > lastMiss) || (seqno == prevRecv))
			entry->status = RAP_PURGED; // Was Received, now purge
		else if ((lastRecv - seqno) >= 3) {
			// ~ Packets lost in RAP session
			rapAgent->IncrementLossCount(); 
			
			if (entry->status != RAP_INACTIVE) {
				assert(entry->status == RAP_SENT);

				numLosses++;
				Debug(rapAgent->GetDebugFlag(), 
				      rapAgent->GetLogfile(),
				      "- acklost seqno %d , last sent %d\n", 
				      seqno, rapAgent->GetSeqno());
			}
			// Was Lost, purge from history
			entry->status = RAP_PURGED; 
		}
	}
}

int RapAgent::LossDetection(RapLossType type, hdr_rap *ackHeader)
{
	TransHistoryEntry key(0, RAP_PURGED);

	currentTime = key.departureTime;
	ackHdr = ackHeader;
	rapAgent = this;
	numLosses = 0;

	switch(type) {
	case RAP_TIMER_BASED:
		transmissionHistory_.Mapcar(TimerLostPacket);
		break;
		
	case RAP_ACK_BASED:
		transmissionHistory_.Mapcar(AckLostPacket);
		break;

	default:
		assert(FALSE);
	}

	Debug(debugEnable_, logfile_, 
	      "- %d losses detected\n", numLosses); 

	Debug(debugEnable_, logfile_, 
	      "- history size %d\n", transmissionHistory_.Size());
  
	transmissionHistory_.Purge((void *) &key, 
				   EqualStatus, // Purge PURGED packets
				   DestroyTransHistoryEntry);

	Debug(debugEnable_, logfile_, 
	      "- history size %d\n", transmissionHistory_.Size());

	if (numLosses)
		return TRUE;
	else
		return FALSE;
}

//----------------------------------------------------------------------
// RapAgent::LossHandler
//      Called when loss detected.
//      Increase IPG. Mark packets INACTIVE. Reschedule rttTimer_.
//----------------------------------------------------------------------

void MarkInactive(long item)
{
	TransHistoryEntry *entry = (TransHistoryEntry *) item;

	entry->status = RAP_INACTIVE;
}

void RapAgent::LossHandler()
{
	IncreaseIpg();		// Multiplicative decrease in rate

	double debugIpg = ipg_ + overhead_ / 2;
	Debug(debugEnable_, logfile_, 
	      "- ipg increased at %.3f to %f\n", 
	      Scheduler::instance().clock(), debugIpg);

	transmissionHistory_.Mapcar(MarkInactive);
	Debug(debugEnable_, logfile_, 
	      "- window full packets marked inactive\n");

	rttTimer_.resched(srtt_);
}

//----------------------------------------------------------------------
// RapAgent::SendAck
//      Create an ack packet, set fields, send the packet out.
//
//      "seqNum" is the sequence number of the packet being acked.
//----------------------------------------------------------------------

void RapAgent::SendAck(int seqNum)
{
	type_ = PT_RAP_ACK;
	Packet* pkt = allocpkt();	// Create a new packet
	hdr_rap* hdr = hdr_rap::access(pkt);   // Access header

	hdr->seqno() = seqNum;
	hdr->flags() = RH_ACK;

	hdr->lastRecv = lastRecv_;
	hdr->lastMiss = lastMiss_;
	hdr->prevRecv = prevRecv_;

	hdr_cmn *ch = hdr_cmn::access(pkt);
	ch->size() = rap_base_hdr_size_;

	send(pkt, 0);
	Debug(debugEnable_, logfile_, 
	      "- ack sent %u [%u %u %u]\n", 
	      seqNum, lastRecv_, lastMiss_, prevRecv_);
}

//----------------------------------------------------------------------
// RapSinkAgent::UpdateLastHole
//      Update the last hole in sequence number space at the receiver.
//      
//	"seqNum" is the sequence number of the data packet received.
//----------------------------------------------------------------------

void RapAgent::UpdateLastHole(int seqNum)
{
	assert(seqNum > 0);

	if (seqNum > (lastRecv_ + 1)) {
		prevRecv_ = lastRecv_;
		lastRecv_ = seqNum;
		lastMiss_ = seqNum - 1;
		return;
	}

	if (seqNum == (lastRecv_ + 1)) {
		lastRecv_ = seqNum;
		return;
	}
	
	if ((lastMiss_ < seqNum) && (seqNum <= lastRecv_)) // Duplicate
		return;

	if (seqNum == lastMiss_) {
		if ((prevRecv_ + 1) == seqNum) // Hole filled
			prevRecv_ = lastMiss_ = 0;
		else
			lastMiss_--;
		
		return;
	}
	
	if ((prevRecv_ < seqNum) && (seqNum < lastMiss_)) {
		prevRecv_ = seqNum;
		return;
	}

	assert(seqNum <= prevRecv_);	// Pretty late...
}


// take pkt count
void RapAgent::advanceby(int delta)
{
	flags_ |= RF_COUNTPKT;
        curseq_ = delta;
	start();
}

void RapAgent::finish()
{
	stop();
	Tcl::instance().evalf("%s done", this->name());
}



syntax highlighted by Code2HTML, v. 0.9.1