#include "mgen.h"
#include "mgenMsg.h"
#include "mgenVersion.h"

#include "protokit.h"

#include <string.h>
#include <stdio.h>   // for stdout/stderr printouts
#include <ctype.h>  // for toupper()

#ifdef WIN32
#include <IpHlpApi.h>
#else
#include <unistd.h>
#include <fcntl.h>
#endif // UNIX

// byte stream MGEN message sink
class MgenStreamSink : public MgenSink
{
    public:
        MgenStreamSink(ProtoDispatcher& theDispatcher);
        virtual ~MgenStreamSink();
        bool Open(const char* path, bool nonBlocking = true);
        void Close();
        
        virtual  bool SendMgenMessage(const char*           txBuffer,
                                      unsigned int          len,
                                      const ProtoAddress& dstAddr);
        
        static void DoOutputReady(ProtoDispatcher::Descriptor descriptor, 
                                  ProtoDispatcher::Event      theEvent, 
                                  const void*                 userData); 
        
        void StartNotifying(MgenFlow* theFlow);
          
    private:
        void OnOutputReady(); 
        bool Write(char* buffer, unsigned int* nbytes);
            
        ProtoDispatcher::Descriptor descriptor;
#ifdef WIN32
        HANDLE                      write_event;
#endif // WIN32
        char                        msg_buffer[MgenMsg::MAX_SIZE];
        UINT16                      msg_len;
        UINT16                      msg_index;
        ProtoDispatcher&            dispatcher;
        bool                        output_active;
};  // end class MgenStreamSink

class MgenStreamSource 
{
    public:
        MgenStreamSource(Mgen& mgenInstance, class MgenApp& mgenApp);
        ~MgenStreamSource();
        
        bool Open(const char* path);
        void Close();
        ProtoDispatcher::Descriptor GetDescriptor() {return descriptor;}
               
        static void DoInputReady(ProtoDispatcher::Descriptor descriptor, 
                                 ProtoDispatcher::Event      theEvent, 
                                 const void*                 userData);
    protected: 
        void OnInputReady();
        bool Read(char* buffer, UINT32 nBytes, UINT32* bytesRead);
    
        Mgen&                       mgen_instance;
        class MgenApp&              mgen_app;
        ProtoDispatcher::Descriptor descriptor;
#ifdef WIN32
        HANDLE                      read_event;
#endif // WIN32
        UINT32                      msg_length;
        char                        msg_buffer[MgenMsg::MAX_SIZE];
        UINT32                      msg_index;
};  // end class MgenStreamSource

class MgenApp : public ProtoApp
{
    public:
        enum CmdType {CMD_INVALID, CMD_ARG, CMD_NOARG};
    
        MgenApp();
        ~MgenApp();

        virtual bool OnStartup(int argc, const char*const* argv);
        virtual bool ProcessCommands(int argc, const char*const* argv);
        virtual void OnShutdown(); 
    
    private:    
        void OnControlEvent(ProtoSocket& theSocket, ProtoSocket::Event theEvent);
        static CmdType GetCmdType(const char* string);
        bool OnCommand(const char* cmd, const char* val);
        void Init(char* scriptFile);
        void Usage();
        static const char* const CMD_LIST[];
        static bool GetInterfaceCounts(const char* ifName, 
                                       UINT32&     txCount,
                                       UINT32&     rxCount);
        
        bool OpenCmdInput(const char* path);
        void CloseCmdInput();
        static void DoCmdInputReady(ProtoDispatcher::Descriptor descriptor, 
                                    ProtoDispatcher::Event      theEvent, 
                                    const void*                 userData); 
        void OnCmdInputReady();
        bool ReadCmdInput(char* buffer, unsigned int& numBytes);
        bool IsCmdDelimiter(char byte)
            {return (('\n' == byte) || ('\r' == byte) || (';' == byte)); }
       
        
        Mgen              mgen;         // Mgen engine
        ProtoPipe         control_pipe; // remote control message pipe
        bool              control_remote;
        
        ProtoDispatcher::Descriptor cmd_descriptor;   // "stdin" control option
        char                        cmd_buffer[8192];
        unsigned int                cmd_length;
        
#ifdef HAVE_GPS
        static bool GetPosition(const void* gpsHandle, GPSPosition& gpsPosition);
        GPSHandle         gps_handle;
        GPSHandle         payload_handle;
#endif // HAVE_GPS
        bool              have_ports;
        bool              convert;
        char              convert_path[PATH_MAX];
        bool              sink_non_blocking;
        MgenStreamSource* source;
        char              ifinfo_name[64];
        UINT32            ifinfo_tx_count;
        UINT32            ifinfo_rx_count;

}; // end class MgenApp


MgenApp::MgenApp()
 :  mgen(GetTimerMgr(), GetSocketNotifier()),
    control_pipe(ProtoPipe::MESSAGE), control_remote(false),
    cmd_descriptor(ProtoDispatcher::INVALID_DESCRIPTOR),
#ifdef HAVE_GPS
    gps_handle(NULL), payload_handle(NULL),
#endif // HAVE_GPS
    have_ports(false), convert(false), sink_non_blocking(true), source(NULL),
    ifinfo_tx_count(0), ifinfo_rx_count(0)
{
    control_pipe.SetNotifier(&GetSocketNotifier());
    control_pipe.SetListener(this, &MgenApp::OnControlEvent);
    ifinfo_name[0] = '\0';
}

MgenApp::~MgenApp()
{
    
}

