#include "mgenFlow.h"
#include "mgenMsg.h"
#include "mgen.h"
#include <time.h>  // for gmtime(), struct tm, etc

#ifndef _WIN32_WCE
#include <errno.h>  // for EAGAIN
#endif // !_WIN32_WCE

MgenSink::MgenSink()
 : notify_flow(NULL)
{
}

MgenSink::~MgenSink()
{
}

MgenFlow::MgenFlow(unsigned int         flowId, 
                   ProtoTimerMgr&       timerMgr,
                   MgenSocketList&      socketList,
                   unsigned long        defaultV6Label)
 : log_file(NULL), log_binary(false), log_flush(false), checksum_enable(false),
   flow_id(flowId),  flow_label(defaultV6Label),
   socket_item(NULL), socket(NULL), sink(NULL), seq_num(0), next_event(NULL), 
   started(false), timer_mgr(timerMgr), socket_list(socketList)
{ 
    tx_timer.SetListener(this, &MgenFlow::OnTxTimeout);
    tx_timer.SetInterval(1.0);
    tx_timer.SetRepeat(-1);
    
    event_timer.SetListener(this, &MgenFlow::OnEventTimeout);
    event_timer.SetInterval(1.0);
    event_timer.SetRepeat(-1);
}

MgenFlow::~MgenFlow()
{
    if (event_timer.IsActive()) event_timer.Deactivate();
    if (tx_timer.IsActive()) tx_timer.Deactivate();
    event_list.Destroy();
    if (socket_item) socket_item->Close();
}

bool MgenFlow::InsertEvent(MgenEvent* theEvent, bool mgenStarted, double currentTime)
{
    double eventTime = theEvent->GetTime();
    if (mgenStarted)
    {   
        // Process "immediate" events or enqueue "scheduled" events     
        if (eventTime < currentTime)
        {
            theEvent->SetTime(currentTime);
            event_list.Precede(next_event, theEvent);
            if (ValidateEvent(theEvent))
            {
                Update(theEvent);
            }
            else
            {
                event_list.Remove(theEvent);
                return false;
            }
        }
        else
        {
            event_list.Insert(theEvent);
            if (!ValidateEvent(theEvent))
            {
                event_list.Remove(theEvent);
                return false;
            }
            // Activate/reschedule "event_timer" as needed
            if (event_timer.IsActive())
            {
                double nextTime = next_event->GetTime();
                if (eventTime < nextTime)
                {
                    next_event = theEvent;
                    event_timer.SetInterval(eventTime - currentTime);
                    event_timer.Reschedule();
                }
            }
            else
            {
                next_event = theEvent;
                event_timer.SetInterval(eventTime - currentTime);
                timer_mgr.ActivateTimer(event_timer);
            }
        }
    }
    else
    {
        eventTime = eventTime > 0.0 ? eventTime : 0.0;
        theEvent->SetTime(eventTime);
        event_list.Insert(theEvent);
        if (!ValidateEvent(theEvent))
        {
            event_list.Remove(theEvent);
            return false;
        }
    }
    return true;
}  // end MgenFlow::InsertEvent()

bool MgenFlow::ValidateEvent(const MgenEvent* event)
{
    //  Validate the event by it's position in the list with respect to its neighbor types   
    const MgenEvent* prevEvent = (MgenEvent*)event->Prev();
    MgenEvent::Type prevType = prevEvent ? prevEvent->GetType() : 
                                           MgenEvent::INVALID_TYPE;
    switch (event->GetType())
    {
        case MgenEvent::ON:
            if ((MgenEvent::MOD == prevType) ||
                (MgenEvent::ON  == prevType))
            {
                DMSG(0, "MgenFlow::InsertEvent() inappropriate ON event.\n");
                return false;   
            }
            break;
        case MgenEvent::MOD:
            if ((MgenEvent::OFF == prevType) ||
                (MgenEvent::INVALID_TYPE  == prevType))
            {
                DMSG(0, "MgenFlow::InsertEvent() inappropriate MOD event.\n");
                return false;   
            }
            break;
        case MgenEvent::OFF:
            if ((MgenEvent::OFF == prevType) ||
                (MgenEvent::INVALID_TYPE  == prevType))
            {
                DMSG(0, "MgenFlow::InsertEvent() inappropriate OFF event.\n");
                return false;   
            }
            break;
        case MgenEvent::INVALID_TYPE:
            ASSERT(0);  // this should never occur
            return false;                
    }  // end switch(event->GetType())
    return true;
}  // end MgenFlow::ValidateEvent()

