/*  remote.c -- remote request processing  */

#include "rts.h"

static void contact ();

static Mutex remote_mutex;	  /* protect started, waiting[] in contact()*/



/*
 *  Initialize free list of pending remote request message descriptors.
 */
void
mpd_init_rem ()
{
    INIT_LOCK (remote_mutex, "known_mutex");
}



/*
 *  Send a request to a remote machine and wait for the reply message.
 *  Return pointer to reply packet, or NULL if there is none.
 *
 *  This is never called with a rmutex held.
 */
Pach
mpd_remote (dest, type, ph, size)
Vcap dest;
enum ms_type type;
Pach ph;
int size;
{
    struct remd_st rem;
    Invb ibp;

    if (! (mpd_net_known ((int) dest)))
	contact ((int) dest);		/* establish contact if none yet made */

    ph->priority = CUR_PROC->priority;

    ibp = (Invb) ph;
    if (type == REQ_INVOKE && (ibp->replied || ibp->type == REM_COCALL_IN)) {

	/* sends and forwards are not ack'd, and we don't wait on a cocall */
	mpd_net_send (dest, type, ph, size);
	return NULL;

    } else {

	/* for all other messages, await acknowledgement */

	ph->rem = &rem;
	rem.wait = mpd_make_sem (0);
	
	mpd_net_send (dest, type, ph, size);
	P ((char *) NULL, rem.wait);		/* wait for reply */
	mpd_kill_sem (rem.wait);

	return rem.reply;
    }
}



/*
 *  Service a request to establish a connection.
 *  Executes as a separate process.
 */
void
mpd_rmt_callme (ph)
Pach ph;
{
    int n = ((struct num_st *) ph) ->num;
    contact (n);
    mpd_net_send (n, ACK_CALLME, ph, PACH_SZ);
    mpd_free ((Ptr) ph);
    mpd_kill (CUR_PROC, (Rinst) NULL);
}



/*
 *  Service a request to create a resource from a remote machine.
 *  Executes as separate process.
 */
void
mpd_rmt_create (ph)
Pach ph;
{
    int size;
    struct crb_st *crbp;
    struct rres_st *reply;

    crbp = (struct crb_st *) ph;
    size = sizeof (struct rres_st) - sizeof (Rcap) + crbp->rc_size;
    reply = (struct rres_st *) mpd_alc (-size, 1);

    crbp->rcp = & reply->rc;
    mpd_create_res (crbp);

    reply->ph.rem    = ph->rem;
    mpd_net_send (ph->origin, ACK_CREATE, &reply->ph, size);

    mpd_free ((Ptr) reply);
    /* ph freed when resource destroyed */

    mpd_kill (CUR_PROC, (Rinst) NULL);
}



/*
 *  Service a request to query a pending op count from a remote machine.
 *  Executes as a separate process.
 */
void
mpd_rmt_query (ph)
Pach ph;
{
    ((struct num_st *) ph)->num = 
	mpd_query_iop ((char *) NULL, ((struct ropc_st *) ph)->oc);
    mpd_net_send (ph->origin, ACK_COUNT, ph, ph->size);
    mpd_free ((Ptr) ph);
    mpd_kill (CUR_PROC, (Rinst) NULL);
}



/*
 *  Service a request to destroy an op from a remote machine.
 *  Executes as separate process.
 */
void
mpd_rmt_destop (ph)
Pach ph;
{
    mpd_dest_op ((char *) NULL, ((struct ropc_st *) ph)->oc);
    mpd_net_send (ph->origin, ACK_DESTOP, ph, ph->size);
    mpd_free ((Ptr) ph);
    mpd_kill (CUR_PROC, (Rinst) NULL);
}



/*
 *  Service a request to destroy a resource from a remote machine.
 *  Executes as separate process.
 */
