/*  net.c -- network interface routines  */

#include "rts.h"
#include <fcntl.h>

static void start_mpdx ();

#define BUFSIZE 100



/*
 *  Initialize network, if this hasn't already been done before.
 *  mpdx_addr is socket address of mpdx if it's already running.
 *  This routine must never be called with a mutex >= mpd_queue_mutex.
 */
void
mpd_init_net (mpdx_addr)
Ptr mpdx_addr;
{
    struct saddr_st as;
    Proc pr;
    char abuf[BUFSIZE];
    int i;

#ifdef __svr4__
#ifdef MULTI_MPD
    if (mpd_num_job_servers > 1)
	mpd_abort ("virtual machines not implemented under Solaris MultiMPD");
#endif
#endif

    LOCK (mpd_exec_up_mutex, "mpd_init_net");
    if (mpd_exec_up) {
	UNLOCK (mpd_exec_up_mutex, "mpd_init_net");
	return;				/* network is already inited */
    }

    /* this socket stuff needs to be run as the IO server */
    BEGIN_IO (NULL);

    if (!mpdx_addr) {			/* is mpdx running? */
	for (i = 0; i < BUFSIZE; i++)
	    abuf[i] = '\0';
	start_mpdx (abuf, 100);		/* no, start it */
    } else
	strcpy (abuf, mpdx_addr);

    mpd_net_start (as.addr);		/* start socket routines */
    DEBUG (D_SOCKET, "mpd_init_net has as.addr %s", as.addr, 0, 0);
    mpd_net_connect (MPDX_VM, abuf);	/* connect to mpdx */
    mpd_exec_up = TRUE;			/* set flag that the network's up */
    UNLOCK (mpd_exec_up_mutex, "mpd_init_net");
    END_IO (NULL);

    /* add detail to err messages because now there's more than one of us */
    sprintf (mpd_my_label, "[vm %d] ", mpd_my_vm);

    /* tell mpdx we are here */
    /* mpd_net_send calls BEGIN_IO itself */
    as.ph.priority = 0;		/* mpdx.c doesn't have a real CUR_PROC */
    mpd_net_send (MPDX_VM, MSG_HELLO, &as.ph, sizeof (as));

    /* start network interface process */
    LOCK_QUEUE ("mpd_init_net");
    pr = mpd_spawn (mpd_net_interface, MAX_INT, (Rinst)NULL, FALSE,0L,0L,0L,0L);
    pr->pname = "[network interface]";
    mpd_activate (pr);
    UNLOCK_QUEUE ("mpd_init_net");
}



/*
 *  Fork and start mpdx.  Copy the address of the mpdx socket into abuf.  Since
 *  this is only called from mpd_init_net, and when mpd_exec_up_mutex is held,
 *  we need to release it before calling mpd_abort, since mpd_abort's
 *  progeny grab it.
 */

