/* oper.c -- runtime support of operations */ #include "rts.h" static Pool oper_pool; /* pool of operation descriptors */ static Pool class_pool; /* pool of class descriptors */ static void init_oper (), re_init_oper (), free_oper (), purge (); static void init_class (), re_init_class (); static void kill_inop (); /* make_op (t, f, v, opcp) is used directly for making a local op * and indirectly for making a resource op. * * note that f is a field name so this can't be trivially made a function. * * This must be "called" with CUR_RES->rmutex already possessed. */ #define make_op(t, f, v, opcp) { \ Oper op; \ \ (opcp)->vm = mpd_my_vm; \ op = (Oper) mpd_addpool (oper_pool); \ (opcp)->oper_entry = (Ptr) op; \ (opcp)->seqn = op->seqn; \ \ op->type = (t); \ op->f = (v); \ op->pending = 0; \ \ op->res = CUR_RES; \ op->next = CUR_RES->oper_list; \ CUR_RES->oper_list = op; \ } /* * Add new resource proc operation. Called during resource initialization. */ void mpd_make_proc (opcp, type, ept) Ocap *opcp; enum op_type type; Func ept; { mpd_check_stk (CUR_STACK); LOCK (CUR_RES->rmutex, "mpd_make_proc"); make_op (type, u.code, ept, opcp); UNLOCK (CUR_RES->rmutex, "mpd_make_proc"); } /* * mpd_new_op (locn, clap) -- make a single op for new(op(...)). */ /*ARGSUSED*/ Ocap mpd_new_op (locn, clap) char *locn; Class clap; { Ocap c; LOCK (CUR_RES->rmutex, "mpd_new_op"); LOCK (clap->clmutex, "mpd_new_op"); make_op (DYNAMIC_OP, u.clap, clap, &c); UNLOCK (CUR_RES->rmutex, "mpd_new_op"); UNLOCK (clap->clmutex, "mpd_new_op"); return c; } /* * mpd_make_inops (addr, clap, ndim, type) * * Make a set of new input operations. * Called during initialization, and also later for local ops and new([]ops). * Returns addr. */ Ptr mpd_make_inops (addr, clap, ndim, type) Ptr addr; Class clap; int ndim, type; { Array *a; Ocap *o; int n; mpd_check_stk (CUR_STACK); LOCK (CUR_RES->rmutex, "mpd_make_inops"); LOCK (clap->clmutex, "mpd_make_inops"); if (ndim == 0) { make_op ((enum op_type) type, u.clap, clap, (Ocap *) addr); clap->numops += 1; } else { a = (Array *) addr; o = (Ocap *) ADATA (a); n = mpd_acount (a); clap->numops += n; while (n--) { make_op ((enum op_type) type, u.clap, clap, o); o++; /* don't use o++ in macro call above! */ } } UNLOCK (CUR_RES->rmutex, "mpd_make_inops"); UNLOCK (clap->clmutex, "mpd_make_inops"); return addr; } /* * mpd_make_arraysem (locn, addr, ndim) * * Make an array of sem operations. * Called only during initialization. * (local ops and new([]ops) cannot be sems.) * Returns addr. * Code is similar to mpd_make_inops. */ Ptr mpd_make_arraysem (locn, addr, ndim) char *locn; Ptr addr; int ndim; { Array *a; Sem *s; int n; mpd_check_stk (CUR_STACK); LOCK (CUR_RES->rmutex, "mpd_make_arraysem"); if (ndim == 0) mpd_loc_abort (locn, "cannot make 0 dimension array of sems"); a = (Array *) addr; s = (Sem *) ADATA (a); n = mpd_acount (a); while (n--) { *s++ = (Sem) mpd_make_semop (locn); } UNLOCK (CUR_RES->rmutex, "mpd_make_arraysem"); return addr; } /* * mpd_dest_array (locn, addr) * * Destroys an array of operations and frees its memory. */ void mpd_dest_array (locn, addr) char *locn; Ptr addr; { Array *a = (Array *) addr; Ocap *o = (Ocap *) ADATA (a); int n = mpd_acount (a); while (--n) mpd_dest_op (locn, *o++); } /* * mpd_dest_op (locn, ocap) * * Destroys a single input op which may be remote. */ void mpd_dest_op (locn, o) char *locn; Ocap o; { struct ropc_st pkt; Pach ph; Oper op, rop; op = (Oper) o.oper_entry; if (op == NULL) { /* if not a real capability */ if (o.seqn == NOOP_SEQN) return; /* if noop, nothing to do */ else if (o.seqn == NULL_SEQN) mpd_loc_abort (locn, "null operation capability"); else mpd_loc_abort (locn, "invalid operation capability"); } if (o.vm != mpd_my_vm) { /* if remote */ if (o.vm <= 0 || o.vm > MAX_VM || !mpd_exec_up) mpd_loc_abort (locn, "invalid operation capability"); pkt.oc = o; ph = (Pach) &pkt; ph = mpd_remote (o.vm, REQ_DESTOP, ph, sizeof (pkt)); mpd_free ((Ptr) ph); return; } if (o.seqn != op->seqn) mpd_loc_abort (locn, "op no longer exists"); if (op->type != DYNAMIC_OP) mpd_loc_abort (locn, "not a dynamic op"); /* Kill the op, but leave the class; another may be created later */ LOCK (op->res->rmutex, "mpd_dest_op"); LOCK (op->u.clap->clmutex, "mpd_dest_op"); LOCK (op->omutex, "mpd_dest_op"); op->seqn++; /* invalidate entry */ if ((rop = op->res->oper_list) == op) op->res->oper_list = op->next; else { while (rop->next != op) rop = rop->next; rop->next = op->next; } UNLOCK (op->res->rmutex, "mpd_dest_op"); UNLOCK (op->u.clap->clmutex, "mpd_dest_op"); } /* * mpd_kill_inops (addr, ndim) * * Remove local operations from the operation table. Purge any pending * invocations from the queues. If a killed operation was the last of * its class, free the class as well. */ void mpd_kill_inops (addr, ndim) Ptr addr; int ndim; { Array *a; Ocap *o; Oper op; int n; mpd_check_stk (CUR_STACK); if (ndim == 0) o = (Ocap *) addr; else { a = (Array *) addr; o = (Ocap *) ADATA (a); } op = (Oper) o->oper_entry; LOCK (op->res->rmutex, "mpd_kill_inops"); LOCK (op->u.clap->clmutex, "mpd_kill_inops"); if (ndim == 0) kill_inop (addr); else { n = mpd_acount (a); while (--n) kill_inop ((Ptr) o++); } UNLOCK (op->res->rmutex, "mpd_kill_inops"); UNLOCK (op->u.clap->clmutex, "mpd_kill_inops"); } static void kill_inop (addr) /* kill one op */ Ptr addr; { Ocap *o; Oper op, rop; Class clap; o = (Ocap *) addr; op = (Oper) o->oper_entry; clap = op->u.clap; LOCK (op->omutex, "mpd_kill_inops"); op->seqn++; if ((rop = op->res->oper_list) == op) op->res->oper_list = op->next; else { while (rop->next != op) rop = rop->next; rop->next = op->next; } if (clap->old_in.head != NULL) purge ((Oper) o->oper_entry, &clap->old_in); if (clap->new_in.head != NULL) purge ((Oper) o->oper_entry, &clap->new_in); if (--clap->numops == 0) mpd_delpool (class_pool, (Ptr) clap); op->next = NULL; free_oper (op); UNLOCK (op->omutex, "mpd_kill_inops"); } /* * Kill all resource operations for the named resource. * Purge any pending input invocations. * This must be called only when we possess res->mutex. */ void mpd_kill_resops (res) Rinst res; { Oper op; Class clap; Class clapp; /* deallocate classes that were never used */ clap = res->class_list; while (clap != NULL) { clapp = clap->next; if (clap->numops == 0) { mpd_delpool (class_pool, (Ptr) clap); } clap = clapp; } for (op = res->oper_list; op; op = op->next) { if (op->type == INPUT_OP || op->type == DYNAMIC_OP) { clap = op->u.clap; LOCK (clap->clmutex, "mpd_kill_resops"); LOCK (op->omutex, "mpd_kill_resops"); op->seqn++; if (clap->old_in.head != NULL) purge (op, &clap->old_in); if (clap->new_in.head != NULL) purge (op, &clap->new_in); if (--clap->numops == 0) mpd_delpool (class_pool, (Ptr) clap); UNLOCK (clap->clmutex, "mpd_kill_resops"); UNLOCK (op->omutex, "mpd_kill_resops"); } else if (op->type == SEMA_OP) { LOCK (op->omutex, "mpd_kill_resops"); op->seqn++; mpd_kill_sem (op->u.sema); UNLOCK (op->omutex, "mpd_kill_resops"); } else { /* if (op->type == PROC_OP || op->type == PROC_SEP_OP) */ LOCK (op->omutex, "mpd_kill_resops"); op->seqn++; UNLOCK (op->omutex, "mpd_kill_resops"); } } if (res->oper_list != NULL) free_oper (res->oper_list); } /* * Remove all invocations of the specified operation from an invocation list. * Operations are represented only by index numbers since the machine and * sequence numbers have been checked when the invocation was done. */ static void purge (op, ilist) Oper op; Invq *ilist; { Invb ibp, ribp, last; last = NULL; ibp = (*ilist).head; while (ibp) { ribp = ibp; ibp = ibp->next; if ((Oper) ribp->opc.oper_entry == op) { if (last == NULL) (*ilist).head = ibp; else last->next = ibp; if (ibp != NULL) ibp->last = ribp->last; else (*ilist).tail = last; mpd_rej_inv (ribp); } else last = ribp; } } /* * Returns pointer to the next eligible invocation block for the GC to check in * processing an input statement. Process must have access to the operation * class. If no invocations are available, wait until more arrive. * We do not protect CUR_PROC since we're the only one who can have * a handle on it (at least to manipulate these fields). */ Ptr mpd_get_anyinv (locn, clap) char *locn; Class clap; { Invb ibp; CUR_PROC->locn = locn; /* add locn to CUR_PROC structure */ TRACE ("IN", locn, 0); mpd_check_stk (CUR_STACK); if (CUR_PROC->next_inv == NULL) { if (CUR_PROC->else_leg) return NULL; else mpd_reaccess (clap); } ibp = CUR_PROC->next_inv; CUR_PROC->next_inv = ibp->next; return (Ptr) ibp; } /* * Returns pointer to next eligible invocation of the specified operation. * If none are available, return NULL. */ Ptr mpd_chk_myinv (opc) Ocap opc; { Oper op; Invb ibp; mpd_check_stk (CUR_STACK); op = (Oper) opc.oper_entry; if (op == NULL) /* if noop capability, return NULL */ return NULL; while (ibp = CUR_PROC->next_inv) { CUR_PROC->next_inv = ibp->next; if (op == (Oper) ibp->opc.oper_entry) break; } return (Ptr) ibp; } /* * Receive an invocation. May be remote. */ Ptr mpd_receive (locn, opc, else_present) char *locn; Ocap opc; Bool else_present; { struct ropc_st pkt; Pach ph; Oper op; Class clap; Invb ibp; Sem sp; CUR_PROC->locn = locn; mpd_check_stk (CUR_STACK); TRACE ("IN", locn, 0); op = (Oper) opc.oper_entry; if (op == NULL) { /* if not a real capability */ if (opc.seqn == NOOP_SEQN) /* if noop */ if (else_present) return NULL; /* exec "else" arm */ else { sp = mpd_make_sem (0); P (locn, sp); /* else hang forever */ } else if (opc.seqn == NULL_SEQN) mpd_loc_abort (locn, "null operation capability"); else mpd_loc_abort (locn, "invalid operation capability"); } if (opc.vm != mpd_my_vm) { /* if remote */ if (opc.vm <= 0 || opc.vm > MAX_VM || !mpd_exec_up) mpd_loc_abort (locn, "invalid operation capability"); pkt.oc = opc; pkt.elseflag = else_present; ph = (Pach) &pkt; ph = mpd_remote (opc.vm, REQ_RECEIVE, ph, sizeof (pkt)); if (((Pach) ph)->size > sizeof (pkt)){ /* if bigger, got invocation */ return (Ptr) ph; } else { mpd_free ((Ptr) ph); /* no invocation */ return NULL; /* execute "else" arm */ } } /* The op is local. Be sure it's still valid */ op = (Oper) opc.oper_entry; clap = op->u.clap; if (opc.seqn != op->seqn) mpd_loc_abort (locn, "op no longer exists"); if (op->type != INPUT_OP && op->type != DYNAMIC_OP) mpd_loc_abort (locn, "cannot input from a proc operation"); /* Get and return the next invocation of the desired op. */ mpd_iaccess (clap, else_present); for (;;) { while (ibp = CUR_PROC->next_inv) { CUR_PROC->next_inv = ibp->next; if (opc.oper_entry == ibp->opc.oper_entry) { mpd_rm_iop (locn, (Ptr) clap, ibp); return (Ptr) ibp; } } if (else_present) { mpd_rm_iop (locn, (Ptr) clap, (Invb) 0); return NULL; } else mpd_reaccess (clap); } /*NOTREACHED*/ } /* * Create an operation to act as a semaphore (i.e., a non-exported, * parameterless, operation in its own class. This is an optimization. */ Ptr mpd_make_semop (locn) char *locn; { Oper op; Sem sp; mpd_check_stk (CUR_STACK); op = (Oper) mpd_addpool (oper_pool); LOCK (op->omutex, "mpd_make_semop"); op->type = SEMA_OP; op->u.sema = sp = mpd_make_sem (0); TRACE ("CREATES", locn, sp); op->next = CUR_RES->oper_list; UNLOCK (op->omutex, "mpd_make_semop"); CUR_RES->oper_list = op; /* no protection on CUR_RES needed */ return (Ptr) sp; } /* * Initialize RTS operation table. */ void mpd_init_oper () { oper_pool = mpd_makepool ("operations", sizeof (struct oper_st), mpd_max_operations, init_oper, re_init_oper); mpd_no_ocap.seqn = NOOP_SEQN; /* other fields zero */ mpd_nu_ocap.seqn = NULL_SEQN; /* other fields zero */ } static void init_oper (op) Oper op; { static unsigned short sn = INIT_SEQ_OP; /* called under oper_pool lock */ op->seqn = sn++; op->type = END_OP; INIT_LOCK (op->omutex, "op->omutex"); } /*ARGSUSED*/ /*(under MultiMPD they are)*/ static void re_init_oper (op) Oper op; { RESET_LOCK (op->omutex); } /* * Return a non-empty list of operation table entries to the free list. * The list that this is attached to must be protected a mutex * that the caller holds. */ static void free_oper (op) Oper op; { Oper opp; for (opp = op; opp->next != NULL; opp = opp->next) mpd_delpool (oper_pool, (Ptr) opp); } /* * Initialize the operation class table. */ void mpd_init_class () { class_pool = mpd_makepool ("operation classes", sizeof (struct class_st), mpd_max_classes, init_class, re_init_class); } static void init_class (c) struct class_st *c; { c->numops = 0; INIT_LOCK (c->clmutex, "class->clmutex"); } /*ARGSUSED*/ /*(under MultiMPD they are)*/ static void re_init_class (c) struct class_st *c; { RESET_LOCK (c->clmutex); } /* * Give the GC a new operation class. */ Ptr mpd_make_class () { Class clap; mpd_check_stk (CUR_STACK); clap = (Class) mpd_addpool (class_pool); clap->inuse = FALSE; clap->old_in.head = clap->new_in.head = NULL; clap->old_in.tail = clap->new_in.tail = NULL; clap->old_pr.head = clap->new_pr.head = NULL; clap->old_pr.tail = clap->new_pr.tail = NULL; clap->else_pr = NULL; clap->else_tailpr = NULL; clap->next = CUR_RES->class_list; CUR_RES->class_list = clap; return (Ptr) clap; } /* * Return number of pending invocations for an input operation. * * The count may change, but we assume that it changes atomically, * so we don't lock it. */ int mpd_query_iop (locn, o) char *locn; Ocap o; { int count; Oper op; Pach ph; union { struct ropc_st r; struct num_st n; } pkt; mpd_check_stk (CUR_STACK); op = (Oper) o.oper_entry; if (op == NULL) { /* if not a real capability */ if (o.seqn == NOOP_SEQN) return 0; /* count 0 for noop */ else if (o.seqn == NULL_SEQN) mpd_loc_abort (locn, "null operation capability"); else mpd_loc_abort (locn, "invalid operation capability"); } if (o.vm == mpd_my_vm) { /* if local */ if (op->type == INPUT_OP || op->type == DYNAMIC_OP) return op->pending; else return 0; /* procs are immediate, so 0 pending */ } /* must be remote. */ if (o.vm <= 0 || o.vm > MAX_VM || !mpd_exec_up) mpd_loc_abort (locn, "invalid operation capability"); pkt.r.oc = o; ph = (Pach) &pkt; ph = mpd_remote (o.vm, REQ_COUNT, ph, sizeof (pkt)); count = ((struct num_st *) ph)->num; mpd_free ((Ptr) ph); return count; }