void
mpd_rmt_destroy (ph)
Pach ph;
{
    mpd_destroy ((char *) NULL, ((struct rres_st *) ph)->rc);
    mpd_net_send (ph->origin, ACK_DESTROY, ph, ph->size);
    mpd_free ((Ptr) ph);
    mpd_kill (CUR_PROC, (Rinst) NULL);
}



/*
 *  Service a request from mpdx to destroy this virtual machine.
 *  Executes as a separate process.
 */
void
mpd_rmt_destvm (ph)
Pach ph;
{
    mpd_dest_all ();		/* destroy all non-global resources */
    mpd_destroy_globals ();	/* destroy all global resources */

    mpd_net_send (ph->origin, ACK_DESTVM, ph, ph->size);  /* ack destroy */
    mpd_net_send (MPDX_VM, MSG_IDLE,			/* send msg counts */
	&mpd_msg_counts.ph, sizeof (mpd_msg_counts));
    EXIT (0);						/* kill self */
}



/*
 *  Service a request to receive an operation from a remote machine.
 *  Executes as separate process.
 */
void
mpd_rmt_rcv (ph)
Pach ph;
{
    Sem sp;
    Invb ibp, *ret;
    struct ropc_st *p;
    struct remd_st rem;

    p = (struct ropc_st *) ph;
    ibp = (Invb) mpd_receive ((char *) 0, p->oc, p->elseflag);

    if (ibp == NULL) {
	/*
	 * No invocation available. Return original packet.
	 */
	mpd_net_send (ph->origin, ACK_RECEIVE, ph, ph->size);
	mpd_free ((Ptr) ph);
	mpd_kill (CUR_PROC, (Rinst) NULL);
    }

    /*
     * Got an invocation.  If it needs no reply, send it and disappear.
     */
    if (ibp->discard) {
	ibp->ph.rem = ph->rem;
	mpd_net_send (ph->origin, ACK_RECEIVE, (Pach) ibp, ibp->ph.size);
	mpd_free ((Ptr) ibp);
	mpd_free ((Ptr) ph);
	mpd_kill (CUR_PROC, (Rinst) NULL);
    }

    /*
     * The invocation needs a reply.  Send MSG_RCVCALL and wait.
     */
    rem.wait = mpd_make_sem (0);		/* set up reply block */
    ibp->ph.rem = &rem;			/* put addr in invocation block */
    ibp->next = (Invb) ph->rem;		/* pass origl reply addr via "next" */
    sp = ibp->wait;			/* invoker's semaphore */
    ret = ibp->ibpret;			/* invoker's ibp pointer */

    mpd_net_send (ph->origin, MSG_RCVCALL, (Pach) ibp, ibp->ph.size);
    mpd_free ((Ptr) ph);
    mpd_free ((Ptr) ibp);

    P ((char *) NULL, rem.wait);	/* wait for reply */
    mpd_kill_sem (rem.wait);

    ph = rem.reply;			/* get reply address */
    ibp = (Invb) ph;

    if (ret)
	*ret = ibp;			/* tell invoker new ibp address */
    V (sp);				/* awaken invoker */

    mpd_kill (CUR_PROC, (Rinst) NULL);	/* kill self */
}



/*
 *  Service a MSG_RCVCALL received in response to a REQ_RECEIVE that found
 *  a call needing a reply.  Reply to the RECEIVE requestor and pass it the
 *  invocation block; wait for the call to reply; then pass back the reply
 *  to the original VM.  Executes as a separate process.
 */
void
mpd_rcv_call (ph)
Pach ph;
{
    Invb ibp;
    Remd r;

    ibp = (Invb) ph;
    ibp->ibpret = &ibp;			/* set to update ibp if block moves */
    ibp->wait = mpd_make_sem (0);	/* sem will tell us when rcv done */

    r = (Remd) ibp->next;		/* original rem passed via "next" */
    r->reply = ph;			/* pass ibp addr back to receiver */
    V (r->wait);			/* start it up */

    P ((char *) NULL, ibp->wait);	/* wait for reply */
    ph = (Pach) ibp;			/* inv block may have moved */
    mpd_net_send (ph->origin, ACK_INVOKE, ph, ph->size);
					/* send reply */
    mpd_kill_sem (ibp->wait);
    mpd_free ((Ptr) ibp);
    mpd_kill (CUR_PROC, (Rinst) NULL);
}



