/***************************************************************************/
/*                                                                         */
/* Project:     OpenSLP - OpenSource implementation of Service Location    */
/*              Protocol Version 2                                         */
/*                                                                         */
/* File:        slpd_outgoing.c                                            */
/*                                                                         */
/* Abstract:    Handles "outgoing" network conversations requests made by  */
/*              other agents to slpd. (slpd_incoming.c handles reqests     */
/*              made by other agents to slpd)                              */
/*                                                                         */
/*-------------------------------------------------------------------------*/
/*                                                                         */
/*     Please submit patches to http://www.openslp.org                     */
/*                                                                         */
/*-------------------------------------------------------------------------*/
/*                                                                         */
/* Copyright (C) 2000 Caldera Systems, Inc                                 */
/* All rights reserved.                                                    */
/*                                                                         */
/* Redistribution and use in source and binary forms, with or without      */
/* modification, are permitted provided that the following conditions are  */
/* met:                                                                    */ 
/*                                                                         */
/*      Redistributions of source code must retain the above copyright     */
/*      notice, this list of conditions and the following disclaimer.      */
/*                                                                         */
/*      Redistributions in binary form must reproduce the above copyright  */
/*      notice, this list of conditions and the following disclaimer in    */
/*      the documentation and/or other materials provided with the         */
/*      distribution.                                                      */
/*                                                                         */
/*      Neither the name of Caldera Systems nor the names of its           */
/*      contributors may be used to endorse or promote products derived    */
/*      from this software without specific prior written permission.      */
/*                                                                         */
/* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS     */
/* `AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT      */
/* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR   */
/* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE CALDERA      */
/* SYSTEMS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, */
/* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT        */
/* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;  LOSS OF USE,  */
/* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON       */
/* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT */
/* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   */
/* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.    */
/*                                                                         */
/***************************************************************************/


/*=========================================================================*/
/* slpd includes                                                           */
/*=========================================================================*/
#include "slpd_outgoing.h"
#include "slpd_property.h"
#include "slpd_process.h"
#include "slpd_log.h"
#include "slpd_knownda.h"


/*=========================================================================*/
/* common code includes                                                    */
/*=========================================================================*/
#include "slp_message.h"


/*=========================================================================*/
SLPList G_OutgoingSocketList = {0,0,0};
/*=========================================================================*/


/*-------------------------------------------------------------------------*/
void OutgoingDatagramRead(SLPList* socklist, SLPDSocket* sock)
/*-------------------------------------------------------------------------*/
{
    int                 bytesread;
    int                 peeraddrlen = sizeof(struct sockaddr_in);

    bytesread = recvfrom(sock->fd,
                         sock->recvbuf->start,
                         SLP_MAX_DATAGRAM_SIZE,
                         0,
                         (struct sockaddr *) &(sock->peeraddr),
                         &peeraddrlen);
    if ( bytesread > 0 )
    {
        sock->recvbuf->end = sock->recvbuf->start + bytesread;

        SLPDProcessMessage(&(sock->peeraddr),
                           sock->recvbuf,
                           &(sock->sendbuf));

        /* Completely ignore the message */
    }
}


