/*  invoke.c -- invocation and proc termination routines  */

#include "rts.h"

static void op_spawn ();
static void semarray ();



/*
 *  Invoke a proc or input operation by either call or send.
 */
Ptr
mpd_invoke (locn, ibp)
char *locn;
Invb ibp;
{
    Oper op;
    Pach ph;
    Sem ibpsem;
    Invb newibp;

    if (locn)
	CUR_PROC->locn = locn;		/* remember source location */
    mpd_check_stk (CUR_STACK);
    DEBUG (D_INVOKE, "%-12.12s invoke%ld %08lX", CUR_PROC->pname,ibp->type,ibp);

    ibp->ph.priority = CUR_PROC->priority;  /* copy invokers's priority */

    switch (ibp->type) {

	case CALL_IN:
	case COCALL_IN:
	case REM_COCALL_IN:
	    TRACE ("CALL", locn, 0);
	    ibp->replied = FALSE;	/* have not replied, will need to */
	    ibp->discard = FALSE;	/* caller needs the ibp back */
	    break;

	case COSEND_IN:
	    mpd_co_send (ibp);		/* copy block */
	    ibp->type = SEND_IN;	/* convert to SEND */
	    /*FALLTHROUGH*/
	case SEND_IN:
	    TRACE ("SEND", locn, 0);
	    ibp->replied = TRUE;	/* that is, needs no further reply */
	    ibp->discard = TRUE;	/* discard ibp when op is done */
	    break;

	default:
	    mpd_malf ("unexpected ibp type in invoke");
    }

    ibp->forwarded = FALSE;		/* this was not forwarded */
    ibp->wait = NULL;			/* ensure null so can be tested */
    ibp->ibpret = NULL;			/* will be set if needed */
    if (locn) {
	ibp->invoker = CUR_PROC;	/* save the invoker's proc id */
	ibp->forwarder = CUR_PROC;	/* save the forwarder's proc id */
    }

    /*
     *	Check for null or noop capability.
     */
    if (ibp->opc.oper_entry == 0) {
	if (ibp->opc.seqn == NOOP_SEQN) {
	    if (ibp->type == SEND_IN) {
		mpd_free ((Ptr) ibp);
		return NULL;
	    }
	    if (ibp->type == COCALL_IN) {
		/* Simulate the normal actions for a co call. */
		mpd_co_call (ibp);
		mpd_co_call_done (ibp);
	    }
	    return (Ptr) ibp;
	} else
	    mpd_loc_abort (locn, "invalid operation capability");
    }

    if (ibp->opc.vm != mpd_my_vm) {

	if (ibp->opc.vm <= 0 || ibp->opc.vm > MAX_VM || !mpd_exec_up)
	    mpd_loc_abort (locn, "invalid operation capability");

	/* Send invocation request to remote machine.  */

	if (ibp->type == COCALL_IN) {
	    mpd_co_call (ibp);
	    ibp->type = REM_COCALL_IN;
	}

	ph = (Pach) ibp;
	ibp = (Invb) mpd_remote (ibp->opc.vm, REQ_INVOKE, ph, ph->size);
	mpd_free ((Ptr) ph);
	return (Ptr) ibp;
    }

    op = (Oper) ibp->opc.oper_entry;
    if (ibp->opc.seqn != op->seqn)
	mpd_loc_abort (locn,
	    "attempting to invoke operation that no longer exists");

    if (op->type == INPUT_OP || op->type == DYNAMIC_OP)

	switch (ibp->type) {
	    case CALL_IN:
	    case REM_COCALL_IN:
		ibpsem = ibp->wait = mpd_make_sem (0);
		ibp->ibpret = &newibp;
		mpd_invk_iop (ibp, op->u.clap);
		P ((char *) NULL, ibpsem);
		mpd_kill_sem (ibpsem);
		ibp = newibp;
		break;
		
	    case SEND_IN:
		mpd_invk_iop (ibp, op->u.clap);
		break;
		
	    case COCALL_IN:
		mpd_co_call (ibp);
		mpd_invk_iop (ibp, op->u.clap);
		break;
	}

    else				/* PROC_OP or PROC_SEP_OP */
	switch (ibp->type) {

	    case CALL_IN:
	    case REM_COCALL_IN:
		/* Do direct call if proc does not require separate context */
		if (op->type != PROC_SEP_OP) {
		    enum in_type old_itype = CUR_PROC->itype;
		    CUR_PROC->itype = CALL_IN;
		    (*op->u.code) (op->res->crb_addr,
			op->res->rv_base, ibp, RTS_CALL);
		    CUR_PROC->itype = old_itype;
		} else {
		    ibpsem = ibp->wait = mpd_make_sem (0);
		    ibp->ibpret = &newibp;
		    op_spawn (ibp, op, CALL_IN);
		    P ((char *) NULL, ibpsem);
		    mpd_kill_sem (ibpsem);
		    ibp = newibp;
		}
		break;

	    case SEND_IN:
		op_spawn (ibp, op, SEND_IN);
		break;

	    case COCALL_IN:
		mpd_co_call (ibp);
		op_spawn (ibp, op, COCALL_IN);
		break;
	}

    return (Ptr) ibp;
}



