static char rcsid[] =
	"$Id: sendrecv.c,v 1.4 1999/03/12 20:22:19 pvmsrc Exp $";

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <sys/msg.h>
/* #include <sys/processor.h> */
/* #include <sys/procset.h> */
#include <errno.h>
#include <sys/errno.h>
#include <sys/param.h>
#include <string.h>
#include <unistd.h>

#include "shmd.h"

#include <pvm3.h>

#ifndef SHM_FAILED
#define SHM_FAILED (void*)-1
#endif

/* Globals */
/* used to make this library easier and faster to code as always */

int	 shm_enabled=0;
/* if 0 not attempted yet */
/* if 1 ok */
/* if -1 not ready or unavailable as of yet */

int  mytid=0;	/* so we don't get our own tid every single call */
int  localid=0; /* a shorten version used by the block routines */
int  mymqueue=0;/* my own message queue */

/* this lot is filled in by shm-attach() call. */
/* its mostly read-only info  that is only changed by the pvm_shmd. */

shm_header_info_t* syshdr;		/* pointer to the start of all this mess */
int	 psize;		/* page/block size */
int  nsegs;		/* number of segments to choose from */
int	 maxpages;	/* maximum number of pages in a segment.. */
blockinfo_t* 	blocks[MAXSEGS];	/* pointers to the block info stuff... yeah */
char* segs[MAXSEGS];				/* pointers to actual segments */
char* msgblocks[MAXSEGS];			/* pointers to the actual shm blocks */
									/* fully qualified addresses used */
									/* so you can access directly */



int shmd_pvm_psend(tid, tag, cp, len, dt)

