static char rcsid[] =
"$Id: calc.c,v 1.3 1999/03/16 16:01:43 pvmsrc Exp $";
/*
* PVM version 3.4: Parallel Virtual Machine System
* University of Tennessee, Knoxville TN.
* Oak Ridge National Laboratory, Oak Ridge TN.
* Emory University, Atlanta GA.
* Authors: J. J. Dongarra, G. E. Fagg, M. Fischer
* G. A. Geist, J. A. Kohl, R. J. Manchek, P. Mucci,
* P. M. Papadopoulos, S. L. Scott, and V. S. Sunderam
* (C) 1997 All Rights Reserved
*
* NOTICE
*
* Permission to use, copy, modify, and distribute this software and
* its documentation for any purpose and without fee is hereby granted
* provided that the above copyright notice appear in all copies and
* that both the copyright notice and this permission notice appear in
* supporting documentation.
*
* Neither the Institutions (Emory University, Oak Ridge National
* Laboratory, and University of Tennessee) nor the Authors make any
* representations about the suitability of this software for any
* purpose. This software is provided ``as is'' without express or
* implied warranty.
*
* PVM version 3 was funded in part by the U.S. Department of Energy,
* the National Science Foundation and the State of Tennessee.
*/
/*
* calc.c
*
* Bag of tasks driver for tiled workers.
* Manages idle, busy, failed workers and message routing sockets.
*
* Oct 95 Manchek
*/
#ifdef HASSTDLIB
#include <stdlib.h>
#endif
#include <stdio.h>
#include <math.h>
#include <X11/Xlib.h>
#include <X11/cursorfont.h>
#include <X11/Intrinsic.h>
#include <X11/StringDefs.h>
#include <X11/Shell.h>
#include <X11/Xaw/Label.h>
#include <X11/Xaw/Command.h>
#include <X11/Xaw/Toggle.h>
#include <X11/Xaw/Form.h>
#include <pvm3.h>
#include "../src/bfunc.h"
#include "../src/listmac.h"
#include "myalloc.h"
#include "hostc.h"
#include "imp.h"
#ifndef min
#define min(a,b) ((a)<(b)?(a):(b))
#endif
#ifndef max
#define max(a,b) ((a)>(b)?(a):(b))
#endif
#define JobSend 1 /* send tile to worker */
#define JobReturn 2 /* get image back from worker */
#define AddMessage 3 /* hosts were added */
#define DelMessage 4 /* hosts were deleted */
#define ExitMessage 5 /* worker exited */
#define RouteAddTag 6 /* new route socket opened */
#define RouteDeleteTag 7 /* route socket closed */
#define TILEHEIGHT 10
struct worker {
struct worker *link, *rlink; /* dll of active or idle */
int tid; /* worker tid */
struct job *job; /* number */
XtInputId route; /* X id for route socket */
};
struct job {
struct job *link, *rlink;
int tile; /* tile number */
};
int gotmorehosts();
/***************
** Globals **
** **
***************/
extern struct canvas imCan; /* from xep.c */
extern int dobars; /* from xep.c */
extern char *workerfile; /* from xep.c */
extern int nworkers; /* from xep.c */
int mytid; /* pvm tid */
struct worker *active = 0; /* active workers */
struct worker *idle = 0; /* idle workers */
struct job *todo = 0; /* jobs to be started */
int ntiles = 0; /* number of tiles in frame */
pvminit()
{
if ((mytid = pvm_mytid()) < 0)
exit(1);
pvm_notify(PvmRouteAdd, RouteAddTag, -1, (int *)0);
pvm_setopt(PvmRoute, PvmRouteDirect);
host_init(AddMessage, DelMessage, gotmorehosts, (int (*)())0);
idle = TALLOC(1, struct worker, "worker");
idle->link = idle->rlink = idle;
idle->tid = 0;
idle->job = 0;
active = TALLOC(1, struct worker, "worker");
active->link = active->rlink = active;
active->tid = 0;
active->job = 0;
todo = TALLOC(1, struct job, "job");
todo->link = todo->rlink = todo;
return 0;
}
more_workers()
{
int nh;
int i;
struct hostc *hp;
struct worker *wp;
int tid;
hp = 0;
nh = 0;
while (hp = host_next(hp))
nh++;
nh -= nworkers;
if (nh <= 0)
return 0;
hp = 0;
while (hp = host_next(hp)) {
i = 0;
for (wp = idle->link; wp != idle; wp = wp->link)
if (pvm_tidtohost(wp->tid) == hp->pvmd_tid) {
i = 1;
break;
}
if (i)
continue;
for (wp = active->link; wp != active; wp = wp->link)
if (pvm_tidtohost(wp->tid) == hp->pvmd_tid) {
i = 1;
break;
}
if (i)
continue;
if (pvm_spawn(workerfile, (char**)0, PvmTaskHost, hp->name, 1, &tid)
< 0) {
pvm_exit();
exit(1);
}
if (tid > 0) {
wp = TALLOC(1, struct worker, "worker");
wp->tid = tid;
wp->job = 0;
wp->route = 0;
LISTPUTBEFORE(idle, wp, link, rlink);
pvm_notify(PvmTaskExit, ExitMessage, 1, &tid);
#ifdef DEBUG
fprintf(stderr, "more_workers() new worker 0x%x\n", tid);
#endif
nworkers++;
}
}
setlabel();
return 0;
}
stop_workers()
{
struct worker *wp;
while ((wp = idle->link) != idle) {
#ifdef DEBUG
fprintf(stderr, "stop_workers() killing 0x%x\n", wp->tid);
#endif
pvm_kill(wp->tid);
LISTDELETE(wp, link, rlink);
if (wp->route) {
/*
fprintf(stderr, "removeaninput() xii %ld\n", wp->route);
*/
XtRemoveInput(wp->route);
wp->route = 0;
}
MY_FREE(wp);
}
while ((wp = active->link) != active) {
#ifdef DEBUG
fprintf(stderr, "stop_workers() killing 0x%x\n", wp->tid);
#endif
pvm_kill(wp->tid);
LISTDELETE(wp, link, rlink);
if (wp->route) {
/*
fprintf(stderr, "removeaninput() xii %ld\n", wp->route);
*/
XtRemoveInput(wp->route);
wp->route = 0;
}
MY_FREE(wp);
}
nworkers = 0;
setlabel();
return 0;
}
do_recalc()
{
struct worker *wp;
struct job *jp;
int ht = imCan.cn_ht;
int i;
/* toss anything that's already on the todo list */
while ((jp = todo->link) != todo) {
LISTDELETE(jp, link, rlink);
MY_FREE(jp);
}
/* scrub in-progress jobs */
for (wp = active; (wp = wp->link) != active; )
wp->job->tile = -1;
/* fill it with new tiles */
ntiles = ht / TILEHEIGHT + 1;
for (i = 0; i < ntiles; i += 8) {
jp = TALLOC(1, struct job, "job");
jp->tile = i;
LISTPUTBEFORE(todo, jp, link, rlink);
}
for (i = 4; i < ntiles; i += 8) {
jp = TALLOC(1, struct job, "job");
jp->tile = i;
LISTPUTBEFORE(todo, jp, link, rlink);
}
for (i = 2; i < ntiles; i += 4) {
jp = TALLOC(1, struct job, "job");
jp->tile = i;
LISTPUTBEFORE(todo, jp, link, rlink);
}
for (i = 1; i < ntiles; i += 2) {
jp = TALLOC(1, struct job, "job");
jp->tile = i;
LISTPUTBEFORE(todo, jp, link, rlink);
}
assign_work();
return 0;
}
/* assign_work()
*
* Send off tiles to idle workers.
*/
assign_work()
{
struct job *jp;
struct worker *wp;
double im1 = imCan.cn_im1;
double im2 = imCan.cn_im2 - im1;
int ht = imCan.cn_ht;
int wd = imCan.cn_wd;
double reim[4]; /* tile corner coords */
int wdht[2]; /* tile wd, ht */
int y1, y2;
reim[0] = imCan.cn_re1;
reim[2] = imCan.cn_re2;
wdht[0] = wd;
while (idle->link != idle && todo->link != todo) {
jp = todo->link;
LISTDELETE(jp, link, rlink);
y2 = ((jp->tile + 1) * ht) / ntiles;
y1 = (jp->tile * ht) / ntiles;
reim[1] = im1 + (y1 * im2) / ht;
reim[3] = im1 + (y2 * im2) / ht;
wdht[1] = y2 - y1;
wp = idle->link;
LISTDELETE(wp, link, rlink);
LISTPUTBEFORE(active, wp, link, rlink);
wp->job = jp;
#ifdef DEBUG
fprintf(stderr, "sent job %d to 0x%x: %dx%d %f,%f/%f,%f\n",
jp->tile, wp->tid,
wdht[0], wdht[1], reim[0], reim[1], reim[2], reim[3]);
#endif
pvm_packf("%+ %2lx %2d", PvmDataDefault, reim, wdht);
if (pvm_send(wp->tid, JobSend)) {
pvm_exit();
exit(1);
}
if (dobars)
label_row(y1, y2, wp->tid);
}
/*
* we must check receive here because messages may have arrived
* while we were sending, so the socket will not be ready to
* read to wake us up. a bit sick.
*/
while (pvm_nrecv(-1, -1) > 0)
claim_message();
return 0;
}
claim_message()
{
struct job *jp;
struct worker *wp;
int tag;
int tid; /* id of sender */
int wd = imCan.cn_wd;
int ht = imCan.cn_ht;
double im1 = imCan.cn_im1;
double im2 = imCan.cn_im2 - im1;
double reim[4]; /* tile corner coords */
int wdht[2]; /* tile wd, ht */
int y1, y2; /* tile start, end rows */
int h; /* height of tile */
pvm_bufinfo(pvm_getrbuf(), (int*)0, &tag, &tid);
#ifdef DEBUG
fprintf(stderr, "message %d from 0x%x\n", tag, tid);
#endif
if (tag == AddMessage) {
host_add();
} else if (tag == DelMessage) {
host_delete();
} else if (tag == ExitMessage) {
pvm_upkint(&tid, 1, 1);
for (wp = idle->link; wp != idle; wp = wp->link) {
if (wp->tid == tid) {
LISTDELETE(wp, link, rlink);
if (wp->route) {
/*
fprintf(stderr, "removeaninput() xii %ld\n", wp->route);
*/
XtRemoveInput(wp->route);
wp->route = 0;
}
MY_FREE(wp);
nworkers--;
setlabel();
goto tryagain;
}
}
for (wp = active->link; wp != active; wp = wp->link) {
if (wp->tid == tid) {
int i;
LISTDELETE(wp, link, rlink);
jp = wp->job;
if (jp->tile >= 0) {
LISTPUTAFTER(todo, jp, link, rlink);
y1 = (jp->tile * ht) / ntiles;
y2 = ((jp->tile + 1) * ht) / ntiles;
h = y2 - y1;
for (i = wd * h; i-- > 0; )
*((char*)imCan.cn_dat + y1 * wd + i) = 32;
/*
BZERO((char*)imCan.cn_dat + y1 * wd, wd * h);
*/
repaint_region(&imCan, 0, y1, wd - 1, y2 - 1);
refresh_region(&imCan, 0, y1, wd - 1, y2 - 1);
} else {
MY_FREE(jp);
}
if (wp->route) {
/*
fprintf(stderr, "removeaninput() xii %ld\n", wp->route);
*/
XtRemoveInput(wp->route);
wp->route = 0;
}
MY_FREE(wp);
nworkers--;
setlabel();
/*
assign_work();
*/
goto tryagain;
}
}
tryagain:
gotmorehosts(); /* hope we can start another */
} else if (tag == JobReturn) {
for (wp = active->link; wp != active; wp = wp->link)
if (wp->tid == tid)
break;
if (wp == active) {
fprintf(stderr, "bogus message?\n");
return 0;
}
LISTDELETE(wp, link, rlink);
jp = wp->job;
if (jp->tile >= 0) {
#ifdef DEBUG
fprintf(stderr, "got job %d from 0x%x\n", jp->tile, wp->tid);
#endif
y1 = (jp->tile * ht) / ntiles;
y2 = ((jp->tile + 1) * ht) / ntiles;
h = y2 - y1;
pvm_unpackf("%*c", wd * h, (char*)imCan.cn_dat + y1 * wd);
if (dobars)
label_row(y1, y2, wp->tid);
repaint_region(&imCan, 0, y1, wd - 1, y2 - 1);
refresh_region(&imCan, 0, y1, wd - 1, y2 - 1);
}
MY_FREE(jp);
/* put work server back on free list */
LISTPUTBEFORE(idle, wp, link, rlink);
/* assign more work if available */
assign_work();
} else if (tag == RouteAddTag) {
int tid, fd;
XtInputId xii;
pvm_unpackf("%d %d", &tid, &fd);
fprintf(stderr, "got route add notify tid 0x%x fd %d\n", tid, fd);
for (wp = idle->link; wp != idle; wp = wp->link)
if (wp->tid == tid)
break;
if (wp == idle)
for (wp = active->link; wp != active; wp = wp->link)
if (wp->tid == tid)
break;
if (wp == active) {
fprintf(stderr, "route add notify for worker not mine tid 0x%x\n",
tid);
} else {
addaninputfile(fd, &wp->route);
pvm_notify(PvmRouteDelete, RouteDeleteTag, 1, &tid);
}
} else if (tag == RouteDeleteTag) {
int tid, fd;
XtInputId xii;
pvm_unpackf("%d %d", &tid, &fd);
fprintf(stderr, "got route delete notify tid 0x%x fd %d\n", tid, fd);
for (wp = idle->link; wp != idle; wp = wp->link)
if (wp->tid == tid)
break;
if (wp == idle)
for (wp = active->link; wp != active; wp = wp->link)
if (wp->tid == tid)
break;
if (wp == active) {
fprintf(stderr,
"route delete notify for worker not mine tid 0x%x\n", tid);
} else if (wp->route) {
/*
removeaninputfile(wp->route);
*/
/*
fprintf(stderr, "removeaninput() xii %ld\n", wp->route);
*/
XtRemoveInput(wp->route);
wp->route = 0;
}
}
return 0;
}
int
gotmorehosts()
{
#ifdef DEBUG
fprintf(stderr, "host added\n");
#endif
more_workers();
assign_work();
return 0;
}
syntax highlighted by Code2HTML, v. 0.9.1