/*
 *  Forward the responsibility for a reply (and an argument list) to another op.
 */
Ptr
mpd_forward (locn, obp, ibp)
char *locn;
Invb obp;	/* old invocation block */
Invb ibp;	/* new invocation block */
{
    Oper op;

    mpd_check_stk (CUR_STACK);
    DEBUG (D_INVOKE, "%-12.12s forward %08lX -> %08lX",
	CUR_PROC->pname, obp, ibp);

    ibp->ph = obp->ph;				/* copy packet header */
    ibp->ph.priority = CUR_PROC->priority;	/* but use curr priority */
    ibp->type = obp->type;
    ibp->forwarded = TRUE;
    ibp->forwarder = CUR_PROC;
    ibp->invoker = obp->invoker;
    ibp->co = obp->co;
    ibp->wait = obp->wait;
    ibp->ibpret = obp->ibpret;
    ibp->replied = obp->replied;	/* transfer responsibility for reply */
    ibp->discard = obp->discard;	/* and for disposal */
    obp->replied = TRUE;
    obp->discard = TRUE;
    obp->ph.rem = NULL;
    obp->ibpret = NULL;

    TRACE ("FORWARD", locn, obp->invoker);
    DEBUG (D_INVOKE, "%-12.12s %08lX forward %08lX", CUR_PROC->pname, obp, ibp);

    if (ibp->opc.oper_entry == 0) {
	/*
	 *  Forward to null or noop capability
	 */
	if (ibp->opc.seqn == NOOP_SEQN) {
	    if (ibp->ibpret)
		*ibp->ibpret = ibp;
	    if (!ibp->replied)
		V (ibp->wait);
	    if (ibp->discard)
		mpd_free ((Ptr) ibp);
	    return NULL;
	} else
	    mpd_loc_abort (locn, "attempting to forward to null operation");
    }

    if (ibp->opc.vm != mpd_my_vm) {
	/*
	 * Forward to remote vm
	 */
	if (ibp->opc.vm <= 0 || ibp->opc.vm > MAX_VM || !mpd_exec_up)
	    mpd_loc_abort (locn, "invalid operation capability");
	mpd_remote (ibp->opc.vm, REQ_INVOKE, (Pach) ibp, ibp->ph.size);
	mpd_free ((Ptr) ibp);

    } else {
	/*
	 * Forward locally
	 */
	op = (Oper) ibp->opc.oper_entry;
	if (ibp->opc.seqn != op->seqn)
	    mpd_loc_abort (locn,
		"attempting to forward to operation that no longer exists");

	if (op->type == INPUT_OP || op->type == DYNAMIC_OP)
	    mpd_invk_iop (ibp, op->u.clap);		/* to input op */
	else
	    op_spawn (ibp, op, ibp->type);		/* to proc */
    }

    return NULL;
}



/*
 *  mpd_init_semop(locn, dest, src, ndim) - initialize non-optimized semaphores.
 *
 *  dest is the address of an opcap or array of opcaps.
 *  src  is the address of an integer or array of integers.
 *  ndim is the number of dimensions.
 */
void
mpd_init_semop (locn, dest, src, ndim)
char *locn;
Ptr dest, src;
int ndim;
{
    Invb ibp;
    int n;

    if (ndim > 0) {
	semarray (locn, (Array *) dest, (Array *) src);
	return;
    }

    n = * (Int *) src;				/* get initial value */
    if (n < 0)					/* must be nonnegative */
	mpd_runerr (locn, E_SEMV, n);
    while (--n >= 0) {
	/* construct invocation block */
	ibp = (Invb) mpd_alc (sizeof (struct invb_st), 1);
	ibp->ph.size = sizeof (*ibp);
	ibp->type = SEND_IN;
	ibp->opc = * (Ocap *) dest;
	mpd_invoke (locn, ibp);			/* issue the send */
    }
}