/*-------------------------------------------------------------------------*/
void OutgoingStreamReconnect(SLPList* socklist, SLPDSocket* sock)
/*-------------------------------------------------------------------------*/
{
#ifdef _WIN32
    u_long              fdflags;
#else    
    int                 fdflags;
#endif

    /*-----------------------------------------------------------------*/
    /* If socket is already being reconnected but is reconnect blocked */
    /* just return.  Blocking connect sockets will eventually time out */
    /*-----------------------------------------------------------------*/
    if(sock->state == STREAM_CONNECT_BLOCK)
    {
        return;
    }

    #ifdef DEBUG
    /* Log that reconnect warning */
    SLPDLog("WARNING: Reconnect to agent at %s.  Agent may not be making efficient \n"
            "         use of TCP.\n",
            inet_ntoa(sock->peeraddr.sin_addr));
    #endif

    /*----------------------------------------------------------------*/
    /* Make sure we have not reconnected too many times               */
    /* We only allow SLPD_CONFIG_MAX_RECONN reconnection retries      */
    /* before we stop                                                 */
    /*----------------------------------------------------------------*/
    sock->reconns += 1;
    if ( sock->reconns > SLPD_CONFIG_MAX_RECONN )
    {
        sock->state = SOCKET_CLOSE;
        SLPDLog("WARNING: Reconnect tries to agent at %s exceeded maximum. It\n"
                "         is possible that the agent is malicious.  Check it out!\n",
                inet_ntoa(sock->peeraddr.sin_addr));
        return;
    }

    /*----------------------------------------------------------------*/
    /* Close the existing socket to clean the stream  and open an new */
    /* socket                                                         */
    /*----------------------------------------------------------------*/
    CloseSocket(sock->fd);
    sock->fd = socket(PF_INET,SOCK_STREAM,0);
    if ( sock->fd < 0 )
    {
        sock->state = SOCKET_CLOSE;
        return;
    }

    /*---------------------------------------------*/
    /* Set the new socket to enable nonblocking IO */
    /*---------------------------------------------*/
#ifdef _WIN32
    fdflags = 1;
    ioctlsocket(sock->fd, FIONBIO, &fdflags);
#else
    fdflags = fcntl(sock->fd, F_GETFL, 0);
    fcntl(sock->fd,F_SETFL, fdflags | O_NONBLOCK);
#endif


    /*--------------------------*/
    /* Connect a the new socket */
    /*--------------------------*/
    if ( connect(sock->fd, 
                 (struct sockaddr *)&(sock->peeraddr), 
                 sizeof(struct sockaddr_in)) )
    {
#ifdef _WIN32
        if ( WSAEWOULDBLOCK == WSAGetLastError() )
#else
        if ( errno == EINPROGRESS )
#endif
        {
            /* Connect blocked */
            sock->state = STREAM_CONNECT_BLOCK;
            return;
        }
    }

    /* Connection occured immediately. Set to WRITE_FIRST so whole */
    /* packet will be written                                      */
    sock->state = STREAM_WRITE_FIRST;
}


/*-------------------------------------------------------------------------*/
void OutgoingStreamRead(SLPList* socklist, SLPDSocket* sock)
/*-------------------------------------------------------------------------*/
{
    int     bytesread, recvlen;
    char    peek[16];
    int     peeraddrlen = sizeof(struct sockaddr_in);

    if ( sock->state == STREAM_READ_FIRST )
    {
        /*---------------------------------------------------*/
        /* take a peek at the packet to get size information */
        /*---------------------------------------------------*/
        bytesread = recvfrom(sock->fd,
                             peek,
                             16,
                             MSG_PEEK,
                             (struct sockaddr *)&(sock->peeraddr),
                             &peeraddrlen);
        if ( bytesread >= 5 && *peek == 2 )
        {
            recvlen = AsUINT24(peek + 2);
            /* one byte is minimum */
            if (recvlen <= 0)
                recvlen = 1;
            /* allocate the recvbuf big enough for the whole message */
            sock->recvbuf = SLPBufferRealloc(sock->recvbuf, recvlen);
            if ( sock->recvbuf )
            {
                sock->state = STREAM_READ;
            }
            else
            {
                SLPDLog("INTERNAL_ERROR - out of memory!\n");
                sock->state = SOCKET_CLOSE;
            }
        }
        else if ( bytesread == -1 )
        {
#ifdef _WIN32
            if ( WSAEWOULDBLOCK != WSAGetLastError() )
#else
            if ( errno != EWOULDBLOCK )
#endif
            {
                /* Error occured or connection was closed. Try to reconnect */
                /* Socket will be closed if connect times out               */
                OutgoingStreamReconnect(socklist,sock);
            }
        }       
        else
	{
            sock->state = SOCKET_CLOSE;
	}
    }

    if ( sock->state == STREAM_READ )
    {
        /*------------------------------*/
        /* recv the rest of the message */
        /*------------------------------*/
        bytesread = recv(sock->fd,
                         sock->recvbuf->curpos,
                         sock->recvbuf->end - sock->recvbuf->curpos,
                         0);
        if ( bytesread > 0 )
        {
            /* reset age because of activity */
            sock->age = 0;

            /* move buffer pointers */
            sock->recvbuf->curpos += bytesread;

            /* check to see if everything was read */
            if ( sock->recvbuf->curpos == sock->recvbuf->end )
            {
                switch ( SLPDProcessMessage(&(sock->peeraddr),
                                            sock->recvbuf,
                                            &(sock->sendbuf)) )
                {
                case SLP_ERROR_DA_BUSY_NOW:
                    sock->state = STREAM_WRITE_WAIT;
                    break;
                case SLP_ERROR_PARSE_ERROR:
                case SLP_ERROR_VER_NOT_SUPPORTED:
                    sock->state = SOCKET_CLOSE;
                    break;
                default:
                    /* End of outgoing message exchange. Unlink   */
                    /* send buf from to do list and free it       */
                    SLPBufferFree(sock->sendbuf);
                    sock->sendbuf = NULL;
                    sock->state = STREAM_WRITE_FIRST;
                    /* clear the reconnection count since we actually
                     * transmitted a successful message exchange
                     */
                    sock->reconns = 0;
                    break;
                }
            }
        }
        else
        {
#ifdef _WIN32
            if ( WSAEWOULDBLOCK != WSAGetLastError() )
#else
            if ( errno != EWOULDBLOCK )
#endif
            {
                /* Error occured or connection was closed. Try to reconnect */
                /* Socket will be closed if connect times out               */
                OutgoingStreamReconnect(socklist,sock);
            }
        }
    }
}


