// -*- c-basic-offset: 4; tab-width: 8; indent-tabs-mode: t -*-

// Copyright (c) 2001-2007 International Computer Science Institute
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software")
// to deal in the Software without restriction, subject to the conditions
// listed in the XORP LICENSE file. These conditions include: you must
// preserve this copyright notice, and you cannot mention the copyright
// holders in advertising related to the Software without their permission.
// The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
// notice is a summary of the XORP LICENSE file; the license in that file is
// legally binding.

#ident "$XORP: xorp/fea/mfea_dataflow.cc,v 1.7 2007/02/16 22:45:45 pavlin Exp $"

//
// MFEA (Multicast Forwarding Engine Abstraction) dataflow implementation.
//

#include "mfea_module.h"

#include "libxorp/xorp.h"
#include "libxorp/xlog.h"
#include "libxorp/debug.h"
#include "libxorp/ipvx.hh"
#include "libxorp/utils.hh"

#include "mfea_dataflow.hh"
#include "mfea_node.hh"
#include "mfea_kernel_messages.hh"


//
// Exported variables
//

//
// Local constants definitions
//

//
// Local structures/classes, typedefs and macros
//

//
// Local variables
//

//
// Local functions prototypes
//


MfeaDft::MfeaDft(MfeaNode& mfea_node)
    : _mfea_node(mfea_node)
{
    
}

MfeaDft::~MfeaDft()
{
    
}

int
MfeaDft::family() const
{
    return (_mfea_node.family());
}

int
MfeaDft::add_entry(const IPvX& source, const IPvX& group,
		   const TimeVal& threshold_interval,
		   uint32_t threshold_packets,
		   uint32_t threshold_bytes,
		   bool is_threshold_in_packets,
		   bool is_threshold_in_bytes,
		   bool is_geq_upcall,
		   bool is_leq_upcall,
		   string& error_msg)
{
    MfeaDfe *mfea_dfe;
    MfeaDfeLookup *mfea_dfe_lookup;
    
    mfea_dfe_lookup = find(source, group);
    
    if (mfea_dfe_lookup == NULL) {
	// Create and add a new dataflow lookup entry
	mfea_dfe_lookup = new MfeaDfeLookup(*this, source, group);
	insert(mfea_dfe_lookup);
    }
    
    // Search for a dataflow entry.
    mfea_dfe = mfea_dfe_lookup->find(threshold_interval,
				     threshold_packets,
				     threshold_bytes,
				     is_threshold_in_packets,
				     is_threshold_in_bytes,
				     is_geq_upcall,
				     is_leq_upcall);
    if (mfea_dfe != NULL) {
	// Already have this entry
	return (XORP_OK);
    }
    
    // Create a new entry
    mfea_dfe = new MfeaDfe(*mfea_dfe_lookup,
			   threshold_interval,
			   threshold_packets,
			   threshold_bytes,
			   is_threshold_in_packets,
			   is_threshold_in_bytes,
			   is_geq_upcall,
			   is_leq_upcall);
    mfea_dfe->init_sg_count();
    if (! mfea_dfe->is_valid()) {
	delete mfea_dfe;
	if (mfea_dfe_lookup->is_empty()) {
	    remove(mfea_dfe_lookup);
	    delete mfea_dfe_lookup;
	}
	error_msg = c_format("Cannot add dataflow monitor for (%s, %s): "
			     "invalid request",
			     cstring(source), cstring(group));
	XLOG_ERROR("%s", error_msg.c_str());
	return (XORP_ERROR);
    }
    
    mfea_dfe_lookup->insert(mfea_dfe);
    mfea_dfe->start_measurement();
    
    return (XORP_OK);
}