static void
start_mpdx (abuf, abufsize)
char abuf[];
int abufsize;
{

#ifndef __PARAGON__
    char *path, numbuf[50], trcbuf[10];
    int fd[2];
    int js_to_create;

    if (! (path = getenv (ENV_MPDX)))
	path = mpd_exec_path;	/* use path set by mpdl if no env variable */
    DEBUG (D_MPDX_ACT, "mpdx path: %s", path, 0, 0);

    if (mpd_trc_flag)
	sprintf (trcbuf, "%d", mpd_trc_fd);
    else
	strcpy (trcbuf, "-1");

    js_to_create = mpd_num_job_servers;
    /* don't need IO server if 1 JS;  see main.c on how this is set */
    if (js_to_create > 1)
	js_to_create -= NUM_IO_SERVERS;
    sprintf (numbuf, "%u", js_to_create);

    BEGIN_IO (NULL);
    fflush (stdout);
    fcntl (STDOUT, F_SETFL, O_APPEND);		/* coordinate VM writes */
    fcntl (STDERR, F_SETFL, O_APPEND);
    END_IO (NULL);

    if (pipe (fd) != 0)	{	/* make pipe for initial message from mpdx */
	/* pipe failed */
	mpd_exec_up = TRUE; /* stop anyone else from starting, while we stop */
	UNLOCK (mpd_exec_up_mutex, "start_mpdx");
	mpd_abort ("can't open pipe for mpdx");
    }
    if ((mpd_exec_pid = vfork ()) < 0) {
	/* fork failed */
	mpd_exec_up = TRUE; /* stop anyone else from starting, while we stop */
	UNLOCK (mpd_exec_up_mutex, "start_mpdx");
	mpd_abort ("can't vfork to start mpdx");
    }
    if (mpd_exec_pid == 0) {
	/* we're the child - execute mpdx */
	dup2 (fd[1], 0);  /* make pipe output fd 0 for mpdx, replacing stdin */
	execl (path, path,
	    VM_MAGIC, PROTO_VER, mpd_argv[0], numbuf, trcbuf, (char *) NULL);
	/* execl failed if that returned */
	perror (path);
	mpd_exec_up = TRUE; /* stop anyone else from starting, while we stop */
	UNLOCK (mpd_exec_up_mutex, "start_mpdx");
	mpd_abort ("can't execute mpdx");
    }

    /*
     *	The parent continues execution of the MPD program.
     *	Read back the mpdx socket address over the pipe we just made.
     *	(The pipe is for synchronization as well as communication!)
     */
    close (fd[1]);
    if (read (fd[0], abuf, abufsize) == 0) {	/* read mpdx socket address */
	/* read_failed */
	mpd_exec_up = TRUE;  /* stop anyone else from starting, while we stop */
	UNLOCK (mpd_exec_up_mutex, "start_mpdx");
	mpd_abort ("no reply from mpdx startup");
    }
    close (fd[0]);				/* no longer need the pipe */
    DEBUG (D_SOCKET, "start_mpdx read %s", abuf, 0, 0);

#else  /* if Paragon */

    /* 
     *  Fork and start mpdx on node 0 of the active compute partition.
     *  Therefore the address of mpdx is known in advance.
     *  The ptype is 1.
     */ 
    char *path, debug, numbuf[50], trcbuf[10];
    long node_0 = 0;
    long pid_0;
    char * argv[8];
    int js_to_create;

    argv[0] = "mpdx";
    argv[1] = VM_MAGIC;
    argv[2] = PROTO_VER;
    argv[3] = mpd_argv[0];

    if (! (path = getenv (ENV_MPDX)))
	path = mpd_exec_path;	/* use path set by mpdl if no env variable */
    DEBUG (D_MPDX_ACT, "mpdx path: %s", path, 0, 0);

    js_to_create = mpd_num_job_servers;	
    /* don't need IO server if 1 JS;  see main.c on how this is set */
    if (js_to_create > 1)
	js_to_create -= NUM_IO_SERVERS;
    sprintf (numbuf, "%u", js_to_create);

    argv[4] = numbuf;
    argv[5] = "";		/* trcbuf doesn't matter; every VM checks env */
    argv[6] = getenv (ENV_DEBUG); 
    argv[7] = NULL;
    if (nx_loadve (&node_0, 1, 1, &pid_0, path, argv, NULL) < 0) {
	mpd_exec_up = TRUE; /* stop anyone else from starting, while we stop */
	UNLOCK (mpd_exec_up_mutex, "start_mpdx");
	mpd_abort ("can't execute mpdx");
    }

    /*
     *	The parent continues execution of the MPD program.
     */
    sprintf(abuf, "%d.%d", 0, 1);  /* set "node.ptype" as on other platforms */
    DEBUG (D_SOCKET, "start_mpdx as %s", abuf, 0, 0);
#endif

}



/*
 *  Network interface process.
 *  Read messages from other machines when they come in.
 *  In order to ensure fairness, this process should be
 *  run at the same priority as user processes.
 */
