/*  main.c -- overall control and error handling for MPD programs  */



/*  This module is the one that actually defines the globals.  */

#define global
#define initval(v) = v
#include "rts.h"


#ifdef __PARAGON__
#include <nx.h>
parameter vm_parameter;		/* parameter struct to start VM */
extern local_message_type;	/* message type == ptype of current VM */
#endif


#include <stdarg.h>

#define RUNERR(s,n,m) {n, m},
static struct err {
    int errnum;
    char *format;
} errlist [] = {
#include "../runerr.h"
    { 0, 0 }
};



static Bool init_done = FALSE;		/* is the RTS initialized yet? */
static Mutex final_mutex;		/* protect mpd_stop stuff */
static struct proc_st dummy_proc;

static void startup (), finalize (), shutdn ();



/*
 *  Start up an MPD program or one of its virtual machines.
 *
 *  For the original startup, argument interpretation is up to the programmer.
 *  For a new virtual machine, args are:
 *	argv[1]	    a magic string indicating that this is a VM startup
 *	argv[2]	    physical machine number for the new VM
 *	argv[3]	    virtual machine number for the new VM
 *	argv[4]	    network address of mpdx's connection port
 *	argv[5]	    debugging flags
 *	argv[6]	    mpd_num_job_severs
 *	argv[7]	    trace fd, or -1 if no tracing (not applicable for Paragon)
 */