/*
 * semarray (locn, dstp, srcp) -- initialize array and advance pointers.
 */
static void
semarray (locn, dstp, srcp)
char *locn;
Array *dstp, *srcp;
{
    Ocap *d = (Ocap *) ADATA (dstp);
    Int *s = (Int *) ADATA (srcp);
    int i;

    for (i = 0; i < dstp->ndim; i++)
	if (UB (dstp, i) - LB (dstp, i) != UB (srcp, i) - LB (srcp, i))
	    mpd_runerr (locn, E_ASIZ, ((Dim*)(d+1))[i], ((Dim*)(s+1))[i]);

    for (i = mpd_acount (dstp); i--; )
	mpd_init_semop (locn, (Ptr) d++, (Ptr) s++, 0);
}



/*
 *  Create a new process to service a proc operation invocation.
 */
static void
op_spawn (ibp, op, type)
Invb ibp;
Oper op;
enum in_type type;
{
    Proc pr;

    /*  We must grab the rmutex first, since we have to grab the queue
     *  mutex.  We need this since mpd_spawn puts the new proc on
     *  its resource list.  However, a mpd_destroy could try to kill
     *  the new thread before mpd_activate below puts it on the
     *  ready queue.  This would be a problem, since mpd_kill would not
     *  find it there and would abort.
     */

    if (op->res != (Rinst) NULL)
	LOCK (op->res->rmutex, "op_spawn");
    LOCK_QUEUE ("op_spawn");

    pr = mpd_spawn (op->u.code, ibp->ph.priority, op->res, TRUE,
	    (long) op->res->crb_addr, (long) op->res->rv_base,
	    (long) ibp, (long) RTS_CALL);
    pr->ptype = PROC_PR;
    pr->itype = type;
    pr->pname = "[baby proc]";		/* GC will reset */
    mpd_activate (pr);

    UNLOCK_QUEUE ("op_spawn");
    if (op->res != (Rinst) NULL)
	UNLOCK (op->res->rmutex, "op_spawn");
}



/*
 *  Send an early reply to the invoker of an operation.
 *  Copy invocation block so invoker and invokee do not share
 *  the same argument area.  Return pointer to new copy.
 *  Also does replies for initial/final code (indicated by null
 *  ibp).  Copy resource capability when replying in initial.
 */
Invb
mpd_reply (locn, ibp)
char *locn;
Invb ibp;
{
    Ptr src, dest;
    Oper op;
    Invb new_ibp;

    mpd_check_stk (CUR_STACK);

    if (ibp == NULL) {
	/*
	 *  The reply is in initialization or finalization code.
	 *  Act like mpd_finished_init () or mpd_finished_final ().
	 */
	DEBUG (D_INVOKE, "%-12.12s reply   %08lX", CUR_PROC->pname, ibp, 0);
	if (CUR_PROC->ptype == INITIAL_PR) {

	    LOCK (CUR_RES->rmutex, "mpd_reply");
	    if (CUR_RES->status & INIT_REPLY) {
		UNLOCK (CUR_RES->rmutex, "mpd_reply");
		RTS_WARN ("ignoring extra reply in body");
		return NULL;
	    }

	    if ((dest = (Ptr) CUR_RES->rcp) != NULL) {
		src = CUR_RES->rv_base;
		while (CUR_RES->rc_size--)
		    *dest++ = *src++;
	    }

	    CUR_RES->status |= INIT_REPLY;
	    UNLOCK (CUR_RES->rmutex, "mpd_reply");
	    V (CUR_PROC->wait);
	    return NULL;

	} else if (CUR_PROC->ptype == FINAL_PR) {

	    RTS_WARN ("reply in final ignored");
	    return NULL;

	} else
	    mpd_malf ("no ibp for reply");
    }

    /*
     *  It's a normal reply from a proc.
     */
    TRACE ("REPLY", locn, ibp->invoker);

    /*
     *  If no reply is desired (send invocation | already replied) just return.
     */
    if (ibp->replied) {
	DEBUG (D_INVOKE, "%-12.12s reply   %08lX ignored",
	    CUR_PROC->pname, ibp, 0);
	return ibp;
    }

    /*
     *	Create a new ibp and copy the old invocation to the new.
     *  Make it look like a send invocation now that we've replied.
     *  This will get it automatically freed later.
     */
    ibp->replied = TRUE;
    new_ibp = mpd_dup_invb (ibp);
    new_ibp->type = SEND_IN;		/* make it look like a send */
    new_ibp->discard = TRUE;
    new_ibp->wait = NULL;
    new_ibp->ibpret = NULL;
    new_ibp->invoker = CUR_PROC;

    /*  there is no need to lock this op, since we just read something
     *  that will not change.
     */
    DEBUG (D_INVOKE, "%-12.12s reply   %08lX -> %08lX",
	CUR_PROC->pname, ibp, new_ibp);

    op = (Oper) ibp->opc.oper_entry;

    switch (op->type) {
	case INPUT_OP:
	case DYNAMIC_OP:
	case PROC_SEP_OP:
	    switch (ibp->type) {
		case CALL_IN:
		case REM_COCALL_IN:
		    if (ibp->ibpret)
			*ibp->ibpret = ibp;
		    V (ibp->wait);
		    break;
		case COCALL_IN:
		    mpd_co_call_done (ibp);
		    break;
		case SEND_IN:
		    mpd_malf ("reply found a waiting send");
	    }
	    break;
	case PROC_OP:
	    mpd_malf ("unexpected reply in supposedly safe proc");
	default:
	    mpd_malf ("invalid operation type in reply");
    };
    return new_ibp;
}