int tid;
int tag;
void *cp;
int len;
int dt;
{

	unsigned pages;
	int i, j, k;
	int remoteid=0;
	int olen;
	msg_info_t msg_info;
	int cc=0;
	int mqid;	/* message queue of the destination */
	int seg;
	int block, c;

/* 	printf("shmd_pvm_psend()\n"); */
#ifdef NAMES
	printf("shmd_pvm_psend(0x%x,%d,%x,%d,%d)\n",
			tid, tag, cp, len, dt);
#endif
#ifdef DEBUG
	printf("shm enabled = %d\n", shm_enabled);
#endif /* DEBUG */

	/* if no shared memory allowed then use pvm_psend() */
	if (shm_enabled<0) return (pvm_psend(tid, tag, cp, len, dt));

#ifdef DEBUG
	printf("shm enabled = %d\n", shm_enabled);
#endif /* DEBUG */
	
	/* Else check to see if we have ever used shared memory */
	/* if not can we ? */

	if (!shm_enabled) {
		shm_attach();		/* get it while its hot or not */
		if (shm_enabled<0) return (pvm_psend(tid, tag, cp, len, dt));
	}	

	/* remote or local */
	if ((mytid&0xFFFF0000)!=(tid&0xFFFF0000)) /* remote then... */
				return (pvm_psend(tid, tag, cp, len, dt));


	/* ok, we are here  because we might be able to... */

	/* ok, is the destination a valid target yet? */
	/* i.e. does it have a valid message queue? */

	mqid = find_tidinfo_id (syshdr, tid);

	if (mqid<0) {
		fprintf(stderr,"TID [0x%x] cannot send to TID [0x%x] as have not found its mqueue yet.\n", mytid, tid); fflush(stderr);
		return (pvm_psend(tid, tag, cp, len, dt));	/* nope */ 
	}

	/* ok, we think we can.. you, me, we both shared memory enabled */
	/* from out of the woods into the wood burner (tm) */


	olen = len;	/* backup of length just in case ;) */

    switch (dt) {

    case PVM_BYTE:
        len *= sizeof(char);
        break;

    case PVM_SHORT:
    case PVM_USHORT:
        len *= sizeof(short);
        break;

    case PVM_INT:
    case PVM_UINT:
        len *= sizeof(int);
        break;

    case PVM_LONG:
    case PVM_ULONG:
        len *= sizeof(long);
        break;

    case PVM_FLOAT:
        len *= sizeof(float);
        break;

    case PVM_CPLX:
        len *= sizeof(float) * 2;
        break;

    case PVM_DOUBLE:
        len *= sizeof(double);
        break;

    case PVM_DCPLX:
        len *= sizeof(double) * 2;
        break;

    case PVM_STR:
        cc = PvmNotImpl;
        break;

    default:
        cc = PvmBadParam;
        break;
    }

	if (cc<0) return (cc); /* data type error */

	/* ok len is now the length of the message in bytes */

	/* short cut here for zero length messages... */

	if (len) {
	/* we now need to know how many pages that is */

	pages = (len / psize);
	if (len % psize) pages++;

#ifdef DEBUG
	fprintf(stderr,"send: len [%d] bytes or [%d] whole pages\n",
			len, pages);
	fflush(stderr);

		fprintf(stderr,"send: pages requested [%d] max in any segment [%d]\n",
			pages, maxpages);
		fflush(stderr);
#endif /* DEBUG */
		
	/* now find a block with that kind of memory available */
	if (pages>maxpages) return (pvm_psend(tid, tag, cp, olen, dt));  /* nope */

	/* ok now really look! */
	remoteid = (tid & 0x0000FFFF);	/* shorten TIDs */

	seg = (remoteid*5) % nsegs;	/* hash into a set segment */
	block = -1;

#ifdef DEBUG
	fprintf(stderr,"Hash localid&[0x%x]*5 %% nsegs [%d] -> [%d]\n",	
		remoteid, nsegs, seg);
	fflush(stderr);
#endif /* DEBUG */

	/* loop round until we find one! */
	for(c=0;c<nsegs;c++) {
#ifdef DEBUG
	fprintf(stderr,"seg[%d] largest block has [%d] pages. Need [%d]\n",
		seg, blocks[seg]->largestblockfree, pages);
	fflush(stderr);
#endif /* DEBUG */
		if (blocks[seg]->largestblockfree >= pages) { /* maybe in luck */
/* 			block = getblocks (blocks[seg], localid, pages); */
			/* try remote id for speed see writen notes */
			block = getblocks (blocks[seg], remoteid, pages);
#ifdef DEBUG
	fprintf(stderr,"Blocks found at [0x%x] [%d] when looking for [%d]\n",
		blocks[seg], block, pages);
	fflush(stderr);
#endif /* DEBUG */

			if (block>=0) break;	/* happy we have the memory */
		} /* there might be space */	
		seg = (seg+1) % nsegs;	/* wrap round if required */
	}
	
	if (block<0) /* no space in at the Inn... */	{	
		fprintf(stderr, "psend() from [0x%x] to [0x%x] of len [%d] bytes out of message buffers?\n", mytid, tid, olen);
		for(j=0;j<nsegs;j++) fprintf(stderr, "pages free [%d]\t", blocks[j]->pagesfree);
		fprintf(stderr, "\nUsing pvm_psend() sockets instead.\n");
		return (pvm_psend(tid, tag, cp, olen, dt));	
				/* glad we copied len into olen.... */
	} 		

	/* ok, we have a reserved block big enought in seg 'seg' at the */
	/* start of block 'block' */

	/* so lets copy data into it and then mqueue it out of here */
#ifdef DEBUG
	printf("Copy into [%x] from [%x] [%d] bytes\n",
		(char*)msgblocks[seg]+(psize*block),
		(char *)cp, len);
#endif

	memcpy ((char*)msgblocks[seg]+(psize*block), (char *)cp, len);


	} /* if ( len != 0 ) */
	else {
		seg = -1; block = -1; pages = 0; 
	}

	/* dirty dead has been done, now to kick the receiver */

	
	/* First make the message */
	msg_info.mtype = (long) tag;
	msg_info.from  = mytid;
    msg_info.seg   = seg;
    msg_info.block = block;
	msg_info.pages = pages;
    msg_info.bytes = len;

	/* now to shift the data out of here! */

	cc = msgsnd (mqid, (void *)&msg_info, (sizeof(msg_info)), 0);
#ifdef DEBUG
	printf("TID [0x%x] sent message via shared memory seg [%d] block [%d] that was [%d]bytes or [%d] pages long to tid [0x%x] signalling on mqueue [%d]=[%d]\n",
			mytid, seg, block, len, pages, tid, mqid, cc);
	fflush(stdout);
#endif /* DEBUG */

/* 	if (cc<0) {  */
	/* i.e. the message was not sent! */
	/* to be done, and its messy... */


	return (cc);		
}


int shmd_pvm_precv(tid, tag, cp, len, dt, rtid, rtag, rlen)