bool MgenFlow::Start(double offsetTime)
{
    // Defer actual start until "offsetTime", but process events
    // so flow state is appropriate for given offset.
    MgenEvent* nextEvent = (MgenEvent*)event_list.Head();
    if (!nextEvent) 
    {
        DMSG(0, "MgenFlow::Start() flow with empty event list!\n");
        return false; 
    }
    while (nextEvent)
    {
        if (nextEvent->GetTime() <= offsetTime)
        {
            Update(nextEvent);
            nextEvent = (MgenEvent*)nextEvent->Next();
        }
        else
        {
            break;   
        }   
    }
    if ((next_event = nextEvent))
    {
        double currentTime = (offsetTime > 0.0) ? offsetTime : 0.0;
        double nextInterval = next_event->GetTime() - currentTime;
        nextInterval = nextInterval > 0.0 ? nextInterval : 0.0;
        event_timer.SetInterval(nextInterval);
        timer_mgr.ActivateTimer(event_timer);
        started = true;
        return true;
    }
    started = true;
    return true;
}  // end MgenFlow::Start()


bool MgenFlow::Update(const MgenEvent* event)
{
    switch (event->GetType())
    {
        case MgenEvent::ON:
        {   
            ASSERT(!tx_timer.IsActive());
            protocol = event->GetProtocol();
            src_port = (event->OptionIsSet(MgenEvent::SRC)) ? event->GetSrcPort() : 0;

            switch (protocol)
            {
                case MgenBaseEvent::UDP:
                    socket_item = socket_list.GetItem(ProtoSocket::UDP, src_port);
                    if (!socket_item)
                    {
                        DMSG(0, "MgenFlow::Update() Error: unable to get socket item.\n");
                        return false; 
                    }
                    // This increments socketItem reference_count, opens socket as needed
                    // note that socket must be opened as IPv4 or IPv6 type
                    if (!socket_item->Open(event->GetDstAddr().GetType(), true)) 
                    {
                        DMSG(0, "MgenFlow::Update() Error: socket open error.\n");
                        return false;
                    }
                    socket = &socket_item->GetSocket();
                    break;
                
                case MgenBaseEvent::TCP:
                    DMSG(0, "MgenFlow::Update() TCP not quite yet supported (almost).\n");
                    return false;
                    
                case MgenBaseEvent::SINK:
                    if (!sink)
                    {
                        DMSG(0, "MgenFlow::Update() SINK requires MgenSink attachment.\n");
                        return false;
                    }
                    break;
                    
                default:
                    DMSG(0, "MgenFlow::Update() invalid protocol type.\n");
                    return false;
            }
            if (host_addr.IsValid() && socket) host_addr.SetPort(socket->GetPort());
            // Note the lack of "break" here is _intentional_
        }
        case MgenEvent::MOD:
        {
            // Generic MGEN flow options
            if (event->OptionIsSet(MgenEvent::DST))
                dst_addr = event->GetDstAddr();
            if (event->OptionIsSet(MgenEvent::PATTERN))
                pattern = event->GetPattern();
            if (event->OptionIsSet(MgenEvent::SEQUENCE)) 
                seq_num = event->GetSequence();
            if (event->OptionIsSet(MgenEvent::LABEL))
                flow_label = event->GetFlowLabel();
            
            // Socket-specific MGEN flow options
            if (socket_item)
            {
                if (event->OptionIsSet(MgenEvent::TXBUFFER))
                {
                    if (!socket_item->SetTxBufferSize(event->GetTxBuffer()))
                        DMSG(0, "MgenFlow::Update() error setting socket tx buffer\n");    
                }
                if (event->OptionIsSet(MgenEvent::TOS))
                {
                    if (!socket_item->SetTOS(event->GetTOS()))
                        DMSG(0, "MgenFlow::Update() error setting socket TOS value\n"); 
                }
                if (event->OptionIsSet(MgenEvent::TTL))
                {
                    //TRACE("setting socket ttl: %d\n", event->GetTTL());
                    if (!socket_item->SetTTL(event->GetTTL()))
                        DMSG(0, "MgenFlow::Update() error setting socket multicast interface\n");    
                }
                if (event->OptionIsSet(MgenEvent::INTERFACE))
                {
                    if (!socket_item->SetMulticastInterface(event->GetInterface()))
                        DMSG(0, "MgenFlow::Update() error setting socket multicast interface\n");    
                }
            }
            
            // Schedule tx_timer as needed
            double nextInterval = pattern.GetPktInterval();
            if (nextInterval < 0.0)  // 0.0 pkt/sec transmission rate
            {
                if (tx_timer.IsActive()) 
                    tx_timer.Deactivate();
                else if (sink && sink->IsNotifying())
                    sink->StopNotifying();
            }
            else if (nextInterval > 0.0)
            {
                if (sink && sink->IsNotifying())
                    sink->StopNotifying();
                if (tx_timer.IsActive())
                {
                    double elapsedTime = last_interval - tx_timer.GetTimeRemaining();
                    if (nextInterval < elapsedTime)
                        tx_timer.SetInterval(0.0);
                    else
                        tx_timer.SetInterval(nextInterval - elapsedTime);
                    last_interval = nextInterval;
                }
                else
                {
                    tx_timer.SetInterval(0.0);
                    timer_mgr.ActivateTimer(tx_timer);
                    last_interval = 0.0;
                }
            }
            else
            {
                if (tx_timer.IsActive()) tx_timer.Deactivate();
                if (MgenBaseEvent::SINK == protocol)
                {
                    if (sink)
                    {
                        if (!sink->IsNotifying()) 
                            sink->StartNotifying(this);
                        tx_timer.SetInterval(0.0);
                        last_interval = 0.0;
                    }
                    else
                    {
                        DMSG(0, "MgenFlow::Update() error: SINK requires MgenSink attachment!\n");
                        return false;
                    }
                }
                else
                {
                    DMSG(0, "MgenFlow::Update() error: Undefined packet rate (-1.0) for non-SINK flow!\n"); 
                    return false;
                }
            }
            break;
        }
            
        case MgenEvent::OFF:
        {
            if (sink && sink->IsNotifying())
                sink->StopNotifying();
            if (tx_timer.IsActive()) 
                tx_timer.Deactivate();
            if (socket_item) 
            {
                socket_item->Close();
                socket_item = NULL;
                socket = NULL;
            }
            break;
        }
            
        case MgenEvent::INVALID_TYPE:
            ASSERT(0);  // this should never occur
            return false;
    }  // end switch(event->GetType())
    return true;
}  // end MgenFlow::Update()