int
MfeaDft::delete_entry(const IPvX& source, const IPvX& group,
		      const TimeVal& threshold_interval,
		      uint32_t threshold_packets,
		      uint32_t threshold_bytes,
		      bool is_threshold_in_packets,
		      bool is_threshold_in_bytes,
		      bool is_geq_upcall,
		      bool is_leq_upcall,
		      string& error_msg)
{
    MfeaDfe *mfea_dfe;
    MfeaDfeLookup *mfea_dfe_lookup;
    
    mfea_dfe_lookup = find(source, group);
    
    if (mfea_dfe_lookup == NULL) {
	error_msg = c_format("Cannot delete dataflow monitor for (%s, %s): "
			     "no such entry",
			     cstring(source), cstring(group));
	XLOG_ERROR("%s", error_msg.c_str());
	return (XORP_ERROR);	// Not found
    }
    
    // Search for a dataflow entry.
    mfea_dfe = mfea_dfe_lookup->find(threshold_interval,
				     threshold_packets,
				     threshold_bytes,
				     is_threshold_in_packets,
				     is_threshold_in_bytes,
				     is_geq_upcall,
				     is_leq_upcall);
    if (mfea_dfe == NULL) {
	error_msg = c_format("Cannot delete dataflow monitor for (%s, %s): "
			     "monitor not found",
			     cstring(source), cstring(group));
	XLOG_ERROR("%s", error_msg.c_str());
	return (XORP_ERROR);	// Not found
    }
    
    if (delete_entry(mfea_dfe) < 0) {
	error_msg = c_format("Cannot delete dataflow monitor for (%s, %s): "
			     "internal error",
			     cstring(source), cstring(group));
	XLOG_ERROR("%s", error_msg.c_str());
	return (XORP_ERROR);
    }
    
    return (XORP_OK);
}

int
MfeaDft::delete_entry(const IPvX& source, const IPvX& group)
{
    MfeaDfeLookup *mfea_dfe_lookup;
    
    mfea_dfe_lookup = find(source, group);
    if (mfea_dfe_lookup == NULL)
	return (XORP_ERROR);	// Nothing found
    
    remove(mfea_dfe_lookup);
    delete mfea_dfe_lookup;
    
    return (XORP_OK);
}

int
MfeaDft::delete_entry(MfeaDfe *mfea_dfe)
{
    MfeaDfeLookup *mfea_dfe_lookup = &mfea_dfe->mfea_dfe_lookup();
    
    mfea_dfe_lookup->remove(mfea_dfe);
    delete mfea_dfe;
    
    if (mfea_dfe_lookup->is_empty()) {
	remove(mfea_dfe_lookup);
	delete mfea_dfe_lookup;
    }
    
    return (XORP_OK);
}

MfeaDfeLookup::MfeaDfeLookup(MfeaDft& mfea_dft,
			     const IPvX& source, const IPvX& group)
    : Mre<MfeaDfeLookup>(source, group),
    _mfea_dft(mfea_dft)
{
    
}

MfeaDfeLookup::~MfeaDfeLookup()
{
    delete_pointers_list(_mfea_dfe_list);
}

int
MfeaDfeLookup::family() const
{
    return (_mfea_dft.family());
}

MfeaDfe *
MfeaDfeLookup::find(const TimeVal& threshold_interval,
		    uint32_t threshold_packets,
		    uint32_t threshold_bytes,
		    bool is_threshold_in_packets,
		    bool is_threshold_in_bytes,
		    bool is_geq_upcall,
		    bool is_leq_upcall)
{
    list<MfeaDfe *>::const_iterator iter;
    
    for (iter = _mfea_dfe_list.begin(); iter != _mfea_dfe_list.end(); ++iter) {
	MfeaDfe *mfea_dfe = *iter;
	if (mfea_dfe->is_same(threshold_interval,
			      threshold_packets,
			      threshold_bytes,
			      is_threshold_in_packets,
			      is_threshold_in_bytes,
			      is_geq_upcall,
			      is_leq_upcall))
	    return (mfea_dfe);
    }
    
    return (NULL);
}

void
MfeaDfeLookup::insert(MfeaDfe *mfea_dfe)
{
    _mfea_dfe_list.push_back(mfea_dfe);
}

void
MfeaDfeLookup::remove(MfeaDfe *mfea_dfe)
{
    // XXX: presumably, the list will be very short, so for simplicity
    // we use the _theoretically_ less-efficient remove() instead of find()
    _mfea_dfe_list.remove(mfea_dfe);
}