main (argc, argv)
int argc;
char **argv;
{
    char *s;
    int i;
#ifdef __PARAGON__
    long n;
    int stat;
    pid_t *pids;
#endif

    mpd_argv = argv;
    mpd_num_job_servers = 1;

#ifdef __PARAGON__

    setpgid (0, 0);		/* create new process group */

    /* this code runs on every compute node */
    if (nx_initve (NULL, 0, NULL, &argc, argv) < 0)	/* initialize */
	{ perror ("nx_initve"); exit (1); }
 
    pids = malloc (sizeof (pid_t) * numnodes ());	/* alloc PID array */

    n = nx_nfork (NULL, -1, 0, pids);		/* fork all distributors */
    if (n < 0)
	{ perror ("nx_nxfork"); exit (1); }

    if (n > 0) {		/* if parent */
	while (n-- > 0)
	    wait (&stat);	/* wait until every distributor terminates */
	exit (stat >> 8);	/* exit with status of the last distributor */
    }
#endif

    /* ensure that stderr is unbuffered */
    setbuf (stderr, (char *) NULL);

    mpd_init_multiMPD ();		/* init stuff in file ../mpdmulti.c */

    /*  dummy_proc is used for mpd_abort (), etc, before we're running
     *  an MPD or idle or network thread.
     *
     *  we have just one copy of the dummy proc, and don't free it (this
     *  might goof up the resource pool for proctab entries).  We have its
     *  status ACTIVE, but when mpd_vm_job_server (called from mpd_init_proc)
     *  calls mpd_scheduler, dummy_proc.should_die will be FALSE (if it
     *  weren't this could goof up mpd_scheduler).
     */
    dummy_proc.pname = "[dummy_proc]";
    dummy_proc.res = (Rinst) NULL;
    dummy_proc.status = ACTIVE;
    dummy_proc.should_die = FALSE;
    INIT_LOCK ((&dummy_proc)->stack_mutex, "dummy->stack_mutex");
    CUR_PROC = &dummy_proc;	
    CUR_RES = (Rinst) NULL;

    for (i = 0; i <= LAST_SHARED_FD; i++)
	INIT_LOCK (mpd_fd_lock[i], "mpd_fd_lock[i]");

    INIT_LOCK (final_mutex, "final_mutex");	/* for mpd_stop progeny */


#ifndef __PARAGON__
    if (argc != 8 || strcmp (argv[1], VM_MAGIC) != 0) {
	mpd_init_trace ((char *) NULL);
#else
    if ((mynode () == 0) && (fork () == 0)) {	/* on node 0 fork VM 0 */
	setptype (local_message_type);	
	mpd_init_trace ((char *) NULL);
	if (mpd_trc_flag) {
	    lseek (mpd_trc_fd, (off_t) 0, SEEK_SET);
	    ftruncate (mpd_trc_fd, (off_t) 0);
	    }
#endif

	/* this is the initial startup */
	mpd_argc = argc;
	mpd_my_vm = MAIN_VM;
	mpd_init_debug ((char *) NULL);
	s = getenv (ENV_PARALLEL);
	if (s != NULL && (i = atoi (s)) > 1) {	/* multiprocessing requested */

#ifdef MULTI_MPD
	    mpd_num_job_servers = i + NUM_IO_SERVERS;	/* add I/O servers */
	    if (mpd_num_job_servers > MAX_JOBSERVERS)  {
		char msg[100];

		mpd_num_job_servers = 1;
		sprintf (msg, "%s=%d exceeds allowable maximum of %d",
		    ENV_PARALLEL, i, MAX_JOBSERVERS - NUM_IO_SERVERS);
		mpd_abort (msg);
		/*NOTREACHED*/
		}

#else /* not MULTI_MPD */

	    if (i > 1)	 {
		char msg[100];
		sprintf (msg, "No parallelism configured (%s=%u ignored)",
		    ENV_PARALLEL, i);
		RTS_WARN (msg);
	    }

#endif /* MULTI_MPD */
	}

    } else {

	mpd_argc = 0;		/* hide args from MPD program */

#ifdef __PARAGON__
	/*
	 * The distributor code runs on every node.
	 * For every parameter struct received one VM is forked.
	 * Termination if phys_machine == -1 with the exit code in the 
	 * virt_machine field.
	 * The local_message_type has to exist on every node only once and
	 * is equal to the ptype.
	 */
	for (;;) {			/* parent repeats until termination */
	    local_message_type ++;
	    crecv (0, &vm_parameter, sizeof (vm_parameter));
	    if (vm_parameter.phys_machine == -1)
		exit (vm_parameter.virt_machine);
	    n = fork ();
	    if (n < 0)
	        { perror ("fork"); exit (1); }
	    if (n == 0)
		break;			/* child */
	}
	/*
	 *  Child process: new virtual machine.
	 */
	setptype (local_message_type);	
	mpd_my_machine = vm_parameter.phys_machine;
	mpd_my_vm = vm_parameter.virt_machine;
	sprintf (mpd_my_label, "[vm %d] ", mpd_my_vm);
	mpd_init_debug ((char *) NULL);	
	mpd_init_trace ((char *) NULL);
#else
	/*
	 *  This is a new virtual machine.
	 */
	mpd_my_machine = atoi (argv[2]);
	mpd_my_vm = atoi (argv[3]);
	sprintf (mpd_my_label, "[vm %d] ", mpd_my_vm);
	mpd_init_debug (argv[5]);
	sscanf (argv[6], "%u", &mpd_num_job_servers);
	mpd_init_trace (argv[7]);
#endif
    }

#ifdef MULTI_MPD
    DEBUG (D_GENERAL, "%ld job server(s)", mpd_num_job_servers, 0, 0);
#endif

    mpd_stack_size = MPDALIGN (mpd_stack_size);	/* ensure legal stack size */
    mpd_init_proc (startup);	/* init processes (no return; calls startup) */
    /*NOTREACHED*/
}

/*
 *  initialization continues here in an MPD process context.
 */
static void
startup ()
{
    CRB *crbp;

    CUR_PROC -> pname = "[startup]";

    /* initialize runtime system */
    mpd_init_event ();
    mpd_init_mem ();
    mpd_init_res ();
    mpd_init_oper ();
    mpd_init_class ();
    mpd_init_co ();
    mpd_init_rem ();
    mpd_init_vm ();
    mpd_init_io ();
    mpd_init_misc ();
    mpd_init_random ();

    /* no better place to put this since mpd_init_net() not called like above */
    INIT_LOCK (mpd_exec_up_mutex, "exec_up_mutex");

    DEBUG0 (D_GENERAL, "startup code done initializing RTS");
    if (mpd_my_vm == MAIN_VM) {
	init_done = TRUE;	/* RTS initialized */
	/* create an instance of main resource on main machine */
	crbp = (CRB *) mpd_alc (-sizeof (struct crb_st), 1);
	crbp->crb_size = sizeof (struct crb_st);
	mpd_create_resource ((char *) NULL, 0, mpd_my_vm, crbp, sizeof (Rcap));
    } else {
	/* otherwise, just start up network and wait for instructions */
	mpd_argc = 0;		/* hide args from users */
#ifndef __PARAGON__
	mpd_init_net (mpd_argv[4]);
#else
	mpd_init_net (vm_parameter.mpdx_addr);
#endif
	init_done = TRUE;	/* RTS initialized */
    }

    /* kill this initialization process */
    mpd_kill (CUR_PROC, (Rinst) NULL);
}



/*
 * Abort giving source line information.
 */
void
mpd_loc_abort (locn, msg)
char *locn, *msg;
{
    char buf[200];

    if (!locn)
	mpd_abort (msg);			/* no trace info available */
    mpd_fmt_locn (buf, locn);
    strcat (buf, ":\n   ");
    strcat (buf, msg);
    mpd_abort (buf);
}



/*
 * Translate a "locn" pointer into a text string.
 */
char *
mpd_fmt_locn (buf, locn)
char *buf, *locn;
{
    int lno;

    if (!locn) {
	strcpy (buf, "unknown location");
	return buf;
    }
    lno = 1;
    while (*--locn != '\002')
	lno++;
    sprintf (buf, "file %s, line %d", locn + 1, lno);
    return buf;
}



/*
 *  mpd_runerr (locn, errnum, args...)
 *  Runerr never really returns, but it is declared as returning int
 *  because of the way it is used in the generated code.
 */
int
mpd_runerr (char *locn, int errnum, ...)
{
    va_list ap;
    char c, *f, *o;
    struct err *ep;
    char buf[200];
    Dim *d;
    String *s;

    va_start (ap, errnum);

    for (ep = errlist; ep->errnum != errnum; ep++)
	if (ep->errnum == 0) {
	    sprintf (buf, "unknown runtime error %d", errnum);
	    mpd_loc_abort (locn, buf);
	}

    o = buf;
    f = ep->format;
    while ((c = *f++) != '\0') {
	if (c != '%')
	    *o++ = c;
	else {
	    switch (c = *f++) {
		case 'd':			/* integer */
		    sprintf (o, "%d", va_arg (ap, int));
		    break;
		case 'B':			/* both array bounds */
		    d = va_arg (ap, Dim *);
		    sprintf (o, "[%d:%d]",  d->lb, d->ub);
		    break;
		case 'L':			/* string length */
		    s = va_arg (ap, String *);
		    sprintf (o, "%d", s->length);
		    break;
		case 'M':			/* string maxlength */
		    s = va_arg (ap, String *);
		    sprintf (o, "%d", MAXLENGTH (s));
		    break;
		case 'S':
		    s = va_arg (ap, String *);
		    if (s->length <= 25) {
			DATA (s) [s->length] = '\0';
			sprintf (o, "%s", DATA (s));
		    } else {
			sprintf (o, "%.20s...", DATA (s));
		    }
		    break;
		default:			/* % (or bogus) */
		    sprintf (o, "%c", c);
		    break;
	    }
	    o += strlen (o);
	}
    }
    va_end (ap);
    *o++ = '\0';
    mpd_loc_abort (locn, buf);
    /*NOTREACHED*/
}



/*
 *  Print run-time support error or warning message.
 *
 *  We do this with a single, self-buffered atomic write to fd 2 to prevent
 *  interference between messages from different virtual machines.
 */
void
mpd_message (s, t)
char *s, *t;
{
    char buf[200];
    sprintf (buf, "%sRTS %s: %s\n", mpd_my_label, s, t);

    BEGIN_IO (stdout);
    fflush (stdout);		/* flush any user output first */
    END_IO (stdout);

    BEGIN_IO (stderr);
    write (2, buf, strlen (buf));
    END_IO (stderr);
}



/*
 *  Fatal run time support error.
 *  Print error message and tell mpdx to abort other nodes.
 */
void
mpd_abort (s)
char *s;
{
    DEBUG (D_ALL_FLAGS, "ABORT: %s", s, 0, 0);
    mpd_message ("abort", s);
    mpd_stop (1, 0);
}



/*  Fatal error detected by network routines.  */

void
mpd_net_abort (s)
char *s;
{
    mpd_abort (s);
}



/*  Runtime malfunction -- for "can't happen" situations.  */

void
mpd_malf (s)
char *s;
{
    mpd_message ("malfunction", s);
    mpd_stop (1, 0);
}



/*
 *  The six steps of distributed termination in MPD are (note that
 *  if we only have one VM mpd_exec_up is false):
 *
 *	1)	Some VM calls mpd_stop.  MSG_STOP is sent to MPDX.
 *
 *		(If a node becomes inactive then MSG_IDLE is passed to MPDX.
 *		 This is for distributed deadlock detection.)
 *
 *	2)	MPDX passes the MSG_STOP to the main VM.
 *
 *	3)	The main VM calls mpd_stop.  Finalization is done here.
 *
 *	4)	As part of the main VM's shutdown, MSG_EXIT is sent to MPDX.
 *
 *	5)	MPDX broadcasts MSG_EXIT to all non-main VMs.
 *
 *	6)	At each non-main VM, mpd_net_interface catches the
 *		MSG_EXIT and just exits.
 */



/*  mpd_stop(code,report) -- initiate termination.
 *
 *  mpd_stop does not return, and thus should be called by a
 *  spawned proc if the caller needs to survive.
 *
 *  If called a second time with a nonzero exitcode,
 *  mpd_stop exits immediately.
 */

void
mpd_stop (exitcode, report_blocked)
int exitcode, report_blocked;
{
    static Bool exiting = FALSE;
    static Bool finals_started = FALSE;
    static Bool shutdn_started = FALSE;
    struct num_st pkt;
    Proc pr;

    DEBUG (D_GENERAL, "mpd_stop(%ld,%ld)", exitcode, report_blocked, 0);
    if (exitcode != 0) {
	LOCK (final_mutex, "mpd_stop");
	if (exiting) {
	    UNLOCK (final_mutex, "mpd_stop");
	    EXIT (1);
	} else {
	    exiting = TRUE;
	    UNLOCK (final_mutex, "mpd_stop");
	}
    }

    if (mpd_my_vm != MAIN_VM) {
	/* We're not the main VM; just tell MPDX to tell IT to stop. */
	pkt.num = exitcode;
	mpd_net_send (MPDX_VM, MSG_STOP, &pkt.ph, sizeof (pkt));
	mpd_kill (CUR_PROC, (Rinst) NULL);
    }

    /*  If we might be starting the finalization code, we need to run
     *  disassociated from any resource so that we can't be killed 
     *  during finalization.  */

    if (mpd_my_vm == MAIN_VM && exitcode == 0 && CUR_RES != NULL) {
	pr = mpd_spawn (mpd_stop, CUR_PROC->priority, (Rinst) NULL, FALSE,
	    (long) exitcode, (long) report_blocked, 0L, 0L);
	mpd_activate (pr);
	mpd_kill (CUR_PROC, (Rinst) NULL);
    }

    /*  We see if the final code needs to be fired up if it hasn't
     *  already been fired up.  Do_finals fires up any final code if
     *  the main resource hasn't been destroyed yet.  It then fires up
     *  the final code for the globals, from the bottom up.  */

    LOCK (final_mutex, "mpd_stop");
    if (finals_started) {
	UNLOCK (final_mutex, "mpd_stop");
    } else {
	finals_started = TRUE;
	UNLOCK (final_mutex, "mpd_stop");
	if ((mpd_my_vm == MAIN_VM) && (exitcode == 0))
	    finalize ();			/* checks if needed */
    }

    /*  We can now complete the shutdown if the final code has finished
     *  or if mpd_stop was called for the second time (either via a MPD
     *  stop statement or through deadlock/termination detection).
     *  We only do this if it hasn't been done already. */

    LOCK (final_mutex, "mpd_stop");
    if (shutdn_started) {
	UNLOCK (final_mutex, "mpd_stop");
    } else {	
	shutdn_started = TRUE;
	UNLOCK (final_mutex, "mpd_stop");
	shutdn (exitcode, report_blocked);
    }

    /* kill this process */
    mpd_kill (CUR_PROC, (Rinst) NULL);
    /*NOTREACHED*/
}



/*  finalize() runs the finalization -- the main resource's final code
 *  (if that is still around) and then the globals' final code.
 */

static void
finalize ()
{
    Proc pr;
    Sem waitsem;

    /*  if we are on the main resource and the main resource has final
     *  code, then we execute it if the resource hasn't already been
     *  destroyed and if we weren't invoked because of an mpd_abort call.
     *  We need to grab the queue mutex to protect the interval between
     *  mpd_spawn and mpd_activate -- spawn puts the proc on a res list,
     *  where it could be destroyed...
     */

    LOCK (mpd_main_res_mutex, "finalize");

    if (mpd_main_res.res != NULL
	&& mpd_main_res.seqn == mpd_main_res.res->seqn++) {

	LOCK (mpd_main_res.res->rmutex, "finalize");

	/* protect the spawn-activate region against mpd_destroy */
	LOCK_QUEUE ("finalize");

	DEBUG0 (D_GENERAL | D_RESOURCE, "finalizing main resource");

	pr = mpd_spawn(mpd_rpatt[0].final, CUR_PROC->priority, mpd_main_res.res,
			TRUE, (long) mpd_main_res.res->crb_addr,
			(long) mpd_main_res.res->rv_base, 0L, 0L);

	UNLOCK (mpd_main_res.res->rmutex, "finalize");
	UNLOCK (mpd_main_res_mutex, "finalize");

	pr->wait = waitsem = mpd_make_sem (0);
	pr->pname = "[main final]";			/* GC resets */
	pr->ptype = FINAL_PR;
	mpd_activate (pr);
	UNLOCK_QUEUE ("finalize");
	P ((char *) NULL, waitsem);
    } else {
	UNLOCK (mpd_main_res_mutex, "finalize");
	DEBUG0 (D_GENERAL | D_RESOURCE, "no main resource to finalize");
    }

    DEBUG0 (D_GENERAL, "destroying main VM globals");
    mpd_destroy_globals ();
}


/*  shutdn() initiates the final shutdown for the VM.  */

static void
shutdn (exitcode, report_blocked)
int exitcode, report_blocked;
{
    Bool exec_was_up;
    struct exit_st pkt;

    DEBUG (D_GENERAL, "shutdn(%ld,%ld)", exitcode, report_blocked, 0);

    if (!init_done) {
	/* RTS is not yet initialized (maybe even mpd_exec_up_mutex),
	 * so its time to punt rather than risking a core dump */
	DEBUG0 (D_GENERAL, "immediate exit, RTS not initialized");
	EXIT (1);
    }

    LOCK (mpd_exec_up_mutex, "shutdn");
    exec_was_up = mpd_exec_up;
    if (mpd_exec_up) {
	mpd_exec_up = FALSE;	/* prevent looping if send aborts */
	UNLOCK (mpd_exec_up_mutex, "shutdn");

	if (mpd_my_vm == MAIN_VM) {
	    /* tell mpdx that main final is done; it then tells the others */
	    mpd_mpdx_death_ok = TRUE;
	    pkt.code = exitcode;
	    pkt.report = report_blocked;
	    mpd_net_send (MPDX_VM, MSG_EXIT, &pkt.ph, sizeof (pkt));
	}
    } else
	UNLOCK (mpd_exec_up_mutex, "shutdn");

    if (report_blocked)
	mpd_print_blocked ();

    if (mpd_my_vm == MAIN_VM && exec_was_up) {
#ifdef MULTI_MPD
	/* mpdx may not be *our* child, so wait() is unsafe */
	if (kill (mpd_exec_pid, 0) == 0) {	/* if it's still there */
	    DEBUG0 (D_GENERAL, "sleeping in hope that MPDX will die");
	    sleep (1);
	} else
	    DEBUG0 (D_GENERAL, "MPDX is gone");
#else
	{
	int wstatus;
	DEBUG0 (D_GENERAL, "waiting for MPDX to die");
	wait (&wstatus);
	}
#endif
    }
#ifdef __PARAGON__
    if (mpd_my_vm == MAIN_VM) {
	int i;

	DEBUG (D_GENERAL,"sending exit code %ld to distributors",exitcode,0,0);
	/* terminate waiting distributors */
	vm_parameter.phys_machine = -1;		
	vm_parameter.virt_machine = exitcode;	
	for (i = 0; i < numnodes (); i++) {
	    csend (0, &vm_parameter, sizeof (vm_parameter), i, 0);
	}
    }
#endif
    DEBUG (D_GENERAL, "exit(%ld)", exitcode, 0, 0);
    EXIT (exitcode);
}



/*  Terminate program due to stack overflow.  */

void
mpd_stk_overflow ()
{
    mpd_abort ("stack overflow");
}


/*  Terminate program due to stack underflow.  */

void
mpd_stk_underflow ()
{
    mpd_abort ("stack underflow");
}



/*  Terminate program due to stack corruption.  */

void
mpd_stk_corrupted ()
{
    mpd_abort ("stack corrupted");
}


syntax highlighted by Code2HTML, v. 0.9.1