bool MgenFlow::OnTxTimeout(ProtoTimer& /*theTimer*/)
{
    MgenMsg theMsg;
    unsigned short len = pattern.GetPktSize();
    theMsg.SetMsgLen(len);
    theMsg.SetFlowId(flow_id);
    theMsg.SetSeqNum(seq_num++);
    struct timeval currentTime;
    ProtoSystemTime(currentTime);
    theMsg.SetDstAddr(dst_addr);
    theMsg.SetTxTime(currentTime);
    if (host_addr.IsValid())
        theMsg.SetHostAddr(host_addr); 
        
    
    // GPS info
    theMsg.SetGPSStatus(MgenMsg::INVALID_GPS); 
    theMsg.SetGPSLatitude(999);
    theMsg.SetGPSLongitude(999); 
    theMsg.SetGPSAltitude(-999);
#ifdef HAVE_GPS
    if (get_position)
    {
        GPSPosition pos;
        get_position(get_position_data, pos);
        if (pos.xyvalid)
        {
            theMsg.SetGPSLatitude(pos.y);
            theMsg.SetGPSLongitude(pos.x);
            if (pos.zvalid)
                theMsg.SetGPSAltitude((long)(pos.z+0.5));
            if (pos.stale)
                theMsg.SetGPSStatus(MgenMsg::STALE);
            else
                theMsg.SetGPSStatus(MgenMsg::CURRENT);
        }
    }
    if (payload_handle)
    {
         unsigned char payloadLen = 0;
         GPSGetMemory(payload_handle, 0, (char*)&payloadLen, 1);
         if (payloadLen)
             theMsg.SetPayload(GPSGetMemoryPtr(payload_handle, 1), payloadLen);
    }
#endif // HAVE_GPS
        
    char txBuffer[MgenMsg::MAX_SIZE];
    len = theMsg.Pack(txBuffer, checksum_enable);
#ifdef HAVE_IPV6
    if (ProtoAddress::IPv6 == dst_addr.GetType())
    {
        if (socket && !socket->SetFlowLabel(flow_label))
            DMSG(0, "MgenFlow::OnTxTimeout canot set flow label\n");
    }
#endif //HAVE_IPV6
    // Send message, checking for error
    // (log only on success)
    bool success = false;
    switch (protocol)
    {
        case MgenBaseEvent::UDP:
            if (!socket->SendTo(txBuffer,len, dst_addr))
            {        
#ifndef _WIN32_WCE
                if (EAGAIN != errno)
                    DMSG(0, "MgenFlow::OnTxTimeout() socket_item->SendTo() error: %s\n", GetErrorString());   
                else
#endif // !_WIN32_WCE
                    DMSG(1, "MgenFlow::OnTxTimeout() socket_item->SendTo() error: %s\n", GetErrorString());   
            }
            else
            {
                success = true;
            }
            break;
        case MgenBaseEvent::SINK:
            success = sink->SendMgenMessage(txBuffer, len, dst_addr);
            break;
        default:
            ASSERT(0);  // should never occur
            break;
    }
    if (!success)
    {
#ifdef SIMULATE
        DMSG(0, "MgenFlow::OnTxTimeout() error sending message.\n");
#endif // SIMULATE
        // message was not sent, so sequence number is decremented back one
        seq_num--;
    }
    else if (log_file)
    {
        theMsg.LogSendEvent(log_file, log_binary, txBuffer, log_flush);
    }
    
    double nextInterval = pattern.GetPktInterval();
    if (nextInterval > 0.0)       // normal scheduled transmission event
    {
        tx_timer.SetInterval(nextInterval);
        if (sink && sink->IsNotifying()) sink->StopNotifying();
        if (!tx_timer.IsActive())
        {
            timer_mgr.ActivateTimer(tx_timer);
            last_interval = 0.0;
        }
        else
        {
            last_interval = nextInterval;
        }
        return true;
    }
    else if (nextInterval < 0.0)  // Flow pattern rate is 0.0 message/sec
    {
        if(tx_timer.IsActive()) tx_timer.Deactivate();
        last_interval = 0.0;
        if (sink && sink->IsNotifying())
            sink->StopNotifying();
        return false;
    }
    else  // (nextInterval == 0.0) // Flow pattern rate is undefined (SINK only for now)
    {
        if (tx_timer.IsActive()) tx_timer.Deactivate();
        if (MgenBaseEvent::SINK == protocol)
        {
            if (sink)
            {
                // Trigger async output notification
                if (!sink->IsNotifying()) 
                {
                    sink->StartNotifying(this);
                    last_interval = 0.0;
                }
            }
            else
            {
                DMSG(0, "MgenFlow::Update() error: SINK requires MgenSink attachment!\n");
            }
        }
        else
        {
            DMSG(0, "MgenFlow::Update() error: Undefined packet rate (-1.0) for non-SINK flow!\n"); 
        }
        return false;
    }
}   // end MgenFlow::OnTxTimeout()

