/* res.c -- resource management */
#include "rts.h"
static void init_res (), re_init_res ();
static void destroy_one_res (); /* destroy a resource */
static void destroy_one_global (); /* destroy a global */
static Pool res_pool; /* pool of resource descriptors */
static Pool succ_pool; /* pool of succ_st nodes */
static Bool suiciding = FALSE; /* nonzero if destroy (myvm ()) in progress */
static Rinst destroy_list; /* linked list of resources to destroy */
/* struct succ_st keeps a linked list of globals and global imports */
struct succ_st {
short rpatid; /* index into mpd_rpatt and global_status */
struct succ_st *next;
};
struct global_st {
short rpatid; /* my index */
Bool created; /* has this global been create yet? */
Rinst res; /* resource descriptor for the global */
Sem glmutex; /* protection for this structure */
/* the following fields are used to help linearize the global import dag,
* as in Knuth vol1 (2ed) page 262. */
int count; /* number of globals that import me */
struct succ_st *top; /* list of globals I import directly */
struct global_st *qlink; /* list of globals imported by no others */
};
static struct global_st *global_status = NULL;
static void add_child ();
void
mpd_create_global (locn, rpatid)
char *locn;
short rpatid;
{
Proc pr;
Rinst res;
Rinst creator_res = CUR_RES;
Sem wait;
int creator_idx = CUR_RES->rpatid; /* NULL res can't create global*/
CUR_PROC->locn = locn; /* add locn to CUR_PROC structure */
if (suiciding)
mpd_abort ("attempting to create while VM is being destroyed");
/*
* Interlock the record for this global.
*/
P ((char *) 0, global_status[rpatid].glmutex);
/*
* If the global is already running, there's nothing more to do.
* We know that the global is fully initialized.
*/
TRACE ("CREATEG", locn, 0);
if (global_status[rpatid].created) {
DEBUG (D_RESOURCE, "global %s duplicate create ignored",
mpd_rpatt[rpatid].name, 0, 0);
V (global_status[rpatid].glmutex);
return;
}
/*
* Record the import of the global rpatid by creator_res.
*/
if (creator_res->is_global) {
global_status[rpatid].count++; /* one more imports global rpatid */
add_child (creator_idx, rpatid); /* add child id to creator's list */
}
/*
* Allocate a resource descriptor.
*/
res = (Rinst) mpd_addpool (res_pool);
res->rpatid = rpatid;
res->is_global = TRUE;
global_status[rpatid].res = res;
/*
* Spawn the body process.
*/
/* protect the spawn-activate region */
LOCK (res->rmutex, "mpd-create_global");
LOCK_QUEUE ("mpd_create_global");
pr = mpd_spawn (mpd_rpatt[rpatid].initial, CUR_PROC->priority,
res, TRUE, 0L, 0L, 0L, 0L);
pr->pname = "[global init]"; /* GC will reset */
DEBUG5 (D_RESOURCE, "global %s init %lX spawned as p%06lX, r%06lX",
mpd_rpatt[rpatid].name, mpd_rpatt[rpatid].initial, pr, res, 0);
pr->wait = wait = mpd_make_sem (0);
pr->ptype = INITIAL_PR;
/*
* Start body process and wait for it to complete or reply.
*/
DEBUG (D_RESOURCE, "starting global %s", mpd_rpatt[rpatid].name, 0, 0);
mpd_activate (pr);
UNLOCK_QUEUE ("mpd_create_global");
UNLOCK (res->rmutex, "mpd-create_global");
P ((char *) 0, wait);
mpd_kill_sem (wait);
/*
* Now, finally, the importer of the global can proceed.
*/
/* We must set this to TRUE only after the global is totally
* initialized. This is because, to avoid possible deadlock,
* we test if this global is already created before we try to
* grab this lock, and if it is already created then we just
* return. If we try to grab the lock we can hang with 1 JS
* and MultiMPD locks; the first caller has the lock and blocks
* until its initial code is done, the initial code blocs for
* I/O, and then another importer tries to grab the lock. Since
* we only have one JS, we're hung (actually, with n JSs and
* n+1 simultaneous global imports you hang).
*/
global_status[rpatid].created = TRUE;
V (global_status[rpatid].glmutex);
}
/*
* Create new instance of a resource (called from generated code).
*
* crbp has the resource parameters filled in. We fill in the rest
* of the crb fields with the parameters. The GC allocates the
* crbp block and the RTS frees it, while the RTS allocates an
* Rcap and the GC frees it (we return a pointer to this capability).
*/
Ptr
mpd_create_resource (locn, rpatid, vm, crbp, rcapsize)
char *locn;
int rpatid;
Vcap vm;
CRB *crbp;
int rcapsize;
{
if (locn)
CUR_PROC->locn = locn; /* remember caller's locn */
mpd_check_stk (CUR_STACK);
TRACE ("CREATER", locn, 0);
/* fill in the rest of the crbp from the parameters */
crbp->rpatid = rpatid;
crbp->vm = vm;
crbp->ph.size = crbp->crb_size;
crbp->rcp = (Rcap *) mpd_alc (-rcapsize, 1);
crbp->rc_size = rcapsize;
mpd_create_res (crbp);
return (Ptr) crbp->rcp;
}
/*
* Create new instance of a resource (called internally).
*
* crbp is assumed to point to an alloc'd block; the block header is altered.
*/
void
mpd_create_res (crbp)
CRB *crbp;
{
Rinst res;
Proc pr;
Sem wait;
Memh mp;
mpd_check_stk (CUR_STACK);
if (suiciding)
mpd_abort ("attempting to create while VM is being destroyed");
if (crbp->vm == NULL_VM)
mpd_abort ("attempting to create resource on null machine");
if (crbp->vm != mpd_my_vm && crbp->vm != NOOP_VM) {
Pach ph;
Ptr src, dest;
/* Send request to remote machine. */
crbp->ph.priority = CUR_PROC->priority;
ph = mpd_remote (crbp->vm, REQ_CREATE, &crbp->ph, crbp->ph.size);
/* Copy back new resource capability. */
if ((dest = (Ptr) crbp->rcp) != NULL) {
src = (Ptr) & ((struct rres_st *) ph)->rc;
while (crbp->rc_size--)
*dest++ = *src++;
}
mpd_free ((Ptr) ph);
return;
}
/*
* Allocate a new resource instance descriptor.
*/
res = (Rinst) mpd_addpool (res_pool);
res->rpatid = crbp->rpatid;
res->crb_addr = crbp;
res->rc_size = crbp->rc_size;
res->rcp = crbp->rcp;
/*
* Make newly created resource owner of CRB.
* We're assuming here that crbp is the address of an alloc'd block.
*/
mp = (Memh) ((Ptr) crbp - MEMH_SZ);
mp->res = res;
insert (mp, res->meml, rnext, rlast);
/*
* Invoke UC resource entry code as a separate process.
*/
/* protect the spawn-activate region */
LOCK (res->rmutex, "mpd_create_res");
LOCK_QUEUE ("mpd_create_res");
pr = mpd_spawn (mpd_rpatt[crbp->rpatid].initial, CUR_PROC->priority,
res, TRUE, (long) crbp, 0L, 0L, 0L);
pr->wait = wait = mpd_make_sem (0);
pr->ptype = INITIAL_PR;
/*
* Start new process; returns when initialization finishes.
*/
mpd_activate (pr);
UNLOCK_QUEUE ("mpd_create_res");
UNLOCK (res->rmutex, "mpd_create_res");
P ((char *) 0, wait);
mpd_kill_sem (wait);
}
/*
* Add a node for child_idx to creator_idx's top list, thus indicating
* the fact that global creator_idx imports global child_idx. This is
* done when we have global_status[creator_idx].glmutex.
*/
static void
add_child (creator_idx, child_idx)
short creator_idx, child_idx;
{
struct succ_st *succ = (struct succ_st *) mpd_addpool (succ_pool);
succ->rpatid = child_idx;
/* add to front of list -- order is not important */
succ->next = global_status[creator_idx].top;
global_status[creator_idx].top = succ;
}
/*
* Allocate memory for resource variables.
* Called by the body code immediately after entry.
*
* Initialize resource ID part of myresource capability.
*
* Remember the main resource so we can destroy it at termination time.
*/
Ptr
mpd_alloc_rv (size)
int size;
{
Rcap *rcp;
mpd_check_stk (CUR_STACK);
LOCK (CUR_RES->rmutex, "mpd_alloc_rv");
CUR_RES->rv_base = mpd_alc (size, 0);
rcp = (Rcap *) CUR_RES->rv_base;
rcp->vm = mpd_my_vm;
rcp->res = CUR_RES;
rcp->seqn = CUR_RES->seqn;
UNLOCK (CUR_RES->rmutex, "mpd_alloc_rv");
LOCK (mpd_main_res_mutex, "mpd_alloc_rv");
if (mpd_main_res.res == 0)
memcpy ((char *) &mpd_main_res, (char *) rcp, sizeof (mpd_main_res));
UNLOCK (mpd_main_res_mutex, "mpd_alloc_rv");
return CUR_RES->rv_base;
}
/*
* Called when resource's initial code has been executed.
* Copy myresource of new resource to capability.
* Remove initial process from list of processes for resource.
* Notify creator.
*/
void
mpd_finished_init ()
{
Ptr src, dest;
mpd_check_stk (CUR_STACK);
LOCK (CUR_RES->rmutex, "mpd_finished_init");
if (! (CUR_RES->status & INIT_REPLY)) {
if ((dest = (Ptr) CUR_RES->rcp) != NULL) {
src = CUR_RES->rv_base;
while (CUR_RES->rc_size--) *dest++ = *src++;
}
V (CUR_PROC->wait);
CUR_PROC->wait = NULL;
}
mpd_kill (CUR_PROC, CUR_RES);
UNLOCK (CUR_RES->rmutex, "mpd_finished_init");
}
/*
* Allocate and return a null or noop resource cap.
*
* size give the length of the rescap.
* pattern is an int array specifying layout of opcaps within the rescap:
* zero or more occurrences of -> ndim, [lb1, lb2, [...]]
* ndim=0 is a simple ocap
* ndim=n is an n-dimensional array; need headers etc.
* ndim=-1 terminates the list
* ocap is the initial value for each opcap in the rescap.
*/
Ptr
mpd_literal_rcap (size, pattern, o)
int size, *pattern;
Ocap *o;
{
Ptr p, q;
int ndim;
p = q = mpd_alc (size, 1);
if (o == &mpd_nu_ocap)
memcpy (p, (Ptr) &mpd_nu_rcap, sizeof (Rcap));
else
memcpy (p, (Ptr) &mpd_no_rcap, sizeof (Rcap));
p += MPDALIGN (sizeof (Rcap));
while ((ndim = *pattern++) >= 0) {
if (ndim == 0) {
* (Ocap *) p = *o;
p += MPDALIGN (sizeof (Ocap));
} else if (ndim > 3) {
mpd_abort ("can't handle 4-D (or more) ops in null/noop rescaps");
} else {
mpd_init_array ((char*) 0, (Array *) p, sizeof(Ocap), (Ptr) o, ndim,
pattern[0], pattern[1], pattern[2], pattern[3],
pattern[4], pattern[5]);
pattern += 2 * ndim;
p += DSIZE (p);
}
}
return q;
}
/*
* Destroy an instance of a resource.
*/
void
mpd_destroy (locn, rcp)
char *locn;
Rcap rcp;
{
Rinst res;
Proc pr;
Bool kill_me = FALSE;
Sem wait;
Sem wake_creator = NULL;
if (locn)
CUR_PROC->locn = locn; /* remember source location */
mpd_check_stk (CUR_STACK);
TRACE ("DESTROYR", locn, 0);
/*
* Check for null or noop resource capability.
*/
if (rcp.res == NULL) {
if (rcp.seqn == NOOP_SEQN)
return;
mpd_abort ("attempting to destroy null resource");
}
if (rcp.vm != mpd_my_vm) {
/* Send destroy request to the remote machine. */
struct rres_st dr;
Pach ph;
dr.rc.vm = rcp.vm;
dr.rc.seqn = rcp.seqn;
dr.rc.res = rcp.res;
dr.ph.priority = CUR_PROC->priority;
ph = mpd_remote (rcp.vm, REQ_DESTROY, (Pach) &dr, sizeof (dr));
/* this ph does not belong to any resource */
mpd_free ((Ptr) ph);
return;
}
res = rcp.res;
DEBUG (D_RESOURCE, "mpd_destroy %s %s (r%06lX) ",
res->is_global ? " global" : "resource",
mpd_rpatt[res->rpatid].name, res);
LOCK (res->rmutex, "mpd_destroy");
/*
* Check the sequence number. Also invalidate it
* so subsequent destroys of same resource fail.
*/
if (rcp.seqn != res->seqn) { /* check to be sure its valid */
char buf[100];
sprintf (buf,
"attempting to destroy resource (%s) that no longer exists",
mpd_rpatt[res->rpatid].name);
UNLOCK (res->rmutex, "mpd_destroy");
mpd_abort (buf);
}
res->seqn += 1; /* invalidate it */
/*
* Execute finalization code if there is any.
*/
if (mpd_rpatt [res->rpatid].final != NULL) {
DEBUG (D_RESOURCE, "%s r%06lX (%s) exec final",
(res->is_global ? "global " : "resource"), res,
mpd_rpatt[res->rpatid].name);
/* we need to hold queue mutex to protect the spawn-activate region */
LOCK_QUEUE ("mpd_destroy");
pr = mpd_spawn (mpd_rpatt[res->rpatid].final, CUR_PROC->priority,
res, TRUE, (long) res->crb_addr, (long) res->rv_base, 0L, 0L);
pr->pname = "[final]";
pr->wait = wait = mpd_make_sem (0);
pr->ptype = FINAL_PR;
mpd_activate (pr);
UNLOCK_QUEUE ("mpd_destroy");
UNLOCK (res->rmutex, "mpd_destroy");
P ((char *) 0, wait);
mpd_kill_sem (wait);
LOCK (res->rmutex, "mpd_destroy");
DEBUG (D_RESOURCE, "%s r%06lX (%s) final done",
(res->is_global ? "global " : "resource"), res,
mpd_rpatt[res->rpatid].name);
} else {
UNLOCK_QUEUE ("mpd_destroy");
}
mpd_kill_resops (res);
/*
* Kill all processes belonging to the resource,
* but defer killing myself and notifying creator
* if the resource has an initial process.
* Note that if mpd_kill finds a READY proc it tries to take it
* off of the ready queue, and if it doesn't find it there then
* it aborts. Thus, if a proc is going to belong to a resource
* in the RTS we need to protect the proc (its status, and
* where it is blocked) with the queue mutex like this:
*
* if (res != NULL) grab res->rmutex
* grab queue mutex
* pr = mpd_spawn (...res, TRUE, ...)
* fill in pr->fields
* mpd_activate (pr)
* give queue mutex
* if (res != NULL) give res->rmutex
*
* Note that the TRUE in the mpd_spawn call means that we already
* hold res->rmutex. Also, must grab rmutex before queue mutex, so...
*/
while (pr = res->procs) {
if (pr->ptype == INITIAL_PR && ! (res->status & INIT_REPLY)) {
RTS_WARN ("body process destroyed");
wake_creator = pr->wait;
}
if (pr == CUR_PROC) {
kill_me = TRUE;
/* pr is first on res list -- slide past it. */
pr = pr->procs;
}
if (pr == NULL)
break; /* no more processes except possibly self */
mpd_kill (pr, res);
/* mpd_kill removes the thread off of the res->procs list. */
}
/*
* Free memory associated with resource and
* return resource instance descriptor to free list.
*/
mpd_res_free (res);
res->status = FREE_SLOT;
UNLOCK (res->rmutex, "mpd_destroy");
/* we add the res to the free list after releasing the rmutex lock. */
mpd_delpool (res_pool, (Ptr) res);
if (wake_creator)
V (wake_creator);
/*
* Commit suicide if the destroyer is part of the
* dead resource.
*/
if (kill_me)
mpd_kill (CUR_PROC, (Rinst) NULL);
}
/*
* Destroy all the resource instances on this machine, *not* including
* any globals.
*/
void
mpd_dest_all ()
{
Rinst r;
Rcap rc;
if (suiciding)
return; /* somebody's already doing this */
suiciding = TRUE;
mpd_eachpool (res_pool, destroy_one_res); /* create hitlist */
while (destroy_list != NULL) {
r = destroy_list;
destroy_list = destroy_list->next;
rc.vm = mpd_my_vm;
rc.res = r;
rc.seqn = r->seqn;
mpd_destroy ((char *) 0, rc);
}
}
/*
* Put a resource (but not a global) on the destroy_list.
*/
static void
destroy_one_res (r)
Rinst r;
{
if (! (r->status & FREE_SLOT) && ! r->is_global) {
r->next = destroy_list;
destroy_list = r;
}
}
/*
* A resource's final code has completed. Notify destroyer.
*/
void
mpd_finished_final ()
{
mpd_check_stk (CUR_STACK);
LOCK (CUR_RES->rmutex, "mpd_finished_final");
if (! (CUR_RES->status & FINAL_REPLY)) {
V (CUR_PROC->wait);
CUR_PROC->wait = NULL;
}
mpd_kill (CUR_PROC, CUR_RES);
}
/*
* Initialize resource management part of MPD RTS.
*/
void
mpd_init_res ()
{
int i;
mpd_nu_rcap.vm = 0; /* create null resource capability */
mpd_nu_rcap.seqn = NULL_SEQN; /* indicate NULL */
mpd_nu_rcap.res = 0; /* rin[0] is never allocated */
mpd_no_rcap.vm = 0; /* create noop resource capability */
mpd_no_rcap.seqn = NOOP_SEQN; /* indicate NOOP */
mpd_no_rcap.res = 0; /* rin[0] is never allocated */
/*
* Initialize free list of resource instance descriptors.
*/
res_pool = mpd_makepool ("active resources", sizeof (struct rin_st),
mpd_max_resources, init_res, re_init_res);
INIT_LOCK (mpd_main_res_mutex, "main_res_mutex");
/* we really need a mpd_num_rpats or something from mpdl... */
global_status =
(struct global_st *) mpd_alloc (mpd_num_rpats*sizeof(struct global_st));
for (i = 0; i < mpd_num_rpats; i++) {
global_status[i].rpatid = i;
global_status[i].created = FALSE;
global_status[i].glmutex = mpd_make_sem (1);
global_status[i].count = 0;
global_status[i].top = NULL;
global_status[i].qlink = NULL;
}
succ_pool = mpd_makepool ("successor structs", sizeof (struct succ_st),
(int) MAX_INT, (Func) 0, (Func) 0);
}
/*
* initialize a resource descriptor for the first time
*/
static void
init_res (r)
Rinst r;
{
static unsigned short sn = INIT_SEQ_RES;
/* called under res_pool lock */
r->seqn = sn++;
r->status = FREE_SLOT;
INIT_LOCK (r->rmutex, "r->rmutex");
}
/*
* initialize a resource descriptor
*/
static void
re_init_res (r)
Rinst r;
{
r->status = 0;
r->is_global = FALSE;
RESET_LOCK (r->rmutex);
r->crb_addr = NULL;
r->rc_size = 0;
r->oper_list = NULL;
r->class_list = NULL;
r->procs = NULL;
r->meml = NULL;
r->rcp = NULL;
}
/*
* destroy the globals in this vm from the top of the hierarchy
* to the bottom. See Knuth, vol1 (2ed) p. 258-262. At runtime
* (global create time) we have constructed the information in
* figure 8 (in global_status). We now make a queue of globals
* that have no predecessors (globals that import them), and
* then just keep going until this queue is empty, following
* steps T6 and T7 of figure 9.
*/
void
mpd_destroy_globals ()
{
int i;
struct global_st *q = NULL;
struct succ_st *succ;
Rinst r;
Rcap rc;
/* construct the queue */
for (i = 0; i < mpd_num_rpats; i++)
if (global_status[i].created && global_status[i].count == 0) {
global_status[i].qlink = q;
q = &global_status[i];
}
/* now process guys from the queue one at a time (and in so doing
* creating more for the queue */
for (; q != NULL; q = q->qlink) {
r = global_status[q->rpatid].res;
rc.vm = mpd_my_vm;
rc.res = r;
rc.seqn = r->seqn;
mpd_destroy ((char *) 0, rc);
/*
* for each guy on q's top list, since we have destroyed
* q->rpatid we decrement this guy's count of predecessors.
* if that count is q, we add him to q.
*/
for (succ = q->top; succ != NULL; succ = succ->next) {
if (--global_status[succ->rpatid].count == 0) {
global_status[succ->rpatid].qlink = q->qlink;
q->qlink = &global_status[succ->rpatid];
}
}
}
}
syntax highlighted by Code2HTML, v. 0.9.1