MfeaDfe::MfeaDfe(MfeaDfeLookup& mfea_dfe_lookup,
		 const TimeVal& threshold_interval,
		 uint32_t threshold_packets,
		 uint32_t threshold_bytes,
		 bool is_threshold_in_packets,
		 bool is_threshold_in_bytes,
		 bool is_geq_upcall,
		 bool is_leq_upcall)
    : _mfea_dfe_lookup(mfea_dfe_lookup),
      _threshold_interval(threshold_interval),
      _threshold_packets(threshold_packets),
      _threshold_bytes(threshold_bytes),
      _is_threshold_in_packets(is_threshold_in_packets),
      _is_threshold_in_bytes(is_threshold_in_bytes),
      _is_geq_upcall(is_geq_upcall),
      _is_leq_upcall(is_leq_upcall)
{
    _delta_sg_count_index = 0;
    _is_bootstrap_completed = false;
    _measurement_interval = _threshold_interval / MFEA_DATAFLOW_TEST_FREQUENCY;
    for (size_t i = 0; i < sizeof(_start_time)/sizeof(_start_time[0]); i++)
	_start_time[i] = TimeVal::ZERO();
}

MfeaDfe::~MfeaDfe()
{
    
}

MfeaDft&
MfeaDfe::mfea_dft() const
{
    return (_mfea_dfe_lookup.mfea_dft());
}

EventLoop&
MfeaDfe::eventloop() const
{
    return (mfea_dft().mfea_node().eventloop());
}

int
MfeaDfe::family() const
{
    return (_mfea_dfe_lookup.family());
}

bool
MfeaDfe::is_valid() const
{
    // TODO: the minimum interval is hard-coded
    int min_sec = 3;		// XXX: the minimum threshold interval value
    int min_usec = 0;
    
    return ((_is_threshold_in_packets || _is_threshold_in_bytes)
	    && (_is_geq_upcall ^ _is_leq_upcall)
	    && (_threshold_interval >= TimeVal(min_sec, min_usec))
	    && _last_sg_count.is_valid());
}

bool
MfeaDfe::is_same(const TimeVal& threshold_interval_test,
		 uint32_t threshold_packets_test,
		 uint32_t threshold_bytes_test,
		 bool is_threshold_in_packets_test,
		 bool is_threshold_in_bytes_test,
		 bool is_geq_upcall_test,
		 bool is_leq_upcall_test) const
{
    if (is_threshold_in_packets_test)
	if (threshold_packets_test != _threshold_packets)
	    return (false);
    if (is_threshold_in_bytes_test)
	if (threshold_bytes_test != _threshold_bytes)
	    return (false);
    
    return ((threshold_interval_test == _threshold_interval)
	    && (is_threshold_in_packets_test == _is_threshold_in_packets)
	    && (is_threshold_in_bytes_test == _is_threshold_in_bytes)
	    && (is_geq_upcall_test == _is_geq_upcall)
	    && (is_leq_upcall_test == _is_leq_upcall));
}

void
MfeaDfe::init_sg_count()
{
    mfea_dft().mfea_node().get_sg_count(source_addr(), group_addr(),
					_last_sg_count);
}

