/* invoke.c -- invocation and proc termination routines */ #include "rts.h" static void op_spawn (); static void semarray (); /* * Invoke a proc or input operation by either call or send. */ Ptr mpd_invoke (locn, ibp) char *locn; Invb ibp; { Oper op; Pach ph; Sem ibpsem; Invb newibp; if (locn) CUR_PROC->locn = locn; /* remember source location */ mpd_check_stk (CUR_STACK); DEBUG (D_INVOKE, "%-12.12s invoke%ld %08lX", CUR_PROC->pname,ibp->type,ibp); ibp->ph.priority = CUR_PROC->priority; /* copy invokers's priority */ switch (ibp->type) { case CALL_IN: case COCALL_IN: case REM_COCALL_IN: TRACE ("CALL", locn, 0); ibp->replied = FALSE; /* have not replied, will need to */ ibp->discard = FALSE; /* caller needs the ibp back */ break; case COSEND_IN: mpd_co_send (ibp); /* copy block */ ibp->type = SEND_IN; /* convert to SEND */ /*FALLTHROUGH*/ case SEND_IN: TRACE ("SEND", locn, 0); ibp->replied = TRUE; /* that is, needs no further reply */ ibp->discard = TRUE; /* discard ibp when op is done */ break; default: mpd_malf ("unexpected ibp type in invoke"); } ibp->forwarded = FALSE; /* this was not forwarded */ ibp->wait = NULL; /* ensure null so can be tested */ ibp->ibpret = NULL; /* will be set if needed */ if (locn) { ibp->invoker = CUR_PROC; /* save the invoker's proc id */ ibp->forwarder = CUR_PROC; /* save the forwarder's proc id */ } /* * Check for null or noop capability. */ if (ibp->opc.oper_entry == 0) { if (ibp->opc.seqn == NOOP_SEQN) { if (ibp->type == SEND_IN) { mpd_free ((Ptr) ibp); return NULL; } if (ibp->type == COCALL_IN) { /* Simulate the normal actions for a co call. */ mpd_co_call (ibp); mpd_co_call_done (ibp); } return (Ptr) ibp; } else mpd_loc_abort (locn, "invalid operation capability"); } if (ibp->opc.vm != mpd_my_vm) { if (ibp->opc.vm <= 0 || ibp->opc.vm > MAX_VM || !mpd_exec_up) mpd_loc_abort (locn, "invalid operation capability"); /* Send invocation request to remote machine. */ if (ibp->type == COCALL_IN) { mpd_co_call (ibp); ibp->type = REM_COCALL_IN; } ph = (Pach) ibp; ibp = (Invb) mpd_remote (ibp->opc.vm, REQ_INVOKE, ph, ph->size); mpd_free ((Ptr) ph); return (Ptr) ibp; } op = (Oper) ibp->opc.oper_entry; if (ibp->opc.seqn != op->seqn) mpd_loc_abort (locn, "attempting to invoke operation that no longer exists"); if (op->type == INPUT_OP || op->type == DYNAMIC_OP) switch (ibp->type) { case CALL_IN: case REM_COCALL_IN: ibpsem = ibp->wait = mpd_make_sem (0); ibp->ibpret = &newibp; mpd_invk_iop (ibp, op->u.clap); P ((char *) NULL, ibpsem); mpd_kill_sem (ibpsem); ibp = newibp; break; case SEND_IN: mpd_invk_iop (ibp, op->u.clap); break; case COCALL_IN: mpd_co_call (ibp); mpd_invk_iop (ibp, op->u.clap); break; } else /* PROC_OP or PROC_SEP_OP */ switch (ibp->type) { case CALL_IN: case REM_COCALL_IN: /* Do direct call if proc does not require separate context */ if (op->type != PROC_SEP_OP) { enum in_type old_itype = CUR_PROC->itype; CUR_PROC->itype = CALL_IN; (*op->u.code) (op->res->crb_addr, op->res->rv_base, ibp, RTS_CALL); CUR_PROC->itype = old_itype; } else { ibpsem = ibp->wait = mpd_make_sem (0); ibp->ibpret = &newibp; op_spawn (ibp, op, CALL_IN); P ((char *) NULL, ibpsem); mpd_kill_sem (ibpsem); ibp = newibp; } break; case SEND_IN: op_spawn (ibp, op, SEND_IN); break; case COCALL_IN: mpd_co_call (ibp); op_spawn (ibp, op, COCALL_IN); break; } return (Ptr) ibp; } /* * Forward the responsibility for a reply (and an argument list) to another op. */ Ptr mpd_forward (locn, obp, ibp) char *locn; Invb obp; /* old invocation block */ Invb ibp; /* new invocation block */ { Oper op; mpd_check_stk (CUR_STACK); DEBUG (D_INVOKE, "%-12.12s forward %08lX -> %08lX", CUR_PROC->pname, obp, ibp); ibp->ph = obp->ph; /* copy packet header */ ibp->ph.priority = CUR_PROC->priority; /* but use curr priority */ ibp->type = obp->type; ibp->forwarded = TRUE; ibp->forwarder = CUR_PROC; ibp->invoker = obp->invoker; ibp->co = obp->co; ibp->wait = obp->wait; ibp->ibpret = obp->ibpret; ibp->replied = obp->replied; /* transfer responsibility for reply */ ibp->discard = obp->discard; /* and for disposal */ obp->replied = TRUE; obp->discard = TRUE; obp->ph.rem = NULL; obp->ibpret = NULL; TRACE ("FORWARD", locn, obp->invoker); DEBUG (D_INVOKE, "%-12.12s %08lX forward %08lX", CUR_PROC->pname, obp, ibp); if (ibp->opc.oper_entry == 0) { /* * Forward to null or noop capability */ if (ibp->opc.seqn == NOOP_SEQN) { if (ibp->ibpret) *ibp->ibpret = ibp; if (!ibp->replied) V (ibp->wait); if (ibp->discard) mpd_free ((Ptr) ibp); return NULL; } else mpd_loc_abort (locn, "attempting to forward to null operation"); } if (ibp->opc.vm != mpd_my_vm) { /* * Forward to remote vm */ if (ibp->opc.vm <= 0 || ibp->opc.vm > MAX_VM || !mpd_exec_up) mpd_loc_abort (locn, "invalid operation capability"); mpd_remote (ibp->opc.vm, REQ_INVOKE, (Pach) ibp, ibp->ph.size); mpd_free ((Ptr) ibp); } else { /* * Forward locally */ op = (Oper) ibp->opc.oper_entry; if (ibp->opc.seqn != op->seqn) mpd_loc_abort (locn, "attempting to forward to operation that no longer exists"); if (op->type == INPUT_OP || op->type == DYNAMIC_OP) mpd_invk_iop (ibp, op->u.clap); /* to input op */ else op_spawn (ibp, op, ibp->type); /* to proc */ } return NULL; } /* * mpd_init_semop(locn, dest, src, ndim) - initialize non-optimized semaphores. * * dest is the address of an opcap or array of opcaps. * src is the address of an integer or array of integers. * ndim is the number of dimensions. */ void mpd_init_semop (locn, dest, src, ndim) char *locn; Ptr dest, src; int ndim; { Invb ibp; int n; if (ndim > 0) { semarray (locn, (Array *) dest, (Array *) src); return; } n = * (Int *) src; /* get initial value */ if (n < 0) /* must be nonnegative */ mpd_runerr (locn, E_SEMV, n); while (--n >= 0) { /* construct invocation block */ ibp = (Invb) mpd_alc (sizeof (struct invb_st), 1); ibp->ph.size = sizeof (*ibp); ibp->type = SEND_IN; ibp->opc = * (Ocap *) dest; mpd_invoke (locn, ibp); /* issue the send */ } } /* * semarray (locn, dstp, srcp) -- initialize array and advance pointers. */ static void semarray (locn, dstp, srcp) char *locn; Array *dstp, *srcp; { Ocap *d = (Ocap *) ADATA (dstp); Int *s = (Int *) ADATA (srcp); int i; for (i = 0; i < dstp->ndim; i++) if (UB (dstp, i) - LB (dstp, i) != UB (srcp, i) - LB (srcp, i)) mpd_runerr (locn, E_ASIZ, ((Dim*)(d+1))[i], ((Dim*)(s+1))[i]); for (i = mpd_acount (dstp); i--; ) mpd_init_semop (locn, (Ptr) d++, (Ptr) s++, 0); } /* * Create a new process to service a proc operation invocation. */ static void op_spawn (ibp, op, type) Invb ibp; Oper op; enum in_type type; { Proc pr; /* We must grab the rmutex first, since we have to grab the queue * mutex. We need this since mpd_spawn puts the new proc on * its resource list. However, a mpd_destroy could try to kill * the new thread before mpd_activate below puts it on the * ready queue. This would be a problem, since mpd_kill would not * find it there and would abort. */ if (op->res != (Rinst) NULL) LOCK (op->res->rmutex, "op_spawn"); LOCK_QUEUE ("op_spawn"); pr = mpd_spawn (op->u.code, ibp->ph.priority, op->res, TRUE, (long) op->res->crb_addr, (long) op->res->rv_base, (long) ibp, (long) RTS_CALL); pr->ptype = PROC_PR; pr->itype = type; pr->pname = "[baby proc]"; /* GC will reset */ mpd_activate (pr); UNLOCK_QUEUE ("op_spawn"); if (op->res != (Rinst) NULL) UNLOCK (op->res->rmutex, "op_spawn"); } /* * Send an early reply to the invoker of an operation. * Copy invocation block so invoker and invokee do not share * the same argument area. Return pointer to new copy. * Also does replies for initial/final code (indicated by null * ibp). Copy resource capability when replying in initial. */ Invb mpd_reply (locn, ibp) char *locn; Invb ibp; { Ptr src, dest; Oper op; Invb new_ibp; mpd_check_stk (CUR_STACK); if (ibp == NULL) { /* * The reply is in initialization or finalization code. * Act like mpd_finished_init () or mpd_finished_final (). */ DEBUG (D_INVOKE, "%-12.12s reply %08lX", CUR_PROC->pname, ibp, 0); if (CUR_PROC->ptype == INITIAL_PR) { LOCK (CUR_RES->rmutex, "mpd_reply"); if (CUR_RES->status & INIT_REPLY) { UNLOCK (CUR_RES->rmutex, "mpd_reply"); RTS_WARN ("ignoring extra reply in body"); return NULL; } if ((dest = (Ptr) CUR_RES->rcp) != NULL) { src = CUR_RES->rv_base; while (CUR_RES->rc_size--) *dest++ = *src++; } CUR_RES->status |= INIT_REPLY; UNLOCK (CUR_RES->rmutex, "mpd_reply"); V (CUR_PROC->wait); return NULL; } else if (CUR_PROC->ptype == FINAL_PR) { RTS_WARN ("reply in final ignored"); return NULL; } else mpd_malf ("no ibp for reply"); } /* * It's a normal reply from a proc. */ TRACE ("REPLY", locn, ibp->invoker); /* * If no reply is desired (send invocation | already replied) just return. */ if (ibp->replied) { DEBUG (D_INVOKE, "%-12.12s reply %08lX ignored", CUR_PROC->pname, ibp, 0); return ibp; } /* * Create a new ibp and copy the old invocation to the new. * Make it look like a send invocation now that we've replied. * This will get it automatically freed later. */ ibp->replied = TRUE; new_ibp = mpd_dup_invb (ibp); new_ibp->type = SEND_IN; /* make it look like a send */ new_ibp->discard = TRUE; new_ibp->wait = NULL; new_ibp->ibpret = NULL; new_ibp->invoker = CUR_PROC; /* there is no need to lock this op, since we just read something * that will not change. */ DEBUG (D_INVOKE, "%-12.12s reply %08lX -> %08lX", CUR_PROC->pname, ibp, new_ibp); op = (Oper) ibp->opc.oper_entry; switch (op->type) { case INPUT_OP: case DYNAMIC_OP: case PROC_SEP_OP: switch (ibp->type) { case CALL_IN: case REM_COCALL_IN: if (ibp->ibpret) *ibp->ibpret = ibp; V (ibp->wait); break; case COCALL_IN: mpd_co_call_done (ibp); break; case SEND_IN: mpd_malf ("reply found a waiting send"); } break; case PROC_OP: mpd_malf ("unexpected reply in supposedly safe proc"); default: mpd_malf ("invalid operation type in reply"); }; return new_ibp; } /* * An input operation has finished. Clean up. */ void mpd_finished_input (locn, ibp) char *locn; Invb ibp; { mpd_check_stk (CUR_STACK); DEBUG (D_INVOKE, "%-12.12s fin_inp %08lX", CUR_PROC->pname, ibp, 0); TRACE ("NI", locn, ibp->invoker); switch (ibp->type) { case CALL_IN: case REM_COCALL_IN: if (ibp->ibpret) *ibp->ibpret = ibp; if (! ibp->replied) V (ibp->wait); if (ibp->discard) mpd_free ((Ptr) ibp); return; case SEND_IN: mpd_free ((Ptr) ibp); return; case COCALL_IN: if (! ibp->replied) mpd_co_call_done (ibp); return; } } /* * A proc operation has finished. Release the invocation block if appropriate. * If the proc was called, notify the invoker. Commit suicide. */ void mpd_finished_proc (ibp) Invb ibp; { Oper op; mpd_check_stk (CUR_STACK); DEBUG (D_INVOKE, " fin_prc %08lX", ibp, 0, 0); switch (CUR_PROC->itype) { case CALL_IN: case REM_COCALL_IN: op = (Oper) ibp->opc.oper_entry; if (op->type != PROC_SEP_OP && !ibp->forwarded) return; /* was run in same process */ if (ibp->ibpret) *ibp->ibpret = ibp; if (! ibp->replied) V (ibp->wait); /* reply now if didn't before */ if (ibp->discard) mpd_free ((Ptr) ibp); /* free if unwanted */ break; case SEND_IN: mpd_free ((Ptr) ibp); break; case COCALL_IN: if (! ibp->replied) mpd_co_call_done (ibp); break; } mpd_kill (CUR_PROC, (Rinst) NULL); } /* * Reject an invocation because the operation was killed * before the invocation was accepted. * Must be called when we possess the res mutex (this is true * with all present RTS calls). */ void mpd_rej_inv (ibp) Invb ibp; { Oper op; op = (Oper) ibp->opc.oper_entry; if (op->type == PROC_OP || op->type == PROC_SEP_OP) mpd_malf ("rejecting a proc op"); switch (ibp->type) { case CALL_IN: case REM_COCALL_IN: /* indicate rejection in status field */ if (ibp->ibpret) *ibp->ibpret = ibp; V (ibp->wait); return; case SEND_IN: /* we already possess the res rmutex */ mpd_locked_free ((Ptr) ibp); return; case COCALL_IN: /* indicate rejection in status field */ mpd_co_call_done (ibp); return; } } /* * Duplicate an invocation block and return the address of the copy. */ Invb mpd_dup_invb (ibp) Invb ibp; { Invb new; int n; n = ibp->ph.size; new = (Invb) mpd_alc (n, 1); memcpy ((Ptr) new, (Ptr) ibp, n); return new; }