void MgenApp::Usage()
{
    fprintf(stderr, "mgen [ipv4][ipv6][input <scriptFile>][save <saveFile>]\n"
                    "     [output <logFile>][log <logFile>][hostAddr {on|off}\n"
                    "     [binary][txlog][nolog][flush]\n"
                    "     [event \"<mgen event>\"][port <recvPortList>]\n"
                    "     [instance <name>][command <cmdInput>]\n"
                    "     [sink <sinkFile>][block][source <sourceFile>]\n"
                    "     [interface <interfaceName>][ttl <timeToLive>]\n"
                    "     [tos <typeOfService>][label <value>]\n"
                    "     [txbuffer <txSocketBufferSize>][rxbuffer <rxSocketBufferSize>]\n"
                    "     [start <hr:min:sec>[GMT]][offset <sec>]\n"
                    "     [precise {on|off}][ifinfo <ifName>]\n"
                    "     [txcheck][rxcheck][check]\n"
                    "     [convert <binaryLog>][debug <debugLevel>]\n");
}  // end MgenApp::Usage()


const char* const MgenApp::CMD_LIST[] =
{
    "+port",
    "-ipv6",       // open IPv6 sockets by default
    "-ipv4",       // open IPv4 sockets by default
    "+convert",    // convert binary logfile to text-based logfile
    "+sink",       // set Mgen::sink to stream sink
    "-block",      // set Mgen::sink to blocking I/O
    "+source",     // specify an MGEN stream source
    "+ifinfo",     // get tx/rx frame counts for specified interface
    "+precise",    // turn on/off precision packet timing
    "+instance",   // indicate mgen instance name 
    "-stop",       // exit program instance
    "+command",    // specifies an input command file/device
    "+hostAddr",   // turn "host" field on/off in sent messages
    "-help",       // print usage and exit
    NULL
};

MgenApp::CmdType MgenApp::GetCmdType(const char* cmd)
{
    if (!cmd) return CMD_INVALID;
    unsigned int len = strlen(cmd);
    bool matched = false;
    CmdType type = CMD_INVALID;
    const char* const* nextCmd = CMD_LIST;
    while (*nextCmd)
    {
        if (!strncmp(cmd, *nextCmd+1, len))
        {
            if (matched)
            {
                // ambiguous command (command should match only once)
                return CMD_INVALID;
            }
            else
            {
                matched = true;   
                if ('+' == *nextCmd[0])
                    type = CMD_ARG;
                else
                    type = CMD_NOARG;
            }
        }
        nextCmd++;
    }
    return type; 
}  // end MgenApp::GetCmdType()

bool MgenApp::ProcessCommands(int argc, const char*const* argv)
{
    // Group command line arguments into MgenApp or Mgen commands
    // and their (if applicable) respective arguments
    // Parse command line
    int i = 1;
    convert = false;  // initialize conversion flag
    while (i < argc)
    {
        CmdType cmdType = GetCmdType(argv[i]);
        if (CMD_INVALID == cmdType)
        {
            // Is it a class MgenApp command?
            switch(Mgen::GetCmdType(argv[i]))
            {
                case Mgen::CMD_INVALID:
                    break;
                case Mgen::CMD_NOARG:
                    cmdType = CMD_NOARG;
                    break;
                case Mgen::CMD_ARG:
                    cmdType = CMD_ARG;
                    break;
            }
        }
        switch (cmdType)
        {
            case CMD_INVALID:
                DMSG(0, "MgenApp::ProcessCommands() error: invalid command:%s\n", argv[i]);
                return false;
            case CMD_NOARG:
                if (!OnCommand(argv[i], NULL))
                {
                    DMSG(0, "MgenApp::ProcessCommands() OnCommand(%s) error\n", 
                            argv[i]);
                    return false;
                }
                i++;
                break;
            case CMD_ARG:
                if (!OnCommand(argv[i], argv[i+1]))
                {
                    DMSG(0, "MgenApp::ProcessCommands() OnCommand(%s, %s) error\n", 
                            argv[i], argv[i+1]);
                    return false;
                }
                i += 2;
                break;
        }
    }  
    return true;
}  // end MgenApp::ProcessCommands()