bool MgenFlow::IsActive() const
{
    if (!started) return false;
    if (next_event)
    {
        if (MgenEvent::ON != next_event->GetType())
            return true;
        else
            return false;
    }
    else
    {
        const MgenEvent* lastEvent = (const MgenEvent*)event_list.Tail();
        if (lastEvent && (MgenEvent::OFF != lastEvent->GetType()))
            return true;
        else
            return false;   
    }    
}  // end MgenFlow::IsActive();

double MgenFlow::GetCurrentOffset() const
{
   double currentOffset = -1.0;
   if (started) 
   {
       if (next_event)
       {
           currentOffset = next_event->GetTime() - 
                           event_timer.GetTimeRemaining();
       }
       else
       {
           const MgenEvent* event = (const MgenEvent*)event_list.Tail();
           currentOffset =  event ? event->GetTime() : -1.0;
       }  
   }
   return currentOffset;
}  // end MgenFlow::CurrentTime()

bool MgenFlow::OnEventTimeout(ProtoTimer& /*theTimer*/)
{
    ASSERT(next_event);
    
    // 1) Update flow as needed using "next_event"
    Update(next_event);
    
    // 2) Set (or kill) event_timer according to "next_event->next"
    double currentTime = next_event->GetTime();
    next_event = (MgenEvent*)next_event->Next();
    if (next_event)
    {
        double nextInterval = next_event->GetTime() - currentTime;
        nextInterval = nextInterval > 0.0 ? nextInterval : 0.0;
        event_timer.SetInterval(nextInterval);
        return true;
    }
    else
    {
        event_timer.Deactivate();
        return false;
    }
}  // end MgenFlow::OnEventTimeout()

