/* oper.c -- runtime support of operations */
#include "rts.h"
static Pool oper_pool; /* pool of operation descriptors */
static Pool class_pool; /* pool of class descriptors */
static void init_oper (), re_init_oper (), free_oper (), purge ();
static void init_class (), re_init_class ();
static void kill_inop ();
/* make_op (t, f, v, opcp) is used directly for making a local op
* and indirectly for making a resource op.
*
* note that f is a field name so this can't be trivially made a function.
*
* This must be "called" with CUR_RES->rmutex already possessed.
*/
#define make_op(t, f, v, opcp) { \
Oper op; \
\
(opcp)->vm = mpd_my_vm; \
op = (Oper) mpd_addpool (oper_pool); \
(opcp)->oper_entry = (Ptr) op; \
(opcp)->seqn = op->seqn; \
\
op->type = (t); \
op->f = (v); \
op->pending = 0; \
\
op->res = CUR_RES; \
op->next = CUR_RES->oper_list; \
CUR_RES->oper_list = op; \
}
/*
* Add new resource proc operation. Called during resource initialization.
*/
void
mpd_make_proc (opcp, type, ept)
Ocap *opcp;
enum op_type type;
Func ept;
{
mpd_check_stk (CUR_STACK);
LOCK (CUR_RES->rmutex, "mpd_make_proc");
make_op (type, u.code, ept, opcp);
UNLOCK (CUR_RES->rmutex, "mpd_make_proc");
}
/*
* mpd_new_op (locn, clap) -- make a single op for new(op(...)).
*/
/*ARGSUSED*/
Ocap
mpd_new_op (locn, clap)
char *locn;
Class clap;
{
Ocap c;
LOCK (CUR_RES->rmutex, "mpd_new_op");
LOCK (clap->clmutex, "mpd_new_op");
make_op (DYNAMIC_OP, u.clap, clap, &c);
UNLOCK (CUR_RES->rmutex, "mpd_new_op");
UNLOCK (clap->clmutex, "mpd_new_op");
return c;
}
/*
* mpd_make_inops (addr, clap, ndim, type)
*
* Make a set of new input operations.
* Called during initialization, and also later for local ops and new([]ops).
* Returns addr.
*/
Ptr
mpd_make_inops (addr, clap, ndim, type)
Ptr addr;
Class clap;
int ndim, type;
{
Array *a;
Ocap *o;
int n;
mpd_check_stk (CUR_STACK);
LOCK (CUR_RES->rmutex, "mpd_make_inops");
LOCK (clap->clmutex, "mpd_make_inops");
if (ndim == 0) {
make_op ((enum op_type) type, u.clap, clap, (Ocap *) addr);
clap->numops += 1;
} else {
a = (Array *) addr;
o = (Ocap *) ADATA (a);
n = mpd_acount (a);
clap->numops += n;
while (n--) {
make_op ((enum op_type) type, u.clap, clap, o);
o++; /* don't use o++ in macro call above! */
}
}
UNLOCK (CUR_RES->rmutex, "mpd_make_inops");
UNLOCK (clap->clmutex, "mpd_make_inops");
return addr;
}
/*
* mpd_make_arraysem (locn, addr, ndim)
*
* Make an array of sem operations.
* Called only during initialization.
* (local ops and new([]ops) cannot be sems.)
* Returns addr.
* Code is similar to mpd_make_inops.
*/
Ptr
mpd_make_arraysem (locn, addr, ndim)
char *locn;
Ptr addr;
int ndim;
{
Array *a;
Sem *s;
int n;
mpd_check_stk (CUR_STACK);
LOCK (CUR_RES->rmutex, "mpd_make_arraysem");
if (ndim == 0)
mpd_loc_abort (locn, "cannot make 0 dimension array of sems");
a = (Array *) addr;
s = (Sem *) ADATA (a);
n = mpd_acount (a);
while (n--) {
*s++ = (Sem) mpd_make_semop (locn);
}
UNLOCK (CUR_RES->rmutex, "mpd_make_arraysem");
return addr;
}
/*
* mpd_dest_array (locn, addr)
*
* Destroys an array of operations and frees its memory.
*/
void
mpd_dest_array (locn, addr)
char *locn;
Ptr addr;
{
Array *a = (Array *) addr;
Ocap *o = (Ocap *) ADATA (a);
int n = mpd_acount (a);
while (--n)
mpd_dest_op (locn, *o++);
}
/*
* mpd_dest_op (locn, ocap)
*
* Destroys a single input op which may be remote.
*/
void
mpd_dest_op (locn, o)
char *locn;
Ocap o;
{
struct ropc_st pkt;
Pach ph;
Oper op, rop;
op = (Oper) o.oper_entry;
if (op == NULL) { /* if not a real capability */
if (o.seqn == NOOP_SEQN)
return; /* if noop, nothing to do */
else if (o.seqn == NULL_SEQN)
mpd_loc_abort (locn, "null operation capability");
else
mpd_loc_abort (locn, "invalid operation capability");
}
if (o.vm != mpd_my_vm) { /* if remote */
if (o.vm <= 0 || o.vm > MAX_VM || !mpd_exec_up)
mpd_loc_abort (locn, "invalid operation capability");
pkt.oc = o;
ph = (Pach) &pkt;
ph = mpd_remote (o.vm, REQ_DESTOP, ph, sizeof (pkt));
mpd_free ((Ptr) ph);
return;
}
if (o.seqn != op->seqn)
mpd_loc_abort (locn, "op no longer exists");
if (op->type != DYNAMIC_OP)
mpd_loc_abort (locn, "not a dynamic op");
/* Kill the op, but leave the class; another may be created later */
LOCK (op->res->rmutex, "mpd_dest_op");
LOCK (op->u.clap->clmutex, "mpd_dest_op");
LOCK (op->omutex, "mpd_dest_op");
op->seqn++; /* invalidate entry */
if ((rop = op->res->oper_list) == op)
op->res->oper_list = op->next;
else {
while (rop->next != op) rop = rop->next;
rop->next = op->next;
}
UNLOCK (op->res->rmutex, "mpd_dest_op");
UNLOCK (op->u.clap->clmutex, "mpd_dest_op");
}
/*
* mpd_kill_inops (addr, ndim)
*
* Remove local operations from the operation table. Purge any pending
* invocations from the queues. If a killed operation was the last of
* its class, free the class as well.
*/
void
mpd_kill_inops (addr, ndim)
Ptr addr;
int ndim;
{
Array *a;
Ocap *o;
Oper op;
int n;
mpd_check_stk (CUR_STACK);
if (ndim == 0)
o = (Ocap *) addr;
else {
a = (Array *) addr;
o = (Ocap *) ADATA (a);
}
op = (Oper) o->oper_entry;
LOCK (op->res->rmutex, "mpd_kill_inops");
LOCK (op->u.clap->clmutex, "mpd_kill_inops");
if (ndim == 0)
kill_inop (addr);
else {
n = mpd_acount (a);
while (--n)
kill_inop ((Ptr) o++);
}
UNLOCK (op->res->rmutex, "mpd_kill_inops");
UNLOCK (op->u.clap->clmutex, "mpd_kill_inops");
}
static void
kill_inop (addr) /* kill one op */
Ptr addr;
{
Ocap *o;
Oper op, rop;
Class clap;
o = (Ocap *) addr;
op = (Oper) o->oper_entry;
clap = op->u.clap;
LOCK (op->omutex, "mpd_kill_inops");
op->seqn++;
if ((rop = op->res->oper_list) == op)
op->res->oper_list = op->next;
else {
while (rop->next != op) rop = rop->next;
rop->next = op->next;
}
if (clap->old_in.head != NULL)
purge ((Oper) o->oper_entry, &clap->old_in);
if (clap->new_in.head != NULL)
purge ((Oper) o->oper_entry, &clap->new_in);
if (--clap->numops == 0)
mpd_delpool (class_pool, (Ptr) clap);
op->next = NULL;
free_oper (op);
UNLOCK (op->omutex, "mpd_kill_inops");
}
/*
* Kill all resource operations for the named resource.
* Purge any pending input invocations.
* This must be called only when we possess res->mutex.
*/
void
mpd_kill_resops (res)
Rinst res;
{
Oper op;
Class clap;
Class clapp;
/* deallocate classes that were never used */
clap = res->class_list;
while (clap != NULL) {
clapp = clap->next;
if (clap->numops == 0) {
mpd_delpool (class_pool, (Ptr) clap);
}
clap = clapp;
}
for (op = res->oper_list; op; op = op->next) {
if (op->type == INPUT_OP || op->type == DYNAMIC_OP) {
clap = op->u.clap;
LOCK (clap->clmutex, "mpd_kill_resops");
LOCK (op->omutex, "mpd_kill_resops");
op->seqn++;
if (clap->old_in.head != NULL)
purge (op, &clap->old_in);
if (clap->new_in.head != NULL)
purge (op, &clap->new_in);
if (--clap->numops == 0)
mpd_delpool (class_pool, (Ptr) clap);
UNLOCK (clap->clmutex, "mpd_kill_resops");
UNLOCK (op->omutex, "mpd_kill_resops");
} else if (op->type == SEMA_OP) {
LOCK (op->omutex, "mpd_kill_resops");
op->seqn++;
mpd_kill_sem (op->u.sema);
UNLOCK (op->omutex, "mpd_kill_resops");
} else { /* if (op->type == PROC_OP || op->type == PROC_SEP_OP) */
LOCK (op->omutex, "mpd_kill_resops");
op->seqn++;
UNLOCK (op->omutex, "mpd_kill_resops");
}
}
if (res->oper_list != NULL)
free_oper (res->oper_list);
}
/*
* Remove all invocations of the specified operation from an invocation list.
* Operations are represented only by index numbers since the machine and
* sequence numbers have been checked when the invocation was done.
*/
static void
purge (op, ilist)
Oper op;
Invq *ilist;
{
Invb ibp, ribp, last;
last = NULL;
ibp = (*ilist).head;
while (ibp) {
ribp = ibp;
ibp = ibp->next;
if ((Oper) ribp->opc.oper_entry == op) {
if (last == NULL)
(*ilist).head = ibp;
else
last->next = ibp;
if (ibp != NULL)
ibp->last = ribp->last;
else
(*ilist).tail = last;
mpd_rej_inv (ribp);
}
else
last = ribp;
}
}
/*
* Returns pointer to the next eligible invocation block for the GC to check in
* processing an input statement. Process must have access to the operation
* class. If no invocations are available, wait until more arrive.
* We do not protect CUR_PROC since we're the only one who can have
* a handle on it (at least to manipulate these fields).
*/
Ptr
mpd_get_anyinv (locn, clap)
char *locn;
Class clap;
{
Invb ibp;
CUR_PROC->locn = locn; /* add locn to CUR_PROC structure */
TRACE ("IN", locn, 0);
mpd_check_stk (CUR_STACK);
if (CUR_PROC->next_inv == NULL) {
if (CUR_PROC->else_leg)
return NULL;
else
mpd_reaccess (clap);
}
ibp = CUR_PROC->next_inv;
CUR_PROC->next_inv = ibp->next;
return (Ptr) ibp;
}
/*
* Returns pointer to next eligible invocation of the specified operation.
* If none are available, return NULL.
*/
Ptr
mpd_chk_myinv (opc)
Ocap opc;
{
Oper op;
Invb ibp;
mpd_check_stk (CUR_STACK);
op = (Oper) opc.oper_entry;
if (op == NULL) /* if noop capability, return NULL */
return NULL;
while (ibp = CUR_PROC->next_inv) {
CUR_PROC->next_inv = ibp->next;
if (op == (Oper) ibp->opc.oper_entry)
break;
}
return (Ptr) ibp;
}
/*
* Receive an invocation. May be remote.
*/
Ptr
mpd_receive (locn, opc, else_present)
char *locn;
Ocap opc;
Bool else_present;
{
struct ropc_st pkt;
Pach ph;
Oper op;
Class clap;
Invb ibp;
Sem sp;
CUR_PROC->locn = locn;
mpd_check_stk (CUR_STACK);
TRACE ("IN", locn, 0);
op = (Oper) opc.oper_entry;
if (op == NULL) { /* if not a real capability */
if (opc.seqn == NOOP_SEQN) /* if noop */
if (else_present)
return NULL; /* exec "else" arm */
else {
sp = mpd_make_sem (0);
P (locn, sp); /* else hang forever */
}
else if (opc.seqn == NULL_SEQN)
mpd_loc_abort (locn, "null operation capability");
else
mpd_loc_abort (locn, "invalid operation capability");
}
if (opc.vm != mpd_my_vm) { /* if remote */
if (opc.vm <= 0 || opc.vm > MAX_VM || !mpd_exec_up)
mpd_loc_abort (locn, "invalid operation capability");
pkt.oc = opc;
pkt.elseflag = else_present;
ph = (Pach) &pkt;
ph = mpd_remote (opc.vm, REQ_RECEIVE, ph, sizeof (pkt));
if (((Pach) ph)->size > sizeof (pkt)){ /* if bigger, got invocation */
return (Ptr) ph;
} else {
mpd_free ((Ptr) ph); /* no invocation */
return NULL; /* execute "else" arm */
}
}
/* The op is local. Be sure it's still valid */
op = (Oper) opc.oper_entry;
clap = op->u.clap;
if (opc.seqn != op->seqn)
mpd_loc_abort (locn, "op no longer exists");
if (op->type != INPUT_OP && op->type != DYNAMIC_OP)
mpd_loc_abort (locn, "cannot input from a proc operation");
/* Get and return the next invocation of the desired op. */
mpd_iaccess (clap, else_present);
for (;;) {
while (ibp = CUR_PROC->next_inv) {
CUR_PROC->next_inv = ibp->next;
if (opc.oper_entry == ibp->opc.oper_entry) {
mpd_rm_iop (locn, (Ptr) clap, ibp);
return (Ptr) ibp;
}
}
if (else_present) {
mpd_rm_iop (locn, (Ptr) clap, (Invb) 0);
return NULL;
} else
mpd_reaccess (clap);
}
/*NOTREACHED*/
}
/*
* Create an operation to act as a semaphore (i.e., a non-exported,
* parameterless, operation in its own class. This is an optimization.
*/
Ptr
mpd_make_semop (locn)
char *locn;
{
Oper op;
Sem sp;
mpd_check_stk (CUR_STACK);
op = (Oper) mpd_addpool (oper_pool);
LOCK (op->omutex, "mpd_make_semop");
op->type = SEMA_OP;
op->u.sema = sp = mpd_make_sem (0);
TRACE ("CREATES", locn, sp);
op->next = CUR_RES->oper_list;
UNLOCK (op->omutex, "mpd_make_semop");
CUR_RES->oper_list = op; /* no protection on CUR_RES needed */
return (Ptr) sp;
}
/*
* Initialize RTS operation table.
*/
void
mpd_init_oper ()
{
oper_pool = mpd_makepool ("operations", sizeof (struct oper_st),
mpd_max_operations, init_oper, re_init_oper);
mpd_no_ocap.seqn = NOOP_SEQN; /* other fields zero */
mpd_nu_ocap.seqn = NULL_SEQN; /* other fields zero */
}
static void
init_oper (op)
Oper op;
{
static unsigned short sn = INIT_SEQ_OP;
/* called under oper_pool lock */
op->seqn = sn++;
op->type = END_OP;
INIT_LOCK (op->omutex, "op->omutex");
}
/*ARGSUSED*/ /*(under MultiMPD they are)*/
static void
re_init_oper (op)
Oper op;
{
RESET_LOCK (op->omutex);
}
/*
* Return a non-empty list of operation table entries to the free list.
* The list that this is attached to must be protected a mutex
* that the caller holds.
*/
static void
free_oper (op)
Oper op;
{
Oper opp;
for (opp = op; opp->next != NULL; opp = opp->next)
mpd_delpool (oper_pool, (Ptr) opp);
}
/*
* Initialize the operation class table.
*/
void
mpd_init_class ()
{
class_pool = mpd_makepool ("operation classes", sizeof (struct class_st),
mpd_max_classes, init_class, re_init_class);
}
static void
init_class (c)
struct class_st *c;
{
c->numops = 0;
INIT_LOCK (c->clmutex, "class->clmutex");
}
/*ARGSUSED*/ /*(under MultiMPD they are)*/
static void
re_init_class (c)
struct class_st *c;
{
RESET_LOCK (c->clmutex);
}
/*
* Give the GC a new operation class.
*/
Ptr
mpd_make_class ()
{
Class clap;
mpd_check_stk (CUR_STACK);
clap = (Class) mpd_addpool (class_pool);
clap->inuse = FALSE;
clap->old_in.head = clap->new_in.head = NULL;
clap->old_in.tail = clap->new_in.tail = NULL;
clap->old_pr.head = clap->new_pr.head = NULL;
clap->old_pr.tail = clap->new_pr.tail = NULL;
clap->else_pr = NULL;
clap->else_tailpr = NULL;
clap->next = CUR_RES->class_list;
CUR_RES->class_list = clap;
return (Ptr) clap;
}
/*
* Return number of pending invocations for an input operation.
*
* The count may change, but we assume that it changes atomically,
* so we don't lock it.
*/
int
mpd_query_iop (locn, o)
char *locn;
Ocap o;
{
int count;
Oper op;
Pach ph;
union {
struct ropc_st r;
struct num_st n;
} pkt;
mpd_check_stk (CUR_STACK);
op = (Oper) o.oper_entry;
if (op == NULL) { /* if not a real capability */
if (o.seqn == NOOP_SEQN)
return 0; /* count 0 for noop */
else if (o.seqn == NULL_SEQN)
mpd_loc_abort (locn, "null operation capability");
else
mpd_loc_abort (locn, "invalid operation capability");
}
if (o.vm == mpd_my_vm) { /* if local */
if (op->type == INPUT_OP || op->type == DYNAMIC_OP)
return op->pending;
else
return 0; /* procs are immediate, so 0 pending */
}
/* must be remote. */
if (o.vm <= 0 || o.vm > MAX_VM || !mpd_exec_up)
mpd_loc_abort (locn, "invalid operation capability");
pkt.r.oc = o;
ph = (Pach) &pkt;
ph = mpd_remote (o.vm, REQ_COUNT, ph, sizeof (pkt));
count = ((struct num_st *) ph)->num;
mpd_free ((Ptr) ph);
return count;
}
syntax highlighted by Code2HTML, v. 0.9.1