bool MgenApp::OnCommand(const char* cmd, const char* val)
{
    if (control_remote)
    {
        char buffer[8192];
        strcpy(buffer, cmd);
        if (val)
        {
            strcat(buffer, " ");
            strncat(buffer, val, 8192 - strlen(buffer));
        }
        unsigned int len = strlen(buffer);
        if (control_pipe.Send(buffer, len))
        {
            return true;
        }
        else
        {
            DMSG(0, "MgenApp::ProcessCommand(%s) error sending command to remote process\n", cmd);    
            return false;
        }        
    }
    
    CmdType type = GetCmdType(cmd);
    unsigned int len = strlen(cmd);    
    if (CMD_INVALID == type)
    {
        // If it is a core mgen command, process it as "overriding"
        if (Mgen::CMD_INVALID != Mgen::GetCmdType(cmd))
        {
            return mgen.OnCommand(Mgen::GetCommandFromString(cmd), val, true);   
        }
        else
        {
            DMSG(0, "MgenApp::ProcessCommand(%s) error: invalid command\n", cmd);
            return false;
        }
    }
    else if ((CMD_ARG == type) && !val)
    {
        DMSG(0, "MgenApp::ProcessCommand(%s) missing argument\n", cmd);
        return false;
    }
    else if (!strncmp("port", cmd, len))
    {
        // "port" == implicit "0.0 LISTEN UDP <val>" script
        char* string = new char[strlen(val) + 64];
        if (!string)
        {
            DMSG(0, "MgenApp::ProcessCommand(port) memory allocation error: %s\n",
                    GetErrorString());
            return false;   
        }
        sprintf(string, "0.0 LISTEN UDP %s", val);
        DrecEvent* event = new DrecEvent;
        if (!event)
        {
            DMSG(0, "MgenApp::ProcessCommand(port) memory allocation error: %s\n",
                    GetErrorString());
            return false;   
        }
        if (!event->InitFromString(string))
        {
            DMSG(0, "MgenApp::ProcessCommand(port) error parsing <portList>\n");
            return false;      
        }
        have_ports = true;
        mgen.InsertDrecEvent(event);
    }
    else if (!strncmp("ipv4", cmd, len))
    {
        mgen.SetDefaultSocketType(ProtoAddress::IPv4);
    }
    else if (!strncmp("ipv6", cmd, len))
    {
#ifdef HAVE_IPV6 
        ProtoSocket::SetHostIPv6Capable();
        if (ProtoSocket::HostIsIPv6Capable())
            mgen.SetDefaultSocketType(ProtoAddress::IPv6);
        else
#endif // HAVE_IPV6
            DMSG(0, "MgenApp::ProcessCommand(ipv6) Warning: system not IPv6 capable?\n");
    }
    else if (!strncmp("background", cmd, len))
    {
        // do nothing (this command was scanned immediately at startup)
    }
    else if (!strcmp("convert", cmd))
    {
        convert = true;             // set flag to do the conversion
        strcpy(convert_path, val);  // save path of file to convert
    }
    else if (!strncmp("sink", cmd, len))
    {
        MgenStreamSink* theSink = new MgenStreamSink(dispatcher);
        if (theSink)
        {
            if (theSink->Open(val, sink_non_blocking))
            {
                if (mgen.GetSink()) 
                {
                    ((MgenStreamSink*)mgen.GetSink())->Close();
                    delete mgen.GetSink();
                }
                mgen.SetSink(theSink);
            }
            else
            {
                DMSG(0, "MgenApp::ProcessCommand(sink) Error: couldn't open stream sink\n");
                return false;
            }                           
        }
        else
        {
            DMSG(0, "MgenApp::ProcessCommand(sink) Error: couldn't allocate sink: %s\n",
                    GetErrorString());
            return false;
        }
    }
    else if (!strncmp("block", cmd, len))
    {
        sink_non_blocking = false;
    }
    else if (!strncmp("source", cmd, len))
    {
        MgenStreamSource* theSource = new MgenStreamSource(mgen, *this);
        if (theSource)
        {
            if (theSource->Open(val))
            {
                if (!dispatcher.InstallGenericInput(theSource->GetDescriptor(),
                                                    MgenStreamSource::DoInputReady, 
                                                    theSource))
                {
                    DMSG(0, "MgenApp::ProcessCommand(source) Error: couldn't install stream source\n");
                    theSource->Close();
                    return false;       
                }  
                if (source) 
                {
                    source->Close();
                    delete source;  
                }             
                source = theSource;
            }
            else
            {
                DMSG(0, "MgenApp::ProcessCommand(source) Error: couldn't open stream source\n");
                return false;
            }                           
        }
        else
        {
            DMSG(0, "MgenApp::ProcessCommand(source) Error: couldn't allocate source: %s\n",
                    GetErrorString());
            return false;
        }
    }
    else if (!strncmp("ifinfo", cmd, len))
    {
        strncpy(ifinfo_name, val, 63);  
        ifinfo_name[63] = '\0'; 
    }
    else if (!strncmp("precise", cmd, len))
    {
        char status[4];  // valid status is "on" or "off"
        strncpy(status, val, 3);
        status[3] = '\0';
        unsigned int len = strlen(status);
        for (unsigned int i = 0; i < len; i++)
            status[i] = tolower(status[i]);
        if (!strncmp("on", status, len))
        {
            dispatcher.SetPreciseTiming(true);
        }
        else if (!strncmp("off", status, len))
        {
            dispatcher.SetPreciseTiming(false);
        }
        else
        {
            DMSG(0, "MgenApp::ProcessCommand(precise) Error: invalid <status>\n");
            return false;
        }
    }
    else if (!strncmp("instance", cmd, len))
    {
        if (control_pipe.IsOpen())
            control_pipe.Close();
        if (control_pipe.Connect(val))
        {
            control_remote = true;
        }        
        else if (!control_pipe.Listen(val))
        {
            DMSG(0, "MgenApp::ProcessCommand(instance) error opening control pipe\n");
            return false; 
        }
    }
    else if (!strncmp("command", cmd, len))
    {
        if (!OpenCmdInput(val))
        {
            DMSG(0, "MgenApp::ProcessCommand(command) error opening command input file/device\n");
            return false;
        }
    }
    else if (!strncmp("hostAddr", cmd, len))
    {
       char status[4];  // valid status is "on" or "off"
        strncpy(status, val, 3);
        status[3] = '\0';
        unsigned int len = strlen(status);
        for (unsigned int i = 0; i < len; i++)
            status[i] = tolower(status[i]);
        if (!strncmp("on", status, len))
        {
            // (TBD) control whither IPv4 or IPv6 ???
            ProtoAddress localAddress;
            if (localAddress.ResolveLocalAddress())
            {
                mgen.SetHostAddress(localAddress);   
            }
            else
            {
                DMSG(0, "MgenApp::ProcessCommand(hostAddr) error getting local addr\n");
                return false;
            }
        }
        else if (!strncmp("off", status, len))
        {
            mgen.ClearHostAddress();
        }
        else
        {
            DMSG(0, "MgenApp::ProcessCommand(precise) Error: invalid <status>\n");
            return false;
        }
    }
    else if (!strncmp("stop", cmd, len))
    {
       Stop();
    }
    else if (!strncmp("help", cmd, len))
    {
       fprintf(stderr, "mgen: version %s\n", MGEN_VERSION);
       Usage();
       return false;
    }
    return true;
}  // end MgenApp::OnCommand()