//////////////////////////////////////////////////////////////////
// MgenFlowList implementation
MgenFlowList::MgenFlowList()
 : head(NULL), tail(NULL)
{
}

MgenFlowList::~MgenFlowList()
{
    Destroy();
}

void MgenFlowList::Destroy()
{
    MgenFlow* next = head;
    while (next)
    {
        MgenFlow* current = next;
        next = next->next;
        delete current;   
    }
    head = tail = (MgenFlow*)NULL;
}  // end MgenFlowList::Destroy()

void MgenFlowList::Append(MgenFlow* theFlow)
{
  theFlow->next = NULL;
  if ((theFlow->prev = tail))
    tail->next = theFlow;
  else
    head = theFlow;
  tail = theFlow;
}  // end MgenFlowList::Append()

void MgenFlowList::SetLogFile(FILE* filePtr, bool binary, bool flush)
{   
    MgenFlow* current = head;
    while (current)
    {
        current->SetLogFile(filePtr, binary, flush);
        current = current->next;
    }
}  // end MgenFlowList::SetLogFile()

void MgenFlowList::SetHostAddress(const ProtoAddress& hostAddr)
{   
    MgenFlow* current = head;
    while (current)
    {
        current->SetHostAddress(hostAddr);
        current = current->next;
    }
}  // end MgenFlowList::SetHostAddress()

void MgenFlowList::ClearHostAddress()
{   
    MgenFlow* current = head;
    while (current)
    {
        current->ClearHostAddress();
        current = current->next;
    }
}  // end MgenFlowList::ClearHostAddress()

void MgenFlowList::EnableChecksums()
{   
    MgenFlow* current = head;
    while (current)
    {
        current->EnableChecksums();
        current = current->next;
    }
}  // end MgenFlowList::SetLogFile()

void MgenFlowList::SetSink(MgenSink* theSink)
{   
    MgenFlow* current = head;
    while (current)
    {
        current->SetSink(theSink);
        current = current->next;
    }
}  // end MgenFlowList::SetSink()

//Set the default IPv6 flow label.
#ifdef HAVE_IPV6
void MgenFlowList::SetDefaultLabel(UINT32 label)
{
    MgenFlow* current = head;
    while (current)
    {
        current->SetLabel(label); 
        current = current->next;
    }
}  // MgenFlowList::SetDefaultLabel()
#endif //HAVE_IPV6

MgenFlow* MgenFlowList::FindFlowById(unsigned int flowId)
{
    MgenFlow* prev = tail;
    while (prev)
    { 
        if (flowId == prev->flow_id) return prev;
        prev = prev->prev;
    }
    return (MgenFlow*)NULL;
}  // end MgenFlowList::FindFlowById()

bool MgenFlowList::Start(double offsetTime)
{
    bool result = false;
    MgenFlow* next = head;
    while (next)
    {
        result |= next->Start(offsetTime);
        next = next->next;
    }
    return result;
}  // end MgenFlowList::Start()

double MgenFlowList::GetCurrentOffset() const
{
    double currentOffset = -1.0;
    MgenFlow* next = head;
    while (next)
    {
        if (next->event_timer.IsActive())
        {
            return next->GetCurrentOffset();
        }
        else
        {
            double nextOffset =  next->GetCurrentOffset();
            if (nextOffset > currentOffset) 
                currentOffset = nextOffset;  
        }   
        next = next->next;
    }
    return currentOffset;
}  // end MgenFlowList::GetCurrentOffset()

bool MgenFlowList::SaveFlowSequences(FILE* file) const
{
    MgenFlow* next = head;
    while (next)
    {
        if (next->IsActive() || (NULL != next->next_event))
        {
            double offset;
            if (next->IsActive())
            {
                offset = next->GetCurrentOffset();
            }
            else
            {
                offset = next->next_event->GetTime();
            }
            fprintf(file, "%f MOD %lu SEQUENCE %lu\n",
                          offset, next->flow_id, next->seq_num);                
        }
        next = next->next;
    }
    return true;
}  // end MgenFlowList::SaveFlowState()


syntax highlighted by Code2HTML, v. 0.9.1