#include "mgen.h" #include "mgenMsg.h" #include "mgenVersion.h" #include "protokit.h" #include #include // for stdout/stderr printouts #include // for toupper() #ifdef WIN32 #include #else #include #include #endif // UNIX // byte stream MGEN message sink class MgenStreamSink : public MgenSink { public: MgenStreamSink(ProtoDispatcher& theDispatcher); virtual ~MgenStreamSink(); bool Open(const char* path, bool nonBlocking = true); void Close(); virtual bool SendMgenMessage(const char* txBuffer, unsigned int len, const ProtoAddress& dstAddr); static void DoOutputReady(ProtoDispatcher::Descriptor descriptor, ProtoDispatcher::Event theEvent, const void* userData); void StartNotifying(MgenFlow* theFlow); private: void OnOutputReady(); bool Write(char* buffer, unsigned int* nbytes); ProtoDispatcher::Descriptor descriptor; #ifdef WIN32 HANDLE write_event; #endif // WIN32 char msg_buffer[MgenMsg::MAX_SIZE]; UINT16 msg_len; UINT16 msg_index; ProtoDispatcher& dispatcher; bool output_active; }; // end class MgenStreamSink class MgenStreamSource { public: MgenStreamSource(Mgen& mgenInstance, class MgenApp& mgenApp); ~MgenStreamSource(); bool Open(const char* path); void Close(); ProtoDispatcher::Descriptor GetDescriptor() {return descriptor;} static void DoInputReady(ProtoDispatcher::Descriptor descriptor, ProtoDispatcher::Event theEvent, const void* userData); protected: void OnInputReady(); bool Read(char* buffer, UINT32 nBytes, UINT32* bytesRead); Mgen& mgen_instance; class MgenApp& mgen_app; ProtoDispatcher::Descriptor descriptor; #ifdef WIN32 HANDLE read_event; #endif // WIN32 UINT32 msg_length; char msg_buffer[MgenMsg::MAX_SIZE]; UINT32 msg_index; }; // end class MgenStreamSource class MgenApp : public ProtoApp { public: enum CmdType {CMD_INVALID, CMD_ARG, CMD_NOARG}; MgenApp(); ~MgenApp(); virtual bool OnStartup(int argc, const char*const* argv); virtual bool ProcessCommands(int argc, const char*const* argv); virtual void OnShutdown(); private: void OnControlEvent(ProtoSocket& theSocket, ProtoSocket::Event theEvent); static CmdType GetCmdType(const char* string); bool OnCommand(const char* cmd, const char* val); void Init(char* scriptFile); void Usage(); static const char* const CMD_LIST[]; static bool GetInterfaceCounts(const char* ifName, UINT32& txCount, UINT32& rxCount); bool OpenCmdInput(const char* path); void CloseCmdInput(); static void DoCmdInputReady(ProtoDispatcher::Descriptor descriptor, ProtoDispatcher::Event theEvent, const void* userData); void OnCmdInputReady(); bool ReadCmdInput(char* buffer, unsigned int& numBytes); bool IsCmdDelimiter(char byte) {return (('\n' == byte) || ('\r' == byte) || (';' == byte)); } Mgen mgen; // Mgen engine ProtoPipe control_pipe; // remote control message pipe bool control_remote; ProtoDispatcher::Descriptor cmd_descriptor; // "stdin" control option char cmd_buffer[8192]; unsigned int cmd_length; #ifdef HAVE_GPS static bool GetPosition(const void* gpsHandle, GPSPosition& gpsPosition); GPSHandle gps_handle; GPSHandle payload_handle; #endif // HAVE_GPS bool have_ports; bool convert; char convert_path[PATH_MAX]; bool sink_non_blocking; MgenStreamSource* source; char ifinfo_name[64]; UINT32 ifinfo_tx_count; UINT32 ifinfo_rx_count; }; // end class MgenApp MgenApp::MgenApp() : mgen(GetTimerMgr(), GetSocketNotifier()), control_pipe(ProtoPipe::MESSAGE), control_remote(false), cmd_descriptor(ProtoDispatcher::INVALID_DESCRIPTOR), #ifdef HAVE_GPS gps_handle(NULL), payload_handle(NULL), #endif // HAVE_GPS have_ports(false), convert(false), sink_non_blocking(true), source(NULL), ifinfo_tx_count(0), ifinfo_rx_count(0) { control_pipe.SetNotifier(&GetSocketNotifier()); control_pipe.SetListener(this, &MgenApp::OnControlEvent); ifinfo_name[0] = '\0'; } MgenApp::~MgenApp() { } void MgenApp::Usage() { fprintf(stderr, "mgen [ipv4][ipv6][input ][save ]\n" " [output ][log ][hostAddr {on|off}\n" " [binary][txlog][nolog][flush]\n" " [event \"\"][port ]\n" " [instance ][command ]\n" " [sink ][block][source ]\n" " [interface ][ttl ]\n" " [tos ][label ]\n" " [txbuffer ][rxbuffer ]\n" " [start [GMT]][offset ]\n" " [precise {on|off}][ifinfo ]\n" " [txcheck][rxcheck][check]\n" " [convert ][debug ]\n"); } // end MgenApp::Usage() const char* const MgenApp::CMD_LIST[] = { "+port", "-ipv6", // open IPv6 sockets by default "-ipv4", // open IPv4 sockets by default "+convert", // convert binary logfile to text-based logfile "+sink", // set Mgen::sink to stream sink "-block", // set Mgen::sink to blocking I/O "+source", // specify an MGEN stream source "+ifinfo", // get tx/rx frame counts for specified interface "+precise", // turn on/off precision packet timing "+instance", // indicate mgen instance name "-stop", // exit program instance "+command", // specifies an input command file/device "+hostAddr", // turn "host" field on/off in sent messages "-help", // print usage and exit NULL }; MgenApp::CmdType MgenApp::GetCmdType(const char* cmd) { if (!cmd) return CMD_INVALID; unsigned int len = strlen(cmd); bool matched = false; CmdType type = CMD_INVALID; const char* const* nextCmd = CMD_LIST; while (*nextCmd) { if (!strncmp(cmd, *nextCmd+1, len)) { if (matched) { // ambiguous command (command should match only once) return CMD_INVALID; } else { matched = true; if ('+' == *nextCmd[0]) type = CMD_ARG; else type = CMD_NOARG; } } nextCmd++; } return type; } // end MgenApp::GetCmdType() bool MgenApp::ProcessCommands(int argc, const char*const* argv) { // Group command line arguments into MgenApp or Mgen commands // and their (if applicable) respective arguments // Parse command line int i = 1; convert = false; // initialize conversion flag while (i < argc) { CmdType cmdType = GetCmdType(argv[i]); if (CMD_INVALID == cmdType) { // Is it a class MgenApp command? switch(Mgen::GetCmdType(argv[i])) { case Mgen::CMD_INVALID: break; case Mgen::CMD_NOARG: cmdType = CMD_NOARG; break; case Mgen::CMD_ARG: cmdType = CMD_ARG; break; } } switch (cmdType) { case CMD_INVALID: DMSG(0, "MgenApp::ProcessCommands() error: invalid command:%s\n", argv[i]); return false; case CMD_NOARG: if (!OnCommand(argv[i], NULL)) { DMSG(0, "MgenApp::ProcessCommands() OnCommand(%s) error\n", argv[i]); return false; } i++; break; case CMD_ARG: if (!OnCommand(argv[i], argv[i+1])) { DMSG(0, "MgenApp::ProcessCommands() OnCommand(%s, %s) error\n", argv[i], argv[i+1]); return false; } i += 2; break; } } return true; } // end MgenApp::ProcessCommands() bool MgenApp::OnCommand(const char* cmd, const char* val) { if (control_remote) { char buffer[8192]; strcpy(buffer, cmd); if (val) { strcat(buffer, " "); strncat(buffer, val, 8192 - strlen(buffer)); } unsigned int len = strlen(buffer); if (control_pipe.Send(buffer, len)) { return true; } else { DMSG(0, "MgenApp::ProcessCommand(%s) error sending command to remote process\n", cmd); return false; } } CmdType type = GetCmdType(cmd); unsigned int len = strlen(cmd); if (CMD_INVALID == type) { // If it is a core mgen command, process it as "overriding" if (Mgen::CMD_INVALID != Mgen::GetCmdType(cmd)) { return mgen.OnCommand(Mgen::GetCommandFromString(cmd), val, true); } else { DMSG(0, "MgenApp::ProcessCommand(%s) error: invalid command\n", cmd); return false; } } else if ((CMD_ARG == type) && !val) { DMSG(0, "MgenApp::ProcessCommand(%s) missing argument\n", cmd); return false; } else if (!strncmp("port", cmd, len)) { // "port" == implicit "0.0 LISTEN UDP " script char* string = new char[strlen(val) + 64]; if (!string) { DMSG(0, "MgenApp::ProcessCommand(port) memory allocation error: %s\n", GetErrorString()); return false; } sprintf(string, "0.0 LISTEN UDP %s", val); DrecEvent* event = new DrecEvent; if (!event) { DMSG(0, "MgenApp::ProcessCommand(port) memory allocation error: %s\n", GetErrorString()); return false; } if (!event->InitFromString(string)) { DMSG(0, "MgenApp::ProcessCommand(port) error parsing \n"); return false; } have_ports = true; mgen.InsertDrecEvent(event); } else if (!strncmp("ipv4", cmd, len)) { mgen.SetDefaultSocketType(ProtoAddress::IPv4); } else if (!strncmp("ipv6", cmd, len)) { #ifdef HAVE_IPV6 ProtoSocket::SetHostIPv6Capable(); if (ProtoSocket::HostIsIPv6Capable()) mgen.SetDefaultSocketType(ProtoAddress::IPv6); else #endif // HAVE_IPV6 DMSG(0, "MgenApp::ProcessCommand(ipv6) Warning: system not IPv6 capable?\n"); } else if (!strncmp("background", cmd, len)) { // do nothing (this command was scanned immediately at startup) } else if (!strcmp("convert", cmd)) { convert = true; // set flag to do the conversion strcpy(convert_path, val); // save path of file to convert } else if (!strncmp("sink", cmd, len)) { MgenStreamSink* theSink = new MgenStreamSink(dispatcher); if (theSink) { if (theSink->Open(val, sink_non_blocking)) { if (mgen.GetSink()) { ((MgenStreamSink*)mgen.GetSink())->Close(); delete mgen.GetSink(); } mgen.SetSink(theSink); } else { DMSG(0, "MgenApp::ProcessCommand(sink) Error: couldn't open stream sink\n"); return false; } } else { DMSG(0, "MgenApp::ProcessCommand(sink) Error: couldn't allocate sink: %s\n", GetErrorString()); return false; } } else if (!strncmp("block", cmd, len)) { sink_non_blocking = false; } else if (!strncmp("source", cmd, len)) { MgenStreamSource* theSource = new MgenStreamSource(mgen, *this); if (theSource) { if (theSource->Open(val)) { if (!dispatcher.InstallGenericInput(theSource->GetDescriptor(), MgenStreamSource::DoInputReady, theSource)) { DMSG(0, "MgenApp::ProcessCommand(source) Error: couldn't install stream source\n"); theSource->Close(); return false; } if (source) { source->Close(); delete source; } source = theSource; } else { DMSG(0, "MgenApp::ProcessCommand(source) Error: couldn't open stream source\n"); return false; } } else { DMSG(0, "MgenApp::ProcessCommand(source) Error: couldn't allocate source: %s\n", GetErrorString()); return false; } } else if (!strncmp("ifinfo", cmd, len)) { strncpy(ifinfo_name, val, 63); ifinfo_name[63] = '\0'; } else if (!strncmp("precise", cmd, len)) { char status[4]; // valid status is "on" or "off" strncpy(status, val, 3); status[3] = '\0'; unsigned int len = strlen(status); for (unsigned int i = 0; i < len; i++) status[i] = tolower(status[i]); if (!strncmp("on", status, len)) { dispatcher.SetPreciseTiming(true); } else if (!strncmp("off", status, len)) { dispatcher.SetPreciseTiming(false); } else { DMSG(0, "MgenApp::ProcessCommand(precise) Error: invalid \n"); return false; } } else if (!strncmp("instance", cmd, len)) { if (control_pipe.IsOpen()) control_pipe.Close(); if (control_pipe.Connect(val)) { control_remote = true; } else if (!control_pipe.Listen(val)) { DMSG(0, "MgenApp::ProcessCommand(instance) error opening control pipe\n"); return false; } } else if (!strncmp("command", cmd, len)) { if (!OpenCmdInput(val)) { DMSG(0, "MgenApp::ProcessCommand(command) error opening command input file/device\n"); return false; } } else if (!strncmp("hostAddr", cmd, len)) { char status[4]; // valid status is "on" or "off" strncpy(status, val, 3); status[3] = '\0'; unsigned int len = strlen(status); for (unsigned int i = 0; i < len; i++) status[i] = tolower(status[i]); if (!strncmp("on", status, len)) { // (TBD) control whither IPv4 or IPv6 ??? ProtoAddress localAddress; if (localAddress.ResolveLocalAddress()) { mgen.SetHostAddress(localAddress); } else { DMSG(0, "MgenApp::ProcessCommand(hostAddr) error getting local addr\n"); return false; } } else if (!strncmp("off", status, len)) { mgen.ClearHostAddress(); } else { DMSG(0, "MgenApp::ProcessCommand(precise) Error: invalid \n"); return false; } } else if (!strncmp("stop", cmd, len)) { Stop(); } else if (!strncmp("help", cmd, len)) { fprintf(stderr, "mgen: version %s\n", MGEN_VERSION); Usage(); return false; } return true; } // end MgenApp::OnCommand() bool MgenApp::OnStartup(int argc, const char*const* argv) { // Seed the system rand() function struct timeval currentTime; ProtoSystemTime(currentTime); srand(currentTime.tv_usec); mgen.SetLogFile(stdout); // log to stdout by default #ifdef HAVE_IPV6 if (ProtoSocket::HostIsIPv6Capable()) mgen.SetDefaultSocketType(ProtoAddress::IPv6); #endif // HAVE_IPV6 #ifdef HAVE_GPS gps_handle = GPSSubscribe(NULL); if (gps_handle) mgen.SetPositionCallback(MgenApp::GetPosition, gps_handle); payload_handle = GPSSubscribe("/tmp/mgenPayloadKey"); mgen.SetPayloadHandle(payload_handle); #endif // HAVE_GPS if (!ProcessCommands(argc, argv)) { fprintf(stderr, "mgen: error while processing startup commands\n"); return false; } if (control_remote) { // We remoted commands, so we're finished ... return false; } fprintf(stderr, "mgen: version %s\n", MGEN_VERSION); if (convert) { fprintf(stderr, "mgen: beginning binary to text log conversion ...\n"); mgen.ConvertBinaryLog(convert_path); fprintf(stderr, "mgen: conversion complete (exiting).\n"); } else { if (mgen.DelayedStart()) { char startTime[256]; mgen.GetStartTime(startTime); fprintf(stderr, "mgen: delaying start until %s ...\n", startTime); } else { fprintf(stderr, "mgen: starting now ...\n"); } } if ('\0' != ifinfo_name[0]) GetInterfaceCounts(ifinfo_name, ifinfo_tx_count, ifinfo_rx_count); else ifinfo_tx_count = ifinfo_rx_count = 0; return mgen.Start(); } // end MgenApp::OnStartup() void MgenApp::OnShutdown() { mgen.Stop(); if ('\0' != ifinfo_name[0]) { UINT32 txCount, rxCount; if (GetInterfaceCounts(ifinfo_name, txCount, rxCount)) fprintf(stderr, "mgen: iface>%s txFrames>%lu rxFrames>%lu\n", ifinfo_name, txCount - ifinfo_tx_count, rxCount - ifinfo_rx_count); else fprintf(stderr, "mgen: Warning! Couldn't get interface frame counts\n"); } MgenSink* theSink = mgen.GetSink(); if (theSink) { delete theSink; mgen.SetSink(NULL); } if (source) { delete source; source = NULL; } #ifdef HAVE_GPS if (gps_handle) { GPSUnsubscribe(gps_handle); gps_handle = NULL; mgen.SetPositionCallback(NULL, NULL); } if (payload_handle) { GPSUnsubscribe(payload_handle); payload_handle = NULL; mgen.SetPayloadHandle(NULL); } #endif // HAVE_GPS control_pipe.Close(); CloseCmdInput(); } // end MgenApp::OnShutdown() #ifdef HAVE_GPS bool MgenApp::GetPosition(const void* gpsHandle, GPSPosition& gpsPosition) { GPSGetCurrentPosition((GPSHandle)gpsHandle, &gpsPosition); return true; } #endif // HAVE_GPS // This routine uses the system's "netstat" command // to retrieve tx/rx frame counts for a given interface // (WIN32 Note: Counts are cumulative for all interfaces?) bool MgenApp::GetInterfaceCounts(const char* ifName, UINT32& txCount, UINT32& rxCount) { #ifdef _WIN32_WCE // For the moment, we'll assume our WINCE platform // has one interface and we'll use GetIpStatistics() // ignoring the "ifName" MIB_IPSTATS ipStats; if (NO_ERROR != GetIpStatistics(&ipStats)) { DMSG(0, "MgenApp::GetInterfaceCounts() GetIpStatistics() error: %s\n", GetErrorString()); rxCount = txCount = 0; return false; } else { rxCount = ipStats.dwInReceives; txCount = ipStats.dwInDelivers; return true; } #else bool result = false; txCount = rxCount = 0; #ifdef WIN32 FILE* p = _popen("netstat -e", "r"); int lineCount = 0; #else FILE* p = popen("netstat -i", "r"); #endif // if/else WIN32 if (NULL == p) { DMSG(0, "MgenApp::GetInterfaceCounts() popen() error: %s\n", GetErrorString()); return result; } Mgen::FastReader reader; char linebuffer[256]; unsigned int len = 256; while (Mgen::FastReader::OK == reader.Readline(p, linebuffer, &len)) { #ifdef WIN32 // On WIN32, we must collect two lines from "netstat -e" output if (!strncmp(linebuffer, "Unicast", strlen("Unicast")) || !strncmp(linebuffer, "Non-unicast", strlen("Non-unicast"))) { char* ptr = strstr(linebuffer, "packets"); if (ptr) { UINT32 received, sent; if (2 == sscanf(ptr, "packets %lu %lu", &received, &sent)) { txCount += sent; rxCount += received; if (2 == ++lineCount) { result = true; break; } } } } #else if (!strncmp(linebuffer, ifName, strlen(ifName))) { #ifdef LINUX char format[256]; // Linux format: "iface mtu metric rxOk rxErr rxDrp rxOvr txOk ..." sprintf(format, "%s %%lu %%lu %%lu %%lu %%lu %%lu %%lu", ifName); UINT32 mtu, metric, rxOk, rxErr, rxDrp, rxOvr, txOk; if (7 == sscanf(linebuffer, format, &mtu, &metric, &rxOk, &rxErr, &rxDrp, &rxOvr, &txOk)) { txCount = txOk; rxCount = rxOk; result = true; break; } #endif // LINUX #ifdef MACOSX char format[256]; // MacOS X format: "iface mtu net addr ipkts ierrs opkts ..." char net[64], addr[64]; UINT32 mtu, ipkts, ierrs, opkts; sprintf(format, "%s %%lu %%s %%s %%lu %%lu %%lu", ifName); if (6 == sscanf(linebuffer, format, &mtu, net, addr, &ipkts, &ierrs, &opkts)) { txCount = opkts; rxCount = ipkts; result = true; break; } #endif // MACOSX } #endif // if/else WIN32 len = 256; } // end while(reader.Readline()) #ifdef WIN32 _pclose(p); #else pclose(p); #endif // if/else WIN32 return result; #endif // if/else _WIN32_WCE } // end MgenApp::GetInterfaceCounts() void MgenApp::OnControlEvent(ProtoSocket& /*theSocket*/, ProtoSocket::Event theEvent) { if (ProtoSocket::RECV == theEvent) { char buffer[8192]; unsigned int len = 8191; if (control_pipe.Recv(buffer, len)) { // (TBD) Delimit commands by line breaks and/or other delimiters ??? buffer[len] = '\0'; char* cmd = buffer; char* arg = NULL; for (unsigned int i = 0; i < len; i++) { if ('\0' == buffer[i]) { break; } else if (isspace(buffer[i])) { buffer[i] = '\0'; arg = buffer+i+1; break; } } if (!OnCommand(cmd, arg)) DMSG(0, "MgenApp::OnControlEvent() error processing command\n"); } else { DMSG(0, "MgenApp::OnControlEvent() receive error\n"); } } else { DMSG(0, "MgenApp::OnControlEvent() unhandled event type\n"); } } // end MgenApp::OnControlEvent // Mgen run-time control via "stdin" (or other file/device) // Note: On WIN32, MGEN must be built as a "console" application // for this stuff to work? bool MgenApp::OpenCmdInput(const char* path) { #ifdef _WIN32_WCE DMSG(0, "MgenApp::OpenCmdInput() \"command\" option not supported on WinCE\n"); return false; #else CloseCmdInput(); // in case it was open #ifdef WIN32 if (!strcmp(path, "STDIN")) { cmd_descriptor = (ProtoDispatcher::Descriptor)GetStdHandle(STD_INPUT_HANDLE); // (TBD) set stdin for overlapped I/O ??? } else { cmd_descriptor = (ProtoDispatcher::Descriptor)CreateFile(path, GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL); } if ((ProtoDispatcher::Descriptor)INVALID_HANDLE_VALUE == cmd_descriptor) { DMSG(0, "MgenApp::OpenCmdInput() error opening file\n"); return false; } /*// For non-block I/O on Win32 we use overlapped I/O if (NULL == (cmd_read_event = CreateEvent(NULL, TRUE, FALSE, NULL))) { DMSG(0, "MgenApp::OpenCmdInput() error creating overlapped I/O event\n"); CloseHandle((HANDLE)cmd_descriptor); return false; } */ #else if (!strcmp(path, "STDIN")) { cmd_descriptor = fileno(stdin); // Make the stdin non-blocking if(-1 == fcntl(cmd_descriptor, F_SETFL, fcntl(cmd_descriptor, F_GETFL, 0) | O_NONBLOCK)) { cmd_descriptor = -1; DMSG(0, "MgenApp::OpenCmdInput() fcntl(stdout, F_SETFL(O_NONBLOCK)) error: %s", GetErrorString()); } } else { cmd_descriptor = open(path, O_RDONLY | O_NONBLOCK); } if (cmd_descriptor < 0) { DMSG(0, "MgenApp::OpenCmdInput() error opening file: %s", GetErrorString()); cmd_descriptor = ProtoDispatcher::INVALID_DESCRIPTOR; return false; } #endif // if/else WIN32 cmd_length = 0; if (!dispatcher.InstallGenericInput(cmd_descriptor, MgenApp::DoCmdInputReady, this)) { DMSG(0, "MgenApp::OpenCmdInput() Error: couldn't install cmd input\n"); CloseCmdInput(); return false; } return true; #endif // if/else _WIN32_WCE } // end MgenApp::OpenCmdInput() void MgenApp::CloseCmdInput() { #ifndef _WIN32_WCE if (ProtoDispatcher::INVALID_DESCRIPTOR != cmd_descriptor) { dispatcher.RemoveGenericInput(cmd_descriptor); #ifdef WIN32 CloseHandle((HANDLE)cmd_descriptor); #else close(cmd_descriptor); #endif // if/else WIN32/UNIX cmd_descriptor = ProtoDispatcher::INVALID_DESCRIPTOR; } #endif // _WIN32_WCE } // end MgenApp::CloseCmdInput() void MgenApp::DoCmdInputReady(ProtoDispatcher::Descriptor /*descriptor*/, ProtoDispatcher::Event /*theEvent*/, const void* userData) { ((MgenApp*)userData)->OnCmdInputReady(); } // end MgenApp::DoCmdInputReady() void MgenApp::OnCmdInputReady() { // We read the command input stream one byte at a time // just to save some buffer manipulation char byte; unsigned int len = 1; if (ReadCmdInput(&byte, len)) { if (IsCmdDelimiter(byte)) { cmd_buffer[cmd_length] = '\0'; if (cmd_length) { char* cmd = cmd_buffer; char* arg = NULL; for (unsigned int i = 0; i < cmd_length; i++) { if (isspace(cmd_buffer[i])) { cmd_buffer[i] = '\0'; arg = cmd_buffer + i + 1; break; } } if (!OnCommand(cmd, arg)) DMSG(0, "MgenApp::OnCmdInputReady() error processing command\n"); } cmd_length = 0; } else if (cmd_length < 8191) { cmd_buffer[cmd_length++] = byte; } else { DMSG(0, "MgenApp::OnCmdInputReady() error: maximum command length exceeded\n"); } } else { DMSG(0, "MgenApp::OnCmdInputReady() error reading input\n"); } } // end MgenApp::OnCmdInputReady() bool MgenApp::ReadCmdInput(char* buffer, unsigned int& numBytes) { #ifdef WIN32 DWORD want = numBytes; DWORD got; if (ReadFile(cmd_descriptor, buffer, want, &got, NULL)) { numBytes = got; return true; } else { DMSG(0, "MgenApp::ReadCmdInput() ReadFile error: %s\n", GetErrorString()); return false; } #else ssize_t result = read(cmd_descriptor, buffer, numBytes); if (result <= 0) { switch (errno) { case EINTR: case EAGAIN: numBytes = 0; return true; default: DMSG(0, "MgenApp::ReadCmdInput() read error: %s\n", GetErrorString()); return false; } } else { numBytes = result; return true; } #endif // end if/else WIN32/UNIX } // end MgenApp::ReadCmdInput() // Mgen stream sink/source implementation // Note: On WIN32, MGEN must be built as a "console" application // for this stuff to work? MgenStreamSink::MgenStreamSink(ProtoDispatcher& theDispatcher) : descriptor(ProtoDispatcher::INVALID_DESCRIPTOR), msg_len(0), msg_index(0), dispatcher(theDispatcher), output_active(false) { } MgenStreamSink::~MgenStreamSink() { Close(); } bool MgenStreamSink::Open(const char* path, bool nonBlocking) { Close(); #ifdef WIN32 #ifdef _WIN32_WCE DMSG(0, "MgenStreamSink::Open() \"sink\" option not support under WinCE\n"); return false; #else if (!strcmp(path, "STDOUT")) { descriptor = (ProtoDispatcher::Descriptor)GetStdHandle(STD_OUTPUT_HANDLE); // (TBD) set stdout for overlapped I/O ??? } else { descriptor = (ProtoDispatcher::Descriptor)CreateFile(path, GENERIC_WRITE, FILE_SHARE_READ, NULL, CREATE_ALWAYS, FILE_FLAG_OVERLAPPED, NULL); } if ((ProtoDispatcher::Descriptor)INVALID_HANDLE_VALUE == descriptor) { DMSG(0, "MgenStreamSink::Open() error opening file\n"); return false; } // For non-block I/O on Win32 we use overlapped I/O if (NULL == (write_event = CreateEvent(NULL, TRUE, FALSE, NULL))) { DMSG(0, "MgenStreamSink::Open() error creating overlapped I/O event\n"); CloseHandle((HANDLE)descriptor); return false; } #endif // if/else _WIN32_WCE #else if (!strcmp(path, "STDOUT")) { descriptor = fileno(stdout); if (nonBlocking) { // Make the stdout non-blocking and aynchronous int flags = O_NONBLOCK; #ifdef FASYNC flags |= FASYNC; #endif // FASYNC if(-1 == fcntl(descriptor, F_SETFL, fcntl(descriptor, F_GETFL, 0) | flags)) { descriptor = -1; DMSG(0, "MgenStreamSink::Open() fcntl(stdout, F_SETFL(O_NONBLOCK)) error: %s", GetErrorString()); } } } else { int flags = O_CREAT | O_WRONLY; if (nonBlocking) flags |= O_NONBLOCK; descriptor = open(path, flags); } if (descriptor < 0) { DMSG(0, "MgenStreamSink::Open() error opening file: %s", GetErrorString()); descriptor = ProtoDispatcher::INVALID_DESCRIPTOR; return false; } #endif // if/else WIN32 msg_len = msg_index = 0; output_active = false; return true; } // end MgenStreamSink::Open() void MgenStreamSink::Close() { if (ProtoDispatcher::INVALID_DESCRIPTOR != descriptor) { if (output_active) { dispatcher.RemoveGenericOutput(descriptor); output_active = false; } #ifdef WIN32 CloseHandle((HANDLE)descriptor); CloseHandle((HANDLE)write_event); write_event = INVALID_HANDLE_VALUE; #else close(descriptor); #endif // if/else WIN32 descriptor = ProtoDispatcher::INVALID_DESCRIPTOR; } } // end MgenStreamSink::Close() // Write buffer only if it is non-blocking bool MgenStreamSink::SendMgenMessage(const char* txBuffer, unsigned int len, const ProtoAddress& /*dstAddr*/) { if (msg_index >= msg_len) { // Copy txBuffer to msg_buffer len = (len < MgenMsg::MAX_SIZE) ? len : MgenMsg::MAX_SIZE; memcpy(msg_buffer, txBuffer, len); msg_len = len; msg_index = 0; if (!output_active) OnOutputReady(); return true; } else { DMSG(1, "MgenStreamSink::SendMgenMessage() message sink buffer overflow\n"); return false; } } // end MgenStreamSink::SendMgenMessage() void MgenStreamSink::StartNotifying(MgenFlow* theFlow) { // (TBD) start async output readiness monitoring and // mod OnOutputReady to tickle the "notify_flow" // to output a message on demand // also mod MgenFlow ON/MOD/OFF event handling to // start/stop notify as needed if (!output_active) { // (TBD) install async output notification only for result == true? if (dispatcher.InstallGenericOutput(descriptor, MgenStreamSink::DoOutputReady, this)) output_active = true; else DMSG(0, "MgenStreamSink::OnOutputReady() async output install failed!\n"); } MgenSink::StartNotifying(theFlow); } // end MgenStreamSink::StartNotifying() void MgenStreamSink::DoOutputReady(ProtoDispatcher::Descriptor /*descriptor*/, ProtoDispatcher::Event /*theEvent*/, const void* userData) { ((MgenStreamSink*)userData)->OnOutputReady(); } // end MgenStreamSink::DoOutputReady() void MgenStreamSink::OnOutputReady() { unsigned int nbytes = msg_len - msg_index; bool result = Write(msg_buffer+msg_index, &nbytes); msg_index += nbytes; if (msg_index < msg_len) { if (!output_active) { // (TBD) install async output notification only for result == true? if (dispatcher.InstallGenericOutput(descriptor, MgenStreamSink::DoOutputReady, this)) output_active = true; else DMSG(0, "MgenStreamSink::OnOutputReady() async output install failed!\n"); } } else { if (notify_flow) { notify_flow->Notify(); } else if (output_active) { dispatcher.RemoveGenericOutput(descriptor); output_active = false; } } if (!result) DMSG(0, "MgenStreamSink::OnOutputReady() error writing to output!\n"); } // MgenStreamSink::OnOutputReady() bool MgenStreamSink::Write(char* buffer, unsigned int* nbytes) { unsigned int len = *nbytes; #ifdef WIN32 DWORD put = 0; #ifndef _WIN32_WCE while (put < len) { DWORD written; OVERLAPPED osWrite; osWrite.hEvent = write_event; if (!WriteFile((HANDLE)descriptor, buffer+put, len - put, &written, &osWrite)) { if (ERROR_IO_PENDING != GetLastError()) { // I/O error DMSG(0, "MgenStreamSink::SendMgenMessage() WriteFile() error: %d\n", GetLastError()); *nbytes = put; return false; } else { // (TBD perhaps wait some small amount of time instead of ZERO? DWORD result = WaitForSingleObject(write_event, 0); switch (result) { case WAIT_OBJECT_0: if (GetOverlappedResult((HANDLE)descriptor, &osWrite, &written, FALSE)) { put += written; } else { DMSG(0, "MgenStreamSink::SendMgenMessage() GetOverlappedResult() error: %d\n", GetLastError()); *nbytes = put; return false; } break; default: // I/O timed out or abandoned DMSG(0, "MgenStreamSink::SendMgenMessage() WaitForSingleObject() error: %d\n", GetLastError()); *nbytes = put; return false; } } } else { // WriteFile completed immediately put += written; } } #endif // !_WIN32_WCE #else ssize_t put = 0; while (put < (ssize_t)len) { ssize_t result = write(descriptor, buffer+put, len - put); if (result < 0) { switch (errno) { case EINTR: continue; // interrupted, try again case EAGAIN: *nbytes = put; return true; // blocked, can't write message now default: DMSG(0, "MgenStreamSink::SendMgenMessage() write() error: %s\n", GetErrorString()); *nbytes = put; return false; } } else { put += result; } } #endif // if/else WIN32 *nbytes = put; return true; } // end MgenStreamSink::Write() MgenStreamSource::MgenStreamSource(Mgen& mgenInstance, class MgenApp& mgenApp) : mgen_instance(mgenInstance), mgen_app(mgenApp), descriptor(ProtoDispatcher::INVALID_DESCRIPTOR), msg_length(0), msg_index(0) { } MgenStreamSource::~MgenStreamSource() { Close(); } bool MgenStreamSource::Open(const char* path) { Close(); #ifdef WIN32 #ifdef _WIN32_WCE DMSG(0, "MgenStreamSource::Open() \"source\" option not supported under WinCE\n"); return false; #else if (!strcmp(path, "STDIN")) { descriptor = (ProtoDispatcher::Descriptor)GetStdHandle(STD_INPUT_HANDLE); // (TBD) set stdout for overlapped I/O ??? } else { descriptor = (ProtoDispatcher::Descriptor)CreateFile(path, GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL); } if ((ProtoDispatcher::Descriptor)INVALID_HANDLE_VALUE == descriptor) { DMSG(0, "MgenStreamSource::Open() error opening file\n"); return false; } // For non-block I/O on Win32 we use overlapped I/O if (NULL == (read_event = CreateEvent(NULL, TRUE, FALSE, NULL))) { DMSG(0, "MgenStreamSource::Open() error creating overlapped I/O event\n"); CloseHandle((HANDLE)descriptor); return false; } #endif // if/else _WIN32_WCE #else if (!strcmp(path, "STDIN")) { descriptor = fileno(stdin); // Make the stdin non-blocking if(-1 == fcntl(descriptor, F_SETFL, fcntl(descriptor, F_GETFL, 0) | O_NONBLOCK)) { descriptor = -1; DMSG(0, "MgenStreamSource::Open() fcntl(stdout, F_SETFL(O_NONBLOCK)) error: %s", GetErrorString()); } } else { descriptor = open(path, O_RDONLY | O_NONBLOCK); } if (descriptor < 0) { DMSG(0, "MgenStreamSource::Open() error opening file: %s", GetErrorString()); descriptor = ProtoDispatcher::INVALID_DESCRIPTOR; return false; } #endif // if/else WIN32 msg_length = msg_index = 0; return true; } // end MgenStreamSource::Open() void MgenStreamSource::Close() { if (ProtoDispatcher::INVALID_DESCRIPTOR != descriptor) { #ifdef WIN32 CloseHandle((HANDLE)descriptor); CloseHandle(read_event); read_event = INVALID_HANDLE_VALUE; #else close(descriptor); #endif // if/else WIN32 descriptor = ProtoDispatcher::INVALID_DESCRIPTOR; } } // end MgenStreamSource::Close() void MgenStreamSource::DoInputReady(ProtoDispatcher::Descriptor /*descriptor*/, ProtoDispatcher::Event /*theEvent*/, const void* userData) { ((MgenStreamSource*)userData)->OnInputReady(); } // end MgenStreamSource::DoInputReady() void MgenStreamSource::OnInputReady() { UINT32 count; if (msg_length) { if (Read(msg_buffer+msg_index, msg_length - msg_index, &count)) { msg_index += count; if (msg_index == msg_length) { // Message reception complete ProtoAddress src; src.Reset(mgen_instance.GetDefaultSocketType()); src.SetPort(0); mgen_instance.HandleMgenMessage(msg_buffer, msg_length, src); msg_length = msg_index = 0; } } else { mgen_app.Stop(-1); } } else { // Reading first four bytes of MGEN message to get // MGEN "messageSize" field if (Read(msg_buffer+msg_index, 2 - msg_index, &count)) { msg_index += count; if (2 == msg_index) { memcpy(&msg_length, msg_buffer, 2); msg_length = ntohs(msg_length); if ((msg_length < MgenMsg::MIN_SIZE) || (msg_length > MgenMsg::MAX_SIZE)) { DMSG(0, "MgenStreamSource::OnInputReady() invalid MGEN message length received: %lu\n", msg_length); msg_length = msg_index = 0; return; } OnInputReady(); } } else { mgen_app.Stop(-1); } } } // end MgenStreamSource::OnInputReady() bool MgenStreamSource::Read(char* buffer, UINT32 nBytes, UINT32* bytesRead) { #ifdef WIN32 DWORD want = nBytes; DWORD got; if (ReadFile(descriptor, buffer, want, &got, NULL)) { *bytesRead = got; return true; } else { DMSG(0, "MgenStreamSource::Read() read error: %s\n", GetErrorString()); return false; } #else ssize_t result = read(descriptor, buffer, nBytes); if (result <= 0) { switch (errno) { case EINTR: case EAGAIN: *bytesRead = 0; return true; default: DMSG(0, "MgenStreamSource::Read() read error: %s\n", GetErrorString()); return false; } } else { *bytesRead = result; return true; } #endif } // end MgenStreamSource::Read() // This macro instantiates our MgenApp instance PROTO_INSTANTIATE_APP(MgenApp)