int tid;
int tag;
void *cp;
int len;
int dt;
int *rtid;
int *rtag;
int *rlen;
{

	unsigned pages;
	int i, j, k;
	int remoteid=0;
	int olen;
	msg_info_t msg_info;
	int cc=0;
	unsigned int stime = 1;	/* initial usleep time */
	int found=0;
	int alen;
	int c;

/* 	printf("shmd_pvm_precv()\n"); */
#ifdef NAMES
	printf("shmd_pvm_precv(%x, %d, %x, %d, %d, %x, %x, %x)\n",
			tid, tag, cp, len, dt, rtid, rtag, rlen);
#endif
	/* if no shared memory allowed then use pvm_precv() */
	if (shm_enabled<0) 
		return (pvm_precv(tid, tag, cp, len, dt, rtid, rtag, rlen));
	
	
	/* Else check to see if we have ever used shared memory */
	/* if not can we ? */

	if (!shm_enabled) {
		shm_attach();		/* get it while its hot or not */
		if (shm_enabled<0) 
	        return (pvm_precv(tid, tag, cp, len, dt, rtid, rtag, rlen));
	}	

	/* remote or local */
	if ((mytid&0xFFFF0000)!=(tid&0xFFFF0000)) { /* remote then... */
			fprintf(stderr,"Me [0x%x] and src [0x%x] not on same machine\n",
				mytid, tid); fflush(stderr);
			return (pvm_precv(tid, tag, cp, len, dt, rtid, rtag, rlen));
	}

	/* ok, we are here  because we might be able to... */

	/* does the sender have shared memory?  */
	/* actually it might have sent and then died why we were busy */
	/* so we do not check whether it has a message queue (left) itself */
	/* 	mqid = find_tidinfo_id (seg0, tid); */

	/* ok, we think we can.. you, me, we both shared memory enabled */
	/* from out of the woods into the wood burner (tm) */


	olen = len;	/* backup of length just in case ;) */

    switch (dt) {

    case PVM_BYTE:
        len *= sizeof(char);
        break;

    case PVM_SHORT:
    case PVM_USHORT:
        len *= sizeof(short);
        break;

    case PVM_INT:
    case PVM_UINT:
        len *= sizeof(int);
        break;

    case PVM_LONG:
    case PVM_ULONG:
        len *= sizeof(long);
        break;

    case PVM_FLOAT:
        len *= sizeof(float);
        break;

    case PVM_CPLX:
        len *= sizeof(float) * 2;
        break;

    case PVM_DOUBLE:
        len *= sizeof(double);
        break;

    case PVM_DCPLX:
        len *= sizeof(double) * 2;
        break;

    case PVM_STR:
        cc = PvmNotImpl;
        break;

    default:
        cc = PvmBadParam;
        break;
    }

	if (cc<0) return (cc);	/* i.e. invalid data type param */

	/* ok len is now the length of the message in bytes */
	/* we now need to know how many pages that is */

	pages = (len / psize);
	if (len % psize) pages++;

#ifdef DEBUG
	fprintf(stderr,"recv: len [%d] bytes or [%d] whole pages\n",
			len, pages);
	fflush(stderr);

		fprintf(stderr,"recv: pages requested [%d] max in any segment [%d]\n",
			pages, maxpages);
		fflush(stderr);
#endif /* DEBUG */
		
	/* paranoid and safe */
	if (pages>maxpages) {
			return (pvm_precv(tid, tag, cp, olen, dt, rtid, rtag, rlen));
	}


	/* ok, only now can we change the tag as mqueue and pvm use different */
	/* ones for a wildcard */
	if (tag==-1) tag = 0;

	found = 0;
	stime =1;
	c = 0;	/* tight loop counter */
	while (!found) {	
		cc = msgrcv (mymqueue, 
		(void *) &msg_info, 
		sizeof(msg_info), 
		(long) tag, 
		0);
/* 		IPC_NOWAIT); */
		if (cc>0) { /* success */
#ifdef DEBUG
			printf("got one from the message cue. Sleep was [%d] on count[%d].\nAsked for src[0x%x] tag[%d] size[%d]bytes\nGot from [0x%x] of tag[%d] size[%d]bytes, pages[%d]\n",
			 stime, c, 
			 tid, tag, len, 
			 msg_info.from, msg_info.mtype, msg_info.bytes, msg_info.pages);
#endif /* DEBUG */
			break; /* out to main routine */
		}

		if (cc==-1) { /* failure */
			if (errno == EIDRM) { /* id is now invalid */
				shm_enabled = -1; 
				mymqueue = -1;
				return (pvm_precv(tid, tag, cp, olen, dt, rtid, rtag, rlen));
			}
/* 			if (errno == ENOMSG) 	 */
		}

		/* tight loop counter c is checked */
		if (c>100) {
		   /* ok, we now probe for a real pvm message and the iterate */ 
		   cc = pvm_probe (tid, tag);
		   if (cc>0) 
				return (pvm_precv(tid, tag, cp, olen, dt, rtid, rtag, rlen)); 
		   }	
		
		/* ok, neither message avaliable under either */
		/* so, sleep and increase the sleep amount each time */

		if (c>200) {
		   usleep (stime);
		   if (stime<500) 
			   stime *= 2;		/* exp increase... */
		}
		c++;	/* tight loop counter */
	}	/* end while not found */

	/* so lets get a copy of the data out of here */
	/* first agree on the data size */
	/* i.e. the smallest of the two.. */

	alen = len;
	if (msg_info.bytes < len) alen = msg_info.bytes;

	if (alen) {

#ifdef DEBUG
	printf("Copy into [%x] from [%x] [%d] bytes\n",
		(char *)cp,
		(char*)(msgblocks[msg_info.seg]+(psize*msg_info.block)),
		alen);
#endif

		memcpy ((char *)cp, 
				(char*)(msgblocks[msg_info.seg]+(psize*msg_info.block)), 
				alen);

		/* dirty dead has been done, now to clean the block info up. */

#ifdef DEBUG
		printf("recv: [0x%x] cleaning up message block from [0x%x] in Seg[%d] start block [%d] of [%d] pages\n",
				mytid, msg_info.from, msg_info.seg, msg_info.block, msg_info.pages);
#endif /* DEBUG */
		freeblock (blocks[msg_info.seg], localid, msg_info.pages, msg_info.block);
	}

/* 	if (rlen) *rlen = (int) alen; */
/* 	if (rtid) *rtid = (int) msg_info.from; */
/* 	if (rtag) *rtag = (int) msg_info.mtype; */
					/* must cast or else long will over write other args */
	
	return (PvmOk);		
}



