#include "mgen.h"
#include "mgenMsg.h"
#include "mgenVersion.h"
#include "mgenEvent.h"
#include <string.h>
#include <stdio.h>
#include <time.h> // for gmtime()
#include <ctype.h> // for toupper()
#ifdef UNIX
#include <sys/types.h> // for stat
#include <sys/stat.h> // for stat
#include <unistd.h> // for stat
#endif
// For WinCE, we provide an option logging function
// to get log output to our debug window
#ifndef _WIN32_WCE
#include <errno.h>
Mgen::LogFunction Mgen::Log = fprintf;
#else
Mgen::LogFunction Mgen::Log = fprintf;
// This function is used on our WinCE build where there is no
// real "stdout" for default MGEN output
int Mgen::LogToDebug(FILE* /*filePtr*/, const char* format, ...)
{
va_list args;
va_start(args, format);
char charBuffer[2048];
charBuffer[2048] = '\0';
int count = _vsnprintf(charBuffer, 2047, format, args);
TRACE("%s", charBuffer);
va_end(args);
return count;
} // end Mgen::LogToDebug()
#endif // if/else !_WIN32_WCE
Mgen::Mgen(ProtoTimerMgr& timerMgr,
ProtoSocket::Notifier& socketNotifier)
: controller(NULL), timer_mgr(timerMgr), sink(NULL),
log_file(NULL), log_binary(false), log_flush(false),
log_file_lock(false), log_tx(false), log_open(false), log_empty(true),
save_path(NULL), save_path_lock(false), started(false),
start_hour(0), start_min(0), start_sec(-1.0),
start_gmt(false), start_time_lock(false),
offset(-1.0), offset_lock(false), offset_pending(false),
default_flow_label(0), default_label_lock(false),
checksum_enable(false), addr_type(ProtoAddress::IPv4), checksum_force(false),
get_position(NULL), get_position_data(NULL)
{
socket_list.SetSocketNotifier(&socketNotifier);
start_timer.SetListener(this, &Mgen::OnStartTimeout);
start_timer.SetInterval(0.0);
start_timer.SetRepeat(0);
drec_event_timer.SetListener(this, &Mgen::OnDrecEventTimeout);
drec_event_timer.SetInterval(0.0);
drec_event_timer.SetRepeat(-1);
}
Mgen::~Mgen()
{
Stop();
if (save_path) delete save_path;
}
bool Mgen::Start()
{
#ifdef HAVE_IPV6
//Set default IPv6 flow labels for each flow
flow_list.SetDefaultLabel(default_flow_label);
#endif //HAVE_IPV6
struct timeval currentTime;
ProtoSystemTime(currentTime);
if (start_sec < 0.0)
{
// Start immediately
if (log_file)
{
// Log START event
if (log_binary)
{
char buffer[128];
if (log_empty)
{
// write binary log header line plus NULL character
strcpy(buffer, "mgen version=");
strcat(buffer, MGEN_VERSION);
strcat(buffer, " type=binary_log\n");
fwrite(buffer, sizeof(char), strlen(buffer)+1, log_file);
}
unsigned int index = 0;
// set "eventType" field
buffer[index++] = (char)START_EVENT;
// zero "reserved" field
buffer[index++] = 0;
// set "recordLength" field
unsigned short temp16 = htons(8);
memcpy(buffer+index, &temp16, sizeof(short));
index += sizeof(short);
// set "eventTime" fields
unsigned long temp32 = htonl(currentTime.tv_sec);
memcpy(buffer+index, &temp32, sizeof(long));
index += sizeof(long);
temp32 = htonl(currentTime.tv_usec);
memcpy(buffer+index, &temp32, sizeof(long));
index += sizeof(long);
// write the record
if (fwrite(buffer, sizeof(char), index, log_file) < index)
{
DMSG(0, "Mgen::Start() fwrite() error: %s\n", GetErrorString());
}
}
else
{
#ifdef _WIN32_WCE
struct tm timeStruct;
timeStruct.tm_hour = currentTime.tv_sec / 3600;
unsigned long hourSecs = 3600 * timeStruct.tm_hour;
timeStruct.tm_min = (currentTime.tv_sec - (hourSecs)) / 60;
timeStruct.tm_sec = currentTime.tv_sec - (hourSecs) - (60*timeStruct.tm_min);
timeStruct.tm_hour = timeStruct.tm_hour % 24;
struct tm* timePtr = &timeStruct;
#else
struct tm* timePtr = gmtime((time_t*)¤tTime.tv_sec);
#endif // if/else _WIN32_WCE
Mgen::Log(log_file, "%02d:%02d:%02d.%06lu START\n",
timePtr->tm_hour, timePtr->tm_min,
timePtr->tm_sec, (unsigned long)currentTime.tv_usec);
}
if (log_empty) log_empty = false;
fflush(log_file);
}
// Activate transmit flows according to "offset" time
flow_list.Start(offset);
// Pre-process drec events occurring prior to "offset" time
DrecEvent* nextEvent = (DrecEvent*)drec_event_list.Head();
offset_pending = true;
while (nextEvent)
{
if (nextEvent->GetTime() <= offset)
{
ProcessDrecEvent(*nextEvent);
}
else
{
double currentTime = offset > 0.0 ? offset : 0.0;
drec_event_timer.SetInterval(nextEvent->GetTime() - currentTime);
timer_mgr.ActivateTimer(drec_event_timer);
break;
}
nextEvent = (DrecEvent*)nextEvent->Next();
}
offset_pending = false;
next_drec_event = nextEvent;
// Activate any group joins deferred during "offset" processing.
drec_group_list.JoinDeferredGroups(socket_list);
}
else // Schedule absolute start time
{
// Make sure there are pending events
if(!drec_event_list.IsEmpty() || !flow_list.IsEmpty())
{
// Calculate start time delta and schedule start_timer
// (can delay start up to 12 hours into future)
#ifdef _WIN32_WCE
DMSG(0, "Mgen::Start() warning: WinCE absolute start time might not be correct!\n");
unsigned long hours = currentTime.tv_sec / 3600;
unsigned long minutes = (currentTime.tv_sec - (3600*hours)) / 60;
unsigned long seconds = currentTime.tv_sec - (3600*hours) - (60*minutes);
hours = hours % 24;
double nowSec = hours*3600 + minutes*60 + seconds;
#else
struct tm now;
if (start_gmt)
memcpy(&now, gmtime((time_t*)¤tTime.tv_sec), sizeof(struct tm));
else
memcpy(&now, localtime((time_t*)¤tTime.tv_sec), sizeof(struct tm));
double nowSec = now.tm_hour*3600 + now.tm_min*60 + now.tm_sec;
#endif // if/else _WIN32_WCE
double startSec = start_hour*3600 + start_min*60 + start_sec;
double delta = startSec - nowSec;
if (delta < 0.0) delta += (24*3600);
if (delta > (12*3600))
{
DMSG(0, "Mgen::Start() Error: Specified start time has already elapsed\n");
return false;
}
start_timer.SetInterval(delta);
timer_mgr.ActivateTimer(start_timer);
}
}
started = true;
return true;
} // end Mgen::Start()
void Mgen::Stop()
{
if (started)
{
if (log_file)
{
// Log STOP event
struct timeval currentTime;
ProtoSystemTime(currentTime);
if (log_binary)
{
unsigned int index = 0;
char buffer[128];
//set "eventType" field
buffer[index++] = (char)Mgen::STOP_EVENT;
// zero "reserved" field
buffer[index++] = 0;
// set "recordLength" field
unsigned short temp16 = htons(8);
memcpy(buffer+index, &temp16, sizeof(short));
index += sizeof(short);
// set "eventTime" fields
unsigned long temp32 = htonl(currentTime.tv_sec);
memcpy(buffer+index, &temp32, sizeof(long));
index += sizeof(long);
temp32 = htonl(currentTime.tv_usec);
memcpy(buffer+index, &temp32, sizeof(long));
index += sizeof(long);
// write the record
if (fwrite(buffer, sizeof(char), index, log_file) < index)
DMSG(0, "Mgen::Stop() fwrite error: %s\n", GetErrorString());
}
else
{
#ifdef _WIN32_WCE
struct tm timeStruct;
timeStruct.tm_hour = currentTime.tv_sec / 3600;
unsigned long hourSecs = 3600 * timeStruct.tm_hour;
timeStruct.tm_min = (currentTime.tv_sec - hourSecs) / 60;
timeStruct.tm_sec = currentTime.tv_sec - hourSecs - (60*timeStruct.tm_min);
timeStruct.tm_hour = timeStruct.tm_hour % 24;
struct tm* timePtr = &timeStruct;
#else
struct tm* timePtr = gmtime((time_t*)¤tTime.tv_sec);
#endif // if/else _WIN32_WCE
Mgen::Log(log_file, "%02d:%02d:%02d.%06lu STOP\n",
timePtr->tm_hour, timePtr->tm_min,
timePtr->tm_sec, (unsigned long)currentTime.tv_usec);
} // end if/else(log_binary)
fflush(log_file);
if ((log_file != stdout) && (stderr != log_file))
{
fclose(log_file);
log_file = NULL;
}
} // end if(log_file)
// Save current offset and pending flow sequence state
if (save_path)
{
FILE* saveFile = fopen(save_path, "w+");
if (saveFile)
{
flow_list.SaveFlowSequences(saveFile);
fprintf(saveFile, "OFFSET %f\n", GetCurrentOffset());
fclose(saveFile);
}
else
{
DMSG(0, "Mgen::Stop() error opening save file: %s\n", GetErrorString());
}
}
started = false;
} // end if (started)
if (start_timer.IsActive()) start_timer.Deactivate();
flow_list.Destroy();
if (drec_event_timer.IsActive()) drec_event_timer.Deactivate();
drec_event_list.Destroy();
drec_group_list.Destroy(socket_list);
socket_list.Destroy();
} // end Mgen::Stop()
bool Mgen::ConvertBinaryLog(const char* path)
{
if (NULL == log_file) return false;
FILE* file = fopen(path, "rb");
if (!file)
{
DMSG(0, "Mgen::ConvertFile() fopen() Error: %s\n", GetErrorString());
return false;
}
const unsigned int BINARY_RECORD_MAX = 1024; // maximum record size
char buffer[BINARY_RECORD_MAX];
// Read ASCII binary log header line
// The first four characters should be "mgen"
if (fread(buffer, sizeof(char), 4, file) < 4)
{
DMSG(0, "Mgen::ConvertBinaryLog() fread() error: %s\n", GetErrorString());
return false;
}
if (strncmp("mgen", buffer, 4) != 0)
{
DMSG(0, "Mgen::ConvertBinaryLog() error: invalid mgen log file\n");
return false;
}
// Read remainder of header line, including terminating NULL
unsigned int index = 3;
while ('\0' != buffer[index])
{
index++;
if (fread(buffer+index, sizeof(char), 1, file) < 1)
{
DMSG(0, "Mgen::ConvertBinaryLog() fread() error: %s\n", GetErrorString());
return false;
}
}
// Confirm log file "version" and "type"
char* ptr = strstr(buffer, "version=");
if (ptr)
{
// Just look at major version number for moment
int version;
if (1 == sscanf(ptr, "version=%d", &version))
{
if (version != 4)
{
DMSG(0, "Mgen::ConvertBinaryLog() invalid log file version\n");
return false;
}
}
else
{
DMSG(0, "Mgen::ConvertBinaryLog() error finding log \"version\" value\n");
return false;
}
}
else
{
DMSG(0, "Mgen::ConvertBinaryLog() error finding log \"version\" label\n");
return false;
}
ptr = strstr(ptr, "type=");
if (ptr)
{
char fileType[128];
if (1 == sscanf(ptr, "type=%s", fileType))
{
if (strcmp(fileType, "binary_log"))
{
DMSG(0, "Mgen::ConvertBinaryLog() invalid log file type\n");
return false;
}
}
else
{
DMSG(0, "Mgen::ConvertBinaryLog() error finding log \"type\" value\n");
return false;
}
}
else
{
DMSG(0, "Mgen::ConvertBinaryLog() error finding log \"type\" label\n");
return false;
}
// Now loop, reading in records one by one, and output text format
while (1)
{
// 1) Read record header (4 bytes)
char header[4];
if (fread(header, sizeof(char), 4, file) < 4)
{
if (feof(file))
{
break;
}
else
{
DMSG(0, "Mgen::ConvertBinaryLog() fread() error: %s\n", GetErrorString());
fclose(file);
return false;
}
}
LogEvent eventType = (LogEvent)header[0];
unsigned short recordLength;
memcpy(&recordLength, header+2, sizeof(short));
recordLength = ntohs(recordLength);
if (recordLength > BINARY_RECORD_MAX)
{
DMSG(0, "Mgen::ConvertBinaryLog() record len:%hu exceeds maximum length\n", recordLength);
fclose(file);
return false;
}
// 2) Read the entire record
if (fread(buffer, sizeof(char), recordLength, file) < recordLength)
{
DMSG(0, "Mgen::ConvertBinaryLog() fread() error: %s\n", GetErrorString());
fclose(file);
return false;
}
// 3) Convert the record to text
unsigned int index = 0;
switch (eventType)
{
case RECV_EVENT:
{
// get "eventTime"
struct timeval eventTime;
unsigned long temp32;
memcpy(&temp32, buffer+index, sizeof(long));
eventTime.tv_sec = ntohl(temp32);
index += sizeof(long);
memcpy(&temp32, buffer+index, sizeof(long));
eventTime.tv_usec = ntohl(temp32);
index += sizeof(long);
// get "srcPort"
unsigned short temp16;
memcpy(&temp16, buffer+index, sizeof(short));
unsigned short srcPort = ntohs(temp16);
index += sizeof(short);
// get "srcAddrType"
ProtoAddress::Type addrType;
switch (buffer[index++])
{
case MgenMsg::IPv4:
addrType = ProtoAddress::IPv4;
break;
case MgenMsg::IPv6:
addrType = ProtoAddress::IPv6;
break;
default:
DMSG(0, "Mgen::ConvertBinaryLog() unknown source address type:%d\n",
buffer[index-1]);
fclose(file);
return false;
}
// get "srcAddrLen"
unsigned int addrLen = (unsigned int)buffer[index++];
ProtoAddress srcAddr;
// get "srcAddr"
srcAddr.SetRawHostAddress(addrType, buffer+index, addrLen);
index += addrLen;
srcAddr.SetPort(srcPort);
// The remainder of the record corresponds to the message content
MgenMsg msg;
msg.Unpack(buffer+index, recordLength - index, false);
msg.LogRecvEvent(log_file, eventTime, srcAddr, false, NULL, log_flush);
break;
}
case SEND_EVENT:
{
MgenMsg msg;
msg.Unpack(buffer, recordLength, false);
msg.LogSendEvent(log_file, false, NULL, log_flush);
break;
}
case LISTEN_EVENT:
case IGNORE_EVENT:
{
const char* eventName = (LISTEN_EVENT == eventType) ? "LISTEN" : "IGNORE";
unsigned int index = 0;
// get "eventTime"
struct timeval eventTime;
unsigned long temp32;
memcpy(&temp32, buffer+index, sizeof(long));
eventTime.tv_sec = ntohl(temp32);
index += sizeof(long);
memcpy(&temp32, buffer+index, sizeof(long));
eventTime.tv_usec = ntohl(temp32);
index += sizeof(long);
// get "protocol"
const char* protoName =
MgenBaseEvent::GetStringFromProtocol((MgenBaseEvent::Protocol)buffer[index++]);
// skip "reserved" field
index++;
// get "portNumber"
unsigned short temp16;
memcpy(&temp16, buffer+index, sizeof(short));
unsigned short portNumber = ntohs(temp16);
// Output text log format
#ifdef _WIN32_WCE
struct tm timeStruct;
timeStruct.tm_hour = eventTime.tv_sec / 3600;
unsigned long hourSecs = 3600 * timeStruct.tm_hour;
timeStruct.tm_min = (eventTime.tv_sec - hourSecs) / 60;
timeStruct.tm_sec = eventTime.tv_sec - hourSecs - (60*timeStruct.tm_min);
timeStruct.tm_hour = timeStruct.tm_hour % 24;
struct tm* timePtr = &timeStruct;
#else
struct tm* timePtr = gmtime((time_t*)&eventTime.tv_sec);
#endif // if/else _WIN32_WCE
Mgen::Log(log_file, "%02d:%02d:%02d.%06lu %s proto>%s port>%hu\n",
timePtr->tm_hour, timePtr->tm_min, timePtr->tm_sec,
(unsigned long)eventTime.tv_usec, eventName,
protoName, portNumber);
break;
}
case JOIN_EVENT:
case LEAVE_EVENT:
{
unsigned int index = 0;
// get "eventTime"
struct timeval eventTime;
unsigned long temp32;
memcpy(&temp32, buffer+index, sizeof(long));
eventTime.tv_sec = ntohl(temp32);
index += sizeof(long);
memcpy(&temp32, buffer+index, sizeof(long));
eventTime.tv_usec = ntohl(temp32);
index += sizeof(long);
// get "groupPort"
unsigned short temp16;
memcpy(&temp16, buffer+index, sizeof(short));
unsigned short groupPort = ntohs(temp16);
index += sizeof(short);
// get "groupAddrType"
ProtoAddress::Type addrType;
switch (buffer[index++])
{
case MgenMsg::IPv4:
addrType = ProtoAddress::IPv4;
break;
case MgenMsg::IPv6:
addrType = ProtoAddress::IPv6;
break;
default:
DMSG(0, "Mgen::ConvertBinaryLog() unknown source address type\n");
fclose(file);
return false;
}
// get "groupAddrLen"
unsigned int addrLen = (unsigned int)buffer[index++];
ProtoAddress groupAddr;
// get "groupAddr"
groupAddr.SetRawHostAddress(addrType, buffer+index, addrLen);
index += addrLen;
// get "ifaceNameLen"
unsigned int ifaceNameLen = buffer[index++];
char ifaceName[128];
memcpy(ifaceName, buffer+index, ifaceNameLen);
ifaceName[ifaceNameLen] = '\0';
// Output text log format
const char* eventName = (JOIN_EVENT == eventType) ? "JOIN" : "LEAVE";
#ifdef _WIN32_WCE
struct tm timeStruct;
timeStruct.tm_hour = eventTime.tv_sec / 3600;
unsigned long hourSecs = 3600 * timeStruct.tm_hour;
timeStruct.tm_min = (eventTime.tv_sec - hourSecs) / 60;
timeStruct.tm_sec = eventTime.tv_sec - hourSecs - (60*timeStruct.tm_min);
timeStruct.tm_hour = timeStruct.tm_hour % 24;
struct tm* timePtr = &timeStruct;
#else
struct tm* timePtr = gmtime((time_t*)&eventTime.tv_sec);
#endif // if/else _WIN32_WCE
Mgen::Log(log_file, "%02d:%02d:%02d.%06lu %s group>%s",
timePtr->tm_hour, timePtr->tm_min, timePtr->tm_sec,
(unsigned long)eventTime.tv_usec, eventName,
groupAddr.GetHostString());
if (ifaceNameLen) Mgen::Log(log_file, " interface>%s", ifaceName);
if (groupPort)
Mgen::Log(log_file, " port>%hu\n", groupPort);
else
Mgen::Log(log_file, "\n");
break;
}
case START_EVENT:
case STOP_EVENT:
{
const char* eventName = (START_EVENT == eventType) ? "START" : "STOP";
unsigned int index = 0;
// get "eventTime"
struct timeval eventTime;
unsigned long temp32;
memcpy(&temp32, buffer+index, sizeof(long));
eventTime.tv_sec = ntohl(temp32);
index += sizeof(long);
memcpy(&temp32, buffer+index, sizeof(long));
eventTime.tv_usec = ntohl(temp32);
index += sizeof(long);
#ifdef _WIN32_WCE
struct tm timeStruct;
timeStruct.tm_hour = eventTime.tv_sec / 3600;
unsigned long hourSecs = 3600 * timeStruct.tm_hour;
timeStruct.tm_min = (eventTime.tv_sec - hourSecs) / 60;
timeStruct.tm_sec = eventTime.tv_sec - hourSecs - (60*timeStruct.tm_min);
timeStruct.tm_hour = timeStruct.tm_hour % 24;
struct tm* timePtr = &timeStruct;
#else
struct tm* timePtr = gmtime((time_t*)&eventTime.tv_sec);
#endif // if/else _WIN32_WCE
Mgen::Log(log_file, "%02d:%02d:%02d.%06lu %s\n",
timePtr->tm_hour, timePtr->tm_min, timePtr->tm_sec,
(unsigned long)eventTime.tv_usec, eventName);
break;
}
default:
DMSG(0, "Mgen::ConvertBinaryLog() invalid event type\n");
fclose(file);
return false;
} // end switch(eventType)
} // end while(1)
fclose(file);
return true;
} // end Mgen::ConvertBinaryLog()
bool Mgen::OnStartTimeout(ProtoTimer& /*theTimer*/)
{
start_sec = -1.0;
Start();
return true;
} // Mgen::OnStartTimeout()
bool Mgen::OnDrecEventTimeout(ProtoTimer& /*theTimer*/)
{
// 1) Process next pending drec event
ASSERT(next_drec_event);
ProcessDrecEvent(*next_drec_event);
// 2) Install next DREC event timeout (or kill timer)
double eventTime = next_drec_event->GetTime();
next_drec_event = (DrecEvent*)next_drec_event->Next();
if (next_drec_event)
{
double nextInterval = next_drec_event->GetTime() - eventTime;
nextInterval = nextInterval > 0.0 ? nextInterval : 0.0;
drec_event_timer.SetInterval(nextInterval);
return true;
}
else
{
drec_event_timer.Deactivate();
return false;
}
} // end Mgen::OnDrecEventTimeout()
void Mgen::ProcessDrecEvent(const DrecEvent& event)
{
// 1) Process the next DREC event
switch (event.GetType())
{
case DrecEvent::JOIN:
if (!drec_group_list.JoinGroup(socket_list,
event.GetGroupAddress(),
event.GetInterface(),
event.GetGroupPort(),
offset_pending))
{
DMSG(0, "Mgen::ProcessDrecEvent(JOIN) Warning: error joining group\n");
}
else
{
LogDrecEvent(JOIN_EVENT, event, event.GetGroupPort());
}
break;
case DrecEvent::LEAVE:
if (!drec_group_list.LeaveGroup(socket_list,
event.GetGroupAddress(),
event.GetInterface(),
event.GetGroupPort()))
{
DMSG(0, "Mgen::ProcessDrecEvent(LEAVE) Warning: error leaving group\n");
}
else
{
LogDrecEvent(LEAVE_EVENT, event, event.GetGroupPort());
}
break;
case DrecEvent::LISTEN:
{
ProtoSocket::Protocol theProtocol;
switch (event.GetProtocol())
{
case MgenBaseEvent::UDP:
theProtocol = ProtoSocket::UDP;
break;
case MgenBaseEvent::TCP:
theProtocol = ProtoSocket::TCP;
break;
default:
DMSG(0, "Mgen::ProcessDrecEvent(LISTEN) invalid protocol\n");
return;
}
const unsigned short* port = event.GetPortList();
unsigned short portCount = event.GetPortCount();
for (unsigned short i = 0; i < portCount; i++)
{
MgenSocketList::Item* socketItem = socket_list.GetItem(theProtocol, port[i]);
if (socketItem)
{
ProtoSocket& theSocket = socketItem->GetSocket();
if (theSocket.IsOpen())
{
if (!theSocket.IsBound())
{
if (!theSocket.Bind(port[i]))
{
DMSG(0, "Mgen::ProcessDrecEvent(LISTEN) Error: socket bind error on port %hu\n",
port[i]);
}
}
}
else if (!socketItem->Open(addr_type, true))
{
DMSG(0, "Mgen::ProcessDrecEvent(LISTEN) Error: socket open error on port %hu\n",
port[i]);
continue;
}
}
else
{
DMSG(0, "Mgen::ProcessDrecEvent(LISTEN) Error: no socket available\n");
continue;
}
// If a socket Rx buffer size is specified
unsigned int rxBufferSize = event.GetRxBuffer();
if (0 != rxBufferSize)
{
if (!socketItem->SetRxBufferSize(rxBufferSize))
DMSG(0, "Mgen::ProcessDrecEvent(LISTEN) error setting socket rx buffer\n");
}
// Set socket recv_handler to enable "listen"
socketItem->SetSocketListener(this, &Mgen::OnSocketEvent);
LogDrecEvent(LISTEN_EVENT, event, port[i]);
}
}
break;
case DrecEvent::IGNORE_:
{
ProtoSocket::Protocol theProtocol;
switch (event.GetProtocol())
{
case MgenBaseEvent::UDP:
theProtocol = ProtoSocket::UDP;
break;
case MgenBaseEvent::TCP:
theProtocol = ProtoSocket::TCP;
break;
default:
DMSG(0, "Mgen::ProcessDrecEvent(LISTEN) invalid protocol\n");
return;
}
const unsigned short* port = event.GetPortList();
unsigned short portCount = event.GetPortCount();
for (unsigned short i = 0; i < portCount; i++)
{
MgenSocketList::Item* socketItem =
socket_list.FindItemByPort(theProtocol, port[i]);
if (socketItem && socketItem->SocketHasListener())
{
// Clear socket event_handler to disable "listen"
socketItem->SetSocketListener((Mgen*)NULL, &Mgen::OnSocketEvent);
socketItem->Close(); // decrements socketItem reference_count
LogDrecEvent(IGNORE_EVENT, event, port[i]);
}
else
{
DMSG(0, "Mgen::ProcessDrecEvent(IGNORE) Error: no socket on port %hu\n",
port[i]);
}
}
break;
}
case DrecEvent::INVALID_TYPE:
ASSERT(0);
break;
} // end switch(event.GetType())
} // end Mgen::ProcessDrecEvent()
void Mgen::LogDrecEvent(LogEvent eventType, const DrecEvent &event, unsigned short portNumber)
{
// Get current system time
if (offset_pending) return; // Don't log "offset" pre-processed events
if (NULL == log_file) return;
struct timeval eventTime;
ProtoSystemTime(eventTime);
if (log_binary)
{
char buffer[128];
unsigned int index = 0;
// set "type" field
buffer[index++] = (char)eventType;
// zero "reserved" field
buffer[index++] = 0;
// skip "recordLength" field for moment
index += 2;
// Fill in "eventTime" field
unsigned int temp32 = htonl(eventTime.tv_sec);
memcpy(buffer+index, &temp32, sizeof(long));
index += sizeof(long);
temp32 = htonl(eventTime.tv_usec);
memcpy(buffer+index, &temp32, sizeof(long));
index += sizeof(long);
switch(eventType)
{
case LISTEN_EVENT:
case IGNORE_EVENT:
{
// set "protocol" field
buffer[index++] = (char)event.GetProtocol();
// zero second "reserved" field
buffer[index++] = 0;
// set "portNumber" field
unsigned short temp16 = htons(portNumber);
memcpy(buffer+index, &temp16, sizeof(short));
index += sizeof(short);
break;
}
case JOIN_EVENT:
case LEAVE_EVENT:
{
// set "groupPort" field
unsigned short temp16 = portNumber;
memcpy(buffer+index, &temp16, sizeof(short));
index += sizeof(short);
const ProtoAddress& addr = event.GetGroupAddress();
// set "groupAddrType" field
switch (addr.GetType())
{
case ProtoAddress::IPv4:
buffer[index++] = (char)MgenMsg::IPv4;
break;
case ProtoAddress::IPv6:
buffer[index++] = (char)MgenMsg::IPv6;
break;
default:
DMSG(0, "Mgen::LogDrecEvent(JOIN/LEAVE) error: invalid address type\n");
return;
}
// set "groupAddrLen" field
unsigned char len = (unsigned char)addr.GetLength();
buffer[index++] = len;
// set "groupAddr" field
memcpy(buffer+index, addr.GetRawHostAddress(), len);
index += len;
const char* interfaceName = event.GetInterface();
len = interfaceName ? strlen(interfaceName) : 0;
// set "ifaceNameLen" field
buffer[index++] = len;
// set "ifaceName" field
memcpy(buffer+index, interfaceName, len);
index += len;
break;
}
default:
DMSG(0, "Mgen::LogDrecEvent() error: invalid event type\n");
ASSERT(0);
break;
} // end switch(eventType)
// Set the "recordLength" field
const unsigned short RECORD_LENGTH_OFFSET = 2;
unsigned short temp16 = htons(index - (RECORD_LENGTH_OFFSET+2));
memcpy(buffer+RECORD_LENGTH_OFFSET, &temp16, sizeof(short));
// Write record to log file
if (fwrite(buffer, sizeof(char), index, log_file) < index)
DMSG(0, "Mgen::LogDrecEvent() fwrite() error: %s\n", GetErrorString());
}
else
{
#ifdef _WIN32_WCE
struct tm timeStruct;
timeStruct.tm_hour = eventTime.tv_sec / 3600;
unsigned long hourSecs = 3600 * timeStruct.tm_hour;
timeStruct.tm_min = (eventTime.tv_sec - hourSecs) / 60;
timeStruct.tm_sec = eventTime.tv_sec - hourSecs - (60*timeStruct.tm_min);
timeStruct.tm_hour = timeStruct.tm_hour % 24;
struct tm* timePtr = &timeStruct;
#else
struct tm* timePtr = gmtime((time_t*)&eventTime.tv_sec);
#endif // if/else _WIN32_WCE
switch (eventType)
{
case LISTEN_EVENT:
Mgen::Log(log_file, "%02d:%02d:%02d.%06lu LISTEN proto>%s port>%hu\n",
timePtr->tm_hour, timePtr->tm_min, timePtr->tm_sec,
(unsigned long)eventTime.tv_usec,
MgenBaseEvent::GetStringFromProtocol(event.GetProtocol()),
portNumber);
break;
case IGNORE_EVENT:
Mgen::Log(log_file, "%02d:%02d:%02d.%06lu IGNORE proto>%s port>%hu\n",
timePtr->tm_hour, timePtr->tm_min, timePtr->tm_sec,
(unsigned long)eventTime.tv_usec,
MgenBaseEvent::GetStringFromProtocol(event.GetProtocol()),
portNumber);
break;
case JOIN_EVENT:
{
Mgen::Log(log_file, "%02d:%02d:%02d.%06lu JOIN group>%s",
timePtr->tm_hour, timePtr->tm_min, timePtr->tm_sec,
(unsigned long)eventTime.tv_usec,
event.GetGroupAddress().GetHostString());
const char* iface = event.GetInterface();
if (iface) Mgen::Log(log_file, " interface>%s", iface);
if (portNumber)
Mgen::Log(log_file, " port>%hu\n", portNumber);
else
Mgen::Log(log_file, "\n");
break;
}
case LEAVE_EVENT:
{
Mgen::Log(log_file, "%02d:%02d:%02d.%06lu LEAVE group>%s",
timePtr->tm_hour, timePtr->tm_min, timePtr->tm_sec,
(unsigned long)eventTime.tv_usec,
event.GetGroupAddress().GetHostString());
const char* iface = event.GetInterface();
if (iface) Mgen::Log(log_file, " interface>%s", iface);
if (portNumber)
Mgen::Log(log_file, " port>%hu\n", portNumber);
else
Mgen::Log(log_file, "\n");
break;
}
default:
DMSG(0, "Mgen::LogDrecEvent() error: invalid event type\n");
ASSERT(0);
break;
} // end switch(eventType)
} // end if/else (log_binary)
if (log_flush) fflush(log_file);
} // end Mgen::LogDrecEvent()
// Receive and log a packet
void Mgen::OnSocketEvent(ProtoSocket& theSocket, ProtoSocket::Event theEvent)
{
switch (theEvent)
{
case ProtoSocket::RECV:
{
char buffer[MgenMsg::MAX_SIZE];
unsigned int len = MgenMsg::MAX_SIZE;
ProtoAddress srcAddr;
while (theSocket.RecvFrom(buffer, len, srcAddr))
{
struct timeval currentTime;
ProtoSystemTime(currentTime);
MgenMsg theMsg;
if (controller)
controller->OnMsgReceive(theMsg, srcAddr, currentTime);
if (log_file)
{
if (theMsg.Unpack(buffer, len, checksum_force))
theMsg.LogRecvEvent(log_file, currentTime, srcAddr, log_binary, buffer, log_flush);
else
theMsg.LogRecvError(log_file, currentTime, srcAddr, log_binary, theMsg.GetError(), log_flush);
}
len = MgenMsg::MAX_SIZE;
}
//else
//{
// DMSG(2, "Mgen::OnSocketRecv() ProtoSocket::RecvFrom() error\n");
//}
break;
}
default:
DMSG(0, "Mgen::OnSocketEvent() unexpected event type\n");
break;
}
} // end Mgen::OnSocketEvent()
void Mgen::HandleMgenMessage(char* buffer, unsigned int len,
const ProtoAddress& srcAddr)
{
MgenMsg theMsg;
struct timeval currentTime;
ProtoSystemTime(currentTime);
if (NULL != log_file)
{
if (theMsg.Unpack(buffer, len, checksum_force))
theMsg.LogRecvEvent(log_file, currentTime, srcAddr, log_binary, buffer, log_flush);
else
theMsg.LogRecvError(log_file, currentTime, srcAddr, log_binary, theMsg.GetError(), log_flush);
}
} // end HandleMgenMessage()
bool Mgen::OpenLog(const char* path, bool append, bool binary)
{
if (append)
{
#ifdef WIN32
WIN32_FILE_ATTRIBUTE_DATA fileAttr;
#ifdef _UNICODE
wchar_t wideBuffer[PATH_MAX];
int pathLen = strlen(path);
if (pathLen > PATH_MAX)
pathLen = PATH_MAX;
else
pathLen += 1;
mbstowcs(wideBuffer, path, pathLen);
LPCTSTR pathPtr = wideBuffer;
#else
LPCTSTR pathPtr = path;
#endif // if/else _UNICODE
if (0 == GetFileAttributesEx(pathPtr, GetFileExInfoStandard, &fileAttr))
log_empty = true; // error -- assume (nonexistent) file is empty
else if ((0 == fileAttr.nFileSizeLow) && (0 == fileAttr.nFileSizeHigh))
log_empty = true;
else
log_empty = false;
#else
struct stat buf;
if (stat(path, &buf)) // zero return value == success
log_empty = true; // error -- assume file is empty
else if (buf.st_size == 0)
log_empty = true;
else
log_empty = false;
#endif // if/else WIN32/UNIX
}
else
{
log_empty = true;
}
const char* mode;
if (binary)
{
mode = append ? "ab" : "wb+";
log_binary = true;
}
else
{
mode = append ? "a" : "w+";
log_binary = false;
}
FILE* logFile = fopen(path, mode);
if (!logFile)
{
DMSG(0, "Mgen::OpenLog() fopen() Error: %s\n", GetErrorString());
return false;
}
SetLogFile(logFile);
log_open = true;
return true;
} // end Mgen::OpenLog()
void Mgen::SetLogFile(FILE* filePtr)
{
CloseLog();
log_file = filePtr;
if (log_tx || (NULL == filePtr))
flow_list.SetLogFile(log_file, log_binary, log_flush);
#ifdef _WIN32_WCE
if ((stdout == log_file) || (stderr == log_file))
Log = Mgen::LogToDebug;
else
Log = fprintf;
#endif // _WIN32_WCE
} // end Mgen::SetLogFile()
void Mgen::CloseLog()
{
if (log_file)
{
if ((stdout != log_file) && (stderr != log_file))
fclose(log_file);
log_file = NULL;
}
} // end Mgen::CloseLog()
// Query flow_list and drec_event_list for an idea
// of the current (or greatest) estimate of
// current relative script time offset
double Mgen::GetCurrentOffset() const
{
if (!started)
return -1.0;
if (next_drec_event)
return (next_drec_event->GetTime() - drec_event_timer.GetTimeRemaining());
const DrecEvent* lastEvent = (const DrecEvent*)drec_event_list.Tail();
double drecOffset = lastEvent ? lastEvent->GetTime() : -1.0;
double mgenOffset = flow_list.GetCurrentOffset();
return (drecOffset > mgenOffset) ? drecOffset : mgenOffset;
} // end Mgen::GetCurrentOffset()
// Parse an MGEN script
bool Mgen::ParseScript(const char* path)
{
// Open script file
FILE* scriptFile = fopen(path, "r");
if (!scriptFile)
{
DMSG(0, "Mgen::ParseScript() fopen() Error: %s\n", GetErrorString());
return false;
}
// Read script file line by line using FastReader
FastReader reader;
unsigned int lineCount = 0;
unsigned int lines = 0;
while (1)
{
lineCount += lines; // for grouped (continued) lines
char lineBuffer[SCRIPT_LINE_MAX+1];
unsigned int len = SCRIPT_LINE_MAX;
switch (reader.ReadlineContinue(scriptFile, lineBuffer, &len, &lines))
{
case FastReader::OK:
lineCount++;
lines--;
break;
case FastReader::DONE:
fclose(scriptFile);
return true; // done with script file
case FastReader::ERROR_:
DMSG(0, "Mgen::ParseScript() Error: script file read error\n");
fclose(scriptFile);
return false;
}
if (!ParseEvent(lineBuffer, lineCount))
{
DMSG(0, "Mgen::ParseScript() Error: invalid mgen script line: %lu\n",
lineCount);
fclose(scriptFile);
return false;
}
} // end while (1)
return true;
} // end Mgen::ParseScript()
bool Mgen::ParseEvent(const char* lineBuffer, unsigned int lineCount)
{
const char * ptr = lineBuffer;
// Strip leading white space
while ((' ' == *ptr) || ('\t' == *ptr)) ptr++;
// Check for comment line (leading '#')
if ('#' == *ptr) return true;
char fieldBuffer[SCRIPT_LINE_MAX+1];
// Script lines are in form {<globalCmd>|<eventTime>} ...
if (1 != sscanf(ptr, "%s", fieldBuffer))
{
// Blank line?
return true;
}
// Set ptr to next field in line, stripping white space
ptr += strlen(fieldBuffer);
while ((' ' == *ptr) || ('\t' == *ptr)) ptr++;
Command cmd = GetCommandFromString(fieldBuffer);
if (EVENT == cmd)
{
// read in <eventTime> or <eventType>
if (1 != sscanf(ptr, "%s", fieldBuffer))
{
DMSG(0, "Mgen::ParseEvent() Error: empty EVENT command at line: %lu\n", lineCount);
return false;
}
// Set ptr to next field in line, stripping white space
ptr += strlen(fieldBuffer);
while ((' ' == *ptr) || ('\t' == *ptr)) ptr++;
}
// If it's not a "global command", assume it's an event.
if (INVALID_COMMAND == cmd) cmd = EVENT;
switch (cmd)
{
case EVENT:
{
// EVENT line can begin with the <eventTime> _or_ the <eventType>
// for implicit, immediate events.
double eventTime;
if (1 == sscanf(fieldBuffer, "%lf", &eventTime))
{
// Read event command
if (1 != sscanf(ptr, "%s", fieldBuffer))
{
DMSG(0, "Mgen::ParseEvent() Error: missing command at line: %lu\n", lineCount);
return false;
}
// Set ptr to next field in line, stripping white space
ptr += strlen(fieldBuffer);
while ((' ' == *ptr) || ('\t' == *ptr)) ptr++;
}
else
{
eventTime = -1.0;
}
// Is it an MgenEvent or a DrecEvent?
if (MgenEvent::INVALID_TYPE != MgenEvent::GetTypeFromString(fieldBuffer))
{
// It's an MGEN event
// 1) Read the flow id
if (1 != sscanf(ptr, "%s", fieldBuffer))
{
DMSG(0, "Mgen::ParseEvent() Error: missing <flowId> at line: %lu\n", lineCount);
return false;
}
unsigned long flowId;
if (1 != sscanf(fieldBuffer, "%lu", &flowId))
{
DMSG(0, "Mgen::ParseEvent() Error: invalid <flowId> at line: %lu\n", lineCount);
return false;
}
// 2) Find the flow
MgenFlow* theFlow = flow_list.FindFlowById(flowId);
if (!theFlow)
{
if (!(theFlow = new MgenFlow(flowId,
timer_mgr,
socket_list,
default_flow_label)))
{
DMSG(0, "Mgen::ParseEvent() Error: MgenFlow memory allocation error: %s\n",
GetErrorString());
return false;
}
theFlow->SetPositionCallback(get_position, get_position_data);
#ifdef HAVE_GPS
theFlow->SetPayloadHandle(payload_handle);
#endif // HAVE_GPS
if (host_addr.IsValid()) theFlow->SetHostAddress(host_addr);
if (sink) theFlow->SetSink(sink);
if (log_tx) theFlow->SetLogFile(log_file, log_binary, log_flush);
if (checksum_enable) theFlow->EnableChecksums();
flow_list.Append(theFlow);
}
// 3) Create event object
MgenEvent* theEvent = new MgenEvent();
if (!theEvent)
{
DMSG(0, "MgenFlow::ParseEvent() mgen event allocation error: %s\n",
GetErrorString());
return false;
}
if (!theEvent->InitFromString(lineBuffer))
{
DMSG(0, "MgenFlow::ParseEvent() event init error\n");
delete theEvent;
return false;
}
double currentTime = started ? GetCurrentOffset() : 0.0;
if (currentTime < 0.0) currentTime = 0.0;
if (!theFlow->InsertEvent(theEvent, started, currentTime))
{
DMSG(0, "Mgen::ParseEvent() Error: invalid mgen script line: %lu\n", lineCount);
delete theEvent;
return false;
}
}
else if (DrecEvent::INVALID_TYPE != DrecEvent::GetTypeFromString(fieldBuffer))
{
// It's a DREC event
DrecEvent* theEvent = new DrecEvent();
if (!theEvent)
{
DMSG(0, "MgenFlow::ParseEvent() drecevent allocation error: %s\n",
GetErrorString());
return false;
}
if (!theEvent->InitFromString(lineBuffer))
{
DMSG(0, "Mgen::ParseEvent() Error: invalid mgen script line: %lu\n", lineCount);
delete theEvent;
return false;
}
theEvent->SetTime(eventTime);
InsertDrecEvent(theEvent);
}
else
{
DMSG(0, "Mgen::ParseEvent() Error: invalid command \"%s\" at line: %lu\n",
fieldBuffer, lineCount);
return false;
}
break;
}
default:
// Is it a global command
if (INVALID_COMMAND != cmd)
{
if (!OnCommand(cmd, ptr))
{
DMSG(0, "Mgen::ParseEvent() Error: bad global command at line: %lu\n", lineCount);
return false;
}
}
else
{
DMSG(0, "Mgen::ParseEvent() Error: invalid command at line: %lu\n", lineCount);
return false;
}
break;
}
return true;
} // end Mgen::ParseEvent()
void Mgen::InsertDrecEvent(DrecEvent* theEvent)
{
double eventTime = theEvent->GetTime();
if (started)
{
double currentTime = GetCurrentOffset();
if (currentTime < 0.0) currentTime = 0.0;
if (eventTime < currentTime)
{
theEvent->SetTime(currentTime);
drec_event_list.Precede(next_drec_event, theEvent);
ProcessDrecEvent(*theEvent);
}
else
{
theEvent->SetTime(eventTime);
drec_event_list.Insert(theEvent);
// Reschedule next drec timeout if needed
if (drec_event_timer.IsActive())
{
double nextTime = next_drec_event->GetTime();
if (eventTime < nextTime)
{
next_drec_event = theEvent;
drec_event_timer.SetInterval(eventTime - currentTime);
drec_event_timer.Reschedule();
}
}
else
{
next_drec_event = theEvent;
drec_event_timer.SetInterval(eventTime - currentTime);
timer_mgr.ActivateTimer(drec_event_timer);
}
}
}
else
{
eventTime = eventTime > 0.0 ? eventTime : 0.0;
theEvent->SetTime(eventTime);
drec_event_list.Insert(theEvent);
}
} // end Mgen::InsertDrecEvent()
// Global command processing
const StringMapper Mgen::COMMAND_LIST[] =
{
{"+EVENT", EVENT},
{"+START", START},
{"+INPUT", INPUT},
{"+OUTPUT", OUTPUT},
{"+LOG", LOG},
{"+SAVE", SAVE},
{"+DEBUG", DEBUG},
{"+OFFSET", OFFSET},
{"-TXLOG", TXLOG},
{"-NOLOG", NOLOG},
{"+DLOG", DLOG},
{"-BINARY", BINARY},
{"-FLUSH", FLUSH},
{"+LABEL", LABEL},
{"+RXBUFFER", RXBUFFER},
{"+TXBUFFER", TXBUFFER},
{"+TOS", TOS},
{"+TTL", TTL},
{"+INTERFACE", INTERFACE},
{"-CHECKSUM", CHECKSUM},
{"-TXCHECKSUM", TXCHECKSUM},
{"-RXCHECKSUM", RXCHECKSUM},
{"+OFF", INVALID_COMMAND}, // to deconflict "offset" from "off" event
{NULL, INVALID_COMMAND}
};
Mgen::Command Mgen::GetCommandFromString(const char* string)
{
// Make comparison case-insensitive
char upperString[16];
unsigned int len = strlen(string);
len = len < 16 ? len : 16;
for (unsigned int i = 0 ; i < len; i++)
upperString[i] = toupper(string[i]);
const StringMapper* m = COMMAND_LIST;
Mgen::Command theCommand = INVALID_COMMAND;
while (NULL != (*m).string)
{
if (!strncmp(upperString, (*m).string+1, len))
theCommand = ((Command)((*m).key));
m++;
}
return theCommand;
} // end Mgen::GetCommandFromString()
// This tells if the command is valid and whether args are expected
Mgen::CmdType Mgen::GetCmdType(const char* cmd)
{
if (!cmd) return CMD_INVALID;
char upperCmd[32]; // all commands < 32 characters
unsigned int len = strlen(cmd);
len = len < 31 ? len : 31;
unsigned int i;
for (i = 0; i < 31; i++)
upperCmd[i] = toupper(cmd[i]);
bool matched = false;
const StringMapper* m = COMMAND_LIST;
CmdType type = CMD_INVALID;
while (INVALID_COMMAND != (*m).key)
{
if (!strncmp(upperCmd, (*m).string+1, len))
{
if (matched)
{
// ambiguous command (command should match only once)
return CMD_INVALID;
}
else
{
matched = true;
if ('+' == (*m).string[0])
type = CMD_ARG;
else
type = CMD_NOARG;
}
}
m++;
}
return type;
} // end Mgen::GetCmdType()
bool Mgen::OnCommand(Mgen::Command cmd, const char* arg, bool override)
{
switch (cmd)
{
case START:
{
if (!arg)
{
DMSG(0, "Mgen::OnCommand() Error: missing <startTime>\n");
return false;
}
if (override || !start_time_lock)
{
// convert to upper case for case-insensitivity
// search for "GMT" or "NOW" keywords
char temp[32];
unsigned int len = strlen(arg);
len = len < 31 ? len : 31;
unsigned int i;
for (i = 0 ; i < len; i++)
temp[i] = toupper(arg[i]);
temp[i] = '\0';
unsigned int startHour, startMin;
double startSec;
// arg should be "hr:min:sec[GMT]" or "NOW"
if (3 == sscanf(temp, "%u:%u:%lf", &startHour, &startMin, &startSec))
{
start_hour = startHour;
start_min = startMin;
start_sec = startSec;
if (strstr(temp, "GMT"))
start_gmt = true;
else
start_gmt = false;
}
else
{
// Check for "NOW" keywork (case-insensitive)
if (strstr(temp, "NOW"))
{
// negative start_time_sec indicates immediate start
start_sec = -1.0;
}
else
{
DMSG(0, "Mgen::OnCommand() Error: invalid START time\n");
return false;
}
}
start_time_lock = override; // record START command precedence
} // end if (override || !start_time_lock)
break;
} // end case START
case EVENT:
if (!ParseEvent(arg, 0))
{
DMSG(0, "Mgen::OnCommand() - error parsing event\n");
return false;
}
break;
case INPUT:
if (!ParseScript(arg))
{
DMSG(0, "Mgen::OnCommand() - error parsing script\n");
return false;
}
break;
case TXBUFFER:
unsigned int sizeTemp;
if (1 != sscanf(arg, "%u", &sizeTemp))
{
DMSG(0, "Mgen::OnCommand() - invalid tx buffer size\n");
return false;
}
socket_list.SetDefaultTxBufferSize(sizeTemp, override);
break;
case RXBUFFER:
{
unsigned int sizeTemp;
if (1 != sscanf(arg, "%u", &sizeTemp))
{
DMSG(0, "Mgen::OnCommand() - invalid rx buffer size\n");
}
socket_list.SetDefaultRxBufferSize(sizeTemp, override);
break;
}
case TOS:
{
int tosTemp;
int result = sscanf(arg, "%i", &tosTemp);
if ((1 != result) || (tosTemp < 0) || (tosTemp > 255))
{
DMSG(0, "Mgen::OnCommand() - invalid tos value");
return false;
}
socket_list.SetDefaultTos(tosTemp, override);
break;
}
case INTERFACE:
socket_list.SetDefaultMulticastInterface(arg, override);
break;
case TTL:
{
int ttlTemp;
int result = sscanf(arg, "%i", &ttlTemp);
if ((1 != result) || (ttlTemp < 0) || (ttlTemp > 255))
{
DMSG(0, "Mgen::OnCommand() - invalid ttl value");
return false;
}
socket_list.SetDefaultTtl(ttlTemp, override);
break;
}
case LABEL:
// (TBD) make flow_list.SetDefaultFlowLabel()
if (override || !default_label_lock)
{
int tempLabel;
if (1 != sscanf(arg, "%i", &tempLabel))
{
DMSG(0, "Mgen::OnCommand() - invalid flow label value");
return false;
}
default_flow_label = tempLabel;
default_label_lock = override;
}
break;
case OUTPUT:
if (override || !log_file_lock)
{
if (!OpenLog(arg, false, log_binary))
{
DMSG(0, "Mgen::OnCommand() Error: log file open error: %s\n",
GetErrorString());
return false;
}
log_file_lock = override;
}
break;
case LOG:
if (override || !log_file_lock)
{
if (!OpenLog(arg, true, log_binary))
{
DMSG(0, "Mgen::OnCommand() Error: log file open error: %s\n",
GetErrorString());
return false;
}
log_file_lock = override;
}
break;
case NOLOG:
if (override || !log_file_lock)
{
SetLogFile(NULL);
log_file_lock = override;
}
break;
case DLOG:
if (override)
{
if (!OpenDebugLog(arg))
{
DMSG(0, "Mgen::OnCommand(dlog) error opening debug log file: %s\n",
GetErrorString());
return false;
}
}
break;
case SAVE:
if (override || !save_path_lock)
{
FILE* filePtr = fopen(arg, "w+");
if (filePtr)
{
char* path = new char[strlen(arg) + 1];
if (path)
{
strcpy(path, arg);
if (save_path) delete save_path;
save_path = path;
save_path_lock = override;
}
else
{
DMSG(0, "Mgen::OnCommand() Error: memory allocation error: %s\n",
GetErrorString());
return false;
}
}
else
{
DMSG(0, "Mgen::OnCommand() Error: save file open error: %s\n",
GetErrorString());
return false;
}
}
break;
case DEBUG:
SetDebugLevel(atoi(arg));
break;
case OFFSET:
if (override || !offset_lock)
{
double timeOffset;
if (1 == sscanf(arg, "%lf", &timeOffset))
{
offset = timeOffset;
offset_lock = override;
}
else
{
DMSG(0, "Mgen::OnCommand() Error invalid OFFSET\n");
return false;
}
}
break;
case TXLOG:
log_tx = true;
flow_list.SetLogFile(log_file, log_binary, log_flush);
break;
case BINARY:
if (log_open)
{
DMSG(0, "Mgen::OnCommand() Error: BINARY option must precede OUTPUT and LOG commands\n");
return false;
}
log_binary = true;
if (log_tx)
flow_list.SetLogFile(log_file, log_binary, log_flush);
break;
case FLUSH:
log_flush = true;
if (log_tx)
flow_list.SetLogFile(log_file, log_binary, log_flush);
break;
case TXCHECKSUM:
checksum_enable = true;
flow_list.EnableChecksums();
break;
case RXCHECKSUM:
checksum_force = true;
break;
case CHECKSUM:
checksum_enable = true;
flow_list.EnableChecksums();
checksum_force = true;
break;
case INVALID_COMMAND:
DMSG(0, "Mgen::OnCommand() Error: invalid command\n");
return false;
} // end switch(cmd)
return true;
} // end Mgen::OnCommand()
////////////////////////////////////////////////////////////////
// DrecGroupList implementation
DrecGroupList::DrecGroupList()
: head(NULL), tail(NULL)
{
}
DrecGroupList::~DrecGroupList()
{
}
void DrecGroupList::Destroy(MgenSocketList& socketList)
{
Item* next = head;
while (next)
{
Item* current = next;
next = next->next;
if (NULL != current->socket_item)
socketList.LeaveGroup(current->socket_item,
current->group_addr,
current->GetInterface());
delete current;
}
head = tail = (Item*)NULL;
} // end DrecGroupList::Destroy()
bool DrecGroupList::JoinGroup(MgenSocketList& socketList,
const ProtoAddress& groupAddress,
const char* interfaceName,
unsigned short thePort,
bool deferred)
{
Item* item = FindItemByGroup(groupAddress, interfaceName, thePort);
if (item && item->IsActive())
{
DMSG(0, "DrecGroupList::JoinGroup() Error: group already joined\n");
return false;
}
else
{
if (!item)
{
if (!(item = new Item(groupAddress, interfaceName, thePort)))
{
DMSG(0, "DrecGroupList::JoinGroup() Error: DrecGroupList::Item memory allocation error: %s\n",
GetErrorString());
return false;
}
Append(item);
}
if (deferred)
return true;
else
return item->Activate(socketList);
}
} // end DrecGroupList::JoinGroup()
bool DrecGroupList::LeaveGroup(MgenSocketList& socketList,
const ProtoAddress& groupAddress,
const char* interfaceName,
unsigned short thePort)
{
Item* item = FindItemByGroup(groupAddress, interfaceName, thePort);
if (item)
{
if (item->IsActive())
{
if (!socketList.LeaveGroup(item->socket_item,
groupAddress,
interfaceName))
{
DMSG(0, "DrecGroupList::LeaveGroup() Error: group leave error\n");
return false;
}
}
Remove(item);
delete item;
return true;
}
else
{
DMSG(0, "DrecGroupList::LeaveGroup() Error: group item not found\n");
return false;
}
} // end DrecGroupList::LeaveGroup()
bool DrecGroupList::JoinDeferredGroups(MgenSocketList& socketList)
{
bool result = true;
Item* next = head;
while (next)
{
if (!next->IsActive())
result &= next->Activate(socketList);
next = next->next;
}
return result;
} // end DrecGroupList::JoinDeferredGroups()
DrecGroupList::Item* DrecGroupList::FindItemByGroup(const ProtoAddress& groupAddr,
const char* interfaceName,
unsigned short thePort)
{
Item* next = head;
while (next)
{
const char* nextInterface = next->GetInterface();
bool interfaceIsEqual = (NULL == interfaceName) ?
(NULL == nextInterface) :
((NULL != nextInterface) &&
!strcmp(nextInterface, interfaceName));
bool groupIsEqual = next->group_addr.IsValid() ?
next->group_addr.HostIsEqual(groupAddr) : true;
bool portIsEqual = thePort == next->GetPort();
if (interfaceIsEqual && groupIsEqual && portIsEqual)
return next;
next = next->next;
}
return (Item*)NULL;
} // end DrecGroupList::FindItemByGroup()
void DrecGroupList::Append(Item* item)
{
item->next = NULL;
if ((item->prev = tail))
item->prev->next = item;
else
head = item;
tail = item;
} // end DrecGroupList::Append()
void DrecGroupList::Remove(Item* item)
{
if (item->prev)
item->prev->next = item->next;
else
head = item->next;
if (item->next)
item->next->prev = item->prev;
else
tail = item->prev;
} // end DrecGroupList::Remove()
//////////////////////////////////////////////////
// DrecGroupList::Item implementation
DrecGroupList::Item::Item(const ProtoAddress& groupAddr,
const char* interfaceName,
unsigned short thePort)
: socket_item(NULL), group_addr(groupAddr), port(thePort),
prev(NULL), next(NULL)
{
if (interfaceName)
strncpy(interface_name, interfaceName, 16);
else
interface_name[0] = '\0';
}
DrecGroupList::Item::~Item()
{
}
////////////////////////////////////////////////////////////////
// Mgen::FastReader implementation
Mgen::FastReader::FastReader()
: savecount(0)
{
}
Mgen::FastReader::Result Mgen::FastReader::Read(FILE* filePtr,
char* buffer,
unsigned int* len)
{
unsigned int want = *len;
if (savecount)
{
unsigned int ncopy = want < savecount ? want : savecount;
memcpy(buffer, saveptr, ncopy);
savecount -= ncopy;
saveptr += ncopy;
buffer += ncopy;
want -= ncopy;
}
while (want)
{
unsigned int result = fread(savebuf, sizeof(char), BUFSIZE, filePtr);
if (result)
{
unsigned int ncopy= want < result ? want : result;
memcpy(buffer, savebuf, ncopy);
savecount = result - ncopy;
saveptr = savebuf + ncopy;
buffer += ncopy;
want -= ncopy;
}
else // end-of-file
{
#ifndef _WIN32_WCE
if (ferror(filePtr))
{
if (EINTR == errno) continue;
}
#endif // !_WIN32_WCE
*len -= want;
if (*len)
return OK; // we read at least something
else
return DONE; // we read nothing
}
} // end while(want)
return OK;
} // end Mgen::FastReader::Read()
// An OK text readline() routine (reads what will fit into buffer incl. NULL termination)
// if *len is unchanged on return, it means the line is bigger than the buffer and
// requires multiple reads
Mgen::FastReader::Result Mgen::FastReader::Readline(FILE* filePtr,
char* buffer,
unsigned int* len)
{
unsigned int count = 0;
unsigned int length = *len;
char* ptr = buffer;
while (count < length)
{
unsigned int one = 1;
switch (Read(filePtr, ptr, &one))
{
case OK:
if (('\n' == *ptr) || ('\r' == *ptr))
{
*ptr = '\0';
*len = count;
return OK;
}
count++;
ptr++;
break;
case ERROR_:
return ERROR_;
case DONE:
return DONE;
}
}
// We've filled up the buffer provided with no end-of-line
return ERROR_;
} // end Mgen::FastReader::Readline()
// This reads a line with possible ending '\' continuation character.
// Such "continued" lines are returned as one line with this function
// and the "lineCount" argument indicates the actual number of lines
// which comprise the long "continued" line.
//
// (Note: Lines ending with an even number '\\' are considered ending
// with one less '\' instead of continuing. So, to actually
// end a line with an even number of '\\', continue it with
// an extra '\' and follow it with a blank line.)
Mgen::FastReader::Result Mgen::FastReader::ReadlineContinue(FILE* filePtr,
char* buffer,
unsigned int* len,
unsigned int* lineCount)
{
unsigned int lines = 0;
unsigned int count = 0;
unsigned int length = *len;
char* ptr = buffer;
while (count < length)
{
unsigned int space = length - count;
switch (Readline(filePtr, ptr, &space))
{
case OK:
{
lines++;
// 1) Does the line continue?
char* cptr = space ? ptr + space - 1 : ptr;
// a) skip trailing white space
while (((' ' == *cptr) || ('\t' == *cptr)) && (cptr > ptr))
{
space--;
cptr--;
}
// If line "continues" to a blank line, skip it
if ((cptr == ptr) && ((*cptr == '\0') || isspace(*cptr)))
continue;
if ('\\' == *cptr)
{
// Make sure line ends with odd number of '\' to continue
bool lineContinues = false;
while ((cptr >= ptr) && ('\\' == *cptr))
{
cptr--;
lineContinues = lineContinues ? false : true;
}
// lose trailing '\' (continuation or extra '\' char)
*(ptr+space-1) = '\0';
space -= 1;
if (lineContinues)
{
// get next line to continue
count += space;
ptr += space;
continue;
}
}
*len = count + space;
if (lineCount) *lineCount = lines;
return OK;
break;
}
case ERROR_:
return ERROR_;
case DONE:
return DONE;
}
}
// We've filled up the buffer provided with no end-of-line
return ERROR_;
} // end Mgen::FastReader::Readline()
syntax highlighted by Code2HTML, v. 0.9.1