#include "mgenFlow.h"
#include "mgenMsg.h"
#include "mgen.h"
#include <time.h> // for gmtime(), struct tm, etc
#ifndef _WIN32_WCE
#include <errno.h> // for EAGAIN
#endif // !_WIN32_WCE
MgenSink::MgenSink()
: notify_flow(NULL)
{
}
MgenSink::~MgenSink()
{
}
MgenFlow::MgenFlow(unsigned int flowId,
ProtoTimerMgr& timerMgr,
MgenSocketList& socketList,
unsigned long defaultV6Label)
: log_file(NULL), log_binary(false), log_flush(false), checksum_enable(false),
flow_id(flowId), flow_label(defaultV6Label),
socket_item(NULL), socket(NULL), sink(NULL), seq_num(0), next_event(NULL),
started(false), timer_mgr(timerMgr), socket_list(socketList)
{
tx_timer.SetListener(this, &MgenFlow::OnTxTimeout);
tx_timer.SetInterval(1.0);
tx_timer.SetRepeat(-1);
event_timer.SetListener(this, &MgenFlow::OnEventTimeout);
event_timer.SetInterval(1.0);
event_timer.SetRepeat(-1);
}
MgenFlow::~MgenFlow()
{
if (event_timer.IsActive()) event_timer.Deactivate();
if (tx_timer.IsActive()) tx_timer.Deactivate();
event_list.Destroy();
if (socket_item) socket_item->Close();
}
bool MgenFlow::InsertEvent(MgenEvent* theEvent, bool mgenStarted, double currentTime)
{
double eventTime = theEvent->GetTime();
if (mgenStarted)
{
// Process "immediate" events or enqueue "scheduled" events
if (eventTime < currentTime)
{
theEvent->SetTime(currentTime);
event_list.Precede(next_event, theEvent);
if (ValidateEvent(theEvent))
{
Update(theEvent);
}
else
{
event_list.Remove(theEvent);
return false;
}
}
else
{
event_list.Insert(theEvent);
if (!ValidateEvent(theEvent))
{
event_list.Remove(theEvent);
return false;
}
// Activate/reschedule "event_timer" as needed
if (event_timer.IsActive())
{
double nextTime = next_event->GetTime();
if (eventTime < nextTime)
{
next_event = theEvent;
event_timer.SetInterval(eventTime - currentTime);
event_timer.Reschedule();
}
}
else
{
next_event = theEvent;
event_timer.SetInterval(eventTime - currentTime);
timer_mgr.ActivateTimer(event_timer);
}
}
}
else
{
eventTime = eventTime > 0.0 ? eventTime : 0.0;
theEvent->SetTime(eventTime);
event_list.Insert(theEvent);
if (!ValidateEvent(theEvent))
{
event_list.Remove(theEvent);
return false;
}
}
return true;
} // end MgenFlow::InsertEvent()
bool MgenFlow::ValidateEvent(const MgenEvent* event)
{
// Validate the event by it's position in the list with respect to its neighbor types
const MgenEvent* prevEvent = (MgenEvent*)event->Prev();
MgenEvent::Type prevType = prevEvent ? prevEvent->GetType() :
MgenEvent::INVALID_TYPE;
switch (event->GetType())
{
case MgenEvent::ON:
if ((MgenEvent::MOD == prevType) ||
(MgenEvent::ON == prevType))
{
DMSG(0, "MgenFlow::InsertEvent() inappropriate ON event.\n");
return false;
}
break;
case MgenEvent::MOD:
if ((MgenEvent::OFF == prevType) ||
(MgenEvent::INVALID_TYPE == prevType))
{
DMSG(0, "MgenFlow::InsertEvent() inappropriate MOD event.\n");
return false;
}
break;
case MgenEvent::OFF:
if ((MgenEvent::OFF == prevType) ||
(MgenEvent::INVALID_TYPE == prevType))
{
DMSG(0, "MgenFlow::InsertEvent() inappropriate OFF event.\n");
return false;
}
break;
case MgenEvent::INVALID_TYPE:
ASSERT(0); // this should never occur
return false;
} // end switch(event->GetType())
return true;
} // end MgenFlow::ValidateEvent()
bool MgenFlow::Start(double offsetTime)
{
// Defer actual start until "offsetTime", but process events
// so flow state is appropriate for given offset.
MgenEvent* nextEvent = (MgenEvent*)event_list.Head();
if (!nextEvent)
{
DMSG(0, "MgenFlow::Start() flow with empty event list!\n");
return false;
}
while (nextEvent)
{
if (nextEvent->GetTime() <= offsetTime)
{
Update(nextEvent);
nextEvent = (MgenEvent*)nextEvent->Next();
}
else
{
break;
}
}
if ((next_event = nextEvent))
{
double currentTime = (offsetTime > 0.0) ? offsetTime : 0.0;
double nextInterval = next_event->GetTime() - currentTime;
nextInterval = nextInterval > 0.0 ? nextInterval : 0.0;
event_timer.SetInterval(nextInterval);
timer_mgr.ActivateTimer(event_timer);
started = true;
return true;
}
started = true;
return true;
} // end MgenFlow::Start()
bool MgenFlow::Update(const MgenEvent* event)
{
switch (event->GetType())
{
case MgenEvent::ON:
{
ASSERT(!tx_timer.IsActive());
protocol = event->GetProtocol();
src_port = (event->OptionIsSet(MgenEvent::SRC)) ? event->GetSrcPort() : 0;
switch (protocol)
{
case MgenBaseEvent::UDP:
socket_item = socket_list.GetItem(ProtoSocket::UDP, src_port);
if (!socket_item)
{
DMSG(0, "MgenFlow::Update() Error: unable to get socket item.\n");
return false;
}
// This increments socketItem reference_count, opens socket as needed
// note that socket must be opened as IPv4 or IPv6 type
if (!socket_item->Open(event->GetDstAddr().GetType(), true))
{
DMSG(0, "MgenFlow::Update() Error: socket open error.\n");
return false;
}
socket = &socket_item->GetSocket();
break;
case MgenBaseEvent::TCP:
DMSG(0, "MgenFlow::Update() TCP not quite yet supported (almost).\n");
return false;
case MgenBaseEvent::SINK:
if (!sink)
{
DMSG(0, "MgenFlow::Update() SINK requires MgenSink attachment.\n");
return false;
}
break;
default:
DMSG(0, "MgenFlow::Update() invalid protocol type.\n");
return false;
}
if (host_addr.IsValid() && socket) host_addr.SetPort(socket->GetPort());
// Note the lack of "break" here is _intentional_
}
case MgenEvent::MOD:
{
// Generic MGEN flow options
if (event->OptionIsSet(MgenEvent::DST))
dst_addr = event->GetDstAddr();
if (event->OptionIsSet(MgenEvent::PATTERN))
pattern = event->GetPattern();
if (event->OptionIsSet(MgenEvent::SEQUENCE))
seq_num = event->GetSequence();
if (event->OptionIsSet(MgenEvent::LABEL))
flow_label = event->GetFlowLabel();
// Socket-specific MGEN flow options
if (socket_item)
{
if (event->OptionIsSet(MgenEvent::TXBUFFER))
{
if (!socket_item->SetTxBufferSize(event->GetTxBuffer()))
DMSG(0, "MgenFlow::Update() error setting socket tx buffer\n");
}
if (event->OptionIsSet(MgenEvent::TOS))
{
if (!socket_item->SetTOS(event->GetTOS()))
DMSG(0, "MgenFlow::Update() error setting socket TOS value\n");
}
if (event->OptionIsSet(MgenEvent::TTL))
{
//TRACE("setting socket ttl: %d\n", event->GetTTL());
if (!socket_item->SetTTL(event->GetTTL()))
DMSG(0, "MgenFlow::Update() error setting socket multicast interface\n");
}
if (event->OptionIsSet(MgenEvent::INTERFACE))
{
if (!socket_item->SetMulticastInterface(event->GetInterface()))
DMSG(0, "MgenFlow::Update() error setting socket multicast interface\n");
}
}
// Schedule tx_timer as needed
double nextInterval = pattern.GetPktInterval();
if (nextInterval < 0.0) // 0.0 pkt/sec transmission rate
{
if (tx_timer.IsActive())
tx_timer.Deactivate();
else if (sink && sink->IsNotifying())
sink->StopNotifying();
}
else if (nextInterval > 0.0)
{
if (sink && sink->IsNotifying())
sink->StopNotifying();
if (tx_timer.IsActive())
{
double elapsedTime = last_interval - tx_timer.GetTimeRemaining();
if (nextInterval < elapsedTime)
tx_timer.SetInterval(0.0);
else
tx_timer.SetInterval(nextInterval - elapsedTime);
last_interval = nextInterval;
}
else
{
tx_timer.SetInterval(0.0);
timer_mgr.ActivateTimer(tx_timer);
last_interval = 0.0;
}
}
else
{
if (tx_timer.IsActive()) tx_timer.Deactivate();
if (MgenBaseEvent::SINK == protocol)
{
if (sink)
{
if (!sink->IsNotifying())
sink->StartNotifying(this);
tx_timer.SetInterval(0.0);
last_interval = 0.0;
}
else
{
DMSG(0, "MgenFlow::Update() error: SINK requires MgenSink attachment!\n");
return false;
}
}
else
{
DMSG(0, "MgenFlow::Update() error: Undefined packet rate (-1.0) for non-SINK flow!\n");
return false;
}
}
break;
}
case MgenEvent::OFF:
{
if (sink && sink->IsNotifying())
sink->StopNotifying();
if (tx_timer.IsActive())
tx_timer.Deactivate();
if (socket_item)
{
socket_item->Close();
socket_item = NULL;
socket = NULL;
}
break;
}
case MgenEvent::INVALID_TYPE:
ASSERT(0); // this should never occur
return false;
} // end switch(event->GetType())
return true;
} // end MgenFlow::Update()
bool MgenFlow::OnTxTimeout(ProtoTimer& /*theTimer*/)
{
MgenMsg theMsg;
unsigned short len = pattern.GetPktSize();
theMsg.SetMsgLen(len);
theMsg.SetFlowId(flow_id);
theMsg.SetSeqNum(seq_num++);
struct timeval currentTime;
ProtoSystemTime(currentTime);
theMsg.SetDstAddr(dst_addr);
theMsg.SetTxTime(currentTime);
if (host_addr.IsValid())
theMsg.SetHostAddr(host_addr);
// GPS info
theMsg.SetGPSStatus(MgenMsg::INVALID_GPS);
theMsg.SetGPSLatitude(999);
theMsg.SetGPSLongitude(999);
theMsg.SetGPSAltitude(-999);
#ifdef HAVE_GPS
if (get_position)
{
GPSPosition pos;
get_position(get_position_data, pos);
if (pos.xyvalid)
{
theMsg.SetGPSLatitude(pos.y);
theMsg.SetGPSLongitude(pos.x);
if (pos.zvalid)
theMsg.SetGPSAltitude((long)(pos.z+0.5));
if (pos.stale)
theMsg.SetGPSStatus(MgenMsg::STALE);
else
theMsg.SetGPSStatus(MgenMsg::CURRENT);
}
}
if (payload_handle)
{
unsigned char payloadLen = 0;
GPSGetMemory(payload_handle, 0, (char*)&payloadLen, 1);
if (payloadLen)
theMsg.SetPayload(GPSGetMemoryPtr(payload_handle, 1), payloadLen);
}
#endif // HAVE_GPS
char txBuffer[MgenMsg::MAX_SIZE];
len = theMsg.Pack(txBuffer, checksum_enable);
#ifdef HAVE_IPV6
if (ProtoAddress::IPv6 == dst_addr.GetType())
{
if (socket && !socket->SetFlowLabel(flow_label))
DMSG(0, "MgenFlow::OnTxTimeout canot set flow label\n");
}
#endif //HAVE_IPV6
// Send message, checking for error
// (log only on success)
bool success = false;
switch (protocol)
{
case MgenBaseEvent::UDP:
if (!socket->SendTo(txBuffer,len, dst_addr))
{
#ifndef _WIN32_WCE
if (EAGAIN != errno)
DMSG(0, "MgenFlow::OnTxTimeout() socket_item->SendTo() error: %s\n", GetErrorString());
else
#endif // !_WIN32_WCE
DMSG(1, "MgenFlow::OnTxTimeout() socket_item->SendTo() error: %s\n", GetErrorString());
}
else
{
success = true;
}
break;
case MgenBaseEvent::SINK:
success = sink->SendMgenMessage(txBuffer, len, dst_addr);
break;
default:
ASSERT(0); // should never occur
break;
}
if (!success)
{
#ifdef SIMULATE
DMSG(0, "MgenFlow::OnTxTimeout() error sending message.\n");
#endif // SIMULATE
// message was not sent, so sequence number is decremented back one
seq_num--;
}
else if (log_file)
{
theMsg.LogSendEvent(log_file, log_binary, txBuffer, log_flush);
}
double nextInterval = pattern.GetPktInterval();
if (nextInterval > 0.0) // normal scheduled transmission event
{
tx_timer.SetInterval(nextInterval);
if (sink && sink->IsNotifying()) sink->StopNotifying();
if (!tx_timer.IsActive())
{
timer_mgr.ActivateTimer(tx_timer);
last_interval = 0.0;
}
else
{
last_interval = nextInterval;
}
return true;
}
else if (nextInterval < 0.0) // Flow pattern rate is 0.0 message/sec
{
if(tx_timer.IsActive()) tx_timer.Deactivate();
last_interval = 0.0;
if (sink && sink->IsNotifying())
sink->StopNotifying();
return false;
}
else // (nextInterval == 0.0) // Flow pattern rate is undefined (SINK only for now)
{
if (tx_timer.IsActive()) tx_timer.Deactivate();
if (MgenBaseEvent::SINK == protocol)
{
if (sink)
{
// Trigger async output notification
if (!sink->IsNotifying())
{
sink->StartNotifying(this);
last_interval = 0.0;
}
}
else
{
DMSG(0, "MgenFlow::Update() error: SINK requires MgenSink attachment!\n");
}
}
else
{
DMSG(0, "MgenFlow::Update() error: Undefined packet rate (-1.0) for non-SINK flow!\n");
}
return false;
}
} // end MgenFlow::OnTxTimeout()
bool MgenFlow::IsActive() const
{
if (!started) return false;
if (next_event)
{
if (MgenEvent::ON != next_event->GetType())
return true;
else
return false;
}
else
{
const MgenEvent* lastEvent = (const MgenEvent*)event_list.Tail();
if (lastEvent && (MgenEvent::OFF != lastEvent->GetType()))
return true;
else
return false;
}
} // end MgenFlow::IsActive();
double MgenFlow::GetCurrentOffset() const
{
double currentOffset = -1.0;
if (started)
{
if (next_event)
{
currentOffset = next_event->GetTime() -
event_timer.GetTimeRemaining();
}
else
{
const MgenEvent* event = (const MgenEvent*)event_list.Tail();
currentOffset = event ? event->GetTime() : -1.0;
}
}
return currentOffset;
} // end MgenFlow::CurrentTime()
bool MgenFlow::OnEventTimeout(ProtoTimer& /*theTimer*/)
{
ASSERT(next_event);
// 1) Update flow as needed using "next_event"
Update(next_event);
// 2) Set (or kill) event_timer according to "next_event->next"
double currentTime = next_event->GetTime();
next_event = (MgenEvent*)next_event->Next();
if (next_event)
{
double nextInterval = next_event->GetTime() - currentTime;
nextInterval = nextInterval > 0.0 ? nextInterval : 0.0;
event_timer.SetInterval(nextInterval);
return true;
}
else
{
event_timer.Deactivate();
return false;
}
} // end MgenFlow::OnEventTimeout()
//////////////////////////////////////////////////////////////////
// MgenFlowList implementation
MgenFlowList::MgenFlowList()
: head(NULL), tail(NULL)
{
}
MgenFlowList::~MgenFlowList()
{
Destroy();
}
void MgenFlowList::Destroy()
{
MgenFlow* next = head;
while (next)
{
MgenFlow* current = next;
next = next->next;
delete current;
}
head = tail = (MgenFlow*)NULL;
} // end MgenFlowList::Destroy()
void MgenFlowList::Append(MgenFlow* theFlow)
{
theFlow->next = NULL;
if ((theFlow->prev = tail))
tail->next = theFlow;
else
head = theFlow;
tail = theFlow;
} // end MgenFlowList::Append()
void MgenFlowList::SetLogFile(FILE* filePtr, bool binary, bool flush)
{
MgenFlow* current = head;
while (current)
{
current->SetLogFile(filePtr, binary, flush);
current = current->next;
}
} // end MgenFlowList::SetLogFile()
void MgenFlowList::SetHostAddress(const ProtoAddress& hostAddr)
{
MgenFlow* current = head;
while (current)
{
current->SetHostAddress(hostAddr);
current = current->next;
}
} // end MgenFlowList::SetHostAddress()
void MgenFlowList::ClearHostAddress()
{
MgenFlow* current = head;
while (current)
{
current->ClearHostAddress();
current = current->next;
}
} // end MgenFlowList::ClearHostAddress()
void MgenFlowList::EnableChecksums()
{
MgenFlow* current = head;
while (current)
{
current->EnableChecksums();
current = current->next;
}
} // end MgenFlowList::SetLogFile()
void MgenFlowList::SetSink(MgenSink* theSink)
{
MgenFlow* current = head;
while (current)
{
current->SetSink(theSink);
current = current->next;
}
} // end MgenFlowList::SetSink()
//Set the default IPv6 flow label.
#ifdef HAVE_IPV6
void MgenFlowList::SetDefaultLabel(UINT32 label)
{
MgenFlow* current = head;
while (current)
{
current->SetLabel(label);
current = current->next;
}
} // MgenFlowList::SetDefaultLabel()
#endif //HAVE_IPV6
MgenFlow* MgenFlowList::FindFlowById(unsigned int flowId)
{
MgenFlow* prev = tail;
while (prev)
{
if (flowId == prev->flow_id) return prev;
prev = prev->prev;
}
return (MgenFlow*)NULL;
} // end MgenFlowList::FindFlowById()
bool MgenFlowList::Start(double offsetTime)
{
bool result = false;
MgenFlow* next = head;
while (next)
{
result |= next->Start(offsetTime);
next = next->next;
}
return result;
} // end MgenFlowList::Start()
double MgenFlowList::GetCurrentOffset() const
{
double currentOffset = -1.0;
MgenFlow* next = head;
while (next)
{
if (next->event_timer.IsActive())
{
return next->GetCurrentOffset();
}
else
{
double nextOffset = next->GetCurrentOffset();
if (nextOffset > currentOffset)
currentOffset = nextOffset;
}
next = next->next;
}
return currentOffset;
} // end MgenFlowList::GetCurrentOffset()
bool MgenFlowList::SaveFlowSequences(FILE* file) const
{
MgenFlow* next = head;
while (next)
{
if (next->IsActive() || (NULL != next->next_event))
{
double offset;
if (next->IsActive())
{
offset = next->GetCurrentOffset();
}
else
{
offset = next->next_event->GetTime();
}
fprintf(file, "%f MOD %lu SEQUENCE %lu\n",
offset, next->flow_id, next->seq_num);
}
next = next->next;
}
return true;
} // end MgenFlowList::SaveFlowState()
syntax highlighted by Code2HTML, v. 0.9.1