/* mpdx.c -- MPD execution manager * * Mpdx is initiated by an MPD program when it first has a need. Params are: * argv[1] a magic string to validate the call * argv[2] protocol version number for sanity check with caller * argv[3] caller's path (caller's argv[0]) * argv[4] the number of job servers to create on remote vms * argv[5] file descriptor number for tracing, or -1 if none * * The address of mpdx's listener socket is output to stdout, on the * assumption that it is a pipe to initiating process (main machine). * * Mpdx runs as a single process with no parallelism. * It just loops responding to input messages. */ #include "../paths.h" #include "rts.h" #include #include #include #ifdef __PARAGON__ #include extern local_message_type; /* message type == ptype of current VM */ #endif char version[] = VERSION; /* MPD version number */ /* physical machine data -- default entry is head of list */ struct pmdata { int num; /* physical machine number */ char *hostname; /* name of host on which to create machine */ char *exepath; /* path to executable program on that host */ struct pmdata *next; /* next data node */ }; struct pmdata physm; /* virtual machine data */ typedef enum { STARTING, WORKING, DYING, GONE } VMstate; struct vmdata { /* virtual machine data */ int phys; /* physical machine number */ int pid; /* process id */ VMstate state; /* current state */ char addr[SOCK_ADDR_SIZE]; /* socket address */ int notify; /* machine to notify on birth or death */ Remd rem; /* request message pointer for acking CREVM */ int nmsgs [1 + MAX_VM]; /* last report of messages in & out */ }; struct vmdata *vm [1 + MAX_VM]; /* miscellaneous globals */ char mpd_net_exe_path[MAX_PATH]; /* network path of exe file */ Vcap mpd_my_vm = MPDX_VM; /* VM number for mpd_net_send */ int nvm = 0; /* number of VMs started */ int ndied = 0; /* number that have died */ int exiting = 0; /* shutdown in progress? */ char *trc_arg; /* trace argument */ int trc_fd; /* trace fd */ char my_addr[SOCK_ADDR_SIZE]; /* address of mpdx socket */ struct idle_st mpd_msg_counts; /* messages in & out, updated by socket.c */ /* global scratch area used for all packet I/O */ union { struct pach_st header; struct saddr_st sock; struct num_st npkt; struct exit_st xpkt; struct locn_st locn; struct idle_st idle; } packet; #define PH (&packet.header) #define ORIGIN (packet.header.origin) /* function declarations */ extern char *netpath (); extern void mpd_init_debug (); extern void mpd_net_start (); extern void mpd_net_more (), mpd_net_send (); extern enum ms_type mpd_net_recv (); struct pmdata *lookup (); struct vmdata *alcvm (); char *alloc (), *salloc (); void callme (), locvm (), crevm (), findvm (), hello (); void destvm (), ackdest (), exitmsg (), eof (), stopmsg (), idlemsg (); void exe (), mort (), setloc (); char *getname (); static char jsbuf[10]; /* main program */ main (argc, argv) int argc; char *argv[]; { int i; char *p; struct vmdata *v; char cwd[MAX_PATH], mapfile[MAX_PATH]; char message[100]; /* ensure that stderr is unbuffered */ setbuf (stderr, (char *) NULL); /* init debugging */ #ifndef __PARAGON__ mpd_init_debug ((char *) NULL); #else mpd_init_debug (argv[6]); /* passed via argv on Paragon */ local_message_type = 1; /* local_message_type of MPDX is 1 */ #endif DEBUG5 (D_GENERAL | D_MPDX_ACT, "mpdx %s %s %s %s (pid %ld)", (argc > 1) ? argv[1] : "--", (argc > 2) ? argv[2] : "--", (argc > 3) ? argv[3] : "--", (argc > 4) ? argv[4] : "--", getpid ()); /* check for valid call */ if (argc < 6 || strcmp (argv[1], VM_MAGIC) != 0) mpd_net_abort ("invalid call to MPDX"); strcpy (jsbuf, argv[4]); /* save jobserver arg for exe () */ #ifndef __PARAGON__ trc_arg = argv[5]; /* save trace arg */ sscanf (trc_arg, "%d", &trc_fd); #endif if (strcmp (argv[2], PROTO_VER) != 0) { DEBUG (D_SOCKET, "protocol argv[2] %s, but PROTO_VER %s", argv[2], PROTO_VER, 0); mpd_net_abort ("protocol version mismatch; rerun mpdl to fix"); } /* save path of executable */ physm.exepath = salloc (argv[3]); #ifndef __PARAGON__ /* all binaries are already spawned on Paragon */ /* build network path of executable */ if (p = getenv (ENV_NETMAP)) strcpy (mapfile, p); else sprintf (mapfile, "%s/%s", MPDLIB, "mpdmap"); getcwd (cwd, MAX_PATH); if (!netpath (argv[3], cwd, mapfile, mpd_net_exe_path)) mpd_net_abort ("can't build network path for executable"); DEBUG (D_MPDX_ACT, "netpath is: %s", mpd_net_exe_path, 0, 0); #endif /* enter caller in vm table */ v = alcvm (); v->pid = getppid (); v->state = STARTING; /* close all files except stdin, stdout, stderr, and trace file */ for (i = 3; i < NOFILE; i++) if (i != trc_fd) close (i); /* start network I/O and send address to caller */ mpd_net_start (my_addr); #ifndef __PARAGON__ /* Paragon has no pipe to VM 0 */ write (0, my_addr, strlen (my_addr)); /* fd 0 by agreement with net.c */ #endif /* connect stdin to /dev/null; then will have * 0. stdin /dev/null * 1. stdout as inherited from caller * 2. stderr as inherited from caller * 3. socket for listening * 4+ available for connections to VMs */ close (0); if (open ("/dev/null", O_RDONLY) < 0) mpd_net_abort ("can't open /dev/null"); #ifndef __PARAGON__ /* set up interrupt routine to catch deaths of children */ /* not used on Paragon: they're not MPDX's children */ signal (SIGCHLD, mort); #endif /* now just loop, waiting for things to do... */ for (;;) { mpd_net_recv (PH); /* read packet header */ if (PH->size > sizeof (packet)) mpd_net_abort ("incoming packet too big"); mpd_net_more (PH); /* read the rest */ switch (PH->type) { case MSG_SEOF: eof (); break; case MSG_HELLO: hello (); break; case MSG_EXIT: exitmsg (); break; case MSG_STOP: stopmsg (); break; case MSG_IDLE: idlemsg (); break; case REQ_CALLME: callme (); break; case REQ_CREVM: crevm (); break; case REQ_FINDVM: findvm (); break; case REQ_DESTVM: destvm (); break; case ACK_DESTVM: ackdest (); break; case REQ_LOCVM: locvm (); break; default: sprintf (message, "unexpected packet type %d", PH->type); mpd_net_abort (message); } } } /* locvm () - specify location for virtual machine */ void locvm () { char *xfile = packet.locn.text + strlen (packet.locn.text) + 1; DEBUG (D_MPDX_IN,"LOCATE %ld %s %s",packet.locn.num,packet.locn.text,xfile); setloc (packet.locn.num, packet.locn.text, xfile); mpd_net_send (ORIGIN, ACK_LOCVM, PH, PACH_SZ); } /* callme () - pass a "call me" message from one VM to another */ void callme () { int dest = packet.npkt.num; DEBUG (D_MPDX_IN, "CALLME %ld from %ld", dest, ORIGIN, 0); packet.npkt.num = ORIGIN; mpd_net_send (dest, REQ_CALLME, PH, sizeof (packet.npkt)); } /* crevm () - create virtual machine. */ void crevm () { int pid, pm; struct pmdata *p; struct vmdata *v; parameter vm_parameter; /* for creating VM on Paragon */ char message[100]; v = alcvm (); pm = packet.npkt.num; DEBUG (D_MPDX_IN, "CREVM %ld on %ld", nvm, pm, 0); #ifndef __PARAGON__ if ((pid = vfork ()) < 0) mpd_net_abort ("can't vfork for new vm"); if (pid == 0) exe (pm, nvm); /* in the child, execute a.out */ v->pid = pid; #else /* * Create new VM by sending a message to the node's distributor. */ vm_parameter.phys_machine = packet.npkt.num; vm_parameter.virt_machine = nvm; strcpy (vm_parameter.mpdx_addr, my_addr); p = lookup (pm); if (p->hostname) { if (sscanf (p->hostname, "%d", &pm) != 1) { sprintf (message, "invalid VM location: %s", p->hostname); mpd_net_abort (message); } } if (pm < 0 || pm >= numnodes ()) { sprintf (message, "invalid VM location: %d", pm); mpd_net_abort (message); } csend (0, &vm_parameter, sizeof (vm_parameter), pm, 0); #endif v->state = STARTING; v->notify = ORIGIN; /* save info for acking when HELLO comes back */ v->rem = PH->rem; } #ifndef __PARAGON__ /* exe(pm,vn) - exec MPD program to be virtual machine vn on phys machine pm */ void exe (pm, vn) int pm, vn; { struct pmdata *p; char pmbuf[10], vmbuf[10], dbbuf[10], magicbuf [sizeof (VM_MAGIC) + 2]; char *path, *h, *t; sprintf (pmbuf, "%d", pm); sprintf (vmbuf, "%d", vn); sprintf (dbbuf, "%X", mpd_dbg_flags); sprintf (magicbuf, "'%s'", VM_MAGIC); p = lookup (pm); if (p->exepath && *p->exepath) /* get path to exe file */ path = p->exepath; /* use explicit path if one given */ else path = mpd_net_exe_path; /* else use network path */ if (pm == 0 && !p->hostname) { /* exec locally */ DEBUG (D_MPDX_ACT, "[%ld] exec %s args...", vn, path, 0); execl (path, path, VM_MAGIC, pmbuf, vmbuf, my_addr, dbbuf, jsbuf, trc_arg, (char *) NULL); perror (path); } else { /* exec remotely via rsh */ if (!p->hostname) { if (! (h = getname (pm))) /* get hostname */ { fprintf (stderr, "mpdx: unknown machine %d\n", pm); exit(1); } p->hostname = salloc (h); /* save for next time */ } DEBUG (D_MPDX_ACT, "[%ld] rsh %s -n exec %s args...", vn, p->hostname, path); if (trc_fd == STDOUT || trc_fd == STDERR) t = trc_arg; else t = "-1"; execl (RSHPATH, RSHPATH, p->hostname, "-n", #ifdef __linux__ "--", /* due to broken Linux getopt */ #endif "exec", path, magicbuf, pmbuf, vmbuf, my_addr, dbbuf, jsbuf, t, (char *) NULL); perror (RSHPATH); } mpd_net_abort ("can't execute program"); } #endif /* findvm () - find virtual machine. */ void findvm () { int n; char message[100]; n = packet.npkt.num; DEBUG (D_MPDX_IN, "FINDM %ld from %ld", n, ORIGIN, 0); switch (vm[n]->state) { case STARTING: sprintf (message,"can't connect to vm %d -- not yet initialized",n); mpd_net_abort (message); case WORKING: memcpy (packet.sock.addr, vm[n]->addr, SOCK_ADDR_SIZE); mpd_net_send (ORIGIN, ACK_FINDVM, PH, sizeof (packet.sock)); break; case DYING: case GONE: sprintf (message,"can't connect to vm %d -- already terminated",n); mpd_net_abort (message); } } /* eof () - process EOF pseudo-message indicating a vm has died */ void eof () { char message[100]; struct vmdata *v = vm[ORIGIN]; DEBUG (D_MPDX_IN, "EOF from %ld", ORIGIN, 0, 0); if (v->state != GONE) { sprintf (message, "lost connection to virtual machine %d", ORIGIN); mpd_net_abort (message); } if (++ndied == nvm) /* exit if all alone */ { DEBUG (D_MPDX_ACT, "exiting because no VMs left", 0, 0, 0); exit(1); } } /* hello () - process HELLO message * * register the new virtual machine, and pass back acknowledgement to its * creator (if any). */ void hello () { struct vmdata *v = vm[ORIGIN]; DEBUG (D_MPDX_IN, "HELLO %ld at %s", ORIGIN, packet.sock.addr, 0); if (v->state != STARTING) mpd_net_abort ("unexpected HELLO"); strncpy (v->addr, packet.sock.addr, SOCK_ADDR_SIZE); v->state = WORKING; if (v->notify) { PH->rem = v->rem; packet.npkt.num = ORIGIN; mpd_net_send (v->notify, ACK_CREVM, PH, sizeof (packet.npkt)); v->notify = 0; } } /* destvm () - handle REQ_DESTVM message * * make a note that a machine is being destroyed, and pass it the message. */ void destvm () { int n; char message[100]; n = packet.npkt.num; DEBUG (D_MPDX_IN, "DESTVM %ld from %ld", n, ORIGIN, 0); if (vm[n]->state != WORKING) { sprintf (message, "can't destroy VM %d -- it's not now running", n); mpd_net_abort (message); } vm[n]->state = DYING; vm[n]->notify = ORIGIN; vm[n]->rem = PH->rem; mpd_net_send (n, REQ_DESTVM, PH, PACH_SZ); } /* ackdest () - handle ACK_DESTVM message * * mark the vm as gone and notify the original destroyer. * the vm will kill itself after sending a final idle message. */ void ackdest () { int n; n = ORIGIN; DEBUG (D_MPDX_IN, "GOODBYE from %ld", n, 0, 0); vm[n]->state = GONE; mpd_net_send (vm[n]->notify, ACK_DESTVM, PH, PACH_SZ); } /* exitmsg () - process EXIT message * * pass the EXIT message to all other virtual machines. * give them a chance to die, then kill stragglers. */ void exitmsg () { int i; DEBUG (D_MPDX_IN, "EXIT %ld from %ld", packet.npkt.num, ORIGIN, 0); vm[ORIGIN]->state = GONE; /* note that sender has died */ ++ndied; exiting = 1; /* flag shutdown in progress */ /* * Relay this message from the main VM to all others. Note that the * packet already contains exit status and report-blocked-processes flag. */ for (i = nvm; i > 0; i--) if ((vm[i]->state != GONE) && (i != MAIN_VM)) mpd_net_send (i, MSG_EXIT, PH, sizeof (packet.xpkt)); /* give everybody a chance to die quietly; then kill 'em. */ if (ndied < nvm) sleep (5); DEBUG0 (D_MPDX_ACT, "exitmsg calling mpd_net_abort"); mpd_net_abort ((char *) NULL); } /* stopmsg () - process implicit or explicit stop */ void stopmsg () { int code = packet.npkt.num; /* copy before we overwrite the packet */ DEBUG (D_MPDX_IN, "STOP %ld from %ld", packet.npkt.num, ORIGIN, 0); packet.xpkt.code = code; packet.xpkt.report = 0; /* don't report for a stop */ mpd_net_send (MAIN_VM, MSG_QUIT, PH, sizeof (packet.xpkt)); } /* idlemsg () - process idle notification from one vm * * Idle messages are sent when a VM can make no further process. If all VMs * have sent idle messages, and the sends and receives for each VM (including * those to and from mpdx) are equal, then no messages are in transit and the * system is globally deadlocked. * * Each VM (and mpdx) reports sends on a per-VM basis, and in place of sends to * itself reports the *negative* of the total receive count. Thus it is only * necessary to add all of the slots for each destination and check for zero. * * We don't actually check that each VM has sent an idle message, but if one * of them hasn't the first iteration (for VM 0 = mpdx) will be out of balance * and we'll return quickly. */ void idlemsg () { int i, j, n; /* save the counts for future reference */ memcpy ((Ptr) vm[ORIGIN]->nmsgs, (Ptr) packet.idle.nmsgs, sizeof (vm[ORIGIN]->nmsgs)); /* check if the ledger is in balance for each VM */ for (i = 0; i <= nvm; i++) { /* for each destination: */ if (i > 0 && vm[i]->state == GONE) continue; /* a destroyed VM is idle */ n = mpd_msg_counts.nmsgs[i]; /* start with mpdx count */ for (j = 1; j <= nvm; j++) n += vm[j]->nmsgs[i]; /* add counts of vms */ if (n != 0 || (i > 0 && vm[i]->state == STARTING)) { /* we're not really idle yet */ DEBUG(D_TERM,"IDLE from %ld failed: net %ld for VM %ld",ORIGIN,n,i); return; } } /* we have global deadlock; tell the main VM to finalize */ DEBUG (D_GENERAL | D_MPDX_IN | D_TERM, "IDLE from vm %d found global deadlock", ORIGIN, 0, 0); packet.xpkt.code = 0; /* exit code 0 for deadlock */ packet.xpkt.report = 1; /* report for deadlock */ mpd_net_send (MAIN_VM, MSG_QUIT, PH, sizeof (packet.xpkt)); } #ifndef __PARAGON__ /* mort (sig) - interrupt routine called when a child dies * * Deaths during mpdx shutdown are merely counted. If all VMs are gone we will * exit here, but note that we *aren't* notified by interrupt of VM 1's death * because it's our parent, not our child. * * Deaths during VM startup mean failure of REQ_CREVM which must be acked. * * Other deaths are ignored and will be caught by EOF processing after * the input pipe is flushed. */ /*ARGSUSED*/ void mort (sig) { int i, n, s, code; char buf[10]; n = wait (&s); for (i = 1; i <= nvm; i++) if (vm[i]->pid == n) break; if (i > nvm) mpd_net_abort ("unknown pid returned by wait ())"); sig = s & 0x7F; code = s >> 8; if (sig != 0 && sig != SIGINT && sig != SIGQUIT && sig != SIGTERM) { sprintf (buf, "vm %d", i); psignal ((unsigned int) sig, buf); } else DEBUG (D_MPDX_ACT,"vm %ld exited with signal %ld, code %ld",i,sig,code); /* Re-enable the signal. This is needed under Sys V and derivatives. */ signal (SIGCHLD, mort); if (!exiting && vm[i]->state != STARTING) return; /* ignore, handle when EOF seen */ vm[i]->state = GONE; /* show vm as down */ if (++ndied == nvm) /* exit if all alone */ { DEBUG (D_MPDX_ACT, "exiting because no VMs left", 0, 0, 0); exit(1); } if (exiting) return; /* no further action if shutting down */ /* if we get here we need to NAK a VM startup */ PH->rem = vm[i]->rem; /* init caller's reply address */ packet.npkt.num = NULL_VM; /* indicate failure */ mpd_net_send (vm[i]->notify, ACK_CREVM, PH, sizeof (packet.npkt)); } #endif /* setloc (n, host, path) - set or change location for physical machine */ void setloc (n, host, path) int n; char *host, *path; { struct pmdata *p; p = lookup (n); if (host && *host) { DEBUG (D_MPDX_ACT, "HOSTNAME for %ld: %s", n, host, 0); if (p->hostname) free (p->hostname); /* don't need UNMALLOC for mpdx */ p->hostname = salloc (host); } if (path && *path) { DEBUG (D_MPDX_ACT, "EXE_PATH for %ld: %s", n, path, 0); if (p->exepath) free (p->exepath); /* don't need UNMALLOC for mpdx */ p->exepath = salloc (path); } } /* getname (n) - get hostname for physical machine n */ char * getname (n) int n; { int d[4]; unsigned char a[4]; struct hostent *he; sscanf (my_addr, "%d.%d.%d.%d", d, d + 1, d + 2, d + 3); a[3] = n ? n : d[3]; n >>= 8; a[2] = n ? n : d[2]; n >>= 8; a[1] = n ? n : d[1]; n >>= 8; a[0] = n ? n : d[0]; he = gethostbyaddr ((char *) a, sizeof (a), AF_INET); if (he) return (char *) he->h_name; else return NULL; } /* lookup (n) - find (create if necessary) entry for physical machine n. */ struct pmdata * lookup (n) int n; { struct pmdata *p; static struct pmdata z; for (p = &physm; p; p = p->next) if (p->num == n) return p; p = (struct pmdata *) alloc (sizeof (struct pmdata)); *p = z; p->num = n; p->next = physm.next; physm.next = p; return p; } /* alcvm () - allocate new virtual machine entry and return pointer */ struct vmdata * alcvm () { struct vmdata *v; static struct vmdata zeroes; if (++nvm > MAX_VM) mpd_net_abort ("too many virtual machines"); vm[nvm] = v = (struct vmdata *) alloc (sizeof (struct vmdata)); *v = zeroes; /* initialize to all zeroes */ return v; } /* alloc (n) - allocate n bytes, with success guaranteed */ char * alloc (n) int n; { char *s; s = malloc ((unsigned int) n); if (!s) mpd_net_abort ("MPDX out of memory"); return s; } /* salloc (s) - allocate and initialize string, with success guaranteed */ char * salloc (s) char *s; { return strcpy (alloc (strlen (s) + 1), s); } /* mpd_iowait (query, results, inout) - wait for input from a set of files */ void mpd_iowait (query, results, inout) fd_set *query, *results; enum io_type inout; { int n; if (inout != INPUT) mpd_net_abort ("mpdx iowait not on input"); do { DEBUG (D_SELECT, "select: %08lX", query->fds_bits[0], 0, 0); *results = *query; n = select (FD_SETSIZE, results, (fd_set *) 0, (fd_set *) 0, (struct timeval *) 0); DEBUG (D_SELECT, "selectd %08lX n=%ld", results->fds_bits[0], n, 0); } while (n < 0 && errno == EINTR); if (n < 0) { perror ("select"); mpd_net_abort ("select failure"); } } /* mpd_net_abort (message) - kill all other processes and exit. * Message is optional. Used for network errors, mpdx errors, and normal exit. * * note: we use SIGINT, not SIGKILL, because SIGKILL won't kill the * far end of an rsh. */ void mpd_net_abort (message) char *message; { int i; DEBUG0 (D_MPDX_ACT, "mpd_net_abort here"); if (message) /* print message if given */ fprintf (stderr, "mpdx: %s\n", message); #ifndef __PARAGON__ /* not MPDX's children on Paragon */ for (i = 1; i <= nvm; i++) /* kill all machines */ if (vm[i]->state != GONE) { DEBUG (D_MPDX_ACT, "kill %ld [%ld]", vm[i]->pid, i, 0); kill (vm[i]->pid, SIGINT); } else { DEBUG (D_MPDX_ACT, "no kill %ld [%ld] (already gone)", vm[i]->pid, i, 0); } if (!message) DEBUG (D_MPDX_ACT, "mpdx exiting", 0, 0, 0); exit (1); /* and quit */ #else /* Paragon */ if (message) { /* abnormal termination; be sure to kill all processes */ DEBUG (D_MPDX_ACT, "mpdx killing process group", 0, 0, 0); kill (0, SIGINT); /* kill all processes in group */ } else { /* normal termination */ exit (0); } #endif }