/*-------------------------------------------------------------------------*/
void OutgoingStreamWrite(SLPList* socklist, SLPDSocket* sock)
/*-------------------------------------------------------------------------*/
{
    int        byteswritten;
    int        flags = 0;

#if defined(MSG_DONTWAIT)
    flags = MSG_DONTWAIT;
#endif

    if ( sock->state == STREAM_WRITE_FIRST )
    {
        /* set sendbuf to the first item in the send list if it is not set */
        if(sock->sendbuf == NULL)
        {
            sock->sendbuf = (SLPBuffer)sock->sendlist.head;
            if ( sock->sendbuf == NULL )
            {
                /* there is nothing in the to do list */
                sock->state = STREAM_CONNECT_IDLE;
                return;
            }
            /* Unlink the send buffer we are sending from the send list */
            SLPListUnlink(&(sock->sendlist),(SLPListItem*)(sock->sendbuf));
        }

        /* make sure that the start and curpos pointers are the same */
        sock->sendbuf->curpos = sock->sendbuf->start;
        sock->state = STREAM_WRITE;
    }

    if ( sock->sendbuf->end - sock->sendbuf->start > 0 )
    {
        byteswritten = send(sock->fd,
                            sock->sendbuf->curpos,
                            sock->sendbuf->end - sock->sendbuf->start,
                            flags);
        if ( byteswritten > 0 )
        {
            /* reset age because of activity */
            sock->age = 0; 
            
            /* move buffer pointers */
            sock->sendbuf->curpos += byteswritten;

            /* check to see if everything was written */
            if ( sock->sendbuf->curpos == sock->sendbuf->end )
            {
                /* Message is completely sent. Set state to read the reply */
                sock->state = STREAM_READ_FIRST;
            }
        }
        else
        {
#ifdef _WIN32
            if ( WSAEWOULDBLOCK != WSAGetLastError() )
#else
            if ( errno != EWOULDBLOCK )
#endif
            {
                /* Error occured or connection was closed. Try to reconnect */
                /* Socket will be closed if connect times out               */
                OutgoingStreamReconnect(socklist,sock);
            }
        }
    }
    else
    {
        /* nothing to write */
#ifdef DEBUG
        SLPDLog("yikes, an empty socket is being written!\n");
#endif
        sock->state = SOCKET_CLOSE;
    }
}