/*
 *	An input operation has finished.  Clean up.
 */
void
mpd_finished_input (locn, ibp)
char *locn;
Invb ibp;
{
    mpd_check_stk (CUR_STACK);
    DEBUG (D_INVOKE, "%-12.12s fin_inp %08lX", CUR_PROC->pname, ibp, 0);
    TRACE ("NI", locn, ibp->invoker);
    switch (ibp->type) {

	case CALL_IN:
	case REM_COCALL_IN:
	    if (ibp->ibpret)
		*ibp->ibpret = ibp;
	    if (! ibp->replied)
		V (ibp->wait);
	    if (ibp->discard)
		mpd_free ((Ptr) ibp);
	    return;
	
	case SEND_IN:
	    mpd_free ((Ptr) ibp);
	    return;
	
	case COCALL_IN:
	    if (! ibp->replied)
		mpd_co_call_done (ibp);
	    return;
    }
}



/*
 *  A proc operation has finished.  Release the invocation block if appropriate.
 *  If the proc was called, notify the invoker.  Commit suicide.
 */
void
mpd_finished_proc (ibp)
Invb ibp;
{
    Oper op;

    mpd_check_stk (CUR_STACK);
    DEBUG (D_INVOKE, "             fin_prc %08lX", ibp, 0, 0);

    switch (CUR_PROC->itype) {
	case CALL_IN:
	case REM_COCALL_IN:
	    op = (Oper) ibp->opc.oper_entry;
	    if (op->type != PROC_SEP_OP && !ibp->forwarded)
		return;			/* was run in same process */
	    if (ibp->ibpret)
		*ibp->ibpret = ibp;
	    if (! ibp->replied)
		V (ibp->wait);		/* reply now if didn't before */
	    if (ibp->discard)
		mpd_free ((Ptr) ibp);	/* free if unwanted */
	    break;

	case SEND_IN:
	    mpd_free ((Ptr) ibp);
	    break;

	case COCALL_IN:
	    if (! ibp->replied)
		mpd_co_call_done (ibp);
	    break;
    }

    mpd_kill (CUR_PROC, (Rinst) NULL);
}



/*
 *  Reject an invocation because the operation was killed
 *  before the invocation was accepted.
 *  Must be called when we possess the res mutex (this is true
 *  with all present RTS calls).
 */
void
mpd_rej_inv (ibp)
Invb ibp;
{
    Oper op;

    op = (Oper) ibp->opc.oper_entry;
    if (op->type == PROC_OP || op->type == PROC_SEP_OP)
	mpd_malf ("rejecting a proc op");

    switch (ibp->type) {
	case CALL_IN:
	case REM_COCALL_IN:
	    /* indicate rejection in status field */
	    if (ibp->ibpret)
		*ibp->ibpret = ibp;
	    V (ibp->wait);
	    return;

	case SEND_IN:
	    /* we already possess the res rmutex */
	    mpd_locked_free ((Ptr) ibp);
	    return;

	case COCALL_IN:
	    /* indicate rejection in status field */
	    mpd_co_call_done (ibp);
	    return;
    }
}


/*
 *  Duplicate an invocation block and return the address of the copy.
 */
Invb
mpd_dup_invb (ibp)
Invb ibp;
{
    Invb new;
    int n;

    n = ibp->ph.size;
    new = (Invb) mpd_alc (n, 1);
    memcpy ((Ptr) new, (Ptr) ibp, n);
    return new;
}


syntax highlighted by Code2HTML, v. 0.9.1