#include "mgen.h"
#include "mgenMsg.h"
#include "mgenVersion.h"
#include "protokit.h"
#include <string.h>
#include <stdio.h> // for stdout/stderr printouts
#include <ctype.h> // for toupper()
#ifdef WIN32
#include <IpHlpApi.h>
#else
#include <unistd.h>
#include <fcntl.h>
#endif // UNIX
// byte stream MGEN message sink
class MgenStreamSink : public MgenSink
{
public:
MgenStreamSink(ProtoDispatcher& theDispatcher);
virtual ~MgenStreamSink();
bool Open(const char* path, bool nonBlocking = true);
void Close();
virtual bool SendMgenMessage(const char* txBuffer,
unsigned int len,
const ProtoAddress& dstAddr);
static void DoOutputReady(ProtoDispatcher::Descriptor descriptor,
ProtoDispatcher::Event theEvent,
const void* userData);
void StartNotifying(MgenFlow* theFlow);
private:
void OnOutputReady();
bool Write(char* buffer, unsigned int* nbytes);
ProtoDispatcher::Descriptor descriptor;
#ifdef WIN32
HANDLE write_event;
#endif // WIN32
char msg_buffer[MgenMsg::MAX_SIZE];
UINT16 msg_len;
UINT16 msg_index;
ProtoDispatcher& dispatcher;
bool output_active;
}; // end class MgenStreamSink
class MgenStreamSource
{
public:
MgenStreamSource(Mgen& mgenInstance, class MgenApp& mgenApp);
~MgenStreamSource();
bool Open(const char* path);
void Close();
ProtoDispatcher::Descriptor GetDescriptor() {return descriptor;}
static void DoInputReady(ProtoDispatcher::Descriptor descriptor,
ProtoDispatcher::Event theEvent,
const void* userData);
protected:
void OnInputReady();
bool Read(char* buffer, UINT32 nBytes, UINT32* bytesRead);
Mgen& mgen_instance;
class MgenApp& mgen_app;
ProtoDispatcher::Descriptor descriptor;
#ifdef WIN32
HANDLE read_event;
#endif // WIN32
UINT32 msg_length;
char msg_buffer[MgenMsg::MAX_SIZE];
UINT32 msg_index;
}; // end class MgenStreamSource
class MgenApp : public ProtoApp
{
public:
enum CmdType {CMD_INVALID, CMD_ARG, CMD_NOARG};
MgenApp();
~MgenApp();
virtual bool OnStartup(int argc, const char*const* argv);
virtual bool ProcessCommands(int argc, const char*const* argv);
virtual void OnShutdown();
private:
void OnControlEvent(ProtoSocket& theSocket, ProtoSocket::Event theEvent);
static CmdType GetCmdType(const char* string);
bool OnCommand(const char* cmd, const char* val);
void Init(char* scriptFile);
void Usage();
static const char* const CMD_LIST[];
static bool GetInterfaceCounts(const char* ifName,
UINT32& txCount,
UINT32& rxCount);
bool OpenCmdInput(const char* path);
void CloseCmdInput();
static void DoCmdInputReady(ProtoDispatcher::Descriptor descriptor,
ProtoDispatcher::Event theEvent,
const void* userData);
void OnCmdInputReady();
bool ReadCmdInput(char* buffer, unsigned int& numBytes);
bool IsCmdDelimiter(char byte)
{return (('\n' == byte) || ('\r' == byte) || (';' == byte)); }
Mgen mgen; // Mgen engine
ProtoPipe control_pipe; // remote control message pipe
bool control_remote;
ProtoDispatcher::Descriptor cmd_descriptor; // "stdin" control option
char cmd_buffer[8192];
unsigned int cmd_length;
#ifdef HAVE_GPS
static bool GetPosition(const void* gpsHandle, GPSPosition& gpsPosition);
GPSHandle gps_handle;
GPSHandle payload_handle;
#endif // HAVE_GPS
bool have_ports;
bool convert;
char convert_path[PATH_MAX];
bool sink_non_blocking;
MgenStreamSource* source;
char ifinfo_name[64];
UINT32 ifinfo_tx_count;
UINT32 ifinfo_rx_count;
}; // end class MgenApp
MgenApp::MgenApp()
: mgen(GetTimerMgr(), GetSocketNotifier()),
control_pipe(ProtoPipe::MESSAGE), control_remote(false),
cmd_descriptor(ProtoDispatcher::INVALID_DESCRIPTOR),
#ifdef HAVE_GPS
gps_handle(NULL), payload_handle(NULL),
#endif // HAVE_GPS
have_ports(false), convert(false), sink_non_blocking(true), source(NULL),
ifinfo_tx_count(0), ifinfo_rx_count(0)
{
control_pipe.SetNotifier(&GetSocketNotifier());
control_pipe.SetListener(this, &MgenApp::OnControlEvent);
ifinfo_name[0] = '\0';
}
MgenApp::~MgenApp()
{
}
void MgenApp::Usage()
{
fprintf(stderr, "mgen [ipv4][ipv6][input <scriptFile>][save <saveFile>]\n"
" [output <logFile>][log <logFile>][hostAddr {on|off}\n"
" [binary][txlog][nolog][flush]\n"
" [event \"<mgen event>\"][port <recvPortList>]\n"
" [instance <name>][command <cmdInput>]\n"
" [sink <sinkFile>][block][source <sourceFile>]\n"
" [interface <interfaceName>][ttl <timeToLive>]\n"
" [tos <typeOfService>][label <value>]\n"
" [txbuffer <txSocketBufferSize>][rxbuffer <rxSocketBufferSize>]\n"
" [start <hr:min:sec>[GMT]][offset <sec>]\n"
" [precise {on|off}][ifinfo <ifName>]\n"
" [txcheck][rxcheck][check]\n"
" [convert <binaryLog>][debug <debugLevel>]\n");
} // end MgenApp::Usage()
const char* const MgenApp::CMD_LIST[] =
{
"+port",
"-ipv6", // open IPv6 sockets by default
"-ipv4", // open IPv4 sockets by default
"+convert", // convert binary logfile to text-based logfile
"+sink", // set Mgen::sink to stream sink
"-block", // set Mgen::sink to blocking I/O
"+source", // specify an MGEN stream source
"+ifinfo", // get tx/rx frame counts for specified interface
"+precise", // turn on/off precision packet timing
"+instance", // indicate mgen instance name
"-stop", // exit program instance
"+command", // specifies an input command file/device
"+hostAddr", // turn "host" field on/off in sent messages
"-help", // print usage and exit
NULL
};
MgenApp::CmdType MgenApp::GetCmdType(const char* cmd)
{
if (!cmd) return CMD_INVALID;
unsigned int len = strlen(cmd);
bool matched = false;
CmdType type = CMD_INVALID;
const char* const* nextCmd = CMD_LIST;
while (*nextCmd)
{
if (!strncmp(cmd, *nextCmd+1, len))
{
if (matched)
{
// ambiguous command (command should match only once)
return CMD_INVALID;
}
else
{
matched = true;
if ('+' == *nextCmd[0])
type = CMD_ARG;
else
type = CMD_NOARG;
}
}
nextCmd++;
}
return type;
} // end MgenApp::GetCmdType()
bool MgenApp::ProcessCommands(int argc, const char*const* argv)
{
// Group command line arguments into MgenApp or Mgen commands
// and their (if applicable) respective arguments
// Parse command line
int i = 1;
convert = false; // initialize conversion flag
while (i < argc)
{
CmdType cmdType = GetCmdType(argv[i]);
if (CMD_INVALID == cmdType)
{
// Is it a class MgenApp command?
switch(Mgen::GetCmdType(argv[i]))
{
case Mgen::CMD_INVALID:
break;
case Mgen::CMD_NOARG:
cmdType = CMD_NOARG;
break;
case Mgen::CMD_ARG:
cmdType = CMD_ARG;
break;
}
}
switch (cmdType)
{
case CMD_INVALID:
DMSG(0, "MgenApp::ProcessCommands() error: invalid command:%s\n", argv[i]);
return false;
case CMD_NOARG:
if (!OnCommand(argv[i], NULL))
{
DMSG(0, "MgenApp::ProcessCommands() OnCommand(%s) error\n",
argv[i]);
return false;
}
i++;
break;
case CMD_ARG:
if (!OnCommand(argv[i], argv[i+1]))
{
DMSG(0, "MgenApp::ProcessCommands() OnCommand(%s, %s) error\n",
argv[i], argv[i+1]);
return false;
}
i += 2;
break;
}
}
return true;
} // end MgenApp::ProcessCommands()
bool MgenApp::OnCommand(const char* cmd, const char* val)
{
if (control_remote)
{
char buffer[8192];
strcpy(buffer, cmd);
if (val)
{
strcat(buffer, " ");
strncat(buffer, val, 8192 - strlen(buffer));
}
unsigned int len = strlen(buffer);
if (control_pipe.Send(buffer, len))
{
return true;
}
else
{
DMSG(0, "MgenApp::ProcessCommand(%s) error sending command to remote process\n", cmd);
return false;
}
}
CmdType type = GetCmdType(cmd);
unsigned int len = strlen(cmd);
if (CMD_INVALID == type)
{
// If it is a core mgen command, process it as "overriding"
if (Mgen::CMD_INVALID != Mgen::GetCmdType(cmd))
{
return mgen.OnCommand(Mgen::GetCommandFromString(cmd), val, true);
}
else
{
DMSG(0, "MgenApp::ProcessCommand(%s) error: invalid command\n", cmd);
return false;
}
}
else if ((CMD_ARG == type) && !val)
{
DMSG(0, "MgenApp::ProcessCommand(%s) missing argument\n", cmd);
return false;
}
else if (!strncmp("port", cmd, len))
{
// "port" == implicit "0.0 LISTEN UDP <val>" script
char* string = new char[strlen(val) + 64];
if (!string)
{
DMSG(0, "MgenApp::ProcessCommand(port) memory allocation error: %s\n",
GetErrorString());
return false;
}
sprintf(string, "0.0 LISTEN UDP %s", val);
DrecEvent* event = new DrecEvent;
if (!event)
{
DMSG(0, "MgenApp::ProcessCommand(port) memory allocation error: %s\n",
GetErrorString());
return false;
}
if (!event->InitFromString(string))
{
DMSG(0, "MgenApp::ProcessCommand(port) error parsing <portList>\n");
return false;
}
have_ports = true;
mgen.InsertDrecEvent(event);
}
else if (!strncmp("ipv4", cmd, len))
{
mgen.SetDefaultSocketType(ProtoAddress::IPv4);
}
else if (!strncmp("ipv6", cmd, len))
{
#ifdef HAVE_IPV6
ProtoSocket::SetHostIPv6Capable();
if (ProtoSocket::HostIsIPv6Capable())
mgen.SetDefaultSocketType(ProtoAddress::IPv6);
else
#endif // HAVE_IPV6
DMSG(0, "MgenApp::ProcessCommand(ipv6) Warning: system not IPv6 capable?\n");
}
else if (!strncmp("background", cmd, len))
{
// do nothing (this command was scanned immediately at startup)
}
else if (!strcmp("convert", cmd))
{
convert = true; // set flag to do the conversion
strcpy(convert_path, val); // save path of file to convert
}
else if (!strncmp("sink", cmd, len))
{
MgenStreamSink* theSink = new MgenStreamSink(dispatcher);
if (theSink)
{
if (theSink->Open(val, sink_non_blocking))
{
if (mgen.GetSink())
{
((MgenStreamSink*)mgen.GetSink())->Close();
delete mgen.GetSink();
}
mgen.SetSink(theSink);
}
else
{
DMSG(0, "MgenApp::ProcessCommand(sink) Error: couldn't open stream sink\n");
return false;
}
}
else
{
DMSG(0, "MgenApp::ProcessCommand(sink) Error: couldn't allocate sink: %s\n",
GetErrorString());
return false;
}
}
else if (!strncmp("block", cmd, len))
{
sink_non_blocking = false;
}
else if (!strncmp("source", cmd, len))
{
MgenStreamSource* theSource = new MgenStreamSource(mgen, *this);
if (theSource)
{
if (theSource->Open(val))
{
if (!dispatcher.InstallGenericInput(theSource->GetDescriptor(),
MgenStreamSource::DoInputReady,
theSource))
{
DMSG(0, "MgenApp::ProcessCommand(source) Error: couldn't install stream source\n");
theSource->Close();
return false;
}
if (source)
{
source->Close();
delete source;
}
source = theSource;
}
else
{
DMSG(0, "MgenApp::ProcessCommand(source) Error: couldn't open stream source\n");
return false;
}
}
else
{
DMSG(0, "MgenApp::ProcessCommand(source) Error: couldn't allocate source: %s\n",
GetErrorString());
return false;
}
}
else if (!strncmp("ifinfo", cmd, len))
{
strncpy(ifinfo_name, val, 63);
ifinfo_name[63] = '\0';
}
else if (!strncmp("precise", cmd, len))
{
char status[4]; // valid status is "on" or "off"
strncpy(status, val, 3);
status[3] = '\0';
unsigned int len = strlen(status);
for (unsigned int i = 0; i < len; i++)
status[i] = tolower(status[i]);
if (!strncmp("on", status, len))
{
dispatcher.SetPreciseTiming(true);
}
else if (!strncmp("off", status, len))
{
dispatcher.SetPreciseTiming(false);
}
else
{
DMSG(0, "MgenApp::ProcessCommand(precise) Error: invalid <status>\n");
return false;
}
}
else if (!strncmp("instance", cmd, len))
{
if (control_pipe.IsOpen())
control_pipe.Close();
if (control_pipe.Connect(val))
{
control_remote = true;
}
else if (!control_pipe.Listen(val))
{
DMSG(0, "MgenApp::ProcessCommand(instance) error opening control pipe\n");
return false;
}
}
else if (!strncmp("command", cmd, len))
{
if (!OpenCmdInput(val))
{
DMSG(0, "MgenApp::ProcessCommand(command) error opening command input file/device\n");
return false;
}
}
else if (!strncmp("hostAddr", cmd, len))
{
char status[4]; // valid status is "on" or "off"
strncpy(status, val, 3);
status[3] = '\0';
unsigned int len = strlen(status);
for (unsigned int i = 0; i < len; i++)
status[i] = tolower(status[i]);
if (!strncmp("on", status, len))
{
// (TBD) control whither IPv4 or IPv6 ???
ProtoAddress localAddress;
if (localAddress.ResolveLocalAddress())
{
mgen.SetHostAddress(localAddress);
}
else
{
DMSG(0, "MgenApp::ProcessCommand(hostAddr) error getting local addr\n");
return false;
}
}
else if (!strncmp("off", status, len))
{
mgen.ClearHostAddress();
}
else
{
DMSG(0, "MgenApp::ProcessCommand(precise) Error: invalid <status>\n");
return false;
}
}
else if (!strncmp("stop", cmd, len))
{
Stop();
}
else if (!strncmp("help", cmd, len))
{
fprintf(stderr, "mgen: version %s\n", MGEN_VERSION);
Usage();
return false;
}
return true;
} // end MgenApp::OnCommand()
bool MgenApp::OnStartup(int argc, const char*const* argv)
{
// Seed the system rand() function
struct timeval currentTime;
ProtoSystemTime(currentTime);
srand(currentTime.tv_usec);
mgen.SetLogFile(stdout); // log to stdout by default
#ifdef HAVE_IPV6
if (ProtoSocket::HostIsIPv6Capable())
mgen.SetDefaultSocketType(ProtoAddress::IPv6);
#endif // HAVE_IPV6
#ifdef HAVE_GPS
gps_handle = GPSSubscribe(NULL);
if (gps_handle)
mgen.SetPositionCallback(MgenApp::GetPosition, gps_handle);
payload_handle = GPSSubscribe("/tmp/mgenPayloadKey");
mgen.SetPayloadHandle(payload_handle);
#endif // HAVE_GPS
if (!ProcessCommands(argc, argv))
{
fprintf(stderr, "mgen: error while processing startup commands\n");
return false;
}
if (control_remote)
{
// We remoted commands, so we're finished ...
return false;
}
fprintf(stderr, "mgen: version %s\n", MGEN_VERSION);
if (convert)
{
fprintf(stderr, "mgen: beginning binary to text log conversion ...\n");
mgen.ConvertBinaryLog(convert_path);
fprintf(stderr, "mgen: conversion complete (exiting).\n");
}
else
{
if (mgen.DelayedStart())
{
char startTime[256];
mgen.GetStartTime(startTime);
fprintf(stderr, "mgen: delaying start until %s ...\n", startTime);
}
else
{
fprintf(stderr, "mgen: starting now ...\n");
}
}
if ('\0' != ifinfo_name[0])
GetInterfaceCounts(ifinfo_name, ifinfo_tx_count, ifinfo_rx_count);
else
ifinfo_tx_count = ifinfo_rx_count = 0;
return mgen.Start();
} // end MgenApp::OnStartup()
void MgenApp::OnShutdown()
{
mgen.Stop();
if ('\0' != ifinfo_name[0])
{
UINT32 txCount, rxCount;
if (GetInterfaceCounts(ifinfo_name, txCount, rxCount))
fprintf(stderr, "mgen: iface>%s txFrames>%lu rxFrames>%lu\n",
ifinfo_name,
txCount - ifinfo_tx_count,
rxCount - ifinfo_rx_count);
else
fprintf(stderr, "mgen: Warning! Couldn't get interface frame counts\n");
}
MgenSink* theSink = mgen.GetSink();
if (theSink)
{
delete theSink;
mgen.SetSink(NULL);
}
if (source)
{
delete source;
source = NULL;
}
#ifdef HAVE_GPS
if (gps_handle)
{
GPSUnsubscribe(gps_handle);
gps_handle = NULL;
mgen.SetPositionCallback(NULL, NULL);
}
if (payload_handle)
{
GPSUnsubscribe(payload_handle);
payload_handle = NULL;
mgen.SetPayloadHandle(NULL);
}
#endif // HAVE_GPS
control_pipe.Close();
CloseCmdInput();
} // end MgenApp::OnShutdown()
#ifdef HAVE_GPS
bool MgenApp::GetPosition(const void* gpsHandle, GPSPosition& gpsPosition)
{
GPSGetCurrentPosition((GPSHandle)gpsHandle, &gpsPosition);
return true;
}
#endif // HAVE_GPS
// This routine uses the system's "netstat" command
// to retrieve tx/rx frame counts for a given interface
// (WIN32 Note: Counts are cumulative for all interfaces?)
bool MgenApp::GetInterfaceCounts(const char* ifName,
UINT32& txCount,
UINT32& rxCount)
{
#ifdef _WIN32_WCE
// For the moment, we'll assume our WINCE platform
// has one interface and we'll use GetIpStatistics()
// ignoring the "ifName"
MIB_IPSTATS ipStats;
if (NO_ERROR != GetIpStatistics(&ipStats))
{
DMSG(0, "MgenApp::GetInterfaceCounts() GetIpStatistics() error: %s\n", GetErrorString());
rxCount = txCount = 0;
return false;
}
else
{
rxCount = ipStats.dwInReceives;
txCount = ipStats.dwInDelivers;
return true;
}
#else
bool result = false;
txCount = rxCount = 0;
#ifdef WIN32
FILE* p = _popen("netstat -e", "r");
int lineCount = 0;
#else
FILE* p = popen("netstat -i", "r");
#endif // if/else WIN32
if (NULL == p)
{
DMSG(0, "MgenApp::GetInterfaceCounts() popen() error: %s\n",
GetErrorString());
return result;
}
Mgen::FastReader reader;
char linebuffer[256];
unsigned int len = 256;
while (Mgen::FastReader::OK == reader.Readline(p, linebuffer, &len))
{
#ifdef WIN32
// On WIN32, we must collect two lines from "netstat -e" output
if (!strncmp(linebuffer, "Unicast", strlen("Unicast")) ||
!strncmp(linebuffer, "Non-unicast", strlen("Non-unicast")))
{
char* ptr = strstr(linebuffer, "packets");
if (ptr)
{
UINT32 received, sent;
if (2 == sscanf(ptr, "packets %lu %lu", &received, &sent))
{
txCount += sent;
rxCount += received;
if (2 == ++lineCount)
{
result = true;
break;
}
}
}
}
#else
if (!strncmp(linebuffer, ifName, strlen(ifName)))
{
#ifdef LINUX
char format[256];
// Linux format: "iface mtu metric rxOk rxErr rxDrp rxOvr txOk ..."
sprintf(format, "%s %%lu %%lu %%lu %%lu %%lu %%lu %%lu", ifName);
UINT32 mtu, metric, rxOk, rxErr, rxDrp, rxOvr, txOk;
if (7 == sscanf(linebuffer, format, &mtu, &metric, &rxOk, &rxErr, &rxDrp, &rxOvr, &txOk))
{
txCount = txOk;
rxCount = rxOk;
result = true;
break;
}
#endif // LINUX
#ifdef MACOSX
char format[256];
// MacOS X format: "iface mtu net addr ipkts ierrs opkts ..."
char net[64], addr[64];
UINT32 mtu, ipkts, ierrs, opkts;
sprintf(format, "%s %%lu %%s %%s %%lu %%lu %%lu", ifName);
if (6 == sscanf(linebuffer, format, &mtu, net, addr, &ipkts, &ierrs, &opkts))
{
txCount = opkts;
rxCount = ipkts;
result = true;
break;
}
#endif // MACOSX
}
#endif // if/else WIN32
len = 256;
} // end while(reader.Readline())
#ifdef WIN32
_pclose(p);
#else
pclose(p);
#endif // if/else WIN32
return result;
#endif // if/else _WIN32_WCE
} // end MgenApp::GetInterfaceCounts()
void MgenApp::OnControlEvent(ProtoSocket& /*theSocket*/,
ProtoSocket::Event theEvent)
{
if (ProtoSocket::RECV == theEvent)
{
char buffer[8192];
unsigned int len = 8191;
if (control_pipe.Recv(buffer, len))
{
// (TBD) Delimit commands by line breaks and/or other delimiters ???
buffer[len] = '\0';
char* cmd = buffer;
char* arg = NULL;
for (unsigned int i = 0; i < len; i++)
{
if ('\0' == buffer[i])
{
break;
}
else if (isspace(buffer[i]))
{
buffer[i] = '\0';
arg = buffer+i+1;
break;
}
}
if (!OnCommand(cmd, arg))
DMSG(0, "MgenApp::OnControlEvent() error processing command\n");
}
else
{
DMSG(0, "MgenApp::OnControlEvent() receive error\n");
}
}
else
{
DMSG(0, "MgenApp::OnControlEvent() unhandled event type\n");
}
} // end MgenApp::OnControlEvent
// Mgen run-time control via "stdin" (or other file/device)
// Note: On WIN32, MGEN must be built as a "console" application
// for this stuff to work?
bool MgenApp::OpenCmdInput(const char* path)
{
#ifdef _WIN32_WCE
DMSG(0, "MgenApp::OpenCmdInput() \"command\" option not supported on WinCE\n");
return false;
#else
CloseCmdInput(); // in case it was open
#ifdef WIN32
if (!strcmp(path, "STDIN"))
{
cmd_descriptor = (ProtoDispatcher::Descriptor)GetStdHandle(STD_INPUT_HANDLE);
// (TBD) set stdin for overlapped I/O ???
}
else
{
cmd_descriptor = (ProtoDispatcher::Descriptor)CreateFile(path, GENERIC_READ, FILE_SHARE_READ,
NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
}
if ((ProtoDispatcher::Descriptor)INVALID_HANDLE_VALUE == cmd_descriptor)
{
DMSG(0, "MgenApp::OpenCmdInput() error opening file\n");
return false;
}
/*// For non-block I/O on Win32 we use overlapped I/O
if (NULL == (cmd_read_event = CreateEvent(NULL, TRUE, FALSE, NULL)))
{
DMSG(0, "MgenApp::OpenCmdInput() error creating overlapped I/O event\n");
CloseHandle((HANDLE)cmd_descriptor);
return false;
} */
#else
if (!strcmp(path, "STDIN"))
{
cmd_descriptor = fileno(stdin);
// Make the stdin non-blocking
if(-1 == fcntl(cmd_descriptor, F_SETFL, fcntl(cmd_descriptor, F_GETFL, 0) | O_NONBLOCK))
{
cmd_descriptor = -1;
DMSG(0, "MgenApp::OpenCmdInput() fcntl(stdout, F_SETFL(O_NONBLOCK)) error: %s",
GetErrorString());
}
}
else
{
cmd_descriptor = open(path, O_RDONLY | O_NONBLOCK);
}
if (cmd_descriptor < 0)
{
DMSG(0, "MgenApp::OpenCmdInput() error opening file: %s", GetErrorString());
cmd_descriptor = ProtoDispatcher::INVALID_DESCRIPTOR;
return false;
}
#endif // if/else WIN32
cmd_length = 0;
if (!dispatcher.InstallGenericInput(cmd_descriptor, MgenApp::DoCmdInputReady, this))
{
DMSG(0, "MgenApp::OpenCmdInput() Error: couldn't install cmd input\n");
CloseCmdInput();
return false;
}
return true;
#endif // if/else _WIN32_WCE
} // end MgenApp::OpenCmdInput()
void MgenApp::CloseCmdInput()
{
#ifndef _WIN32_WCE
if (ProtoDispatcher::INVALID_DESCRIPTOR != cmd_descriptor)
{
dispatcher.RemoveGenericInput(cmd_descriptor);
#ifdef WIN32
CloseHandle((HANDLE)cmd_descriptor);
#else
close(cmd_descriptor);
#endif // if/else WIN32/UNIX
cmd_descriptor = ProtoDispatcher::INVALID_DESCRIPTOR;
}
#endif // _WIN32_WCE
} // end MgenApp::CloseCmdInput()
void MgenApp::DoCmdInputReady(ProtoDispatcher::Descriptor /*descriptor*/,
ProtoDispatcher::Event /*theEvent*/,
const void* userData)
{
((MgenApp*)userData)->OnCmdInputReady();
} // end MgenApp::DoCmdInputReady()
void MgenApp::OnCmdInputReady()
{
// We read the command input stream one byte at a time
// just to save some buffer manipulation
char byte;
unsigned int len = 1;
if (ReadCmdInput(&byte, len))
{
if (IsCmdDelimiter(byte))
{
cmd_buffer[cmd_length] = '\0';
if (cmd_length)
{
char* cmd = cmd_buffer;
char* arg = NULL;
for (unsigned int i = 0; i < cmd_length; i++)
{
if (isspace(cmd_buffer[i]))
{
cmd_buffer[i] = '\0';
arg = cmd_buffer + i + 1;
break;
}
}
if (!OnCommand(cmd, arg))
DMSG(0, "MgenApp::OnCmdInputReady() error processing command\n");
}
cmd_length = 0;
}
else if (cmd_length < 8191)
{
cmd_buffer[cmd_length++] = byte;
}
else
{
DMSG(0, "MgenApp::OnCmdInputReady() error: maximum command length exceeded\n");
}
}
else
{
DMSG(0, "MgenApp::OnCmdInputReady() error reading input\n");
}
} // end MgenApp::OnCmdInputReady()
bool MgenApp::ReadCmdInput(char* buffer, unsigned int& numBytes)
{
#ifdef WIN32
DWORD want = numBytes;
DWORD got;
if (ReadFile(cmd_descriptor, buffer, want, &got, NULL))
{
numBytes = got;
return true;
}
else
{
DMSG(0, "MgenApp::ReadCmdInput() ReadFile error: %s\n", GetErrorString());
return false;
}
#else
ssize_t result = read(cmd_descriptor, buffer, numBytes);
if (result <= 0)
{
switch (errno)
{
case EINTR:
case EAGAIN:
numBytes = 0;
return true;
default:
DMSG(0, "MgenApp::ReadCmdInput() read error: %s\n", GetErrorString());
return false;
}
}
else
{
numBytes = result;
return true;
}
#endif // end if/else WIN32/UNIX
} // end MgenApp::ReadCmdInput()
// Mgen stream sink/source implementation
// Note: On WIN32, MGEN must be built as a "console" application
// for this stuff to work?
MgenStreamSink::MgenStreamSink(ProtoDispatcher& theDispatcher)
: descriptor(ProtoDispatcher::INVALID_DESCRIPTOR),
msg_len(0), msg_index(0), dispatcher(theDispatcher), output_active(false)
{
}
MgenStreamSink::~MgenStreamSink()
{
Close();
}
bool MgenStreamSink::Open(const char* path, bool nonBlocking)
{
Close();
#ifdef WIN32
#ifdef _WIN32_WCE
DMSG(0, "MgenStreamSink::Open() \"sink\" option not support under WinCE\n");
return false;
#else
if (!strcmp(path, "STDOUT"))
{
descriptor = (ProtoDispatcher::Descriptor)GetStdHandle(STD_OUTPUT_HANDLE);
// (TBD) set stdout for overlapped I/O ???
}
else
{
descriptor = (ProtoDispatcher::Descriptor)CreateFile(path, GENERIC_WRITE, FILE_SHARE_READ,
NULL, CREATE_ALWAYS, FILE_FLAG_OVERLAPPED, NULL);
}
if ((ProtoDispatcher::Descriptor)INVALID_HANDLE_VALUE == descriptor)
{
DMSG(0, "MgenStreamSink::Open() error opening file\n");
return false;
}
// For non-block I/O on Win32 we use overlapped I/O
if (NULL == (write_event = CreateEvent(NULL, TRUE, FALSE, NULL)))
{
DMSG(0, "MgenStreamSink::Open() error creating overlapped I/O event\n");
CloseHandle((HANDLE)descriptor);
return false;
}
#endif // if/else _WIN32_WCE
#else
if (!strcmp(path, "STDOUT"))
{
descriptor = fileno(stdout);
if (nonBlocking)
{
// Make the stdout non-blocking and aynchronous
int flags = O_NONBLOCK;
#ifdef FASYNC
flags |= FASYNC;
#endif // FASYNC
if(-1 == fcntl(descriptor, F_SETFL, fcntl(descriptor, F_GETFL, 0) | flags))
{
descriptor = -1;
DMSG(0, "MgenStreamSink::Open() fcntl(stdout, F_SETFL(O_NONBLOCK)) error: %s",
GetErrorString());
}
}
}
else
{
int flags = O_CREAT | O_WRONLY;
if (nonBlocking) flags |= O_NONBLOCK;
descriptor = open(path, flags);
}
if (descriptor < 0)
{
DMSG(0, "MgenStreamSink::Open() error opening file: %s", GetErrorString());
descriptor = ProtoDispatcher::INVALID_DESCRIPTOR;
return false;
}
#endif // if/else WIN32
msg_len = msg_index = 0;
output_active = false;
return true;
} // end MgenStreamSink::Open()
void MgenStreamSink::Close()
{
if (ProtoDispatcher::INVALID_DESCRIPTOR != descriptor)
{
if (output_active)
{
dispatcher.RemoveGenericOutput(descriptor);
output_active = false;
}
#ifdef WIN32
CloseHandle((HANDLE)descriptor);
CloseHandle((HANDLE)write_event);
write_event = INVALID_HANDLE_VALUE;
#else
close(descriptor);
#endif // if/else WIN32
descriptor = ProtoDispatcher::INVALID_DESCRIPTOR;
}
} // end MgenStreamSink::Close()
// Write buffer only if it is non-blocking
bool MgenStreamSink::SendMgenMessage(const char* txBuffer,
unsigned int len,
const ProtoAddress& /*dstAddr*/)
{
if (msg_index >= msg_len)
{
// Copy txBuffer to msg_buffer
len = (len < MgenMsg::MAX_SIZE) ? len : MgenMsg::MAX_SIZE;
memcpy(msg_buffer, txBuffer, len);
msg_len = len;
msg_index = 0;
if (!output_active) OnOutputReady();
return true;
}
else
{
DMSG(1, "MgenStreamSink::SendMgenMessage() message sink buffer overflow\n");
return false;
}
} // end MgenStreamSink::SendMgenMessage()
void MgenStreamSink::StartNotifying(MgenFlow* theFlow)
{
// (TBD) start async output readiness monitoring and
// mod OnOutputReady to tickle the "notify_flow"
// to output a message on demand
// also mod MgenFlow ON/MOD/OFF event handling to
// start/stop notify as needed
if (!output_active)
{
// (TBD) install async output notification only for result == true?
if (dispatcher.InstallGenericOutput(descriptor, MgenStreamSink::DoOutputReady, this))
output_active = true;
else
DMSG(0, "MgenStreamSink::OnOutputReady() async output install failed!\n");
}
MgenSink::StartNotifying(theFlow);
} // end MgenStreamSink::StartNotifying()
void MgenStreamSink::DoOutputReady(ProtoDispatcher::Descriptor /*descriptor*/,
ProtoDispatcher::Event /*theEvent*/,
const void* userData)
{
((MgenStreamSink*)userData)->OnOutputReady();
} // end MgenStreamSink::DoOutputReady()
void MgenStreamSink::OnOutputReady()
{
unsigned int nbytes = msg_len - msg_index;
bool result = Write(msg_buffer+msg_index, &nbytes);
msg_index += nbytes;
if (msg_index < msg_len)
{
if (!output_active)
{
// (TBD) install async output notification only for result == true?
if (dispatcher.InstallGenericOutput(descriptor, MgenStreamSink::DoOutputReady, this))
output_active = true;
else
DMSG(0, "MgenStreamSink::OnOutputReady() async output install failed!\n");
}
}
else
{
if (notify_flow)
{
notify_flow->Notify();
}
else if (output_active)
{
dispatcher.RemoveGenericOutput(descriptor);
output_active = false;
}
}
if (!result)
DMSG(0, "MgenStreamSink::OnOutputReady() error writing to output!\n");
} // MgenStreamSink::OnOutputReady()
bool MgenStreamSink::Write(char* buffer, unsigned int* nbytes)
{
unsigned int len = *nbytes;
#ifdef WIN32
DWORD put = 0;
#ifndef _WIN32_WCE
while (put < len)
{
DWORD written;
OVERLAPPED osWrite;
osWrite.hEvent = write_event;
if (!WriteFile((HANDLE)descriptor, buffer+put, len - put, &written, &osWrite))
{
if (ERROR_IO_PENDING != GetLastError())
{
// I/O error
DMSG(0, "MgenStreamSink::SendMgenMessage() WriteFile() error: %d\n",
GetLastError());
*nbytes = put;
return false;
}
else
{
// (TBD perhaps wait some small amount of time instead of ZERO?
DWORD result = WaitForSingleObject(write_event, 0);
switch (result)
{
case WAIT_OBJECT_0:
if (GetOverlappedResult((HANDLE)descriptor, &osWrite, &written, FALSE))
{
put += written;
}
else
{
DMSG(0, "MgenStreamSink::SendMgenMessage() GetOverlappedResult() error: %d\n",
GetLastError());
*nbytes = put;
return false;
}
break;
default:
// I/O timed out or abandoned
DMSG(0, "MgenStreamSink::SendMgenMessage() WaitForSingleObject() error: %d\n",
GetLastError());
*nbytes = put;
return false;
}
}
}
else
{
// WriteFile completed immediately
put += written;
}
}
#endif // !_WIN32_WCE
#else
ssize_t put = 0;
while (put < (ssize_t)len)
{
ssize_t result = write(descriptor, buffer+put, len - put);
if (result < 0)
{
switch (errno)
{
case EINTR:
continue; // interrupted, try again
case EAGAIN:
*nbytes = put;
return true; // blocked, can't write message now
default:
DMSG(0, "MgenStreamSink::SendMgenMessage() write() error: %s\n", GetErrorString());
*nbytes = put;
return false;
}
}
else
{
put += result;
}
}
#endif // if/else WIN32
*nbytes = put;
return true;
} // end MgenStreamSink::Write()
MgenStreamSource::MgenStreamSource(Mgen& mgenInstance, class MgenApp& mgenApp)
: mgen_instance(mgenInstance), mgen_app(mgenApp),
descriptor(ProtoDispatcher::INVALID_DESCRIPTOR),
msg_length(0), msg_index(0)
{
}
MgenStreamSource::~MgenStreamSource()
{
Close();
}
bool MgenStreamSource::Open(const char* path)
{
Close();
#ifdef WIN32
#ifdef _WIN32_WCE
DMSG(0, "MgenStreamSource::Open() \"source\" option not supported under WinCE\n");
return false;
#else
if (!strcmp(path, "STDIN"))
{
descriptor = (ProtoDispatcher::Descriptor)GetStdHandle(STD_INPUT_HANDLE);
// (TBD) set stdout for overlapped I/O ???
}
else
{
descriptor = (ProtoDispatcher::Descriptor)CreateFile(path, GENERIC_READ, FILE_SHARE_READ,
NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
}
if ((ProtoDispatcher::Descriptor)INVALID_HANDLE_VALUE == descriptor)
{
DMSG(0, "MgenStreamSource::Open() error opening file\n");
return false;
}
// For non-block I/O on Win32 we use overlapped I/O
if (NULL == (read_event = CreateEvent(NULL, TRUE, FALSE, NULL)))
{
DMSG(0, "MgenStreamSource::Open() error creating overlapped I/O event\n");
CloseHandle((HANDLE)descriptor);
return false;
}
#endif // if/else _WIN32_WCE
#else
if (!strcmp(path, "STDIN"))
{
descriptor = fileno(stdin);
// Make the stdin non-blocking
if(-1 == fcntl(descriptor, F_SETFL, fcntl(descriptor, F_GETFL, 0) | O_NONBLOCK))
{
descriptor = -1;
DMSG(0, "MgenStreamSource::Open() fcntl(stdout, F_SETFL(O_NONBLOCK)) error: %s",
GetErrorString());
}
}
else
{
descriptor = open(path, O_RDONLY | O_NONBLOCK);
}
if (descriptor < 0)
{
DMSG(0, "MgenStreamSource::Open() error opening file: %s", GetErrorString());
descriptor = ProtoDispatcher::INVALID_DESCRIPTOR;
return false;
}
#endif // if/else WIN32
msg_length = msg_index = 0;
return true;
} // end MgenStreamSource::Open()
void MgenStreamSource::Close()
{
if (ProtoDispatcher::INVALID_DESCRIPTOR != descriptor)
{
#ifdef WIN32
CloseHandle((HANDLE)descriptor);
CloseHandle(read_event);
read_event = INVALID_HANDLE_VALUE;
#else
close(descriptor);
#endif // if/else WIN32
descriptor = ProtoDispatcher::INVALID_DESCRIPTOR;
}
} // end MgenStreamSource::Close()
void MgenStreamSource::DoInputReady(ProtoDispatcher::Descriptor /*descriptor*/,
ProtoDispatcher::Event /*theEvent*/,
const void* userData)
{
((MgenStreamSource*)userData)->OnInputReady();
} // end MgenStreamSource::DoInputReady()
void MgenStreamSource::OnInputReady()
{
UINT32 count;
if (msg_length)
{
if (Read(msg_buffer+msg_index, msg_length - msg_index, &count))
{
msg_index += count;
if (msg_index == msg_length)
{
// Message reception complete
ProtoAddress src;
src.Reset(mgen_instance.GetDefaultSocketType());
src.SetPort(0);
mgen_instance.HandleMgenMessage(msg_buffer, msg_length, src);
msg_length = msg_index = 0;
}
}
else
{
mgen_app.Stop(-1);
}
}
else
{
// Reading first four bytes of MGEN message to get
// MGEN "messageSize" field
if (Read(msg_buffer+msg_index, 2 - msg_index, &count))
{
msg_index += count;
if (2 == msg_index)
{
memcpy(&msg_length, msg_buffer, 2);
msg_length = ntohs(msg_length);
if ((msg_length < MgenMsg::MIN_SIZE) || (msg_length > MgenMsg::MAX_SIZE))
{
DMSG(0, "MgenStreamSource::OnInputReady() invalid MGEN message length received: %lu\n",
msg_length);
msg_length = msg_index = 0;
return;
}
OnInputReady();
}
}
else
{
mgen_app.Stop(-1);
}
}
} // end MgenStreamSource::OnInputReady()
bool MgenStreamSource::Read(char* buffer, UINT32 nBytes, UINT32* bytesRead)
{
#ifdef WIN32
DWORD want = nBytes;
DWORD got;
if (ReadFile(descriptor, buffer, want, &got, NULL))
{
*bytesRead = got;
return true;
}
else
{
DMSG(0, "MgenStreamSource::Read() read error: %s\n", GetErrorString());
return false;
}
#else
ssize_t result = read(descriptor, buffer, nBytes);
if (result <= 0)
{
switch (errno)
{
case EINTR:
case EAGAIN:
*bytesRead = 0;
return true;
default:
DMSG(0, "MgenStreamSource::Read() read error: %s\n", GetErrorString());
return false;
}
}
else
{
*bytesRead = result;
return true;
}
#endif
} // end MgenStreamSource::Read()
// This macro instantiates our MgenApp instance
PROTO_INSTANTIATE_APP(MgenApp)
syntax highlighted by Code2HTML, v. 0.9.1