bool MgenApp::OnStartup(int argc, const char*const* argv)
{   
    // Seed the system rand() function
    struct timeval currentTime;
    ProtoSystemTime(currentTime);
    srand(currentTime.tv_usec);
    
    mgen.SetLogFile(stdout);  // log to stdout by default

#ifdef HAVE_IPV6    
    if (ProtoSocket::HostIsIPv6Capable()) 
        mgen.SetDefaultSocketType(ProtoAddress::IPv6);
#endif // HAVE_IPV6
    
#ifdef HAVE_GPS
        gps_handle = GPSSubscribe(NULL);
        if (gps_handle)
            mgen.SetPositionCallback(MgenApp::GetPosition, gps_handle);
        payload_handle = GPSSubscribe("/tmp/mgenPayloadKey");
        mgen.SetPayloadHandle(payload_handle);
#endif // HAVE_GPS   
    
    if (!ProcessCommands(argc, argv))
    {
        fprintf(stderr, "mgen: error while processing startup commands\n");
        return false;
    }
    
    if (control_remote)
    {
        // We remoted commands, so we're finished ...
        return false;   
    }

    fprintf(stderr, "mgen: version %s\n", MGEN_VERSION);
    
    if (convert)
    {
        fprintf(stderr, "mgen: beginning binary to text log conversion ...\n");
        mgen.ConvertBinaryLog(convert_path);
        fprintf(stderr, "mgen: conversion complete (exiting).\n");
    }
    else
    {
        if (mgen.DelayedStart())
        {
            char startTime[256];
            mgen.GetStartTime(startTime);
            fprintf(stderr, "mgen: delaying start until %s ...\n", startTime);
        }
        else
        {
            fprintf(stderr, "mgen: starting now ...\n");
        }
    }
    
    if ('\0' != ifinfo_name[0])
        GetInterfaceCounts(ifinfo_name, ifinfo_tx_count, ifinfo_rx_count);
    else
        ifinfo_tx_count = ifinfo_rx_count = 0;  
    return mgen.Start();
}  // end MgenApp::OnStartup()

void MgenApp::OnShutdown()
{
    mgen.Stop();
    
    if ('\0' != ifinfo_name[0])
    {
        UINT32 txCount, rxCount;
        if (GetInterfaceCounts(ifinfo_name, txCount, rxCount))
            fprintf(stderr, "mgen: iface>%s txFrames>%lu rxFrames>%lu\n", 
                             ifinfo_name,
                             txCount - ifinfo_tx_count,
                             rxCount - ifinfo_rx_count);
        else
            fprintf(stderr, "mgen: Warning! Couldn't get interface frame counts\n");
    }
    
   
    
    MgenSink* theSink = mgen.GetSink();
    if (theSink)
    {
        delete theSink;
        mgen.SetSink(NULL);   
    }
    if (source)
    {
        delete source;
        source = NULL;
    }
#ifdef HAVE_GPS
    if (gps_handle) 
    {
        GPSUnsubscribe(gps_handle);
        gps_handle = NULL;
        mgen.SetPositionCallback(NULL, NULL);
    }
    if (payload_handle)
    {
        GPSUnsubscribe(payload_handle);
        payload_handle = NULL;
        mgen.SetPayloadHandle(NULL);
    }
#endif // HAVE_GPS    
    control_pipe.Close();
    CloseCmdInput();
}  // end MgenApp::OnShutdown()


#ifdef HAVE_GPS
bool MgenApp::GetPosition(const void* gpsHandle, GPSPosition& gpsPosition)
{
    GPSGetCurrentPosition((GPSHandle)gpsHandle, &gpsPosition);
    return true;    
}
#endif // HAVE_GPS

// This routine uses the system's "netstat" command
// to retrieve tx/rx frame counts for a given interface
// (WIN32 Note: Counts are cumulative for all interfaces?)
bool MgenApp::GetInterfaceCounts(const char* ifName, 
                                 UINT32&     txCount,
                                 UINT32&     rxCount)
{
#ifdef _WIN32_WCE
    // For the moment, we'll assume our WINCE platform
    // has one interface and we'll use GetIpStatistics()
    // ignoring the "ifName"
    MIB_IPSTATS ipStats;
    if (NO_ERROR != GetIpStatistics(&ipStats))
    {
        DMSG(0, "MgenApp::GetInterfaceCounts() GetIpStatistics() error: %s\n", GetErrorString());
        rxCount = txCount = 0;
        return false;
    }
    else
    {
        rxCount = ipStats.dwInReceives;
        txCount = ipStats.dwInDelivers;
        return true;
    }
#else
    bool result = false;
    txCount = rxCount = 0;
#ifdef WIN32
    FILE* p = _popen("netstat -e", "r");
    int lineCount = 0;
#else
    FILE* p = popen("netstat -i", "r");
#endif  // if/else WIN32
    if (NULL == p)
    {
        DMSG(0, "MgenApp::GetInterfaceCounts() popen() error: %s\n",
                 GetErrorString());
        return result;   
    }
    Mgen::FastReader reader;
    char linebuffer[256];
    unsigned int len = 256;
    while (Mgen::FastReader::OK == reader.Readline(p, linebuffer, &len))
    {
#ifdef WIN32
        // On WIN32, we must collect two lines from "netstat -e" output
        if (!strncmp(linebuffer, "Unicast", strlen("Unicast")) ||
            !strncmp(linebuffer, "Non-unicast", strlen("Non-unicast")))
        {
            char* ptr = strstr(linebuffer, "packets");
            if (ptr)
            {
                UINT32 received, sent;
                if (2 == sscanf(ptr, "packets %lu %lu", &received, &sent))
                {
                    txCount += sent;
                    rxCount += received;  
                    if (2 == ++lineCount)
                    {
                        result = true;
                        break;
                    }
                }
            }
        }
#else
        if (!strncmp(linebuffer, ifName, strlen(ifName)))
        {
            
#ifdef LINUX
            char format[256];
            // Linux format: "iface mtu metric rxOk rxErr rxDrp rxOvr txOk ..."
            sprintf(format, "%s %%lu %%lu %%lu %%lu %%lu %%lu %%lu", ifName);
            UINT32 mtu, metric, rxOk, rxErr, rxDrp, rxOvr, txOk;
            if (7 == sscanf(linebuffer, format, &mtu, &metric, &rxOk, &rxErr, &rxDrp, &rxOvr, &txOk))
            {
                txCount = txOk;
                rxCount = rxOk;
                result = true;
                break;
            }
#endif // LINUX   
#ifdef MACOSX
            char format[256];
            // MacOS X format: "iface mtu net addr ipkts ierrs opkts ..."
            char net[64], addr[64];
            UINT32 mtu, ipkts, ierrs, opkts;
            sprintf(format, "%s %%lu %%s %%s %%lu %%lu %%lu", ifName);
            if (6 == sscanf(linebuffer, format, &mtu, net, addr, &ipkts, &ierrs, &opkts))
            {
                txCount = opkts;
                rxCount = ipkts;
                result = true;
                break;
            }   
#endif // MACOSX
        }
#endif // if/else WIN32
        len = 256;
    }  // end while(reader.Readline())
#ifdef WIN32
    _pclose(p);
#else
    pclose(p);
#endif // if/else WIN32
    return result;
#endif // if/else _WIN32_WCE
}  // end MgenApp::GetInterfaceCounts()