void
mpd_net_interface ()
{
    Ptr pbuf;
    Pach ph, nph;
    enum ms_type t;
    Proc pr;
    int task_pri, exitcode, report;

    ph = 0;
    for (;;) {
	/* allocate space for packet header if we need a new one */
	if (!ph)  {
	    pbuf = mpd_alc (-PBUF_SZ, 1);
	    ph = (Pach) pbuf;
	}

	/* get the header of the next available message */
	t = mpd_net_recv (ph);

	/* if the whole packet is too big for the buffer,
	 * allocate a new one and copy the header. */
	if (ph->size > PBUF_SZ) {
	    nph = (Pach) mpd_alc (-ph->size, 1);
	    *nph = *ph;
	    mpd_free ((Ptr) ph);
	    ph = nph;
	}

	/* read the rest of the packet */
	mpd_net_more (ph);

	task_pri = ph->priority;

	/* process the message according to its type */
	switch (t) {

	    case MSG_SEOF:
		LOCK (mpd_exec_up_mutex, "mpd_net_interface");
		/* ignore deaths of others, except for mpdx, unless
		 * it is OK for MPDX to die */
		if (ph->origin == MPDX_VM) {
		    mpd_exec_up = FALSE;
		    if (!mpd_mpdx_death_ok) {
			UNLOCK (mpd_exec_up_mutex, "mpd_net_interface");
			mpd_abort ("mpdx died!");
		    }
		}
		else
		    UNLOCK (mpd_exec_up_mutex, "mpd_net_interface");
		break;

	    case MSG_QUIT:
		/* MPDX got a MSG_STOP or enough MSG_IDLEs to tell
		 * the main VM to finalize and shutdown. */
		if (mpd_my_vm != MAIN_VM)
		    mpd_malf ("MSG_QUIT not main vm");	/* protocol error */
		pr = mpd_spawn(mpd_stop, CUR_PROC->priority, (Rinst)NULL, FALSE,
		    (long) ((struct exit_st *) ph)->code,
		    (long) ((struct exit_st *) ph)->report, 0L, 0L);
		mpd_activate (pr);
		break;

	    case MSG_EXIT:
		if (mpd_my_vm == MAIN_VM)
		    mpd_malf ("MSG_EXIT for main vm");	/* protocol error */
		exitcode = ((struct exit_st *) ph) -> code;
		report = ((struct exit_st *) ph) -> report;
		DEBUG(D_GENERAL,"exit(%ld,%ld) per MSG_EXIT",exitcode,report,0);
		if (report)
		    mpd_print_blocked ();
		EXIT (exitcode);
		/*NOTREACHED*/
		break;

	    case REQ_CALLME:
		pr = mpd_spawn (mpd_rmt_callme, task_pri, (Rinst) NULL, FALSE,
			(long) ph, 0L, 0L, 0L);
		pr->pname = "[callme]";
		mpd_activate (pr);
		ph = 0;
		break;

	    case REQ_CREATE:
		pr = mpd_spawn (mpd_rmt_create, task_pri, (Rinst) NULL, FALSE,
			(long) ph, 0L, 0L, 0L);
		pr->pname = "[create]";
		mpd_activate (pr);
		ph = 0;
		break;

	    case REQ_COUNT:
		pr = mpd_spawn (mpd_rmt_query, task_pri, (Rinst) NULL, FALSE,
			(long) ph, 0L, 0L, 0L);
		pr->pname = "[query]";
		mpd_activate (pr);
		ph = 0;
		break;

	    case REQ_DESTOP:
		pr = mpd_spawn (mpd_rmt_destop, task_pri, (Rinst) NULL, FALSE,
			(long) ph, 0L, 0L, 0L);
		pr->pname = "[destop]";
		mpd_activate (pr);
		ph = 0;
		break;
		
	    case REQ_DESTROY:
		pr = mpd_spawn (mpd_rmt_destroy, task_pri, (Rinst) NULL, FALSE,
			(long) ph, 0L, 0L, 0L);
		pr->pname = "[destroy]";
		mpd_activate (pr);
		ph = 0;
		break;

	    case REQ_DESTVM:
		pr = mpd_spawn (mpd_rmt_destvm, task_pri, (Rinst) NULL, FALSE,
			(long) ph, 0L, 0L, 0L);
		pr->pname = "[destvm]";
		mpd_activate (pr);
		ph = 0;
		break;
		
	    case REQ_INVOKE:
		pr = mpd_spawn (mpd_rmt_invk, task_pri, (Rinst) NULL, FALSE,
			(long) ph, 0L, 0L, 0L);
		pr->pname = "[invoke]";
		mpd_activate (pr);
		ph = 0;
		break;

	    case REQ_RECEIVE:
		pr = mpd_spawn (mpd_rmt_rcv, task_pri, (Rinst) NULL, FALSE,
			(long) ph, 0L, 0L, 0L);
		pr->pname = "[receive]";
		mpd_activate (pr);
		ph = 0;
		break;

	    case MSG_RCVCALL:
		pr = mpd_spawn (mpd_rcv_call, task_pri, (Rinst) NULL, FALSE,
			(long) ph, 0L, 0L, 0L);
		pr->pname = "[rcv_call]";
		mpd_activate (pr);
		ph = 0;
		break;
		
	    case ACK_LOCVM:
	    case ACK_CREVM:
	    case ACK_FINDVM:
	    case ACK_COUNT:
	    case ACK_CREATE:
	    case ACK_DESTOP:
	    case ACK_DESTROY:
	    case ACK_DESTVM:
	    case ACK_CALLME:
	    case ACK_RECEIVE:
		ph->rem->reply = ph;
		V (ph->rem->wait);
		ph = 0;
		break;

	    case ACK_INVOKE:
		if (((Invb) ph)->type == REM_COCALL_IN)
		    mpd_co_call_done ((Invb) ph);
		else {
		    ph->rem->reply = ph;
		    V (ph->rem->wait);
		    }
		ph = 0;
		break;

	    default:
		mpd_abort ("unknown incoming message type");
		break;
	}
    }
}


syntax highlighted by Code2HTML, v. 0.9.1