//
// Read the count from the kernel, and test if it is above/below the threshold.
// XXX: if both packets and bytes are enabled, then return true if the test
// is positive for either.
//
bool
MfeaDfe::test_sg_count()
{
    SgCount saved_last_sg_count = _last_sg_count;
    uint32_t diff_value, threshold_value;
    bool ret_value = false;
    
    //
    // Perform the measurement
    //
    if (mfea_dft().mfea_node().get_sg_count(source_addr(), group_addr(),
					    _last_sg_count)
	< 0) {
	// Error
	return (false);		// TODO: what do we do when error occured?
    }
    
    //
    // Compute the delta since the last measurement
    //
    if ((_is_threshold_in_packets
	 && (saved_last_sg_count.pktcnt() > _last_sg_count.pktcnt()))
	|| (_is_threshold_in_bytes
	    && (saved_last_sg_count.bytecnt() > _last_sg_count.bytecnt()))) {
	// XXX: very likely the counter has round-up. We can try to be
	// smart and compute the difference between the old value and
	// the maximum possible value, and then just add that difference
	// to the new value.
	// However, the maximum possible value depends on the original size
	// of the counter. In case FreeBSD-4.5 it is u_long for IPv4,
	// but u_quad_t for IPv6.
	// Hence, we just ignore this measurement... Sigh...
	_delta_sg_count[_delta_sg_count_index].reset();
	ret_value = false;
	goto ret_label;
    }
    
    _delta_sg_count[_delta_sg_count_index] = _last_sg_count;
    _delta_sg_count[_delta_sg_count_index] -= saved_last_sg_count;
    
    //
    // Increment the counter to point to the next entry (to be used next time)
    //
    _delta_sg_count_index++;
    if (_delta_sg_count_index >= MFEA_DATAFLOW_TEST_FREQUENCY) {
	_delta_sg_count_index %= MFEA_DATAFLOW_TEST_FREQUENCY;
	_is_bootstrap_completed = true;
    }
    
    //
    // Compute the difference for the last threshold interval
    //
    _measured_sg_count.reset();
    if (_is_bootstrap_completed) {
	for (size_t i = 0; i < MFEA_DATAFLOW_TEST_FREQUENCY; i++) {
	    _measured_sg_count += _delta_sg_count[i];
	}
    } else {
	for (size_t i = 0; i < _delta_sg_count_index; i++) {
	    _measured_sg_count += _delta_sg_count[i];
	}
    }
    
    if (_is_threshold_in_packets) {
	threshold_value = _threshold_packets;
	diff_value = _measured_sg_count.pktcnt();
	
	if (_is_geq_upcall) {
	    if (diff_value >= threshold_value) {
		ret_value = true;
		goto ret_label;
	    }
	}
	if (_is_leq_upcall && _is_bootstrap_completed) {
	    if (diff_value <= threshold_value) {
		ret_value = true;
		goto ret_label;
	    }
	}
    }
    
    if (_is_threshold_in_bytes) {
	threshold_value = _threshold_bytes;
	diff_value = _measured_sg_count.bytecnt();
	
	if (_is_geq_upcall) {
	    if (diff_value >= threshold_value) {
		ret_value = true;
		goto ret_label;
	    }
	}
	if (_is_leq_upcall && _is_bootstrap_completed) {
	    if (diff_value <= threshold_value) {
		ret_value = true;
		goto ret_label;
	    }
	}
    }

 ret_label:
    return (ret_value);
}

void
MfeaDfe::start_measurement()
{
    _measurement_timer =
	eventloop().new_oneoff_after(_measurement_interval,
				     callback(this,
					      &MfeaDfe::measurement_timer_timeout));
    
    TimeVal now;
    
    mfea_dft().mfea_node().eventloop().current_time(now);
    _start_time[_delta_sg_count_index] = now;
}

void
MfeaDfe::dataflow_signal_send()
{
    // XXX: for simplicity, we assume that the threshold interval
    // is same as the measured interval
    mfea_dft().mfea_node().signal_dataflow_message_recv(
	source_addr(),
	group_addr(),
	_threshold_interval,
	_threshold_interval,
	_threshold_packets,
	_threshold_bytes,
	_measured_sg_count.pktcnt(),
	_measured_sg_count.bytecnt(),
	_is_threshold_in_packets,
	_is_threshold_in_bytes,
	_is_geq_upcall,
	_is_leq_upcall);
}

const TimeVal&
MfeaDfe::start_time() const
{
    if (! _is_bootstrap_completed)
	return (_start_time[0]);
    
    return (_start_time[_delta_sg_count_index]);
}

uint32_t
MfeaDfe::measured_packets() const
{
    SgCount result;
    
    if (_is_bootstrap_completed) {
	for (size_t i = 0; i < MFEA_DATAFLOW_TEST_FREQUENCY; i++) {
	    result += _delta_sg_count[i];
	}
    } else {
	for (size_t i = 0; i < _delta_sg_count_index; i++) {
	    result += _delta_sg_count[i];
	}
    }
    
    return (result.pktcnt());
}

uint32_t
MfeaDfe::measured_bytes() const
{
    SgCount result;
    
    if (_is_bootstrap_completed) {
	for (size_t i = 0; i < MFEA_DATAFLOW_TEST_FREQUENCY; i++) {
	    result += _delta_sg_count[i];
	}
    } else {
	for (size_t i = 0; i < _delta_sg_count_index; i++) {
	    result += _delta_sg_count[i];
	}
    }
    
    return (result.bytecnt());
}

void
MfeaDfe::measurement_timer_timeout()
{
    if (test_sg_count()) {
	// Time to deliver a signal
	dataflow_signal_send();
    }
    
    // Restart the measurements
    start_measurement();
}


syntax highlighted by Code2HTML, v. 0.9.1