void MgenApp::OnControlEvent(ProtoSocket& /*theSocket*/,
                             ProtoSocket::Event theEvent)
{
    if (ProtoSocket::RECV == theEvent)
    {
        char buffer[8192];
        unsigned int len = 8191;
        if (control_pipe.Recv(buffer, len))
        {
            // (TBD) Delimit commands by line breaks and/or other delimiters ???
            buffer[len] = '\0';
            char* cmd = buffer;
            char* arg = NULL;
            for (unsigned int i = 0; i < len; i++)
            {
                if ('\0' == buffer[i])
                {
                    break;
                }
                else if (isspace(buffer[i]))
                {
                    buffer[i] = '\0';
                    arg = buffer+i+1;
                    break;
                }
            }
            if (!OnCommand(cmd, arg))
                DMSG(0, "MgenApp::OnControlEvent() error processing command\n");
        }
        else
        {
            DMSG(0, "MgenApp::OnControlEvent() receive error\n"); 
        }   
    }
    else
    {
        DMSG(0, "MgenApp::OnControlEvent() unhandled event type\n");  
    }
}  // end MgenApp::OnControlEvent


// Mgen run-time control via "stdin" (or other file/device)
// Note: On WIN32, MGEN must be built as a "console" application
//       for this stuff to work?

