// -*- c-basic-offset: 4; tab-width: 8; indent-tabs-mode: t -*- // Copyright (c) 2001-2007 International Computer Science Institute // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software") // to deal in the Software without restriction, subject to the conditions // listed in the XORP LICENSE file. These conditions include: you must // preserve this copyright notice, and you cannot mention the copyright // holders in advertising related to the Software without their permission. // The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This // notice is a summary of the XORP LICENSE file; the license in that file is // legally binding. #ident "$XORP: xorp/fea/mfea_dataflow.cc,v 1.7 2007/02/16 22:45:45 pavlin Exp $" // // MFEA (Multicast Forwarding Engine Abstraction) dataflow implementation. // #include "mfea_module.h" #include "libxorp/xorp.h" #include "libxorp/xlog.h" #include "libxorp/debug.h" #include "libxorp/ipvx.hh" #include "libxorp/utils.hh" #include "mfea_dataflow.hh" #include "mfea_node.hh" #include "mfea_kernel_messages.hh" // // Exported variables // // // Local constants definitions // // // Local structures/classes, typedefs and macros // // // Local variables // // // Local functions prototypes // MfeaDft::MfeaDft(MfeaNode& mfea_node) : _mfea_node(mfea_node) { } MfeaDft::~MfeaDft() { } int MfeaDft::family() const { return (_mfea_node.family()); } int MfeaDft::add_entry(const IPvX& source, const IPvX& group, const TimeVal& threshold_interval, uint32_t threshold_packets, uint32_t threshold_bytes, bool is_threshold_in_packets, bool is_threshold_in_bytes, bool is_geq_upcall, bool is_leq_upcall, string& error_msg) { MfeaDfe *mfea_dfe; MfeaDfeLookup *mfea_dfe_lookup; mfea_dfe_lookup = find(source, group); if (mfea_dfe_lookup == NULL) { // Create and add a new dataflow lookup entry mfea_dfe_lookup = new MfeaDfeLookup(*this, source, group); insert(mfea_dfe_lookup); } // Search for a dataflow entry. mfea_dfe = mfea_dfe_lookup->find(threshold_interval, threshold_packets, threshold_bytes, is_threshold_in_packets, is_threshold_in_bytes, is_geq_upcall, is_leq_upcall); if (mfea_dfe != NULL) { // Already have this entry return (XORP_OK); } // Create a new entry mfea_dfe = new MfeaDfe(*mfea_dfe_lookup, threshold_interval, threshold_packets, threshold_bytes, is_threshold_in_packets, is_threshold_in_bytes, is_geq_upcall, is_leq_upcall); mfea_dfe->init_sg_count(); if (! mfea_dfe->is_valid()) { delete mfea_dfe; if (mfea_dfe_lookup->is_empty()) { remove(mfea_dfe_lookup); delete mfea_dfe_lookup; } error_msg = c_format("Cannot add dataflow monitor for (%s, %s): " "invalid request", cstring(source), cstring(group)); XLOG_ERROR("%s", error_msg.c_str()); return (XORP_ERROR); } mfea_dfe_lookup->insert(mfea_dfe); mfea_dfe->start_measurement(); return (XORP_OK); } int MfeaDft::delete_entry(const IPvX& source, const IPvX& group, const TimeVal& threshold_interval, uint32_t threshold_packets, uint32_t threshold_bytes, bool is_threshold_in_packets, bool is_threshold_in_bytes, bool is_geq_upcall, bool is_leq_upcall, string& error_msg) { MfeaDfe *mfea_dfe; MfeaDfeLookup *mfea_dfe_lookup; mfea_dfe_lookup = find(source, group); if (mfea_dfe_lookup == NULL) { error_msg = c_format("Cannot delete dataflow monitor for (%s, %s): " "no such entry", cstring(source), cstring(group)); XLOG_ERROR("%s", error_msg.c_str()); return (XORP_ERROR); // Not found } // Search for a dataflow entry. mfea_dfe = mfea_dfe_lookup->find(threshold_interval, threshold_packets, threshold_bytes, is_threshold_in_packets, is_threshold_in_bytes, is_geq_upcall, is_leq_upcall); if (mfea_dfe == NULL) { error_msg = c_format("Cannot delete dataflow monitor for (%s, %s): " "monitor not found", cstring(source), cstring(group)); XLOG_ERROR("%s", error_msg.c_str()); return (XORP_ERROR); // Not found } if (delete_entry(mfea_dfe) < 0) { error_msg = c_format("Cannot delete dataflow monitor for (%s, %s): " "internal error", cstring(source), cstring(group)); XLOG_ERROR("%s", error_msg.c_str()); return (XORP_ERROR); } return (XORP_OK); } int MfeaDft::delete_entry(const IPvX& source, const IPvX& group) { MfeaDfeLookup *mfea_dfe_lookup; mfea_dfe_lookup = find(source, group); if (mfea_dfe_lookup == NULL) return (XORP_ERROR); // Nothing found remove(mfea_dfe_lookup); delete mfea_dfe_lookup; return (XORP_OK); } int MfeaDft::delete_entry(MfeaDfe *mfea_dfe) { MfeaDfeLookup *mfea_dfe_lookup = &mfea_dfe->mfea_dfe_lookup(); mfea_dfe_lookup->remove(mfea_dfe); delete mfea_dfe; if (mfea_dfe_lookup->is_empty()) { remove(mfea_dfe_lookup); delete mfea_dfe_lookup; } return (XORP_OK); } MfeaDfeLookup::MfeaDfeLookup(MfeaDft& mfea_dft, const IPvX& source, const IPvX& group) : Mre(source, group), _mfea_dft(mfea_dft) { } MfeaDfeLookup::~MfeaDfeLookup() { delete_pointers_list(_mfea_dfe_list); } int MfeaDfeLookup::family() const { return (_mfea_dft.family()); } MfeaDfe * MfeaDfeLookup::find(const TimeVal& threshold_interval, uint32_t threshold_packets, uint32_t threshold_bytes, bool is_threshold_in_packets, bool is_threshold_in_bytes, bool is_geq_upcall, bool is_leq_upcall) { list::const_iterator iter; for (iter = _mfea_dfe_list.begin(); iter != _mfea_dfe_list.end(); ++iter) { MfeaDfe *mfea_dfe = *iter; if (mfea_dfe->is_same(threshold_interval, threshold_packets, threshold_bytes, is_threshold_in_packets, is_threshold_in_bytes, is_geq_upcall, is_leq_upcall)) return (mfea_dfe); } return (NULL); } void MfeaDfeLookup::insert(MfeaDfe *mfea_dfe) { _mfea_dfe_list.push_back(mfea_dfe); } void MfeaDfeLookup::remove(MfeaDfe *mfea_dfe) { // XXX: presumably, the list will be very short, so for simplicity // we use the _theoretically_ less-efficient remove() instead of find() _mfea_dfe_list.remove(mfea_dfe); } MfeaDfe::MfeaDfe(MfeaDfeLookup& mfea_dfe_lookup, const TimeVal& threshold_interval, uint32_t threshold_packets, uint32_t threshold_bytes, bool is_threshold_in_packets, bool is_threshold_in_bytes, bool is_geq_upcall, bool is_leq_upcall) : _mfea_dfe_lookup(mfea_dfe_lookup), _threshold_interval(threshold_interval), _threshold_packets(threshold_packets), _threshold_bytes(threshold_bytes), _is_threshold_in_packets(is_threshold_in_packets), _is_threshold_in_bytes(is_threshold_in_bytes), _is_geq_upcall(is_geq_upcall), _is_leq_upcall(is_leq_upcall) { _delta_sg_count_index = 0; _is_bootstrap_completed = false; _measurement_interval = _threshold_interval / MFEA_DATAFLOW_TEST_FREQUENCY; for (size_t i = 0; i < sizeof(_start_time)/sizeof(_start_time[0]); i++) _start_time[i] = TimeVal::ZERO(); } MfeaDfe::~MfeaDfe() { } MfeaDft& MfeaDfe::mfea_dft() const { return (_mfea_dfe_lookup.mfea_dft()); } EventLoop& MfeaDfe::eventloop() const { return (mfea_dft().mfea_node().eventloop()); } int MfeaDfe::family() const { return (_mfea_dfe_lookup.family()); } bool MfeaDfe::is_valid() const { // TODO: the minimum interval is hard-coded int min_sec = 3; // XXX: the minimum threshold interval value int min_usec = 0; return ((_is_threshold_in_packets || _is_threshold_in_bytes) && (_is_geq_upcall ^ _is_leq_upcall) && (_threshold_interval >= TimeVal(min_sec, min_usec)) && _last_sg_count.is_valid()); } bool MfeaDfe::is_same(const TimeVal& threshold_interval_test, uint32_t threshold_packets_test, uint32_t threshold_bytes_test, bool is_threshold_in_packets_test, bool is_threshold_in_bytes_test, bool is_geq_upcall_test, bool is_leq_upcall_test) const { if (is_threshold_in_packets_test) if (threshold_packets_test != _threshold_packets) return (false); if (is_threshold_in_bytes_test) if (threshold_bytes_test != _threshold_bytes) return (false); return ((threshold_interval_test == _threshold_interval) && (is_threshold_in_packets_test == _is_threshold_in_packets) && (is_threshold_in_bytes_test == _is_threshold_in_bytes) && (is_geq_upcall_test == _is_geq_upcall) && (is_leq_upcall_test == _is_leq_upcall)); } void MfeaDfe::init_sg_count() { mfea_dft().mfea_node().get_sg_count(source_addr(), group_addr(), _last_sg_count); } // // Read the count from the kernel, and test if it is above/below the threshold. // XXX: if both packets and bytes are enabled, then return true if the test // is positive for either. // bool MfeaDfe::test_sg_count() { SgCount saved_last_sg_count = _last_sg_count; uint32_t diff_value, threshold_value; bool ret_value = false; // // Perform the measurement // if (mfea_dft().mfea_node().get_sg_count(source_addr(), group_addr(), _last_sg_count) < 0) { // Error return (false); // TODO: what do we do when error occured? } // // Compute the delta since the last measurement // if ((_is_threshold_in_packets && (saved_last_sg_count.pktcnt() > _last_sg_count.pktcnt())) || (_is_threshold_in_bytes && (saved_last_sg_count.bytecnt() > _last_sg_count.bytecnt()))) { // XXX: very likely the counter has round-up. We can try to be // smart and compute the difference between the old value and // the maximum possible value, and then just add that difference // to the new value. // However, the maximum possible value depends on the original size // of the counter. In case FreeBSD-4.5 it is u_long for IPv4, // but u_quad_t for IPv6. // Hence, we just ignore this measurement... Sigh... _delta_sg_count[_delta_sg_count_index].reset(); ret_value = false; goto ret_label; } _delta_sg_count[_delta_sg_count_index] = _last_sg_count; _delta_sg_count[_delta_sg_count_index] -= saved_last_sg_count; // // Increment the counter to point to the next entry (to be used next time) // _delta_sg_count_index++; if (_delta_sg_count_index >= MFEA_DATAFLOW_TEST_FREQUENCY) { _delta_sg_count_index %= MFEA_DATAFLOW_TEST_FREQUENCY; _is_bootstrap_completed = true; } // // Compute the difference for the last threshold interval // _measured_sg_count.reset(); if (_is_bootstrap_completed) { for (size_t i = 0; i < MFEA_DATAFLOW_TEST_FREQUENCY; i++) { _measured_sg_count += _delta_sg_count[i]; } } else { for (size_t i = 0; i < _delta_sg_count_index; i++) { _measured_sg_count += _delta_sg_count[i]; } } if (_is_threshold_in_packets) { threshold_value = _threshold_packets; diff_value = _measured_sg_count.pktcnt(); if (_is_geq_upcall) { if (diff_value >= threshold_value) { ret_value = true; goto ret_label; } } if (_is_leq_upcall && _is_bootstrap_completed) { if (diff_value <= threshold_value) { ret_value = true; goto ret_label; } } } if (_is_threshold_in_bytes) { threshold_value = _threshold_bytes; diff_value = _measured_sg_count.bytecnt(); if (_is_geq_upcall) { if (diff_value >= threshold_value) { ret_value = true; goto ret_label; } } if (_is_leq_upcall && _is_bootstrap_completed) { if (diff_value <= threshold_value) { ret_value = true; goto ret_label; } } } ret_label: return (ret_value); } void MfeaDfe::start_measurement() { _measurement_timer = eventloop().new_oneoff_after(_measurement_interval, callback(this, &MfeaDfe::measurement_timer_timeout)); TimeVal now; mfea_dft().mfea_node().eventloop().current_time(now); _start_time[_delta_sg_count_index] = now; } void MfeaDfe::dataflow_signal_send() { // XXX: for simplicity, we assume that the threshold interval // is same as the measured interval mfea_dft().mfea_node().signal_dataflow_message_recv( source_addr(), group_addr(), _threshold_interval, _threshold_interval, _threshold_packets, _threshold_bytes, _measured_sg_count.pktcnt(), _measured_sg_count.bytecnt(), _is_threshold_in_packets, _is_threshold_in_bytes, _is_geq_upcall, _is_leq_upcall); } const TimeVal& MfeaDfe::start_time() const { if (! _is_bootstrap_completed) return (_start_time[0]); return (_start_time[_delta_sg_count_index]); } uint32_t MfeaDfe::measured_packets() const { SgCount result; if (_is_bootstrap_completed) { for (size_t i = 0; i < MFEA_DATAFLOW_TEST_FREQUENCY; i++) { result += _delta_sg_count[i]; } } else { for (size_t i = 0; i < _delta_sg_count_index; i++) { result += _delta_sg_count[i]; } } return (result.pktcnt()); } uint32_t MfeaDfe::measured_bytes() const { SgCount result; if (_is_bootstrap_completed) { for (size_t i = 0; i < MFEA_DATAFLOW_TEST_FREQUENCY; i++) { result += _delta_sg_count[i]; } } else { for (size_t i = 0; i < _delta_sg_count_index; i++) { result += _delta_sg_count[i]; } } return (result.bytecnt()); } void MfeaDfe::measurement_timer_timeout() { if (test_sg_count()) { // Time to deliver a signal dataflow_signal_send(); } // Restart the measurements start_measurement(); }