/*
 *  Service a request to invoke an operation from a remote machine.
 *  Executes as separate process.
 */
void
mpd_rmt_invk (ph)
Pach ph;
{
    Invb ibp = (Invb) ph;
    CUR_PROC->res = CUR_RES = ((Oper) ibp->opc.oper_entry) -> res;
    ibp = (Invb) mpd_invoke ((char *) 0, ibp);	/* ibp can move if forwarded */
    ph = (Pach) ibp;
    if (ibp->type != SEND_IN && ibp->type != COSEND_IN) {
	mpd_net_send (ph->origin, ACK_INVOKE, ph, ph->size);
	mpd_free ((Ptr) ph);
    }
    CUR_PROC->res = CUR_RES = NULL;
    mpd_kill (CUR_PROC, (Rinst) NULL);
}



/*
 *  Establish contact with machine n, ensuring that only one connection is
 *  made.  A higher numbered machine never calls mpd_connect to a lower machine,
 *  but instead asks the lower machine to connect to it, by passing a message
 *  through mpdx.
 *
 *  This routine is never called with a mutex held.  It is only called
 *  from mpd_remote and mpd_rmt_callme in this file, and none of the
 *  routines that call either of these [mpd_invoke, mpd_forward,
 *  mpd_net_interface, mpd_create_res, mpd_destroy, mpd_locate, mpd_crevm,
 *  and mpd_destvm] hold a lock while calling them.  Thus, we can make
 *  remote_mutex at a lower mutex class (acquired first) than the
 *  queue mutex.
 */
static void
contact (n)
int n;
{
    static Bool started[MAX_VM];	/* connection requested for each vm? */
    static Procq waiting[MAX_VM];	/* processes waiting for each vm */
    struct num_st npk;			/* actual message packet */
    Pach ph;				/* packet header pointer */

    if (n == mpd_my_vm)
	mpd_malf ("trying to connect to self");

    /* connecting to a lower machine is simple; ask it to do the work */
    /* (no problem if this happens more than once) */
    if (n < (int) mpd_my_vm)  {		/* cast prevents Sun acc complaints */
	npk.num = n;
	npk.ph.priority = 0;
	ph = mpd_remote (MPDX_VM, REQ_CALLME, (Pach) &npk, sizeof (npk));
	return;
    }

    /* get interlock and check again that we're not connected */
    LOCK (remote_mutex, "contact");
    if (mpd_net_known (n)) {
	UNLOCK (remote_mutex, "contact");
	return;
    }

    /* if we've already requested a connection, just wait until it's complete */
    if (started[n]) {

	LOCK_QUEUE ("contact");		/* protect block-scheduler */
	block (&waiting[n]);		/* put self on wait list */
	UNLOCK (remote_mutex, "contact");
	mpd_scheduler ();		/* block */
	return;
    }

    /* must be the first time. set flag, then release interlock */
    started[n] = TRUE;
    /*  release lock for now -- we may block in mpd_remote.  This would not
     *  seem to hurt anything, but it is not a good idea to let threads
     *  block while they hold RTS mutexes.  */
    UNLOCK (remote_mutex, "contact");

    /* establish the connection */
    npk.num = n;
    ph = mpd_remote (MPDX_VM, REQ_FINDVM, (Pach) &npk, sizeof (npk));
    mpd_net_connect (n, ((struct saddr_st *) ph) -> addr);

    /* release everybody else who may now be waiting */
    LOCK (remote_mutex, "contact");
    while (waiting[n].head != NULL)
	awaken (waiting[n]);		/* unblock everybody */
    UNLOCK (remote_mutex, "contact");
}


syntax highlighted by Code2HTML, v. 0.9.1