bool MgenApp::OpenCmdInput(const char* path)
{
#ifdef _WIN32_WCE
    DMSG(0, "MgenApp::OpenCmdInput() \"command\" option not supported on WinCE\n");
    return false;
#else
    CloseCmdInput();  // in case it was open
#ifdef WIN32
    if (!strcmp(path, "STDIN"))
    {
		cmd_descriptor = (ProtoDispatcher::Descriptor)GetStdHandle(STD_INPUT_HANDLE);
        // (TBD) set stdin for overlapped I/O ???
    }
    else
    {
        cmd_descriptor = (ProtoDispatcher::Descriptor)CreateFile(path, GENERIC_READ, FILE_SHARE_READ, 
                                NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
    }
    if ((ProtoDispatcher::Descriptor)INVALID_HANDLE_VALUE == cmd_descriptor)
    {
        DMSG(0, "MgenApp::OpenCmdInput() error opening file\n");
        return false;
    }
    /*// For non-block I/O on Win32 we use overlapped I/O
    if (NULL == (cmd_read_event = CreateEvent(NULL, TRUE, FALSE, NULL)))
    {
        DMSG(0, "MgenApp::OpenCmdInput() error creating overlapped I/O event\n");
        CloseHandle((HANDLE)cmd_descriptor);
        return false;
    } */ 
#else
    if (!strcmp(path, "STDIN"))
    {
        cmd_descriptor = fileno(stdin);
        // Make the stdin non-blocking
        if(-1 == fcntl(cmd_descriptor, F_SETFL, fcntl(cmd_descriptor, F_GETFL, 0) | O_NONBLOCK))
        {
            cmd_descriptor = -1;
            DMSG(0, "MgenApp::OpenCmdInput() fcntl(stdout, F_SETFL(O_NONBLOCK)) error: %s",
                    GetErrorString());
        }
    }
    else
    {
        cmd_descriptor = open(path, O_RDONLY | O_NONBLOCK);
    }
    if (cmd_descriptor < 0)
    {
        DMSG(0, "MgenApp::OpenCmdInput() error opening file: %s", GetErrorString());
        cmd_descriptor = ProtoDispatcher::INVALID_DESCRIPTOR;
        return false;
    }  
#endif // if/else WIN32
    cmd_length = 0;
    if (!dispatcher.InstallGenericInput(cmd_descriptor, MgenApp::DoCmdInputReady, this))
    {
        DMSG(0, "MgenApp::OpenCmdInput() Error: couldn't install cmd input\n");
        CloseCmdInput();
        return false;       
    }  
    return true;
#endif // if/else _WIN32_WCE
}  // end MgenApp::OpenCmdInput()

void MgenApp::CloseCmdInput()
{
#ifndef _WIN32_WCE
    if (ProtoDispatcher::INVALID_DESCRIPTOR != cmd_descriptor)
    {
        dispatcher.RemoveGenericInput(cmd_descriptor);
#ifdef WIN32
        CloseHandle((HANDLE)cmd_descriptor);
#else
        close(cmd_descriptor);
#endif // if/else WIN32/UNIX
        cmd_descriptor = ProtoDispatcher::INVALID_DESCRIPTOR;
    } 
#endif // _WIN32_WCE
}  // end MgenApp::CloseCmdInput()

void MgenApp::DoCmdInputReady(ProtoDispatcher::Descriptor /*descriptor*/, 
                              ProtoDispatcher::Event      /*theEvent*/, 
                              const void*                 userData)
{
    ((MgenApp*)userData)->OnCmdInputReady();
}  // end MgenApp::DoCmdInputReady()

void MgenApp::OnCmdInputReady()
{
    // We read the command input stream one byte at a time
    // just to save some buffer manipulation
    char byte;
    unsigned int len = 1;
    if (ReadCmdInput(&byte, len))
    {
        if (IsCmdDelimiter(byte))
        {
            cmd_buffer[cmd_length] = '\0';
            if (cmd_length)
            {
                char* cmd = cmd_buffer;
                char* arg = NULL;
                for (unsigned int i = 0; i < cmd_length; i++)
                {
                    if (isspace(cmd_buffer[i]))
                    {
                        cmd_buffer[i] = '\0';
                        arg = cmd_buffer + i + 1;
                        break;
                    } 
                }
                if (!OnCommand(cmd, arg))
                    DMSG(0, "MgenApp::OnCmdInputReady() error processing command\n");       
            }
            cmd_length = 0;
        }        
        else if (cmd_length < 8191)
        {
            cmd_buffer[cmd_length++] = byte;
        }
        else
        {
            DMSG(0, "MgenApp::OnCmdInputReady() error: maximum command length exceeded\n");   
        }
    }
    else
    {
        DMSG(0, "MgenApp::OnCmdInputReady() error reading input\n");
    }    
}  // end MgenApp::OnCmdInputReady()

bool MgenApp::ReadCmdInput(char* buffer, unsigned int& numBytes)
{
#ifdef WIN32
    DWORD want = numBytes;
    DWORD got;
    if (ReadFile(cmd_descriptor, buffer, want, &got, NULL))
    {
        numBytes = got;
        return true;
    }
    else
    {
        DMSG(0, "MgenApp::ReadCmdInput() ReadFile error: %s\n", GetErrorString());
        return false;
    }
#else
    ssize_t result = read(cmd_descriptor, buffer, numBytes);
    if (result <= 0)
    {
        switch (errno)
        {
            case EINTR:
            case EAGAIN:
                numBytes = 0;
                return true;
            default:
                DMSG(0, "MgenApp::ReadCmdInput() read error: %s\n", GetErrorString());
            return false;   
        }
    }
    else
    {
        numBytes = result;
        return true;   
    }
#endif  // end if/else WIN32/UNIX
}  // end MgenApp::ReadCmdInput()


// Mgen stream sink/source implementation
// Note: On WIN32, MGEN must be built as a "console" application
//       for this stuff to work?

MgenStreamSink::MgenStreamSink(ProtoDispatcher& theDispatcher)
 : descriptor(ProtoDispatcher::INVALID_DESCRIPTOR),
   msg_len(0), msg_index(0), dispatcher(theDispatcher), output_active(false)
{
}

MgenStreamSink::~MgenStreamSink()
{
    Close();   
}

bool MgenStreamSink::Open(const char* path, bool nonBlocking)
{
    Close();
#ifdef WIN32
#ifdef _WIN32_WCE
    DMSG(0, "MgenStreamSink::Open() \"sink\" option not support under WinCE\n");
    return false;
#else
    if (!strcmp(path, "STDOUT"))
    {
		descriptor = (ProtoDispatcher::Descriptor)GetStdHandle(STD_OUTPUT_HANDLE);
        // (TBD) set stdout for overlapped I/O ???
    }
    else
    {
        descriptor = (ProtoDispatcher::Descriptor)CreateFile(path, GENERIC_WRITE, FILE_SHARE_READ, 
                                NULL, CREATE_ALWAYS, FILE_FLAG_OVERLAPPED, NULL);
    }
    if ((ProtoDispatcher::Descriptor)INVALID_HANDLE_VALUE == descriptor)
    {
        DMSG(0, "MgenStreamSink::Open() error opening file\n");
        return false;
    }
    // For non-block I/O on Win32 we use overlapped I/O
    if (NULL == (write_event = CreateEvent(NULL, TRUE, FALSE, NULL)))
    {
        DMSG(0, "MgenStreamSink::Open() error creating overlapped I/O event\n");
        CloseHandle((HANDLE)descriptor);
        return false;
    }    
#endif  // if/else _WIN32_WCE
#else
    if (!strcmp(path, "STDOUT"))
    {
        descriptor = fileno(stdout);
        if (nonBlocking)
        {
            // Make the stdout non-blocking and aynchronous
            int flags = O_NONBLOCK;
#ifdef FASYNC
            flags |= FASYNC;
#endif // FASYNC
            if(-1 == fcntl(descriptor, F_SETFL, fcntl(descriptor, F_GETFL, 0) | flags))
            {
                descriptor = -1;
                DMSG(0, "MgenStreamSink::Open() fcntl(stdout, F_SETFL(O_NONBLOCK)) error: %s",
                        GetErrorString());
            }
        }
    }
    else
    {
        int flags = O_CREAT | O_WRONLY;
        if (nonBlocking) flags |= O_NONBLOCK;
        descriptor = open(path, flags);
    }
    if (descriptor < 0)
    {
        DMSG(0, "MgenStreamSink::Open() error opening file: %s", GetErrorString());
        descriptor = ProtoDispatcher::INVALID_DESCRIPTOR;
        return false;
    }  
#endif // if/else WIN32
    msg_len = msg_index = 0;
    output_active = false;
    return true;
}  // end MgenStreamSink::Open()

void MgenStreamSink::Close()
{
    if (ProtoDispatcher::INVALID_DESCRIPTOR != descriptor)
    {
        if (output_active)
        {
            dispatcher.RemoveGenericOutput(descriptor);
            output_active = false;   
        }
#ifdef WIN32
        CloseHandle((HANDLE)descriptor);
        CloseHandle((HANDLE)write_event);
		write_event = INVALID_HANDLE_VALUE;
#else
        close(descriptor);
#endif // if/else WIN32
        descriptor = ProtoDispatcher::INVALID_DESCRIPTOR;   
    }
}  // end MgenStreamSink::Close()

// Write buffer only if it is non-blocking
bool MgenStreamSink::SendMgenMessage(const char*           txBuffer,
                                     unsigned int          len,
                                     const ProtoAddress&   /*dstAddr*/)
{
    if (msg_index >= msg_len)
    {
        // Copy txBuffer to msg_buffer
        len = (len < MgenMsg::MAX_SIZE) ? len : MgenMsg::MAX_SIZE;
        memcpy(msg_buffer, txBuffer, len);
        msg_len = len;
        msg_index = 0;
        if (!output_active) OnOutputReady();
        return true;
    }
    else
    {
        DMSG(1, "MgenStreamSink::SendMgenMessage() message sink buffer overflow\n");
        return false;
    }
}  // end MgenStreamSink::SendMgenMessage()


void MgenStreamSink::StartNotifying(MgenFlow* theFlow)
{
    // (TBD) start async output readiness monitoring and
    //       mod OnOutputReady to tickle the "notify_flow"
    //       to output a message on demand
    //       also mod MgenFlow ON/MOD/OFF event handling to
    //       start/stop notify as needed    
    if (!output_active)
    {
        // (TBD) install async output notification only for result == true?
        if (dispatcher.InstallGenericOutput(descriptor, MgenStreamSink::DoOutputReady, this))
            output_active = true;
        else
            DMSG(0, "MgenStreamSink::OnOutputReady() async output install failed!\n");
    }
    MgenSink::StartNotifying(theFlow);
}  // end MgenStreamSink::StartNotifying()

void MgenStreamSink::DoOutputReady(ProtoDispatcher::Descriptor /*descriptor*/, 
                                   ProtoDispatcher::Event      /*theEvent*/, 
                                   const void*                 userData)
{
    ((MgenStreamSink*)userData)->OnOutputReady();
}  // end MgenStreamSink::DoOutputReady()


void MgenStreamSink::OnOutputReady()
{
    unsigned int nbytes = msg_len - msg_index;
    bool result = Write(msg_buffer+msg_index, &nbytes);
    msg_index += nbytes;
    if (msg_index < msg_len)
    {
        if (!output_active)
        {
            // (TBD) install async output notification only for result == true?
            if (dispatcher.InstallGenericOutput(descriptor, MgenStreamSink::DoOutputReady, this))
                output_active = true;
            else
                DMSG(0, "MgenStreamSink::OnOutputReady() async output install failed!\n");
        }
    }
    else
    {
        if (notify_flow)
        {
            notify_flow->Notify();
        }
        else if (output_active)
        {
            dispatcher.RemoveGenericOutput(descriptor);
            output_active = false;
        }   
    }
    if (!result)
        DMSG(0, "MgenStreamSink::OnOutputReady() error writing to output!\n");
}  // MgenStreamSink::OnOutputReady()

bool MgenStreamSink::Write(char* buffer, unsigned int* nbytes)
{
    unsigned int len = *nbytes;
#ifdef WIN32
    DWORD put = 0;
#ifndef _WIN32_WCE
    while (put < len)
    {
        DWORD written;
        OVERLAPPED osWrite;
        osWrite.hEvent = write_event;
        if (!WriteFile((HANDLE)descriptor, buffer+put, len - put, &written, &osWrite))
        {
            if (ERROR_IO_PENDING != GetLastError())
            {
                // I/O error
                DMSG(0, "MgenStreamSink::SendMgenMessage() WriteFile() error: %d\n",
                        GetLastError()); 
                *nbytes = put;
                return false;
            }
            else
            {            
                // (TBD perhaps wait some small amount of time instead of ZERO?
                DWORD result = WaitForSingleObject(write_event, 0);
                switch (result)
                {
                    case WAIT_OBJECT_0:
                        if (GetOverlappedResult((HANDLE)descriptor, &osWrite, &written, FALSE))
                        {
                            put += written;  
                        }
                        else
                        {
                            DMSG(0, "MgenStreamSink::SendMgenMessage() GetOverlappedResult() error: %d\n",
                                    GetLastError());
                            *nbytes = put;
                            return false;    
                        }                        
                        break; 
                    default:
                        // I/O timed out or abandoned
                        DMSG(0, "MgenStreamSink::SendMgenMessage() WaitForSingleObject() error: %d\n",
                                GetLastError());
                        *nbytes = put;
                        return false;  
                }
            }
        }
        else
        {
            // WriteFile completed immediately
            put += written;    
        }   
    } 
#endif // !_WIN32_WCE
#else
    ssize_t put = 0;
    while (put < (ssize_t)len)
    {
        ssize_t result = write(descriptor, buffer+put, len - put);
        if (result < 0)
        {
            switch (errno)
            {
                case EINTR:
                    continue;     // interrupted, try again
                case EAGAIN:
                    *nbytes = put;
                    return true; // blocked, can't write message now
                default:
                    DMSG(0, "MgenStreamSink::SendMgenMessage() write() error: %s\n", GetErrorString());
                    *nbytes = put;
                    return false; 
            }
        }
        else
        {
            put += result;    
        }           
    }
#endif // if/else WIN32
    *nbytes = put;
    return true;
}  // end MgenStreamSink::Write()


MgenStreamSource::MgenStreamSource(Mgen& mgenInstance, class MgenApp& mgenApp)
 : mgen_instance(mgenInstance), mgen_app(mgenApp),
   descriptor(ProtoDispatcher::INVALID_DESCRIPTOR),
   msg_length(0), msg_index(0)
{
    
}

MgenStreamSource::~MgenStreamSource()
{
    Close();   
}

bool MgenStreamSource::Open(const char* path)
{
    Close();
#ifdef WIN32
#ifdef _WIN32_WCE
    DMSG(0, "MgenStreamSource::Open() \"source\" option not supported under WinCE\n");
    return false;
#else
    if (!strcmp(path, "STDIN"))
    {
		descriptor = (ProtoDispatcher::Descriptor)GetStdHandle(STD_INPUT_HANDLE);
        // (TBD) set stdout for overlapped I/O ???
    }
    else
    {
        descriptor = (ProtoDispatcher::Descriptor)CreateFile(path, GENERIC_READ, FILE_SHARE_READ, 
                                NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
    }
    if ((ProtoDispatcher::Descriptor)INVALID_HANDLE_VALUE == descriptor)
    {
        DMSG(0, "MgenStreamSource::Open() error opening file\n");
        return false;
    }
    // For non-block I/O on Win32 we use overlapped I/O
    if (NULL == (read_event = CreateEvent(NULL, TRUE, FALSE, NULL)))
    {
        DMSG(0, "MgenStreamSource::Open() error creating overlapped I/O event\n");
        CloseHandle((HANDLE)descriptor);
        return false;
    }    
#endif  // if/else _WIN32_WCE
#else
    if (!strcmp(path, "STDIN"))
    {
        descriptor = fileno(stdin);
        // Make the stdin non-blocking
        if(-1 == fcntl(descriptor, F_SETFL, fcntl(descriptor, F_GETFL, 0) | O_NONBLOCK))
        {
            descriptor = -1;
            DMSG(0, "MgenStreamSource::Open() fcntl(stdout, F_SETFL(O_NONBLOCK)) error: %s",
                    GetErrorString());
        }
    }
    else
    {
        descriptor = open(path, O_RDONLY | O_NONBLOCK);
    }
    if (descriptor < 0)
    {
        DMSG(0, "MgenStreamSource::Open() error opening file: %s", GetErrorString());
        descriptor = ProtoDispatcher::INVALID_DESCRIPTOR;
        return false;
    }  
#endif // if/else WIN32
    msg_length = msg_index = 0;
    return true;
}  // end MgenStreamSource::Open()

void MgenStreamSource::Close()
{
    if (ProtoDispatcher::INVALID_DESCRIPTOR != descriptor)
    {
#ifdef WIN32
        CloseHandle((HANDLE)descriptor);
        CloseHandle(read_event);
        read_event = INVALID_HANDLE_VALUE;
#else
        close(descriptor);
#endif // if/else WIN32
        descriptor = ProtoDispatcher::INVALID_DESCRIPTOR;   
    }
}  // end MgenStreamSource::Close()


void MgenStreamSource::DoInputReady(ProtoDispatcher::Descriptor /*descriptor*/, 
                                    ProtoDispatcher::Event      /*theEvent*/, 
                                    const void*                 userData)
{
    ((MgenStreamSource*)userData)->OnInputReady();
}  // end MgenStreamSource::DoInputReady()


void MgenStreamSource::OnInputReady()
{
    UINT32 count;
    if (msg_length)
    {
        if (Read(msg_buffer+msg_index, msg_length - msg_index, &count))
        {
            msg_index += count;
            if (msg_index == msg_length)
            {
                // Message reception complete 
                ProtoAddress src;
                src.Reset(mgen_instance.GetDefaultSocketType());
                src.SetPort(0);
                mgen_instance.HandleMgenMessage(msg_buffer, msg_length, src);
                msg_length = msg_index = 0;
            }   
        }
        else
        {
            mgen_app.Stop(-1);   
        }
    }
    else
    {
        // Reading first four bytes of MGEN message to get
        // MGEN "messageSize" field
        if (Read(msg_buffer+msg_index, 2 - msg_index, &count))
        {
            msg_index += count;
            if (2 == msg_index)
            {
                memcpy(&msg_length, msg_buffer, 2);
                msg_length = ntohs(msg_length);
                if ((msg_length < MgenMsg::MIN_SIZE) || (msg_length > MgenMsg::MAX_SIZE))
                {
                    DMSG(0, "MgenStreamSource::OnInputReady() invalid MGEN message length received: %lu\n",
                            msg_length);
                    msg_length = msg_index = 0;
                    return;  
                }
                OnInputReady();
            }
        }
        else
        {
            mgen_app.Stop(-1);
        }     
    }
}  // end MgenStreamSource::OnInputReady()

bool MgenStreamSource::Read(char* buffer, UINT32 nBytes, UINT32* bytesRead)
{
#ifdef WIN32
    DWORD want = nBytes;
    DWORD got;
    if (ReadFile(descriptor, buffer, want, &got, NULL))
    {
        *bytesRead = got;
        return true;
    }
    else
    {
        DMSG(0, "MgenStreamSource::Read() read error: %s\n", GetErrorString());
        return false;
    }
#else
    ssize_t result = read(descriptor, buffer, nBytes);
    if (result <= 0)
    {
        switch (errno)
        {
            case EINTR:
            case EAGAIN:
                *bytesRead = 0;
                return true;
            default:
                DMSG(0, "MgenStreamSource::Read() read error: %s\n", GetErrorString());
            return false;   
        }
    }
    else
    {
        *bytesRead = result;
        return true;   
    }
#endif
}  // end MgenStreamSource::Read()

// This macro instantiates our MgenApp instance
PROTO_INSTANTIATE_APP(MgenApp)




syntax highlighted by Code2HTML, v. 0.9.1