/*=========================================================================*/
SLPDSocket* SLPDOutgoingConnect(struct in_addr* addr)
/* Get a pointer to a connected socket that is associated with the         */
/* outgoing socket list.  If a connected socket already exists on the      */
/* outgoing list, a pointer to it is returned, otherwise a new connection  */
/* is made and added to the outgoing list                                  */
/*                                                                         */
/* addr (IN) the address of the peer a connection is desired for           */
/*                                                                         */
/* returns: pointer to socket or null on error                             */
/*=========================================================================*/
{
    SLPDSocket* sock = (SLPDSocket*)G_OutgoingSocketList.head;
    while ( sock )
    {
        if ( sock->state == STREAM_CONNECT_IDLE ||
             sock->state > SOCKET_PENDING_IO )
        {
            if ( sock->peeraddr.sin_addr.s_addr == addr->s_addr )
            {
                break;
            }
        }
        sock = (SLPDSocket*)sock->listitem.next;    
    }

    if ( sock == 0 )
    {
        sock = SLPDSocketCreateConnected(addr);
        if(sock)
        {
            SLPListLinkTail(&(G_OutgoingSocketList),(SLPListItem*)sock);
        }
    }

    return sock;
}

/*=========================================================================*/
void SLPDOutgoingDatagramWrite(SLPDSocket* sock)
/* Add a ready to write outgoing datagram socket to the outgoing list.     */
/* The datagram will be written then sit in the list until it ages out     */
/* (after  net.slp.unicastMaximumWait)                                     */
/*                                                                         */
/* sock (IN) the socket that will belong on the outgoing list              */
/*=========================================================================*/
{
    if ( sendto(sock->fd,
                sock->sendbuf->start,
                sock->sendbuf->end - sock->sendbuf->start,
                0,
                (struct sockaddr *) &(sock->peeraddr),
                sizeof(struct sockaddr_in)) >= 0 )
    {
        /* Link the socket into the outgoing list so replies will be */
        /* processed                                                 */
        SLPListLinkHead(&G_OutgoingSocketList,(SLPListItem*)(sock));
    }
    else
    {
        #ifdef DEBUG
        SLPDLog("ERROR: Data could not send() in SLPDOutgoingDatagramWrite()");
        #endif
        SLPDSocketFree(sock);
    }   
}


/*=========================================================================*/
void SLPDOutgoingHandler(int* fdcount,
                         fd_set* readfds,
                         fd_set* writefds)

/* Handles all outgoing requests that are pending on the specified file    */
/* discriptors                                                             */
/*                                                                         */
/* fdcount  (IN/OUT) number of file descriptors marked in fd_sets          */
/*                                                                         */
/* readfds  (IN) file descriptors with pending read IO                     */
/*                                                                         */
/* writefds  (IN) file descriptors with pending read IO                    */
/*=========================================================================*/
{
    SLPDSocket* sock;
    sock = (SLPDSocket*)G_OutgoingSocketList.head;
    while ( sock && *fdcount )
    {
        if ( FD_ISSET(sock->fd,readfds) )
        {
            switch ( sock->state )
            {
            case DATAGRAM_MULTICAST:
            case DATAGRAM_BROADCAST:
            case DATAGRAM_UNICAST:
                OutgoingDatagramRead(&G_OutgoingSocketList,sock);
                break;

            case STREAM_READ:
            case STREAM_READ_FIRST:
                OutgoingStreamRead(&G_OutgoingSocketList,sock);
                break;

            default:
                /* No SOCKET_LISTEN sockets should exist */
                break;
            }

            *fdcount = *fdcount - 1;
        }
        else if ( FD_ISSET(sock->fd,writefds) )
        {
            switch ( sock->state )
            {
            
            case STREAM_CONNECT_BLOCK:
                sock->age = 0;
                sock->state = STREAM_WRITE_FIRST;

            case STREAM_WRITE:
            case STREAM_WRITE_FIRST:
                OutgoingStreamWrite(&G_OutgoingSocketList,sock);
                break;

            default:
                break;
            }

            *fdcount = *fdcount - 1;
        }

        sock = (SLPDSocket*)sock->listitem.next; 
    }                               
}


