#include "mgen.h" #include "mgenMsg.h" #include "mgenVersion.h" #include "mgenEvent.h" #include #include #include // for gmtime() #include // for toupper() #ifdef UNIX #include // for stat #include // for stat #include // for stat #endif // For WinCE, we provide an option logging function // to get log output to our debug window #ifndef _WIN32_WCE #include Mgen::LogFunction Mgen::Log = fprintf; #else Mgen::LogFunction Mgen::Log = fprintf; // This function is used on our WinCE build where there is no // real "stdout" for default MGEN output int Mgen::LogToDebug(FILE* /*filePtr*/, const char* format, ...) { va_list args; va_start(args, format); char charBuffer[2048]; charBuffer[2048] = '\0'; int count = _vsnprintf(charBuffer, 2047, format, args); TRACE("%s", charBuffer); va_end(args); return count; } // end Mgen::LogToDebug() #endif // if/else !_WIN32_WCE Mgen::Mgen(ProtoTimerMgr& timerMgr, ProtoSocket::Notifier& socketNotifier) : controller(NULL), timer_mgr(timerMgr), sink(NULL), log_file(NULL), log_binary(false), log_flush(false), log_file_lock(false), log_tx(false), log_open(false), log_empty(true), save_path(NULL), save_path_lock(false), started(false), start_hour(0), start_min(0), start_sec(-1.0), start_gmt(false), start_time_lock(false), offset(-1.0), offset_lock(false), offset_pending(false), default_flow_label(0), default_label_lock(false), checksum_enable(false), addr_type(ProtoAddress::IPv4), checksum_force(false), get_position(NULL), get_position_data(NULL) { socket_list.SetSocketNotifier(&socketNotifier); start_timer.SetListener(this, &Mgen::OnStartTimeout); start_timer.SetInterval(0.0); start_timer.SetRepeat(0); drec_event_timer.SetListener(this, &Mgen::OnDrecEventTimeout); drec_event_timer.SetInterval(0.0); drec_event_timer.SetRepeat(-1); } Mgen::~Mgen() { Stop(); if (save_path) delete save_path; } bool Mgen::Start() { #ifdef HAVE_IPV6 //Set default IPv6 flow labels for each flow flow_list.SetDefaultLabel(default_flow_label); #endif //HAVE_IPV6 struct timeval currentTime; ProtoSystemTime(currentTime); if (start_sec < 0.0) { // Start immediately if (log_file) { // Log START event if (log_binary) { char buffer[128]; if (log_empty) { // write binary log header line plus NULL character strcpy(buffer, "mgen version="); strcat(buffer, MGEN_VERSION); strcat(buffer, " type=binary_log\n"); fwrite(buffer, sizeof(char), strlen(buffer)+1, log_file); } unsigned int index = 0; // set "eventType" field buffer[index++] = (char)START_EVENT; // zero "reserved" field buffer[index++] = 0; // set "recordLength" field unsigned short temp16 = htons(8); memcpy(buffer+index, &temp16, sizeof(short)); index += sizeof(short); // set "eventTime" fields unsigned long temp32 = htonl(currentTime.tv_sec); memcpy(buffer+index, &temp32, sizeof(long)); index += sizeof(long); temp32 = htonl(currentTime.tv_usec); memcpy(buffer+index, &temp32, sizeof(long)); index += sizeof(long); // write the record if (fwrite(buffer, sizeof(char), index, log_file) < index) { DMSG(0, "Mgen::Start() fwrite() error: %s\n", GetErrorString()); } } else { #ifdef _WIN32_WCE struct tm timeStruct; timeStruct.tm_hour = currentTime.tv_sec / 3600; unsigned long hourSecs = 3600 * timeStruct.tm_hour; timeStruct.tm_min = (currentTime.tv_sec - (hourSecs)) / 60; timeStruct.tm_sec = currentTime.tv_sec - (hourSecs) - (60*timeStruct.tm_min); timeStruct.tm_hour = timeStruct.tm_hour % 24; struct tm* timePtr = &timeStruct; #else struct tm* timePtr = gmtime((time_t*)¤tTime.tv_sec); #endif // if/else _WIN32_WCE Mgen::Log(log_file, "%02d:%02d:%02d.%06lu START\n", timePtr->tm_hour, timePtr->tm_min, timePtr->tm_sec, (unsigned long)currentTime.tv_usec); } if (log_empty) log_empty = false; fflush(log_file); } // Activate transmit flows according to "offset" time flow_list.Start(offset); // Pre-process drec events occurring prior to "offset" time DrecEvent* nextEvent = (DrecEvent*)drec_event_list.Head(); offset_pending = true; while (nextEvent) { if (nextEvent->GetTime() <= offset) { ProcessDrecEvent(*nextEvent); } else { double currentTime = offset > 0.0 ? offset : 0.0; drec_event_timer.SetInterval(nextEvent->GetTime() - currentTime); timer_mgr.ActivateTimer(drec_event_timer); break; } nextEvent = (DrecEvent*)nextEvent->Next(); } offset_pending = false; next_drec_event = nextEvent; // Activate any group joins deferred during "offset" processing. drec_group_list.JoinDeferredGroups(socket_list); } else // Schedule absolute start time { // Make sure there are pending events if(!drec_event_list.IsEmpty() || !flow_list.IsEmpty()) { // Calculate start time delta and schedule start_timer // (can delay start up to 12 hours into future) #ifdef _WIN32_WCE DMSG(0, "Mgen::Start() warning: WinCE absolute start time might not be correct!\n"); unsigned long hours = currentTime.tv_sec / 3600; unsigned long minutes = (currentTime.tv_sec - (3600*hours)) / 60; unsigned long seconds = currentTime.tv_sec - (3600*hours) - (60*minutes); hours = hours % 24; double nowSec = hours*3600 + minutes*60 + seconds; #else struct tm now; if (start_gmt) memcpy(&now, gmtime((time_t*)¤tTime.tv_sec), sizeof(struct tm)); else memcpy(&now, localtime((time_t*)¤tTime.tv_sec), sizeof(struct tm)); double nowSec = now.tm_hour*3600 + now.tm_min*60 + now.tm_sec; #endif // if/else _WIN32_WCE double startSec = start_hour*3600 + start_min*60 + start_sec; double delta = startSec - nowSec; if (delta < 0.0) delta += (24*3600); if (delta > (12*3600)) { DMSG(0, "Mgen::Start() Error: Specified start time has already elapsed\n"); return false; } start_timer.SetInterval(delta); timer_mgr.ActivateTimer(start_timer); } } started = true; return true; } // end Mgen::Start() void Mgen::Stop() { if (started) { if (log_file) { // Log STOP event struct timeval currentTime; ProtoSystemTime(currentTime); if (log_binary) { unsigned int index = 0; char buffer[128]; //set "eventType" field buffer[index++] = (char)Mgen::STOP_EVENT; // zero "reserved" field buffer[index++] = 0; // set "recordLength" field unsigned short temp16 = htons(8); memcpy(buffer+index, &temp16, sizeof(short)); index += sizeof(short); // set "eventTime" fields unsigned long temp32 = htonl(currentTime.tv_sec); memcpy(buffer+index, &temp32, sizeof(long)); index += sizeof(long); temp32 = htonl(currentTime.tv_usec); memcpy(buffer+index, &temp32, sizeof(long)); index += sizeof(long); // write the record if (fwrite(buffer, sizeof(char), index, log_file) < index) DMSG(0, "Mgen::Stop() fwrite error: %s\n", GetErrorString()); } else { #ifdef _WIN32_WCE struct tm timeStruct; timeStruct.tm_hour = currentTime.tv_sec / 3600; unsigned long hourSecs = 3600 * timeStruct.tm_hour; timeStruct.tm_min = (currentTime.tv_sec - hourSecs) / 60; timeStruct.tm_sec = currentTime.tv_sec - hourSecs - (60*timeStruct.tm_min); timeStruct.tm_hour = timeStruct.tm_hour % 24; struct tm* timePtr = &timeStruct; #else struct tm* timePtr = gmtime((time_t*)¤tTime.tv_sec); #endif // if/else _WIN32_WCE Mgen::Log(log_file, "%02d:%02d:%02d.%06lu STOP\n", timePtr->tm_hour, timePtr->tm_min, timePtr->tm_sec, (unsigned long)currentTime.tv_usec); } // end if/else(log_binary) fflush(log_file); if ((log_file != stdout) && (stderr != log_file)) { fclose(log_file); log_file = NULL; } } // end if(log_file) // Save current offset and pending flow sequence state if (save_path) { FILE* saveFile = fopen(save_path, "w+"); if (saveFile) { flow_list.SaveFlowSequences(saveFile); fprintf(saveFile, "OFFSET %f\n", GetCurrentOffset()); fclose(saveFile); } else { DMSG(0, "Mgen::Stop() error opening save file: %s\n", GetErrorString()); } } started = false; } // end if (started) if (start_timer.IsActive()) start_timer.Deactivate(); flow_list.Destroy(); if (drec_event_timer.IsActive()) drec_event_timer.Deactivate(); drec_event_list.Destroy(); drec_group_list.Destroy(socket_list); socket_list.Destroy(); } // end Mgen::Stop() bool Mgen::ConvertBinaryLog(const char* path) { if (NULL == log_file) return false; FILE* file = fopen(path, "rb"); if (!file) { DMSG(0, "Mgen::ConvertFile() fopen() Error: %s\n", GetErrorString()); return false; } const unsigned int BINARY_RECORD_MAX = 1024; // maximum record size char buffer[BINARY_RECORD_MAX]; // Read ASCII binary log header line // The first four characters should be "mgen" if (fread(buffer, sizeof(char), 4, file) < 4) { DMSG(0, "Mgen::ConvertBinaryLog() fread() error: %s\n", GetErrorString()); return false; } if (strncmp("mgen", buffer, 4) != 0) { DMSG(0, "Mgen::ConvertBinaryLog() error: invalid mgen log file\n"); return false; } // Read remainder of header line, including terminating NULL unsigned int index = 3; while ('\0' != buffer[index]) { index++; if (fread(buffer+index, sizeof(char), 1, file) < 1) { DMSG(0, "Mgen::ConvertBinaryLog() fread() error: %s\n", GetErrorString()); return false; } } // Confirm log file "version" and "type" char* ptr = strstr(buffer, "version="); if (ptr) { // Just look at major version number for moment int version; if (1 == sscanf(ptr, "version=%d", &version)) { if (version != 4) { DMSG(0, "Mgen::ConvertBinaryLog() invalid log file version\n"); return false; } } else { DMSG(0, "Mgen::ConvertBinaryLog() error finding log \"version\" value\n"); return false; } } else { DMSG(0, "Mgen::ConvertBinaryLog() error finding log \"version\" label\n"); return false; } ptr = strstr(ptr, "type="); if (ptr) { char fileType[128]; if (1 == sscanf(ptr, "type=%s", fileType)) { if (strcmp(fileType, "binary_log")) { DMSG(0, "Mgen::ConvertBinaryLog() invalid log file type\n"); return false; } } else { DMSG(0, "Mgen::ConvertBinaryLog() error finding log \"type\" value\n"); return false; } } else { DMSG(0, "Mgen::ConvertBinaryLog() error finding log \"type\" label\n"); return false; } // Now loop, reading in records one by one, and output text format while (1) { // 1) Read record header (4 bytes) char header[4]; if (fread(header, sizeof(char), 4, file) < 4) { if (feof(file)) { break; } else { DMSG(0, "Mgen::ConvertBinaryLog() fread() error: %s\n", GetErrorString()); fclose(file); return false; } } LogEvent eventType = (LogEvent)header[0]; unsigned short recordLength; memcpy(&recordLength, header+2, sizeof(short)); recordLength = ntohs(recordLength); if (recordLength > BINARY_RECORD_MAX) { DMSG(0, "Mgen::ConvertBinaryLog() record len:%hu exceeds maximum length\n", recordLength); fclose(file); return false; } // 2) Read the entire record if (fread(buffer, sizeof(char), recordLength, file) < recordLength) { DMSG(0, "Mgen::ConvertBinaryLog() fread() error: %s\n", GetErrorString()); fclose(file); return false; } // 3) Convert the record to text unsigned int index = 0; switch (eventType) { case RECV_EVENT: { // get "eventTime" struct timeval eventTime; unsigned long temp32; memcpy(&temp32, buffer+index, sizeof(long)); eventTime.tv_sec = ntohl(temp32); index += sizeof(long); memcpy(&temp32, buffer+index, sizeof(long)); eventTime.tv_usec = ntohl(temp32); index += sizeof(long); // get "srcPort" unsigned short temp16; memcpy(&temp16, buffer+index, sizeof(short)); unsigned short srcPort = ntohs(temp16); index += sizeof(short); // get "srcAddrType" ProtoAddress::Type addrType; switch (buffer[index++]) { case MgenMsg::IPv4: addrType = ProtoAddress::IPv4; break; case MgenMsg::IPv6: addrType = ProtoAddress::IPv6; break; default: DMSG(0, "Mgen::ConvertBinaryLog() unknown source address type:%d\n", buffer[index-1]); fclose(file); return false; } // get "srcAddrLen" unsigned int addrLen = (unsigned int)buffer[index++]; ProtoAddress srcAddr; // get "srcAddr" srcAddr.SetRawHostAddress(addrType, buffer+index, addrLen); index += addrLen; srcAddr.SetPort(srcPort); // The remainder of the record corresponds to the message content MgenMsg msg; msg.Unpack(buffer+index, recordLength - index, false); msg.LogRecvEvent(log_file, eventTime, srcAddr, false, NULL, log_flush); break; } case SEND_EVENT: { MgenMsg msg; msg.Unpack(buffer, recordLength, false); msg.LogSendEvent(log_file, false, NULL, log_flush); break; } case LISTEN_EVENT: case IGNORE_EVENT: { const char* eventName = (LISTEN_EVENT == eventType) ? "LISTEN" : "IGNORE"; unsigned int index = 0; // get "eventTime" struct timeval eventTime; unsigned long temp32; memcpy(&temp32, buffer+index, sizeof(long)); eventTime.tv_sec = ntohl(temp32); index += sizeof(long); memcpy(&temp32, buffer+index, sizeof(long)); eventTime.tv_usec = ntohl(temp32); index += sizeof(long); // get "protocol" const char* protoName = MgenBaseEvent::GetStringFromProtocol((MgenBaseEvent::Protocol)buffer[index++]); // skip "reserved" field index++; // get "portNumber" unsigned short temp16; memcpy(&temp16, buffer+index, sizeof(short)); unsigned short portNumber = ntohs(temp16); // Output text log format #ifdef _WIN32_WCE struct tm timeStruct; timeStruct.tm_hour = eventTime.tv_sec / 3600; unsigned long hourSecs = 3600 * timeStruct.tm_hour; timeStruct.tm_min = (eventTime.tv_sec - hourSecs) / 60; timeStruct.tm_sec = eventTime.tv_sec - hourSecs - (60*timeStruct.tm_min); timeStruct.tm_hour = timeStruct.tm_hour % 24; struct tm* timePtr = &timeStruct; #else struct tm* timePtr = gmtime((time_t*)&eventTime.tv_sec); #endif // if/else _WIN32_WCE Mgen::Log(log_file, "%02d:%02d:%02d.%06lu %s proto>%s port>%hu\n", timePtr->tm_hour, timePtr->tm_min, timePtr->tm_sec, (unsigned long)eventTime.tv_usec, eventName, protoName, portNumber); break; } case JOIN_EVENT: case LEAVE_EVENT: { unsigned int index = 0; // get "eventTime" struct timeval eventTime; unsigned long temp32; memcpy(&temp32, buffer+index, sizeof(long)); eventTime.tv_sec = ntohl(temp32); index += sizeof(long); memcpy(&temp32, buffer+index, sizeof(long)); eventTime.tv_usec = ntohl(temp32); index += sizeof(long); // get "groupPort" unsigned short temp16; memcpy(&temp16, buffer+index, sizeof(short)); unsigned short groupPort = ntohs(temp16); index += sizeof(short); // get "groupAddrType" ProtoAddress::Type addrType; switch (buffer[index++]) { case MgenMsg::IPv4: addrType = ProtoAddress::IPv4; break; case MgenMsg::IPv6: addrType = ProtoAddress::IPv6; break; default: DMSG(0, "Mgen::ConvertBinaryLog() unknown source address type\n"); fclose(file); return false; } // get "groupAddrLen" unsigned int addrLen = (unsigned int)buffer[index++]; ProtoAddress groupAddr; // get "groupAddr" groupAddr.SetRawHostAddress(addrType, buffer+index, addrLen); index += addrLen; // get "ifaceNameLen" unsigned int ifaceNameLen = buffer[index++]; char ifaceName[128]; memcpy(ifaceName, buffer+index, ifaceNameLen); ifaceName[ifaceNameLen] = '\0'; // Output text log format const char* eventName = (JOIN_EVENT == eventType) ? "JOIN" : "LEAVE"; #ifdef _WIN32_WCE struct tm timeStruct; timeStruct.tm_hour = eventTime.tv_sec / 3600; unsigned long hourSecs = 3600 * timeStruct.tm_hour; timeStruct.tm_min = (eventTime.tv_sec - hourSecs) / 60; timeStruct.tm_sec = eventTime.tv_sec - hourSecs - (60*timeStruct.tm_min); timeStruct.tm_hour = timeStruct.tm_hour % 24; struct tm* timePtr = &timeStruct; #else struct tm* timePtr = gmtime((time_t*)&eventTime.tv_sec); #endif // if/else _WIN32_WCE Mgen::Log(log_file, "%02d:%02d:%02d.%06lu %s group>%s", timePtr->tm_hour, timePtr->tm_min, timePtr->tm_sec, (unsigned long)eventTime.tv_usec, eventName, groupAddr.GetHostString()); if (ifaceNameLen) Mgen::Log(log_file, " interface>%s", ifaceName); if (groupPort) Mgen::Log(log_file, " port>%hu\n", groupPort); else Mgen::Log(log_file, "\n"); break; } case START_EVENT: case STOP_EVENT: { const char* eventName = (START_EVENT == eventType) ? "START" : "STOP"; unsigned int index = 0; // get "eventTime" struct timeval eventTime; unsigned long temp32; memcpy(&temp32, buffer+index, sizeof(long)); eventTime.tv_sec = ntohl(temp32); index += sizeof(long); memcpy(&temp32, buffer+index, sizeof(long)); eventTime.tv_usec = ntohl(temp32); index += sizeof(long); #ifdef _WIN32_WCE struct tm timeStruct; timeStruct.tm_hour = eventTime.tv_sec / 3600; unsigned long hourSecs = 3600 * timeStruct.tm_hour; timeStruct.tm_min = (eventTime.tv_sec - hourSecs) / 60; timeStruct.tm_sec = eventTime.tv_sec - hourSecs - (60*timeStruct.tm_min); timeStruct.tm_hour = timeStruct.tm_hour % 24; struct tm* timePtr = &timeStruct; #else struct tm* timePtr = gmtime((time_t*)&eventTime.tv_sec); #endif // if/else _WIN32_WCE Mgen::Log(log_file, "%02d:%02d:%02d.%06lu %s\n", timePtr->tm_hour, timePtr->tm_min, timePtr->tm_sec, (unsigned long)eventTime.tv_usec, eventName); break; } default: DMSG(0, "Mgen::ConvertBinaryLog() invalid event type\n"); fclose(file); return false; } // end switch(eventType) } // end while(1) fclose(file); return true; } // end Mgen::ConvertBinaryLog() bool Mgen::OnStartTimeout(ProtoTimer& /*theTimer*/) { start_sec = -1.0; Start(); return true; } // Mgen::OnStartTimeout() bool Mgen::OnDrecEventTimeout(ProtoTimer& /*theTimer*/) { // 1) Process next pending drec event ASSERT(next_drec_event); ProcessDrecEvent(*next_drec_event); // 2) Install next DREC event timeout (or kill timer) double eventTime = next_drec_event->GetTime(); next_drec_event = (DrecEvent*)next_drec_event->Next(); if (next_drec_event) { double nextInterval = next_drec_event->GetTime() - eventTime; nextInterval = nextInterval > 0.0 ? nextInterval : 0.0; drec_event_timer.SetInterval(nextInterval); return true; } else { drec_event_timer.Deactivate(); return false; } } // end Mgen::OnDrecEventTimeout() void Mgen::ProcessDrecEvent(const DrecEvent& event) { // 1) Process the next DREC event switch (event.GetType()) { case DrecEvent::JOIN: if (!drec_group_list.JoinGroup(socket_list, event.GetGroupAddress(), event.GetInterface(), event.GetGroupPort(), offset_pending)) { DMSG(0, "Mgen::ProcessDrecEvent(JOIN) Warning: error joining group\n"); } else { LogDrecEvent(JOIN_EVENT, event, event.GetGroupPort()); } break; case DrecEvent::LEAVE: if (!drec_group_list.LeaveGroup(socket_list, event.GetGroupAddress(), event.GetInterface(), event.GetGroupPort())) { DMSG(0, "Mgen::ProcessDrecEvent(LEAVE) Warning: error leaving group\n"); } else { LogDrecEvent(LEAVE_EVENT, event, event.GetGroupPort()); } break; case DrecEvent::LISTEN: { ProtoSocket::Protocol theProtocol; switch (event.GetProtocol()) { case MgenBaseEvent::UDP: theProtocol = ProtoSocket::UDP; break; case MgenBaseEvent::TCP: theProtocol = ProtoSocket::TCP; break; default: DMSG(0, "Mgen::ProcessDrecEvent(LISTEN) invalid protocol\n"); return; } const unsigned short* port = event.GetPortList(); unsigned short portCount = event.GetPortCount(); for (unsigned short i = 0; i < portCount; i++) { MgenSocketList::Item* socketItem = socket_list.GetItem(theProtocol, port[i]); if (socketItem) { ProtoSocket& theSocket = socketItem->GetSocket(); if (theSocket.IsOpen()) { if (!theSocket.IsBound()) { if (!theSocket.Bind(port[i])) { DMSG(0, "Mgen::ProcessDrecEvent(LISTEN) Error: socket bind error on port %hu\n", port[i]); } } } else if (!socketItem->Open(addr_type, true)) { DMSG(0, "Mgen::ProcessDrecEvent(LISTEN) Error: socket open error on port %hu\n", port[i]); continue; } } else { DMSG(0, "Mgen::ProcessDrecEvent(LISTEN) Error: no socket available\n"); continue; } // If a socket Rx buffer size is specified unsigned int rxBufferSize = event.GetRxBuffer(); if (0 != rxBufferSize) { if (!socketItem->SetRxBufferSize(rxBufferSize)) DMSG(0, "Mgen::ProcessDrecEvent(LISTEN) error setting socket rx buffer\n"); } // Set socket recv_handler to enable "listen" socketItem->SetSocketListener(this, &Mgen::OnSocketEvent); LogDrecEvent(LISTEN_EVENT, event, port[i]); } } break; case DrecEvent::IGNORE_: { ProtoSocket::Protocol theProtocol; switch (event.GetProtocol()) { case MgenBaseEvent::UDP: theProtocol = ProtoSocket::UDP; break; case MgenBaseEvent::TCP: theProtocol = ProtoSocket::TCP; break; default: DMSG(0, "Mgen::ProcessDrecEvent(LISTEN) invalid protocol\n"); return; } const unsigned short* port = event.GetPortList(); unsigned short portCount = event.GetPortCount(); for (unsigned short i = 0; i < portCount; i++) { MgenSocketList::Item* socketItem = socket_list.FindItemByPort(theProtocol, port[i]); if (socketItem && socketItem->SocketHasListener()) { // Clear socket event_handler to disable "listen" socketItem->SetSocketListener((Mgen*)NULL, &Mgen::OnSocketEvent); socketItem->Close(); // decrements socketItem reference_count LogDrecEvent(IGNORE_EVENT, event, port[i]); } else { DMSG(0, "Mgen::ProcessDrecEvent(IGNORE) Error: no socket on port %hu\n", port[i]); } } break; } case DrecEvent::INVALID_TYPE: ASSERT(0); break; } // end switch(event.GetType()) } // end Mgen::ProcessDrecEvent() void Mgen::LogDrecEvent(LogEvent eventType, const DrecEvent &event, unsigned short portNumber) { // Get current system time if (offset_pending) return; // Don't log "offset" pre-processed events if (NULL == log_file) return; struct timeval eventTime; ProtoSystemTime(eventTime); if (log_binary) { char buffer[128]; unsigned int index = 0; // set "type" field buffer[index++] = (char)eventType; // zero "reserved" field buffer[index++] = 0; // skip "recordLength" field for moment index += 2; // Fill in "eventTime" field unsigned int temp32 = htonl(eventTime.tv_sec); memcpy(buffer+index, &temp32, sizeof(long)); index += sizeof(long); temp32 = htonl(eventTime.tv_usec); memcpy(buffer+index, &temp32, sizeof(long)); index += sizeof(long); switch(eventType) { case LISTEN_EVENT: case IGNORE_EVENT: { // set "protocol" field buffer[index++] = (char)event.GetProtocol(); // zero second "reserved" field buffer[index++] = 0; // set "portNumber" field unsigned short temp16 = htons(portNumber); memcpy(buffer+index, &temp16, sizeof(short)); index += sizeof(short); break; } case JOIN_EVENT: case LEAVE_EVENT: { // set "groupPort" field unsigned short temp16 = portNumber; memcpy(buffer+index, &temp16, sizeof(short)); index += sizeof(short); const ProtoAddress& addr = event.GetGroupAddress(); // set "groupAddrType" field switch (addr.GetType()) { case ProtoAddress::IPv4: buffer[index++] = (char)MgenMsg::IPv4; break; case ProtoAddress::IPv6: buffer[index++] = (char)MgenMsg::IPv6; break; default: DMSG(0, "Mgen::LogDrecEvent(JOIN/LEAVE) error: invalid address type\n"); return; } // set "groupAddrLen" field unsigned char len = (unsigned char)addr.GetLength(); buffer[index++] = len; // set "groupAddr" field memcpy(buffer+index, addr.GetRawHostAddress(), len); index += len; const char* interfaceName = event.GetInterface(); len = interfaceName ? strlen(interfaceName) : 0; // set "ifaceNameLen" field buffer[index++] = len; // set "ifaceName" field memcpy(buffer+index, interfaceName, len); index += len; break; } default: DMSG(0, "Mgen::LogDrecEvent() error: invalid event type\n"); ASSERT(0); break; } // end switch(eventType) // Set the "recordLength" field const unsigned short RECORD_LENGTH_OFFSET = 2; unsigned short temp16 = htons(index - (RECORD_LENGTH_OFFSET+2)); memcpy(buffer+RECORD_LENGTH_OFFSET, &temp16, sizeof(short)); // Write record to log file if (fwrite(buffer, sizeof(char), index, log_file) < index) DMSG(0, "Mgen::LogDrecEvent() fwrite() error: %s\n", GetErrorString()); } else { #ifdef _WIN32_WCE struct tm timeStruct; timeStruct.tm_hour = eventTime.tv_sec / 3600; unsigned long hourSecs = 3600 * timeStruct.tm_hour; timeStruct.tm_min = (eventTime.tv_sec - hourSecs) / 60; timeStruct.tm_sec = eventTime.tv_sec - hourSecs - (60*timeStruct.tm_min); timeStruct.tm_hour = timeStruct.tm_hour % 24; struct tm* timePtr = &timeStruct; #else struct tm* timePtr = gmtime((time_t*)&eventTime.tv_sec); #endif // if/else _WIN32_WCE switch (eventType) { case LISTEN_EVENT: Mgen::Log(log_file, "%02d:%02d:%02d.%06lu LISTEN proto>%s port>%hu\n", timePtr->tm_hour, timePtr->tm_min, timePtr->tm_sec, (unsigned long)eventTime.tv_usec, MgenBaseEvent::GetStringFromProtocol(event.GetProtocol()), portNumber); break; case IGNORE_EVENT: Mgen::Log(log_file, "%02d:%02d:%02d.%06lu IGNORE proto>%s port>%hu\n", timePtr->tm_hour, timePtr->tm_min, timePtr->tm_sec, (unsigned long)eventTime.tv_usec, MgenBaseEvent::GetStringFromProtocol(event.GetProtocol()), portNumber); break; case JOIN_EVENT: { Mgen::Log(log_file, "%02d:%02d:%02d.%06lu JOIN group>%s", timePtr->tm_hour, timePtr->tm_min, timePtr->tm_sec, (unsigned long)eventTime.tv_usec, event.GetGroupAddress().GetHostString()); const char* iface = event.GetInterface(); if (iface) Mgen::Log(log_file, " interface>%s", iface); if (portNumber) Mgen::Log(log_file, " port>%hu\n", portNumber); else Mgen::Log(log_file, "\n"); break; } case LEAVE_EVENT: { Mgen::Log(log_file, "%02d:%02d:%02d.%06lu LEAVE group>%s", timePtr->tm_hour, timePtr->tm_min, timePtr->tm_sec, (unsigned long)eventTime.tv_usec, event.GetGroupAddress().GetHostString()); const char* iface = event.GetInterface(); if (iface) Mgen::Log(log_file, " interface>%s", iface); if (portNumber) Mgen::Log(log_file, " port>%hu\n", portNumber); else Mgen::Log(log_file, "\n"); break; } default: DMSG(0, "Mgen::LogDrecEvent() error: invalid event type\n"); ASSERT(0); break; } // end switch(eventType) } // end if/else (log_binary) if (log_flush) fflush(log_file); } // end Mgen::LogDrecEvent() // Receive and log a packet void Mgen::OnSocketEvent(ProtoSocket& theSocket, ProtoSocket::Event theEvent) { switch (theEvent) { case ProtoSocket::RECV: { char buffer[MgenMsg::MAX_SIZE]; unsigned int len = MgenMsg::MAX_SIZE; ProtoAddress srcAddr; while (theSocket.RecvFrom(buffer, len, srcAddr)) { struct timeval currentTime; ProtoSystemTime(currentTime); MgenMsg theMsg; if (controller) controller->OnMsgReceive(theMsg, srcAddr, currentTime); if (log_file) { if (theMsg.Unpack(buffer, len, checksum_force)) theMsg.LogRecvEvent(log_file, currentTime, srcAddr, log_binary, buffer, log_flush); else theMsg.LogRecvError(log_file, currentTime, srcAddr, log_binary, theMsg.GetError(), log_flush); } len = MgenMsg::MAX_SIZE; } //else //{ // DMSG(2, "Mgen::OnSocketRecv() ProtoSocket::RecvFrom() error\n"); //} break; } default: DMSG(0, "Mgen::OnSocketEvent() unexpected event type\n"); break; } } // end Mgen::OnSocketEvent() void Mgen::HandleMgenMessage(char* buffer, unsigned int len, const ProtoAddress& srcAddr) { MgenMsg theMsg; struct timeval currentTime; ProtoSystemTime(currentTime); if (NULL != log_file) { if (theMsg.Unpack(buffer, len, checksum_force)) theMsg.LogRecvEvent(log_file, currentTime, srcAddr, log_binary, buffer, log_flush); else theMsg.LogRecvError(log_file, currentTime, srcAddr, log_binary, theMsg.GetError(), log_flush); } } // end HandleMgenMessage() bool Mgen::OpenLog(const char* path, bool append, bool binary) { if (append) { #ifdef WIN32 WIN32_FILE_ATTRIBUTE_DATA fileAttr; #ifdef _UNICODE wchar_t wideBuffer[PATH_MAX]; int pathLen = strlen(path); if (pathLen > PATH_MAX) pathLen = PATH_MAX; else pathLen += 1; mbstowcs(wideBuffer, path, pathLen); LPCTSTR pathPtr = wideBuffer; #else LPCTSTR pathPtr = path; #endif // if/else _UNICODE if (0 == GetFileAttributesEx(pathPtr, GetFileExInfoStandard, &fileAttr)) log_empty = true; // error -- assume (nonexistent) file is empty else if ((0 == fileAttr.nFileSizeLow) && (0 == fileAttr.nFileSizeHigh)) log_empty = true; else log_empty = false; #else struct stat buf; if (stat(path, &buf)) // zero return value == success log_empty = true; // error -- assume file is empty else if (buf.st_size == 0) log_empty = true; else log_empty = false; #endif // if/else WIN32/UNIX } else { log_empty = true; } const char* mode; if (binary) { mode = append ? "ab" : "wb+"; log_binary = true; } else { mode = append ? "a" : "w+"; log_binary = false; } FILE* logFile = fopen(path, mode); if (!logFile) { DMSG(0, "Mgen::OpenLog() fopen() Error: %s\n", GetErrorString()); return false; } SetLogFile(logFile); log_open = true; return true; } // end Mgen::OpenLog() void Mgen::SetLogFile(FILE* filePtr) { CloseLog(); log_file = filePtr; if (log_tx || (NULL == filePtr)) flow_list.SetLogFile(log_file, log_binary, log_flush); #ifdef _WIN32_WCE if ((stdout == log_file) || (stderr == log_file)) Log = Mgen::LogToDebug; else Log = fprintf; #endif // _WIN32_WCE } // end Mgen::SetLogFile() void Mgen::CloseLog() { if (log_file) { if ((stdout != log_file) && (stderr != log_file)) fclose(log_file); log_file = NULL; } } // end Mgen::CloseLog() // Query flow_list and drec_event_list for an idea // of the current (or greatest) estimate of // current relative script time offset double Mgen::GetCurrentOffset() const { if (!started) return -1.0; if (next_drec_event) return (next_drec_event->GetTime() - drec_event_timer.GetTimeRemaining()); const DrecEvent* lastEvent = (const DrecEvent*)drec_event_list.Tail(); double drecOffset = lastEvent ? lastEvent->GetTime() : -1.0; double mgenOffset = flow_list.GetCurrentOffset(); return (drecOffset > mgenOffset) ? drecOffset : mgenOffset; } // end Mgen::GetCurrentOffset() // Parse an MGEN script bool Mgen::ParseScript(const char* path) { // Open script file FILE* scriptFile = fopen(path, "r"); if (!scriptFile) { DMSG(0, "Mgen::ParseScript() fopen() Error: %s\n", GetErrorString()); return false; } // Read script file line by line using FastReader FastReader reader; unsigned int lineCount = 0; unsigned int lines = 0; while (1) { lineCount += lines; // for grouped (continued) lines char lineBuffer[SCRIPT_LINE_MAX+1]; unsigned int len = SCRIPT_LINE_MAX; switch (reader.ReadlineContinue(scriptFile, lineBuffer, &len, &lines)) { case FastReader::OK: lineCount++; lines--; break; case FastReader::DONE: fclose(scriptFile); return true; // done with script file case FastReader::ERROR_: DMSG(0, "Mgen::ParseScript() Error: script file read error\n"); fclose(scriptFile); return false; } if (!ParseEvent(lineBuffer, lineCount)) { DMSG(0, "Mgen::ParseScript() Error: invalid mgen script line: %lu\n", lineCount); fclose(scriptFile); return false; } } // end while (1) return true; } // end Mgen::ParseScript() bool Mgen::ParseEvent(const char* lineBuffer, unsigned int lineCount) { const char * ptr = lineBuffer; // Strip leading white space while ((' ' == *ptr) || ('\t' == *ptr)) ptr++; // Check for comment line (leading '#') if ('#' == *ptr) return true; char fieldBuffer[SCRIPT_LINE_MAX+1]; // Script lines are in form {|} ... if (1 != sscanf(ptr, "%s", fieldBuffer)) { // Blank line? return true; } // Set ptr to next field in line, stripping white space ptr += strlen(fieldBuffer); while ((' ' == *ptr) || ('\t' == *ptr)) ptr++; Command cmd = GetCommandFromString(fieldBuffer); if (EVENT == cmd) { // read in or if (1 != sscanf(ptr, "%s", fieldBuffer)) { DMSG(0, "Mgen::ParseEvent() Error: empty EVENT command at line: %lu\n", lineCount); return false; } // Set ptr to next field in line, stripping white space ptr += strlen(fieldBuffer); while ((' ' == *ptr) || ('\t' == *ptr)) ptr++; } // If it's not a "global command", assume it's an event. if (INVALID_COMMAND == cmd) cmd = EVENT; switch (cmd) { case EVENT: { // EVENT line can begin with the _or_ the // for implicit, immediate events. double eventTime; if (1 == sscanf(fieldBuffer, "%lf", &eventTime)) { // Read event command if (1 != sscanf(ptr, "%s", fieldBuffer)) { DMSG(0, "Mgen::ParseEvent() Error: missing command at line: %lu\n", lineCount); return false; } // Set ptr to next field in line, stripping white space ptr += strlen(fieldBuffer); while ((' ' == *ptr) || ('\t' == *ptr)) ptr++; } else { eventTime = -1.0; } // Is it an MgenEvent or a DrecEvent? if (MgenEvent::INVALID_TYPE != MgenEvent::GetTypeFromString(fieldBuffer)) { // It's an MGEN event // 1) Read the flow id if (1 != sscanf(ptr, "%s", fieldBuffer)) { DMSG(0, "Mgen::ParseEvent() Error: missing at line: %lu\n", lineCount); return false; } unsigned long flowId; if (1 != sscanf(fieldBuffer, "%lu", &flowId)) { DMSG(0, "Mgen::ParseEvent() Error: invalid at line: %lu\n", lineCount); return false; } // 2) Find the flow MgenFlow* theFlow = flow_list.FindFlowById(flowId); if (!theFlow) { if (!(theFlow = new MgenFlow(flowId, timer_mgr, socket_list, default_flow_label))) { DMSG(0, "Mgen::ParseEvent() Error: MgenFlow memory allocation error: %s\n", GetErrorString()); return false; } theFlow->SetPositionCallback(get_position, get_position_data); #ifdef HAVE_GPS theFlow->SetPayloadHandle(payload_handle); #endif // HAVE_GPS if (host_addr.IsValid()) theFlow->SetHostAddress(host_addr); if (sink) theFlow->SetSink(sink); if (log_tx) theFlow->SetLogFile(log_file, log_binary, log_flush); if (checksum_enable) theFlow->EnableChecksums(); flow_list.Append(theFlow); } // 3) Create event object MgenEvent* theEvent = new MgenEvent(); if (!theEvent) { DMSG(0, "MgenFlow::ParseEvent() mgen event allocation error: %s\n", GetErrorString()); return false; } if (!theEvent->InitFromString(lineBuffer)) { DMSG(0, "MgenFlow::ParseEvent() event init error\n"); delete theEvent; return false; } double currentTime = started ? GetCurrentOffset() : 0.0; if (currentTime < 0.0) currentTime = 0.0; if (!theFlow->InsertEvent(theEvent, started, currentTime)) { DMSG(0, "Mgen::ParseEvent() Error: invalid mgen script line: %lu\n", lineCount); delete theEvent; return false; } } else if (DrecEvent::INVALID_TYPE != DrecEvent::GetTypeFromString(fieldBuffer)) { // It's a DREC event DrecEvent* theEvent = new DrecEvent(); if (!theEvent) { DMSG(0, "MgenFlow::ParseEvent() drecevent allocation error: %s\n", GetErrorString()); return false; } if (!theEvent->InitFromString(lineBuffer)) { DMSG(0, "Mgen::ParseEvent() Error: invalid mgen script line: %lu\n", lineCount); delete theEvent; return false; } theEvent->SetTime(eventTime); InsertDrecEvent(theEvent); } else { DMSG(0, "Mgen::ParseEvent() Error: invalid command \"%s\" at line: %lu\n", fieldBuffer, lineCount); return false; } break; } default: // Is it a global command if (INVALID_COMMAND != cmd) { if (!OnCommand(cmd, ptr)) { DMSG(0, "Mgen::ParseEvent() Error: bad global command at line: %lu\n", lineCount); return false; } } else { DMSG(0, "Mgen::ParseEvent() Error: invalid command at line: %lu\n", lineCount); return false; } break; } return true; } // end Mgen::ParseEvent() void Mgen::InsertDrecEvent(DrecEvent* theEvent) { double eventTime = theEvent->GetTime(); if (started) { double currentTime = GetCurrentOffset(); if (currentTime < 0.0) currentTime = 0.0; if (eventTime < currentTime) { theEvent->SetTime(currentTime); drec_event_list.Precede(next_drec_event, theEvent); ProcessDrecEvent(*theEvent); } else { theEvent->SetTime(eventTime); drec_event_list.Insert(theEvent); // Reschedule next drec timeout if needed if (drec_event_timer.IsActive()) { double nextTime = next_drec_event->GetTime(); if (eventTime < nextTime) { next_drec_event = theEvent; drec_event_timer.SetInterval(eventTime - currentTime); drec_event_timer.Reschedule(); } } else { next_drec_event = theEvent; drec_event_timer.SetInterval(eventTime - currentTime); timer_mgr.ActivateTimer(drec_event_timer); } } } else { eventTime = eventTime > 0.0 ? eventTime : 0.0; theEvent->SetTime(eventTime); drec_event_list.Insert(theEvent); } } // end Mgen::InsertDrecEvent() // Global command processing const StringMapper Mgen::COMMAND_LIST[] = { {"+EVENT", EVENT}, {"+START", START}, {"+INPUT", INPUT}, {"+OUTPUT", OUTPUT}, {"+LOG", LOG}, {"+SAVE", SAVE}, {"+DEBUG", DEBUG}, {"+OFFSET", OFFSET}, {"-TXLOG", TXLOG}, {"-NOLOG", NOLOG}, {"+DLOG", DLOG}, {"-BINARY", BINARY}, {"-FLUSH", FLUSH}, {"+LABEL", LABEL}, {"+RXBUFFER", RXBUFFER}, {"+TXBUFFER", TXBUFFER}, {"+TOS", TOS}, {"+TTL", TTL}, {"+INTERFACE", INTERFACE}, {"-CHECKSUM", CHECKSUM}, {"-TXCHECKSUM", TXCHECKSUM}, {"-RXCHECKSUM", RXCHECKSUM}, {"+OFF", INVALID_COMMAND}, // to deconflict "offset" from "off" event {NULL, INVALID_COMMAND} }; Mgen::Command Mgen::GetCommandFromString(const char* string) { // Make comparison case-insensitive char upperString[16]; unsigned int len = strlen(string); len = len < 16 ? len : 16; for (unsigned int i = 0 ; i < len; i++) upperString[i] = toupper(string[i]); const StringMapper* m = COMMAND_LIST; Mgen::Command theCommand = INVALID_COMMAND; while (NULL != (*m).string) { if (!strncmp(upperString, (*m).string+1, len)) theCommand = ((Command)((*m).key)); m++; } return theCommand; } // end Mgen::GetCommandFromString() // This tells if the command is valid and whether args are expected Mgen::CmdType Mgen::GetCmdType(const char* cmd) { if (!cmd) return CMD_INVALID; char upperCmd[32]; // all commands < 32 characters unsigned int len = strlen(cmd); len = len < 31 ? len : 31; unsigned int i; for (i = 0; i < 31; i++) upperCmd[i] = toupper(cmd[i]); bool matched = false; const StringMapper* m = COMMAND_LIST; CmdType type = CMD_INVALID; while (INVALID_COMMAND != (*m).key) { if (!strncmp(upperCmd, (*m).string+1, len)) { if (matched) { // ambiguous command (command should match only once) return CMD_INVALID; } else { matched = true; if ('+' == (*m).string[0]) type = CMD_ARG; else type = CMD_NOARG; } } m++; } return type; } // end Mgen::GetCmdType() bool Mgen::OnCommand(Mgen::Command cmd, const char* arg, bool override) { switch (cmd) { case START: { if (!arg) { DMSG(0, "Mgen::OnCommand() Error: missing \n"); return false; } if (override || !start_time_lock) { // convert to upper case for case-insensitivity // search for "GMT" or "NOW" keywords char temp[32]; unsigned int len = strlen(arg); len = len < 31 ? len : 31; unsigned int i; for (i = 0 ; i < len; i++) temp[i] = toupper(arg[i]); temp[i] = '\0'; unsigned int startHour, startMin; double startSec; // arg should be "hr:min:sec[GMT]" or "NOW" if (3 == sscanf(temp, "%u:%u:%lf", &startHour, &startMin, &startSec)) { start_hour = startHour; start_min = startMin; start_sec = startSec; if (strstr(temp, "GMT")) start_gmt = true; else start_gmt = false; } else { // Check for "NOW" keywork (case-insensitive) if (strstr(temp, "NOW")) { // negative start_time_sec indicates immediate start start_sec = -1.0; } else { DMSG(0, "Mgen::OnCommand() Error: invalid START time\n"); return false; } } start_time_lock = override; // record START command precedence } // end if (override || !start_time_lock) break; } // end case START case EVENT: if (!ParseEvent(arg, 0)) { DMSG(0, "Mgen::OnCommand() - error parsing event\n"); return false; } break; case INPUT: if (!ParseScript(arg)) { DMSG(0, "Mgen::OnCommand() - error parsing script\n"); return false; } break; case TXBUFFER: unsigned int sizeTemp; if (1 != sscanf(arg, "%u", &sizeTemp)) { DMSG(0, "Mgen::OnCommand() - invalid tx buffer size\n"); return false; } socket_list.SetDefaultTxBufferSize(sizeTemp, override); break; case RXBUFFER: { unsigned int sizeTemp; if (1 != sscanf(arg, "%u", &sizeTemp)) { DMSG(0, "Mgen::OnCommand() - invalid rx buffer size\n"); } socket_list.SetDefaultRxBufferSize(sizeTemp, override); break; } case TOS: { int tosTemp; int result = sscanf(arg, "%i", &tosTemp); if ((1 != result) || (tosTemp < 0) || (tosTemp > 255)) { DMSG(0, "Mgen::OnCommand() - invalid tos value"); return false; } socket_list.SetDefaultTos(tosTemp, override); break; } case INTERFACE: socket_list.SetDefaultMulticastInterface(arg, override); break; case TTL: { int ttlTemp; int result = sscanf(arg, "%i", &ttlTemp); if ((1 != result) || (ttlTemp < 0) || (ttlTemp > 255)) { DMSG(0, "Mgen::OnCommand() - invalid ttl value"); return false; } socket_list.SetDefaultTtl(ttlTemp, override); break; } case LABEL: // (TBD) make flow_list.SetDefaultFlowLabel() if (override || !default_label_lock) { int tempLabel; if (1 != sscanf(arg, "%i", &tempLabel)) { DMSG(0, "Mgen::OnCommand() - invalid flow label value"); return false; } default_flow_label = tempLabel; default_label_lock = override; } break; case OUTPUT: if (override || !log_file_lock) { if (!OpenLog(arg, false, log_binary)) { DMSG(0, "Mgen::OnCommand() Error: log file open error: %s\n", GetErrorString()); return false; } log_file_lock = override; } break; case LOG: if (override || !log_file_lock) { if (!OpenLog(arg, true, log_binary)) { DMSG(0, "Mgen::OnCommand() Error: log file open error: %s\n", GetErrorString()); return false; } log_file_lock = override; } break; case NOLOG: if (override || !log_file_lock) { SetLogFile(NULL); log_file_lock = override; } break; case DLOG: if (override) { if (!OpenDebugLog(arg)) { DMSG(0, "Mgen::OnCommand(dlog) error opening debug log file: %s\n", GetErrorString()); return false; } } break; case SAVE: if (override || !save_path_lock) { FILE* filePtr = fopen(arg, "w+"); if (filePtr) { char* path = new char[strlen(arg) + 1]; if (path) { strcpy(path, arg); if (save_path) delete save_path; save_path = path; save_path_lock = override; } else { DMSG(0, "Mgen::OnCommand() Error: memory allocation error: %s\n", GetErrorString()); return false; } } else { DMSG(0, "Mgen::OnCommand() Error: save file open error: %s\n", GetErrorString()); return false; } } break; case DEBUG: SetDebugLevel(atoi(arg)); break; case OFFSET: if (override || !offset_lock) { double timeOffset; if (1 == sscanf(arg, "%lf", &timeOffset)) { offset = timeOffset; offset_lock = override; } else { DMSG(0, "Mgen::OnCommand() Error invalid OFFSET\n"); return false; } } break; case TXLOG: log_tx = true; flow_list.SetLogFile(log_file, log_binary, log_flush); break; case BINARY: if (log_open) { DMSG(0, "Mgen::OnCommand() Error: BINARY option must precede OUTPUT and LOG commands\n"); return false; } log_binary = true; if (log_tx) flow_list.SetLogFile(log_file, log_binary, log_flush); break; case FLUSH: log_flush = true; if (log_tx) flow_list.SetLogFile(log_file, log_binary, log_flush); break; case TXCHECKSUM: checksum_enable = true; flow_list.EnableChecksums(); break; case RXCHECKSUM: checksum_force = true; break; case CHECKSUM: checksum_enable = true; flow_list.EnableChecksums(); checksum_force = true; break; case INVALID_COMMAND: DMSG(0, "Mgen::OnCommand() Error: invalid command\n"); return false; } // end switch(cmd) return true; } // end Mgen::OnCommand() //////////////////////////////////////////////////////////////// // DrecGroupList implementation DrecGroupList::DrecGroupList() : head(NULL), tail(NULL) { } DrecGroupList::~DrecGroupList() { } void DrecGroupList::Destroy(MgenSocketList& socketList) { Item* next = head; while (next) { Item* current = next; next = next->next; if (NULL != current->socket_item) socketList.LeaveGroup(current->socket_item, current->group_addr, current->GetInterface()); delete current; } head = tail = (Item*)NULL; } // end DrecGroupList::Destroy() bool DrecGroupList::JoinGroup(MgenSocketList& socketList, const ProtoAddress& groupAddress, const char* interfaceName, unsigned short thePort, bool deferred) { Item* item = FindItemByGroup(groupAddress, interfaceName, thePort); if (item && item->IsActive()) { DMSG(0, "DrecGroupList::JoinGroup() Error: group already joined\n"); return false; } else { if (!item) { if (!(item = new Item(groupAddress, interfaceName, thePort))) { DMSG(0, "DrecGroupList::JoinGroup() Error: DrecGroupList::Item memory allocation error: %s\n", GetErrorString()); return false; } Append(item); } if (deferred) return true; else return item->Activate(socketList); } } // end DrecGroupList::JoinGroup() bool DrecGroupList::LeaveGroup(MgenSocketList& socketList, const ProtoAddress& groupAddress, const char* interfaceName, unsigned short thePort) { Item* item = FindItemByGroup(groupAddress, interfaceName, thePort); if (item) { if (item->IsActive()) { if (!socketList.LeaveGroup(item->socket_item, groupAddress, interfaceName)) { DMSG(0, "DrecGroupList::LeaveGroup() Error: group leave error\n"); return false; } } Remove(item); delete item; return true; } else { DMSG(0, "DrecGroupList::LeaveGroup() Error: group item not found\n"); return false; } } // end DrecGroupList::LeaveGroup() bool DrecGroupList::JoinDeferredGroups(MgenSocketList& socketList) { bool result = true; Item* next = head; while (next) { if (!next->IsActive()) result &= next->Activate(socketList); next = next->next; } return result; } // end DrecGroupList::JoinDeferredGroups() DrecGroupList::Item* DrecGroupList::FindItemByGroup(const ProtoAddress& groupAddr, const char* interfaceName, unsigned short thePort) { Item* next = head; while (next) { const char* nextInterface = next->GetInterface(); bool interfaceIsEqual = (NULL == interfaceName) ? (NULL == nextInterface) : ((NULL != nextInterface) && !strcmp(nextInterface, interfaceName)); bool groupIsEqual = next->group_addr.IsValid() ? next->group_addr.HostIsEqual(groupAddr) : true; bool portIsEqual = thePort == next->GetPort(); if (interfaceIsEqual && groupIsEqual && portIsEqual) return next; next = next->next; } return (Item*)NULL; } // end DrecGroupList::FindItemByGroup() void DrecGroupList::Append(Item* item) { item->next = NULL; if ((item->prev = tail)) item->prev->next = item; else head = item; tail = item; } // end DrecGroupList::Append() void DrecGroupList::Remove(Item* item) { if (item->prev) item->prev->next = item->next; else head = item->next; if (item->next) item->next->prev = item->prev; else tail = item->prev; } // end DrecGroupList::Remove() ////////////////////////////////////////////////// // DrecGroupList::Item implementation DrecGroupList::Item::Item(const ProtoAddress& groupAddr, const char* interfaceName, unsigned short thePort) : socket_item(NULL), group_addr(groupAddr), port(thePort), prev(NULL), next(NULL) { if (interfaceName) strncpy(interface_name, interfaceName, 16); else interface_name[0] = '\0'; } DrecGroupList::Item::~Item() { } //////////////////////////////////////////////////////////////// // Mgen::FastReader implementation Mgen::FastReader::FastReader() : savecount(0) { } Mgen::FastReader::Result Mgen::FastReader::Read(FILE* filePtr, char* buffer, unsigned int* len) { unsigned int want = *len; if (savecount) { unsigned int ncopy = want < savecount ? want : savecount; memcpy(buffer, saveptr, ncopy); savecount -= ncopy; saveptr += ncopy; buffer += ncopy; want -= ncopy; } while (want) { unsigned int result = fread(savebuf, sizeof(char), BUFSIZE, filePtr); if (result) { unsigned int ncopy= want < result ? want : result; memcpy(buffer, savebuf, ncopy); savecount = result - ncopy; saveptr = savebuf + ncopy; buffer += ncopy; want -= ncopy; } else // end-of-file { #ifndef _WIN32_WCE if (ferror(filePtr)) { if (EINTR == errno) continue; } #endif // !_WIN32_WCE *len -= want; if (*len) return OK; // we read at least something else return DONE; // we read nothing } } // end while(want) return OK; } // end Mgen::FastReader::Read() // An OK text readline() routine (reads what will fit into buffer incl. NULL termination) // if *len is unchanged on return, it means the line is bigger than the buffer and // requires multiple reads Mgen::FastReader::Result Mgen::FastReader::Readline(FILE* filePtr, char* buffer, unsigned int* len) { unsigned int count = 0; unsigned int length = *len; char* ptr = buffer; while (count < length) { unsigned int one = 1; switch (Read(filePtr, ptr, &one)) { case OK: if (('\n' == *ptr) || ('\r' == *ptr)) { *ptr = '\0'; *len = count; return OK; } count++; ptr++; break; case ERROR_: return ERROR_; case DONE: return DONE; } } // We've filled up the buffer provided with no end-of-line return ERROR_; } // end Mgen::FastReader::Readline() // This reads a line with possible ending '\' continuation character. // Such "continued" lines are returned as one line with this function // and the "lineCount" argument indicates the actual number of lines // which comprise the long "continued" line. // // (Note: Lines ending with an even number '\\' are considered ending // with one less '\' instead of continuing. So, to actually // end a line with an even number of '\\', continue it with // an extra '\' and follow it with a blank line.) Mgen::FastReader::Result Mgen::FastReader::ReadlineContinue(FILE* filePtr, char* buffer, unsigned int* len, unsigned int* lineCount) { unsigned int lines = 0; unsigned int count = 0; unsigned int length = *len; char* ptr = buffer; while (count < length) { unsigned int space = length - count; switch (Readline(filePtr, ptr, &space)) { case OK: { lines++; // 1) Does the line continue? char* cptr = space ? ptr + space - 1 : ptr; // a) skip trailing white space while (((' ' == *cptr) || ('\t' == *cptr)) && (cptr > ptr)) { space--; cptr--; } // If line "continues" to a blank line, skip it if ((cptr == ptr) && ((*cptr == '\0') || isspace(*cptr))) continue; if ('\\' == *cptr) { // Make sure line ends with odd number of '\' to continue bool lineContinues = false; while ((cptr >= ptr) && ('\\' == *cptr)) { cptr--; lineContinues = lineContinues ? false : true; } // lose trailing '\' (continuation or extra '\' char) *(ptr+space-1) = '\0'; space -= 1; if (lineContinues) { // get next line to continue count += space; ptr += space; continue; } } *len = count + space; if (lineCount) *lineCount = lines; return OK; break; } case ERROR_: return ERROR_; case DONE: return DONE; } } // We've filled up the buffer provided with no end-of-line return ERROR_; } // end Mgen::FastReader::Readline()