/* res.c -- resource management */ #include "rts.h" static void init_res (), re_init_res (); static void destroy_one_res (); /* destroy a resource */ static void destroy_one_global (); /* destroy a global */ static Pool res_pool; /* pool of resource descriptors */ static Pool succ_pool; /* pool of succ_st nodes */ static Bool suiciding = FALSE; /* nonzero if destroy (myvm ()) in progress */ static Rinst destroy_list; /* linked list of resources to destroy */ /* struct succ_st keeps a linked list of globals and global imports */ struct succ_st { short rpatid; /* index into mpd_rpatt and global_status */ struct succ_st *next; }; struct global_st { short rpatid; /* my index */ Bool created; /* has this global been create yet? */ Rinst res; /* resource descriptor for the global */ Sem glmutex; /* protection for this structure */ /* the following fields are used to help linearize the global import dag, * as in Knuth vol1 (2ed) page 262. */ int count; /* number of globals that import me */ struct succ_st *top; /* list of globals I import directly */ struct global_st *qlink; /* list of globals imported by no others */ }; static struct global_st *global_status = NULL; static void add_child (); void mpd_create_global (locn, rpatid) char *locn; short rpatid; { Proc pr; Rinst res; Rinst creator_res = CUR_RES; Sem wait; int creator_idx = CUR_RES->rpatid; /* NULL res can't create global*/ CUR_PROC->locn = locn; /* add locn to CUR_PROC structure */ if (suiciding) mpd_abort ("attempting to create while VM is being destroyed"); /* * Interlock the record for this global. */ P ((char *) 0, global_status[rpatid].glmutex); /* * If the global is already running, there's nothing more to do. * We know that the global is fully initialized. */ TRACE ("CREATEG", locn, 0); if (global_status[rpatid].created) { DEBUG (D_RESOURCE, "global %s duplicate create ignored", mpd_rpatt[rpatid].name, 0, 0); V (global_status[rpatid].glmutex); return; } /* * Record the import of the global rpatid by creator_res. */ if (creator_res->is_global) { global_status[rpatid].count++; /* one more imports global rpatid */ add_child (creator_idx, rpatid); /* add child id to creator's list */ } /* * Allocate a resource descriptor. */ res = (Rinst) mpd_addpool (res_pool); res->rpatid = rpatid; res->is_global = TRUE; global_status[rpatid].res = res; /* * Spawn the body process. */ /* protect the spawn-activate region */ LOCK (res->rmutex, "mpd-create_global"); LOCK_QUEUE ("mpd_create_global"); pr = mpd_spawn (mpd_rpatt[rpatid].initial, CUR_PROC->priority, res, TRUE, 0L, 0L, 0L, 0L); pr->pname = "[global init]"; /* GC will reset */ DEBUG5 (D_RESOURCE, "global %s init %lX spawned as p%06lX, r%06lX", mpd_rpatt[rpatid].name, mpd_rpatt[rpatid].initial, pr, res, 0); pr->wait = wait = mpd_make_sem (0); pr->ptype = INITIAL_PR; /* * Start body process and wait for it to complete or reply. */ DEBUG (D_RESOURCE, "starting global %s", mpd_rpatt[rpatid].name, 0, 0); mpd_activate (pr); UNLOCK_QUEUE ("mpd_create_global"); UNLOCK (res->rmutex, "mpd-create_global"); P ((char *) 0, wait); mpd_kill_sem (wait); /* * Now, finally, the importer of the global can proceed. */ /* We must set this to TRUE only after the global is totally * initialized. This is because, to avoid possible deadlock, * we test if this global is already created before we try to * grab this lock, and if it is already created then we just * return. If we try to grab the lock we can hang with 1 JS * and MultiMPD locks; the first caller has the lock and blocks * until its initial code is done, the initial code blocs for * I/O, and then another importer tries to grab the lock. Since * we only have one JS, we're hung (actually, with n JSs and * n+1 simultaneous global imports you hang). */ global_status[rpatid].created = TRUE; V (global_status[rpatid].glmutex); } /* * Create new instance of a resource (called from generated code). * * crbp has the resource parameters filled in. We fill in the rest * of the crb fields with the parameters. The GC allocates the * crbp block and the RTS frees it, while the RTS allocates an * Rcap and the GC frees it (we return a pointer to this capability). */ Ptr mpd_create_resource (locn, rpatid, vm, crbp, rcapsize) char *locn; int rpatid; Vcap vm; CRB *crbp; int rcapsize; { if (locn) CUR_PROC->locn = locn; /* remember caller's locn */ mpd_check_stk (CUR_STACK); TRACE ("CREATER", locn, 0); /* fill in the rest of the crbp from the parameters */ crbp->rpatid = rpatid; crbp->vm = vm; crbp->ph.size = crbp->crb_size; crbp->rcp = (Rcap *) mpd_alc (-rcapsize, 1); crbp->rc_size = rcapsize; mpd_create_res (crbp); return (Ptr) crbp->rcp; } /* * Create new instance of a resource (called internally). * * crbp is assumed to point to an alloc'd block; the block header is altered. */ void mpd_create_res (crbp) CRB *crbp; { Rinst res; Proc pr; Sem wait; Memh mp; mpd_check_stk (CUR_STACK); if (suiciding) mpd_abort ("attempting to create while VM is being destroyed"); if (crbp->vm == NULL_VM) mpd_abort ("attempting to create resource on null machine"); if (crbp->vm != mpd_my_vm && crbp->vm != NOOP_VM) { Pach ph; Ptr src, dest; /* Send request to remote machine. */ crbp->ph.priority = CUR_PROC->priority; ph = mpd_remote (crbp->vm, REQ_CREATE, &crbp->ph, crbp->ph.size); /* Copy back new resource capability. */ if ((dest = (Ptr) crbp->rcp) != NULL) { src = (Ptr) & ((struct rres_st *) ph)->rc; while (crbp->rc_size--) *dest++ = *src++; } mpd_free ((Ptr) ph); return; } /* * Allocate a new resource instance descriptor. */ res = (Rinst) mpd_addpool (res_pool); res->rpatid = crbp->rpatid; res->crb_addr = crbp; res->rc_size = crbp->rc_size; res->rcp = crbp->rcp; /* * Make newly created resource owner of CRB. * We're assuming here that crbp is the address of an alloc'd block. */ mp = (Memh) ((Ptr) crbp - MEMH_SZ); mp->res = res; insert (mp, res->meml, rnext, rlast); /* * Invoke UC resource entry code as a separate process. */ /* protect the spawn-activate region */ LOCK (res->rmutex, "mpd_create_res"); LOCK_QUEUE ("mpd_create_res"); pr = mpd_spawn (mpd_rpatt[crbp->rpatid].initial, CUR_PROC->priority, res, TRUE, (long) crbp, 0L, 0L, 0L); pr->wait = wait = mpd_make_sem (0); pr->ptype = INITIAL_PR; /* * Start new process; returns when initialization finishes. */ mpd_activate (pr); UNLOCK_QUEUE ("mpd_create_res"); UNLOCK (res->rmutex, "mpd_create_res"); P ((char *) 0, wait); mpd_kill_sem (wait); } /* * Add a node for child_idx to creator_idx's top list, thus indicating * the fact that global creator_idx imports global child_idx. This is * done when we have global_status[creator_idx].glmutex. */ static void add_child (creator_idx, child_idx) short creator_idx, child_idx; { struct succ_st *succ = (struct succ_st *) mpd_addpool (succ_pool); succ->rpatid = child_idx; /* add to front of list -- order is not important */ succ->next = global_status[creator_idx].top; global_status[creator_idx].top = succ; } /* * Allocate memory for resource variables. * Called by the body code immediately after entry. * * Initialize resource ID part of myresource capability. * * Remember the main resource so we can destroy it at termination time. */ Ptr mpd_alloc_rv (size) int size; { Rcap *rcp; mpd_check_stk (CUR_STACK); LOCK (CUR_RES->rmutex, "mpd_alloc_rv"); CUR_RES->rv_base = mpd_alc (size, 0); rcp = (Rcap *) CUR_RES->rv_base; rcp->vm = mpd_my_vm; rcp->res = CUR_RES; rcp->seqn = CUR_RES->seqn; UNLOCK (CUR_RES->rmutex, "mpd_alloc_rv"); LOCK (mpd_main_res_mutex, "mpd_alloc_rv"); if (mpd_main_res.res == 0) memcpy ((char *) &mpd_main_res, (char *) rcp, sizeof (mpd_main_res)); UNLOCK (mpd_main_res_mutex, "mpd_alloc_rv"); return CUR_RES->rv_base; } /* * Called when resource's initial code has been executed. * Copy myresource of new resource to capability. * Remove initial process from list of processes for resource. * Notify creator. */ void mpd_finished_init () { Ptr src, dest; mpd_check_stk (CUR_STACK); LOCK (CUR_RES->rmutex, "mpd_finished_init"); if (! (CUR_RES->status & INIT_REPLY)) { if ((dest = (Ptr) CUR_RES->rcp) != NULL) { src = CUR_RES->rv_base; while (CUR_RES->rc_size--) *dest++ = *src++; } V (CUR_PROC->wait); CUR_PROC->wait = NULL; } mpd_kill (CUR_PROC, CUR_RES); UNLOCK (CUR_RES->rmutex, "mpd_finished_init"); } /* * Allocate and return a null or noop resource cap. * * size give the length of the rescap. * pattern is an int array specifying layout of opcaps within the rescap: * zero or more occurrences of -> ndim, [lb1, lb2, [...]] * ndim=0 is a simple ocap * ndim=n is an n-dimensional array; need headers etc. * ndim=-1 terminates the list * ocap is the initial value for each opcap in the rescap. */ Ptr mpd_literal_rcap (size, pattern, o) int size, *pattern; Ocap *o; { Ptr p, q; int ndim; p = q = mpd_alc (size, 1); if (o == &mpd_nu_ocap) memcpy (p, (Ptr) &mpd_nu_rcap, sizeof (Rcap)); else memcpy (p, (Ptr) &mpd_no_rcap, sizeof (Rcap)); p += MPDALIGN (sizeof (Rcap)); while ((ndim = *pattern++) >= 0) { if (ndim == 0) { * (Ocap *) p = *o; p += MPDALIGN (sizeof (Ocap)); } else if (ndim > 3) { mpd_abort ("can't handle 4-D (or more) ops in null/noop rescaps"); } else { mpd_init_array ((char*) 0, (Array *) p, sizeof(Ocap), (Ptr) o, ndim, pattern[0], pattern[1], pattern[2], pattern[3], pattern[4], pattern[5]); pattern += 2 * ndim; p += DSIZE (p); } } return q; } /* * Destroy an instance of a resource. */ void mpd_destroy (locn, rcp) char *locn; Rcap rcp; { Rinst res; Proc pr; Bool kill_me = FALSE; Sem wait; Sem wake_creator = NULL; if (locn) CUR_PROC->locn = locn; /* remember source location */ mpd_check_stk (CUR_STACK); TRACE ("DESTROYR", locn, 0); /* * Check for null or noop resource capability. */ if (rcp.res == NULL) { if (rcp.seqn == NOOP_SEQN) return; mpd_abort ("attempting to destroy null resource"); } if (rcp.vm != mpd_my_vm) { /* Send destroy request to the remote machine. */ struct rres_st dr; Pach ph; dr.rc.vm = rcp.vm; dr.rc.seqn = rcp.seqn; dr.rc.res = rcp.res; dr.ph.priority = CUR_PROC->priority; ph = mpd_remote (rcp.vm, REQ_DESTROY, (Pach) &dr, sizeof (dr)); /* this ph does not belong to any resource */ mpd_free ((Ptr) ph); return; } res = rcp.res; DEBUG (D_RESOURCE, "mpd_destroy %s %s (r%06lX) ", res->is_global ? " global" : "resource", mpd_rpatt[res->rpatid].name, res); LOCK (res->rmutex, "mpd_destroy"); /* * Check the sequence number. Also invalidate it * so subsequent destroys of same resource fail. */ if (rcp.seqn != res->seqn) { /* check to be sure its valid */ char buf[100]; sprintf (buf, "attempting to destroy resource (%s) that no longer exists", mpd_rpatt[res->rpatid].name); UNLOCK (res->rmutex, "mpd_destroy"); mpd_abort (buf); } res->seqn += 1; /* invalidate it */ /* * Execute finalization code if there is any. */ if (mpd_rpatt [res->rpatid].final != NULL) { DEBUG (D_RESOURCE, "%s r%06lX (%s) exec final", (res->is_global ? "global " : "resource"), res, mpd_rpatt[res->rpatid].name); /* we need to hold queue mutex to protect the spawn-activate region */ LOCK_QUEUE ("mpd_destroy"); pr = mpd_spawn (mpd_rpatt[res->rpatid].final, CUR_PROC->priority, res, TRUE, (long) res->crb_addr, (long) res->rv_base, 0L, 0L); pr->pname = "[final]"; pr->wait = wait = mpd_make_sem (0); pr->ptype = FINAL_PR; mpd_activate (pr); UNLOCK_QUEUE ("mpd_destroy"); UNLOCK (res->rmutex, "mpd_destroy"); P ((char *) 0, wait); mpd_kill_sem (wait); LOCK (res->rmutex, "mpd_destroy"); DEBUG (D_RESOURCE, "%s r%06lX (%s) final done", (res->is_global ? "global " : "resource"), res, mpd_rpatt[res->rpatid].name); } else { UNLOCK_QUEUE ("mpd_destroy"); } mpd_kill_resops (res); /* * Kill all processes belonging to the resource, * but defer killing myself and notifying creator * if the resource has an initial process. * Note that if mpd_kill finds a READY proc it tries to take it * off of the ready queue, and if it doesn't find it there then * it aborts. Thus, if a proc is going to belong to a resource * in the RTS we need to protect the proc (its status, and * where it is blocked) with the queue mutex like this: * * if (res != NULL) grab res->rmutex * grab queue mutex * pr = mpd_spawn (...res, TRUE, ...) * fill in pr->fields * mpd_activate (pr) * give queue mutex * if (res != NULL) give res->rmutex * * Note that the TRUE in the mpd_spawn call means that we already * hold res->rmutex. Also, must grab rmutex before queue mutex, so... */ while (pr = res->procs) { if (pr->ptype == INITIAL_PR && ! (res->status & INIT_REPLY)) { RTS_WARN ("body process destroyed"); wake_creator = pr->wait; } if (pr == CUR_PROC) { kill_me = TRUE; /* pr is first on res list -- slide past it. */ pr = pr->procs; } if (pr == NULL) break; /* no more processes except possibly self */ mpd_kill (pr, res); /* mpd_kill removes the thread off of the res->procs list. */ } /* * Free memory associated with resource and * return resource instance descriptor to free list. */ mpd_res_free (res); res->status = FREE_SLOT; UNLOCK (res->rmutex, "mpd_destroy"); /* we add the res to the free list after releasing the rmutex lock. */ mpd_delpool (res_pool, (Ptr) res); if (wake_creator) V (wake_creator); /* * Commit suicide if the destroyer is part of the * dead resource. */ if (kill_me) mpd_kill (CUR_PROC, (Rinst) NULL); } /* * Destroy all the resource instances on this machine, *not* including * any globals. */ void mpd_dest_all () { Rinst r; Rcap rc; if (suiciding) return; /* somebody's already doing this */ suiciding = TRUE; mpd_eachpool (res_pool, destroy_one_res); /* create hitlist */ while (destroy_list != NULL) { r = destroy_list; destroy_list = destroy_list->next; rc.vm = mpd_my_vm; rc.res = r; rc.seqn = r->seqn; mpd_destroy ((char *) 0, rc); } } /* * Put a resource (but not a global) on the destroy_list. */ static void destroy_one_res (r) Rinst r; { if (! (r->status & FREE_SLOT) && ! r->is_global) { r->next = destroy_list; destroy_list = r; } } /* * A resource's final code has completed. Notify destroyer. */ void mpd_finished_final () { mpd_check_stk (CUR_STACK); LOCK (CUR_RES->rmutex, "mpd_finished_final"); if (! (CUR_RES->status & FINAL_REPLY)) { V (CUR_PROC->wait); CUR_PROC->wait = NULL; } mpd_kill (CUR_PROC, CUR_RES); } /* * Initialize resource management part of MPD RTS. */ void mpd_init_res () { int i; mpd_nu_rcap.vm = 0; /* create null resource capability */ mpd_nu_rcap.seqn = NULL_SEQN; /* indicate NULL */ mpd_nu_rcap.res = 0; /* rin[0] is never allocated */ mpd_no_rcap.vm = 0; /* create noop resource capability */ mpd_no_rcap.seqn = NOOP_SEQN; /* indicate NOOP */ mpd_no_rcap.res = 0; /* rin[0] is never allocated */ /* * Initialize free list of resource instance descriptors. */ res_pool = mpd_makepool ("active resources", sizeof (struct rin_st), mpd_max_resources, init_res, re_init_res); INIT_LOCK (mpd_main_res_mutex, "main_res_mutex"); /* we really need a mpd_num_rpats or something from mpdl... */ global_status = (struct global_st *) mpd_alloc (mpd_num_rpats*sizeof(struct global_st)); for (i = 0; i < mpd_num_rpats; i++) { global_status[i].rpatid = i; global_status[i].created = FALSE; global_status[i].glmutex = mpd_make_sem (1); global_status[i].count = 0; global_status[i].top = NULL; global_status[i].qlink = NULL; } succ_pool = mpd_makepool ("successor structs", sizeof (struct succ_st), (int) MAX_INT, (Func) 0, (Func) 0); } /* * initialize a resource descriptor for the first time */ static void init_res (r) Rinst r; { static unsigned short sn = INIT_SEQ_RES; /* called under res_pool lock */ r->seqn = sn++; r->status = FREE_SLOT; INIT_LOCK (r->rmutex, "r->rmutex"); } /* * initialize a resource descriptor */ static void re_init_res (r) Rinst r; { r->status = 0; r->is_global = FALSE; RESET_LOCK (r->rmutex); r->crb_addr = NULL; r->rc_size = 0; r->oper_list = NULL; r->class_list = NULL; r->procs = NULL; r->meml = NULL; r->rcp = NULL; } /* * destroy the globals in this vm from the top of the hierarchy * to the bottom. See Knuth, vol1 (2ed) p. 258-262. At runtime * (global create time) we have constructed the information in * figure 8 (in global_status). We now make a queue of globals * that have no predecessors (globals that import them), and * then just keep going until this queue is empty, following * steps T6 and T7 of figure 9. */ void mpd_destroy_globals () { int i; struct global_st *q = NULL; struct succ_st *succ; Rinst r; Rcap rc; /* construct the queue */ for (i = 0; i < mpd_num_rpats; i++) if (global_status[i].created && global_status[i].count == 0) { global_status[i].qlink = q; q = &global_status[i]; } /* now process guys from the queue one at a time (and in so doing * creating more for the queue */ for (; q != NULL; q = q->qlink) { r = global_status[q->rpatid].res; rc.vm = mpd_my_vm; rc.res = r; rc.seqn = r->seqn; mpd_destroy ((char *) 0, rc); /* * for each guy on q's top list, since we have destroyed * q->rpatid we decrement this guy's count of predecessors. * if that count is q, we add him to q. */ for (succ = q->top; succ != NULL; succ = succ->next) { if (--global_status[succ->rpatid].count == 0) { global_status[succ->rpatid].qlink = q->qlink; q->qlink = &global_status[succ->rpatid]; } } } }