/*=========================================================================*/
void SLPDOutgoingAge(time_t seconds)
/*=========================================================================*/
{
    SLPDSocket* del  = 0;
    SLPDSocket* sock = (SLPDSocket*)G_OutgoingSocketList.head;

    while ( sock )
    {
        switch ( sock->state )
        {
        case DATAGRAM_MULTICAST:
        case DATAGRAM_BROADCAST:
        case DATAGRAM_UNICAST:
            if ( sock->age > G_SlpdProperty.unicastMaximumWait / 1000 )
            {
                del = sock;
            }
            sock->age = sock->age + seconds;
            break;

        case STREAM_READ_FIRST:
        case STREAM_WRITE_FIRST:
            sock->age = 0;
            break;
        
        case STREAM_CONNECT_BLOCK:
        case STREAM_READ:
        case STREAM_WRITE:
            if ( G_OutgoingSocketList.count > SLPD_COMFORT_SOCKETS )
            {
                /* Accelerate ageing cause we are low on sockets */
                if ( sock->age > SLPD_CONFIG_BUSY_CLOSE_CONN )
                {
                    /* Remove peer from KnownDAs since it might be dead */
                    SLPDKnownDARemove(&(sock->peeraddr.sin_addr));
                    del = sock;
                }
            }
            else
            {
                if ( sock->age > SLPD_CONFIG_CLOSE_CONN )
                {
                    /* Remove peer from KnownDAs since it might be dead */
                    SLPDKnownDARemove(&(sock->peeraddr.sin_addr));
                    del = sock;
                }
            }
            sock->age = sock->age + seconds;
            break;

        case STREAM_CONNECT_IDLE:
            if ( G_OutgoingSocketList.count > SLPD_COMFORT_SOCKETS )
            {
                /* Accelerate ageing cause we are low on sockets */
                if ( sock->age > SLPD_CONFIG_BUSY_CLOSE_CONN )
                {
                    del = sock;
                }
            }
            else
            {
                if ( sock->age > SLPD_CONFIG_CLOSE_CONN )
                {
                    del = sock;
                }
            }
            sock->age = sock->age + seconds;
            break;

        case STREAM_WRITE_WAIT:
            /* this when we are talking to a busy DA */
            sock->age = 0;
            sock->state = STREAM_WRITE_FIRST;
            break;

        default:
            /* don't age the other sockets at all */
            break;
        }

        sock = (SLPDSocket*)sock->listitem.next;

        if ( del )
        {
            SLPDSocketFree((SLPDSocket*)SLPListUnlink(&G_OutgoingSocketList,(SLPListItem*)del));
            del = 0;
        }
    }                                                 
}

/*=========================================================================*/
int SLPDOutgoingInit()
/* Initialize outgoing socket list to have appropriate sockets for all     */
/* network interfaces                                                      */
/*                                                                         */
/* list     (IN/OUT) pointer to a socket list to be filled with sockets    */
/*                                                                         */
/* Returns  Zero on success non-zero on error                              */
/*=========================================================================*/
{
    /*------------------------------------------------------------*/
    /* First, remove all of the sockets that might be in the list */
    /*------------------------------------------------------------*/
    while ( G_OutgoingSocketList.count )
    {
        SLPDSocketFree((SLPDSocket*)SLPListUnlink(&G_OutgoingSocketList,(SLPListItem*)G_OutgoingSocketList.head));
    }

    return 0;
}


/*=========================================================================*/
int SLPDOutgoingDeinit(int graceful)
/* Deinitialize incoming socket list to have appropriate sockets for all   */
/* network interfaces                                                      */
/*                                                                         */
/* graceful (IN) Do not close sockets with pending writes                  */
/*                                                                         */
/* Returns  Zero on success non-zero when pending writes remain            */
/*=========================================================================*/
{
    SLPDSocket* del  = 0;
    SLPDSocket* sock = (SLPDSocket*)G_OutgoingSocketList.head;

    while ( sock )
    {
        /* graceful only closes sockets without pending I/O */
        if ( graceful == 0 )
        {
            del = sock;
        }
        else if ( sock->state < SOCKET_PENDING_IO )
        {
            del = sock;
        }

        sock = (SLPDSocket*)sock->listitem.next;

        if ( del )
        {
            SLPDSocketFree((SLPDSocket*)SLPListUnlink(&G_OutgoingSocketList,(SLPListItem*)del));
            del = 0;
        }
    }

    return G_OutgoingSocketList.count;
}



#ifdef DEBUG
/*=========================================================================*/
void SLPDOutgoingSocketDump()
/*=========================================================================*/
{

}
#endif



syntax highlighted by Code2HTML, v. 0.9.1