#include "mgenFlow.h" #include "mgenMsg.h" #include "mgen.h" #include // for gmtime(), struct tm, etc #ifndef _WIN32_WCE #include // for EAGAIN #endif // !_WIN32_WCE MgenSink::MgenSink() : notify_flow(NULL) { } MgenSink::~MgenSink() { } MgenFlow::MgenFlow(unsigned int flowId, ProtoTimerMgr& timerMgr, MgenSocketList& socketList, unsigned long defaultV6Label) : log_file(NULL), log_binary(false), log_flush(false), checksum_enable(false), flow_id(flowId), flow_label(defaultV6Label), socket_item(NULL), socket(NULL), sink(NULL), seq_num(0), next_event(NULL), started(false), timer_mgr(timerMgr), socket_list(socketList) { tx_timer.SetListener(this, &MgenFlow::OnTxTimeout); tx_timer.SetInterval(1.0); tx_timer.SetRepeat(-1); event_timer.SetListener(this, &MgenFlow::OnEventTimeout); event_timer.SetInterval(1.0); event_timer.SetRepeat(-1); } MgenFlow::~MgenFlow() { if (event_timer.IsActive()) event_timer.Deactivate(); if (tx_timer.IsActive()) tx_timer.Deactivate(); event_list.Destroy(); if (socket_item) socket_item->Close(); } bool MgenFlow::InsertEvent(MgenEvent* theEvent, bool mgenStarted, double currentTime) { double eventTime = theEvent->GetTime(); if (mgenStarted) { // Process "immediate" events or enqueue "scheduled" events if (eventTime < currentTime) { theEvent->SetTime(currentTime); event_list.Precede(next_event, theEvent); if (ValidateEvent(theEvent)) { Update(theEvent); } else { event_list.Remove(theEvent); return false; } } else { event_list.Insert(theEvent); if (!ValidateEvent(theEvent)) { event_list.Remove(theEvent); return false; } // Activate/reschedule "event_timer" as needed if (event_timer.IsActive()) { double nextTime = next_event->GetTime(); if (eventTime < nextTime) { next_event = theEvent; event_timer.SetInterval(eventTime - currentTime); event_timer.Reschedule(); } } else { next_event = theEvent; event_timer.SetInterval(eventTime - currentTime); timer_mgr.ActivateTimer(event_timer); } } } else { eventTime = eventTime > 0.0 ? eventTime : 0.0; theEvent->SetTime(eventTime); event_list.Insert(theEvent); if (!ValidateEvent(theEvent)) { event_list.Remove(theEvent); return false; } } return true; } // end MgenFlow::InsertEvent() bool MgenFlow::ValidateEvent(const MgenEvent* event) { // Validate the event by it's position in the list with respect to its neighbor types const MgenEvent* prevEvent = (MgenEvent*)event->Prev(); MgenEvent::Type prevType = prevEvent ? prevEvent->GetType() : MgenEvent::INVALID_TYPE; switch (event->GetType()) { case MgenEvent::ON: if ((MgenEvent::MOD == prevType) || (MgenEvent::ON == prevType)) { DMSG(0, "MgenFlow::InsertEvent() inappropriate ON event.\n"); return false; } break; case MgenEvent::MOD: if ((MgenEvent::OFF == prevType) || (MgenEvent::INVALID_TYPE == prevType)) { DMSG(0, "MgenFlow::InsertEvent() inappropriate MOD event.\n"); return false; } break; case MgenEvent::OFF: if ((MgenEvent::OFF == prevType) || (MgenEvent::INVALID_TYPE == prevType)) { DMSG(0, "MgenFlow::InsertEvent() inappropriate OFF event.\n"); return false; } break; case MgenEvent::INVALID_TYPE: ASSERT(0); // this should never occur return false; } // end switch(event->GetType()) return true; } // end MgenFlow::ValidateEvent() bool MgenFlow::Start(double offsetTime) { // Defer actual start until "offsetTime", but process events // so flow state is appropriate for given offset. MgenEvent* nextEvent = (MgenEvent*)event_list.Head(); if (!nextEvent) { DMSG(0, "MgenFlow::Start() flow with empty event list!\n"); return false; } while (nextEvent) { if (nextEvent->GetTime() <= offsetTime) { Update(nextEvent); nextEvent = (MgenEvent*)nextEvent->Next(); } else { break; } } if ((next_event = nextEvent)) { double currentTime = (offsetTime > 0.0) ? offsetTime : 0.0; double nextInterval = next_event->GetTime() - currentTime; nextInterval = nextInterval > 0.0 ? nextInterval : 0.0; event_timer.SetInterval(nextInterval); timer_mgr.ActivateTimer(event_timer); started = true; return true; } started = true; return true; } // end MgenFlow::Start() bool MgenFlow::Update(const MgenEvent* event) { switch (event->GetType()) { case MgenEvent::ON: { ASSERT(!tx_timer.IsActive()); protocol = event->GetProtocol(); src_port = (event->OptionIsSet(MgenEvent::SRC)) ? event->GetSrcPort() : 0; switch (protocol) { case MgenBaseEvent::UDP: socket_item = socket_list.GetItem(ProtoSocket::UDP, src_port); if (!socket_item) { DMSG(0, "MgenFlow::Update() Error: unable to get socket item.\n"); return false; } // This increments socketItem reference_count, opens socket as needed // note that socket must be opened as IPv4 or IPv6 type if (!socket_item->Open(event->GetDstAddr().GetType(), true)) { DMSG(0, "MgenFlow::Update() Error: socket open error.\n"); return false; } socket = &socket_item->GetSocket(); break; case MgenBaseEvent::TCP: DMSG(0, "MgenFlow::Update() TCP not quite yet supported (almost).\n"); return false; case MgenBaseEvent::SINK: if (!sink) { DMSG(0, "MgenFlow::Update() SINK requires MgenSink attachment.\n"); return false; } break; default: DMSG(0, "MgenFlow::Update() invalid protocol type.\n"); return false; } if (host_addr.IsValid() && socket) host_addr.SetPort(socket->GetPort()); // Note the lack of "break" here is _intentional_ } case MgenEvent::MOD: { // Generic MGEN flow options if (event->OptionIsSet(MgenEvent::DST)) dst_addr = event->GetDstAddr(); if (event->OptionIsSet(MgenEvent::PATTERN)) pattern = event->GetPattern(); if (event->OptionIsSet(MgenEvent::SEQUENCE)) seq_num = event->GetSequence(); if (event->OptionIsSet(MgenEvent::LABEL)) flow_label = event->GetFlowLabel(); // Socket-specific MGEN flow options if (socket_item) { if (event->OptionIsSet(MgenEvent::TXBUFFER)) { if (!socket_item->SetTxBufferSize(event->GetTxBuffer())) DMSG(0, "MgenFlow::Update() error setting socket tx buffer\n"); } if (event->OptionIsSet(MgenEvent::TOS)) { if (!socket_item->SetTOS(event->GetTOS())) DMSG(0, "MgenFlow::Update() error setting socket TOS value\n"); } if (event->OptionIsSet(MgenEvent::TTL)) { //TRACE("setting socket ttl: %d\n", event->GetTTL()); if (!socket_item->SetTTL(event->GetTTL())) DMSG(0, "MgenFlow::Update() error setting socket multicast interface\n"); } if (event->OptionIsSet(MgenEvent::INTERFACE)) { if (!socket_item->SetMulticastInterface(event->GetInterface())) DMSG(0, "MgenFlow::Update() error setting socket multicast interface\n"); } } // Schedule tx_timer as needed double nextInterval = pattern.GetPktInterval(); if (nextInterval < 0.0) // 0.0 pkt/sec transmission rate { if (tx_timer.IsActive()) tx_timer.Deactivate(); else if (sink && sink->IsNotifying()) sink->StopNotifying(); } else if (nextInterval > 0.0) { if (sink && sink->IsNotifying()) sink->StopNotifying(); if (tx_timer.IsActive()) { double elapsedTime = last_interval - tx_timer.GetTimeRemaining(); if (nextInterval < elapsedTime) tx_timer.SetInterval(0.0); else tx_timer.SetInterval(nextInterval - elapsedTime); last_interval = nextInterval; } else { tx_timer.SetInterval(0.0); timer_mgr.ActivateTimer(tx_timer); last_interval = 0.0; } } else { if (tx_timer.IsActive()) tx_timer.Deactivate(); if (MgenBaseEvent::SINK == protocol) { if (sink) { if (!sink->IsNotifying()) sink->StartNotifying(this); tx_timer.SetInterval(0.0); last_interval = 0.0; } else { DMSG(0, "MgenFlow::Update() error: SINK requires MgenSink attachment!\n"); return false; } } else { DMSG(0, "MgenFlow::Update() error: Undefined packet rate (-1.0) for non-SINK flow!\n"); return false; } } break; } case MgenEvent::OFF: { if (sink && sink->IsNotifying()) sink->StopNotifying(); if (tx_timer.IsActive()) tx_timer.Deactivate(); if (socket_item) { socket_item->Close(); socket_item = NULL; socket = NULL; } break; } case MgenEvent::INVALID_TYPE: ASSERT(0); // this should never occur return false; } // end switch(event->GetType()) return true; } // end MgenFlow::Update() bool MgenFlow::OnTxTimeout(ProtoTimer& /*theTimer*/) { MgenMsg theMsg; unsigned short len = pattern.GetPktSize(); theMsg.SetMsgLen(len); theMsg.SetFlowId(flow_id); theMsg.SetSeqNum(seq_num++); struct timeval currentTime; ProtoSystemTime(currentTime); theMsg.SetDstAddr(dst_addr); theMsg.SetTxTime(currentTime); if (host_addr.IsValid()) theMsg.SetHostAddr(host_addr); // GPS info theMsg.SetGPSStatus(MgenMsg::INVALID_GPS); theMsg.SetGPSLatitude(999); theMsg.SetGPSLongitude(999); theMsg.SetGPSAltitude(-999); #ifdef HAVE_GPS if (get_position) { GPSPosition pos; get_position(get_position_data, pos); if (pos.xyvalid) { theMsg.SetGPSLatitude(pos.y); theMsg.SetGPSLongitude(pos.x); if (pos.zvalid) theMsg.SetGPSAltitude((long)(pos.z+0.5)); if (pos.stale) theMsg.SetGPSStatus(MgenMsg::STALE); else theMsg.SetGPSStatus(MgenMsg::CURRENT); } } if (payload_handle) { unsigned char payloadLen = 0; GPSGetMemory(payload_handle, 0, (char*)&payloadLen, 1); if (payloadLen) theMsg.SetPayload(GPSGetMemoryPtr(payload_handle, 1), payloadLen); } #endif // HAVE_GPS char txBuffer[MgenMsg::MAX_SIZE]; len = theMsg.Pack(txBuffer, checksum_enable); #ifdef HAVE_IPV6 if (ProtoAddress::IPv6 == dst_addr.GetType()) { if (socket && !socket->SetFlowLabel(flow_label)) DMSG(0, "MgenFlow::OnTxTimeout canot set flow label\n"); } #endif //HAVE_IPV6 // Send message, checking for error // (log only on success) bool success = false; switch (protocol) { case MgenBaseEvent::UDP: if (!socket->SendTo(txBuffer,len, dst_addr)) { #ifndef _WIN32_WCE if (EAGAIN != errno) DMSG(0, "MgenFlow::OnTxTimeout() socket_item->SendTo() error: %s\n", GetErrorString()); else #endif // !_WIN32_WCE DMSG(1, "MgenFlow::OnTxTimeout() socket_item->SendTo() error: %s\n", GetErrorString()); } else { success = true; } break; case MgenBaseEvent::SINK: success = sink->SendMgenMessage(txBuffer, len, dst_addr); break; default: ASSERT(0); // should never occur break; } if (!success) { #ifdef SIMULATE DMSG(0, "MgenFlow::OnTxTimeout() error sending message.\n"); #endif // SIMULATE // message was not sent, so sequence number is decremented back one seq_num--; } else if (log_file) { theMsg.LogSendEvent(log_file, log_binary, txBuffer, log_flush); } double nextInterval = pattern.GetPktInterval(); if (nextInterval > 0.0) // normal scheduled transmission event { tx_timer.SetInterval(nextInterval); if (sink && sink->IsNotifying()) sink->StopNotifying(); if (!tx_timer.IsActive()) { timer_mgr.ActivateTimer(tx_timer); last_interval = 0.0; } else { last_interval = nextInterval; } return true; } else if (nextInterval < 0.0) // Flow pattern rate is 0.0 message/sec { if(tx_timer.IsActive()) tx_timer.Deactivate(); last_interval = 0.0; if (sink && sink->IsNotifying()) sink->StopNotifying(); return false; } else // (nextInterval == 0.0) // Flow pattern rate is undefined (SINK only for now) { if (tx_timer.IsActive()) tx_timer.Deactivate(); if (MgenBaseEvent::SINK == protocol) { if (sink) { // Trigger async output notification if (!sink->IsNotifying()) { sink->StartNotifying(this); last_interval = 0.0; } } else { DMSG(0, "MgenFlow::Update() error: SINK requires MgenSink attachment!\n"); } } else { DMSG(0, "MgenFlow::Update() error: Undefined packet rate (-1.0) for non-SINK flow!\n"); } return false; } } // end MgenFlow::OnTxTimeout() bool MgenFlow::IsActive() const { if (!started) return false; if (next_event) { if (MgenEvent::ON != next_event->GetType()) return true; else return false; } else { const MgenEvent* lastEvent = (const MgenEvent*)event_list.Tail(); if (lastEvent && (MgenEvent::OFF != lastEvent->GetType())) return true; else return false; } } // end MgenFlow::IsActive(); double MgenFlow::GetCurrentOffset() const { double currentOffset = -1.0; if (started) { if (next_event) { currentOffset = next_event->GetTime() - event_timer.GetTimeRemaining(); } else { const MgenEvent* event = (const MgenEvent*)event_list.Tail(); currentOffset = event ? event->GetTime() : -1.0; } } return currentOffset; } // end MgenFlow::CurrentTime() bool MgenFlow::OnEventTimeout(ProtoTimer& /*theTimer*/) { ASSERT(next_event); // 1) Update flow as needed using "next_event" Update(next_event); // 2) Set (or kill) event_timer according to "next_event->next" double currentTime = next_event->GetTime(); next_event = (MgenEvent*)next_event->Next(); if (next_event) { double nextInterval = next_event->GetTime() - currentTime; nextInterval = nextInterval > 0.0 ? nextInterval : 0.0; event_timer.SetInterval(nextInterval); return true; } else { event_timer.Deactivate(); return false; } } // end MgenFlow::OnEventTimeout() ////////////////////////////////////////////////////////////////// // MgenFlowList implementation MgenFlowList::MgenFlowList() : head(NULL), tail(NULL) { } MgenFlowList::~MgenFlowList() { Destroy(); } void MgenFlowList::Destroy() { MgenFlow* next = head; while (next) { MgenFlow* current = next; next = next->next; delete current; } head = tail = (MgenFlow*)NULL; } // end MgenFlowList::Destroy() void MgenFlowList::Append(MgenFlow* theFlow) { theFlow->next = NULL; if ((theFlow->prev = tail)) tail->next = theFlow; else head = theFlow; tail = theFlow; } // end MgenFlowList::Append() void MgenFlowList::SetLogFile(FILE* filePtr, bool binary, bool flush) { MgenFlow* current = head; while (current) { current->SetLogFile(filePtr, binary, flush); current = current->next; } } // end MgenFlowList::SetLogFile() void MgenFlowList::SetHostAddress(const ProtoAddress& hostAddr) { MgenFlow* current = head; while (current) { current->SetHostAddress(hostAddr); current = current->next; } } // end MgenFlowList::SetHostAddress() void MgenFlowList::ClearHostAddress() { MgenFlow* current = head; while (current) { current->ClearHostAddress(); current = current->next; } } // end MgenFlowList::ClearHostAddress() void MgenFlowList::EnableChecksums() { MgenFlow* current = head; while (current) { current->EnableChecksums(); current = current->next; } } // end MgenFlowList::SetLogFile() void MgenFlowList::SetSink(MgenSink* theSink) { MgenFlow* current = head; while (current) { current->SetSink(theSink); current = current->next; } } // end MgenFlowList::SetSink() //Set the default IPv6 flow label. #ifdef HAVE_IPV6 void MgenFlowList::SetDefaultLabel(UINT32 label) { MgenFlow* current = head; while (current) { current->SetLabel(label); current = current->next; } } // MgenFlowList::SetDefaultLabel() #endif //HAVE_IPV6 MgenFlow* MgenFlowList::FindFlowById(unsigned int flowId) { MgenFlow* prev = tail; while (prev) { if (flowId == prev->flow_id) return prev; prev = prev->prev; } return (MgenFlow*)NULL; } // end MgenFlowList::FindFlowById() bool MgenFlowList::Start(double offsetTime) { bool result = false; MgenFlow* next = head; while (next) { result |= next->Start(offsetTime); next = next->next; } return result; } // end MgenFlowList::Start() double MgenFlowList::GetCurrentOffset() const { double currentOffset = -1.0; MgenFlow* next = head; while (next) { if (next->event_timer.IsActive()) { return next->GetCurrentOffset(); } else { double nextOffset = next->GetCurrentOffset(); if (nextOffset > currentOffset) currentOffset = nextOffset; } next = next->next; } return currentOffset; } // end MgenFlowList::GetCurrentOffset() bool MgenFlowList::SaveFlowSequences(FILE* file) const { MgenFlow* next = head; while (next) { if (next->IsActive() || (NULL != next->next_event)) { double offset; if (next->IsActive()) { offset = next->GetCurrentOffset(); } else { offset = next->next_event->GetTime(); } fprintf(file, "%f MOD %lu SEQUENCE %lu\n", offset, next->flow_id, next->seq_num); } next = next->next; } return true; } // end MgenFlowList::SaveFlowState()