int shm_attach() {
	int i,id;
	key_t	key, k1, k2;
	int id1, id2;
	struct shmid_ds shmsbuf;
	unsigned size;
	void* 	ptr;
	int*	iptr;
	int 	toview;
	int		last;
	int		shmdtid;
	int 	srbuf, bufid;
	blockinfo_t*	baseptr;
	char*	cptr;
	shm_header_info_t* hptr;
	int		numsegs=0;
	char	hname[MAXHOSTNAMELEN];
	char	shmdname[1024];
	int isitok=0; /* ok or not ? */

	/* make sure we have our tid so we can decide if this local or remote */
	if (!mytid) {
		mytid = pvm_mytid(); 
		localid = (mytid & 0x0000FFFF);	/* shorten TIDs */
		if (mytid<0) /* ops no pvm... */
			return (-1);
	}

	bzero (shmdname, 1024);
	gethostname(hname, MAXHOSTNAMELEN);
	strcat (shmdname, "pvm_shmd:");
	strcat (shmdname, hname);


	srbuf = pvm_setrbuf(0);
	if ( pvm_recvinfo(shmdname, 0, PvmMboxDefault) == PvmNotFound ) {
		fprintf(stderr,"No pvm_shmd\n");	
		exit (-1);
	}	
	pvm_upkint(&shmdtid,1,1);
	pvm_upkint(&id,1,1);	
	pvm_setrbuf(srbuf);


	pvm_initsend(PvmDataDefault);
	pvm_pkint(&mytid,1,1);
	pvm_send (shmdtid, 1);			/* join the shared memory system */

#ifdef DEBUGSTART
	fprintf(stderr,"Joining the system\n"); fflush(stderr);

	fprintf(stderr,"Recv on [0x%x] and tag [0x%x]\n", shmdtid, mytid);
	fflush(stderr);
#endif /* DEBUGSTART */

	bufid = pvm_recv (shmdtid, mytid);	/* i.e. my message no questions */
#ifdef DEBUGSTART
	fprintf(stderr,"Got message back?\n"); fflush(stderr);
#endif /* DEBUGSTART */

	pvm_upkint (&isitok, 1, 1);			/* shm says yeah or nay ? */
#ifdef DEBUGSTART
	fprintf(stderr,"Value [%d]\n", isitok); fflush(stderr);
#endif /* DEBUGSTART */

	/* we do this handshake so that we don't have to spin */
	/* on our own info mqueue appearing in the shm header segment */
	/* once we get this message we know its there ok! */

	if (isitok<0) {
			fprintf(stderr,"pvm_shmd said no shm for [0x%x] ?", mytid);
			shm_enabled = -1;
			return (-1);
	}

      	
	ptr = shmat (id, 0, 0);			/* inital shmat attempt */
	
/* 	if (ptr>0) { */
	if (ptr != SHM_FAILED) {
#ifdef DEBUGSTART
		printf("attached at offset of 0x%lx\n", (long) ptr );
#endif /* DEBUGSTART */
		}
	else {
		perror("shmat:");
		shm_enabled = -1;
		return (-1);
	}

	segs[0] = (char *) ptr;				/* first segment pointer */
	syshdr = (shm_header_info_t *) ptr;	/* system header pointer */
	psize  = getpagesize();				/* ok a system call really */
	nsegs  = syshdr->numsegs;			/* number of segments */

#ifdef DEBUGSTART
	printf("Got syshdr [0x%lx] pagesize [%d] nsegs [%d]\n",
		(long) syshdr, psize, nsegs);
#endif /* DEBUGSTART */

	cptr = (char *) ptr;
/* 	cptr += sizeof (shm_header_info_t);	 */
		/* gets us to the blockinfo */
	/* actually this was a mistake, and a bug of my own making!!! */
	/* the sysheader only tasks around 200 bytes.. the rest is */
	/* the tidlist proper! */
	/* real start of blockinfo is 16KBytes lower.... */
	cptr += (16384);

	baseptr = (blockinfo_t*)cptr;		/* cast and assign */
	
	for(i=0;i<nsegs;i++) { 	/* for each segement (set of blocks) */	
		blocks[i] = baseptr;
#ifdef DEBUGSTART
		printf("Seg[%d] key [0x%x] id [%d] blockinfo [0x%lx]\n", i, 
				blocks[i]->segkey, blocks[i]->segid, (long) blocks[i]);
#endif /* DEBUGSTART */
		if (i) {	
		  /* we have already attached the first segment just the rest to go */
		   segs[i] = shmat (baseptr->segid, 0, 0);
/* 		   if (segs[i]<0) { */
		   if (segs[i]==SHM_FAILED) {
			   perror("shmat:");
			   shm_enabled = -1;
			   return (-1);
		   	}
#ifdef DEBUGSTART
		  printf("Seg[%d] attached at [0x%lx]\n", i, (long) segs[i]);
#endif /* DEBUGSTART */
		}
		msgblocks[i] = segs[i]+(baseptr->segoffset);
		cptr = (char *)blocks[i] + blocks[i]->nextoffset;
		baseptr = (blockinfo_t *) cptr; 
					/* point to the next blockinfo_t struct */ 	
	}

#ifdef DEBUGSTART
	printf("Number of segments is [%d]\n", i);
	fflush(stdout);
#endif /* DEBUGSTART */

	/* Now to find the maximum pages allowed for any single message.. */
	/* this should change when I allow messages across segments. */

#ifdef DEBUGSTART
	for(i=0;i<nsegs;i++)
		printf("[%d] has [%d] pages of memory starting at [0x%lx]. Whole pages starts at [0x%lx]\n",
			i, blocks[i]->npages, (long) msgblocks[i], (long) segs[i]);
#endif /* DEBUGSTART */

	maxpages = blocks[0]->npages;	
	for(i=1;i<nsegs;i++)
		if (blocks[i]->npages > maxpages) maxpages = blocks[i]->npages;

	
	/* now to find my own message queue and we are away */
	mymqueue = find_tidinfo_id (syshdr, mytid);		
	if (mymqueue < 0) {
		shm_enabled = -1;
		return (-1);
	}

#ifdef DEBUGSTART
	printf("mqueue: TID [0x%x] queue id [%d]\n", mytid, mymqueue);
#endif /* DEBUGSTART */


	/* if (processor_bind(P_PID, P_MYID, 0, NULL) == -1)
		perror("processor_bind"); */


	/* no further objections my honour */

	shm_enabled = 1;	/* i.e. all is ok and ready for SHM action */
	return (0);
}



syntax highlighted by Code2HTML, v. 0.9.1