/* event.c -- event waiting and handling */
#include "rts.h"
#ifdef __PARAGON__
#include "nx.h"
extern local_message_type; /* message type == ptype of current VM */
#endif
static void update_clock ();
static int wake_sleepers (), iocheck (), fdmerge ();
struct napper { /* napping job information */
struct napper *next; /* next list entry */
Sem sp; /* semaphore to tickle */
int wakeup; /* wakeup time */
};
struct iowait { /* i/o blocked job information */
struct iowait *next; /* next list entry */
Sem sp; /* semaphore to tickle */
fd_set *query; /* where to find set of wanted files */
fd_set *results; /* where to put the results */
enum io_type inout; /* INPUT or OUTPUT wait */
};
enum fdset_op { FDSET_AND, FDSET_OR };
static fd_set zeroset;
/* nap_list and io_list are protected by mpd_queue_mutex. */
static struct napper *nap_list = NULL; /* napping jobs (unordered list) */
static struct iowait *io_list = NULL; /* I/O blocked jobs (in order blocked)*/
static struct iowait *io_tail; /* end of the list */
static struct timeval start; /* system time when we started executing */
static int msclock; /* elapsed real time, in milliseconds */
/*
* mpd_nap_list_empty() and mpd_evio_list_empty() are for idleness checking.
* The caller already holds the queue_mutex.
*/
Bool mpd_nap_list_empty ()
{
return nap_list == NULL;
}
Bool mpd_evio_list_empty ()
{
return io_list == NULL || (mpd_exec_up && io_list->next == NULL);
}
/*
* Initialize the clock so we can measure a program's age.
* Should be called before job servers are created, since it
* contains a system call gettimeofday ().
*/
void
mpd_init_event ()
{
struct timezone tz;
gettimeofday (&start, &tz);
}
/*
* Return the elapsed time in milliseconds.
*/
int
mpd_age ()
{
update_clock (); /* get the clock */
wake_sleepers (); /* having done so, check wakeups */
return msclock; /* return the age */
}
/*
* Delay a process a specified number of milliseconds.
*/
void
mpd_nap (locn, msec)
char *locn;
{
Sem sp;
struct napper *np;
if (msec <= 0) { /* if no delay, just reschedule */
mpd_loop_resched (locn);
return;
}
update_clock (); /* get clock reading */
wake_sleepers (); /* awaken all whose clocks have rung */
sp = mpd_make_sem (0);
np = (struct napper *) mpd_alc (-sizeof (struct napper), 1);
np->sp = sp;
np->wakeup = msclock + msec;
LOCK_QUEUE ("mpd_nap");
np->next = nap_list; /* add to nap list */
nap_list = np;
UNLOCK_QUEUE ("mpd_nap");
P ((char *) 0, sp); /* wait for wakeup call */
mpd_kill_sem (sp);
}
/*
* Wait for input or activity on a set of files.
* "query" is not dereferenced here yet, but later when blocking on I/O.
*
* One place this is called is from mpd_net_recv () in socket.c, with the
* first two parameters needing to be protected for MultiMPD. Fortunately,
* the only place in this file they are assigned to (indirectly, through
* the io_list) is protected by a mutex on the io_list.
*/
void
mpd_iowait (query, results, inout)
fd_set *query, *results;
enum io_type inout;
{
Sem sp;
struct iowait *ip;
sp = mpd_make_sem (0);
ip = (struct iowait *) mpd_alc (-sizeof (struct iowait), 1);
ip->next = NULL;
ip->sp = sp;
/* since query and results are to be protected in MultiMPD, any
* assignment to ip->query or ip->results must be protected. */
ip->query = query;
ip->results = results;
ip->inout = inout;
/* add to end of list, with interlock */
LOCK_QUEUE ("mpd_iowait");
if (io_list == NULL)
io_list = ip;
else
io_tail->next = ip;
io_tail = ip;
UNLOCK_QUEUE ("mpd_iowait");
P ((char *) 0, sp); /* wait for file ready signal */
mpd_kill_sem (sp);
}
/*
* Check for I/O or timer event; return nonzero if any processes were awakened.
* Time out after t milliseconds, or never if t < 0.
*/
int
mpd_evcheck (t)
int t;
{
int n, w;
w = 0;
if (nap_list != NULL) {
update_clock (); /* update clock reading */
n = wake_sleepers (); /* try to awaken napping jobs */
if (n == 0)
w = 1; /* if we awakened somebody */
#ifndef __PARAGON__
if ((t < 0 && n < MAX_INT) || (t > n))
t = n; /* shorten timeout until wakeup due */
#endif
}
#ifdef __PARAGON__
if (t != 0)
t = 1; /* on Paragon, just busy-wait */
#endif
return w | iocheck (t);
}
/*
* Check I/O and return nonzero if any processes were awakened.
* Time out after t milliseconds, or never if t < 0.
*/
static int
iocheck (t)
int t;
{
struct iowait *ip, *ip2, **ipp;
struct timeval tv, *tp;
static struct timeval zerotime;
fd_set inset, outset, readyset;
int n;
LOCK_QUEUE ("iocheck");
if (io_list == NULL && t == 0) {
UNLOCK_QUEUE ("iocheck");
return 0;
}
n = 0;
inset = outset = zeroset;
#ifndef __PARAGON__
for (ip = io_list; ip != NULL; ip = ip->next)
if (ip->inout == INPUT)
n = fdmerge (&inset, &inset, ip->query, FDSET_OR);
else
n = fdmerge (&outset, &outset, ip->query, FDSET_OR);
#else
ip = io_list;
ip2 = NULL;
while (ip != NULL) {
if (ip->query != NULL) { /* if not receive of Paragon message */
if (ip->inout == INPUT)
n = fdmerge (&inset, &inset, ip->query, FDSET_OR);
else
n = fdmerge (&outset, &outset, ip->query, FDSET_OR);
ip2 = ip;
ip = ip->next;
}
else {
/*
* Check if a message on with correct message type is waiting.
* If so, remove entry from list and restart waiting process.
*/
DEBUG (D_SELECT, "Network check for msg_type %u",
local_message_type, 0, 0);
if (iprobex (local_message_type, -1, -1, msginfo) != 0) {
DEBUG (D_SELECT, "Network traffic", 0, 0, 0);
V (ip->sp);
if (ip == io_list) {
io_list = io_list->next;
ip2 = NULL;
}
else {
ip2->next = ip->next;
}
if (io_tail == ip)
io_tail = ip2;
mpd_free ((Ptr) ip);
UNLOCK_QUEUE ("iocheck");
return 1;
}
ip2 = ip;
ip = ip->next;
}
}
#endif
UNLOCK_QUEUE ("iocheck");
DEBUG (D_SELECT, "select: %08lX %08lX t=%ld",
inset.fds_bits[0], outset.fds_bits[0], t);
if (t < 0)
tp = NULL; /* if not to timeout */
else {
tv = zerotime; /* set timeout value */
tv.tv_usec = 1000 * (t % 1000);
tv.tv_sec = t / 1000;
tp = &tv;
}
BEGIN_IO (NULL);
n = select (n, &inset, &outset, (fd_set *) 0, tp);
END_IO (NULL);
DEBUG (D_SELECT, "selectd %08lX %08lX n=%ld",
inset.fds_bits[0], outset.fds_bits[0], n);
if (n < 0) {
perror ("select failure");
mpd_abort ("I/O error");
}
if (n == 0) /* if no I/O ready */
return 0;
/* one or more files are ready for I/O */
LOCK_QUEUE ("iocheck");
for (ipp = &io_list; (ip = *ipp) != NULL;) {
if (ip->inout == INPUT)
n = fdmerge (&readyset, &inset, ip->query, FDSET_AND);
else
n = fdmerge (&readyset, &outset, ip->query, FDSET_AND);
if (n > 0) {
*ip->results = readyset;
V (ip->sp);
*ipp = ip->next;
/* Important note: this ip cannot belong to any resource.
* Otherwise mpd_free would try to lock the resource mutex,
* violating the rule that it must be grabbed before queue_mutex,
* which we already hold.
*/
mpd_free ((Ptr) ip);
} else {
ipp = &ip->next;
io_tail = ip;
}
}
UNLOCK_QUEUE ("iocheck");
return 1; /* indicate we accomplished something */
}
/*
* Merge two fd_set structures to create a third.
*
* fdmerge (result, set1, set2, op)
* first three parameters are pointers to fd_set structs.
* op is FDSET_AND or FDSET_OR.
* return value is 1 + highest fd set in result, or 0 if none.
*/
static int
fdmerge (result, set1, set2, op)
fd_set *result, *set1, *set2;
enum fdset_op op;
{
int i, n;
fd_set tempset;
tempset = zeroset;
for (i = n = 0; i < FD_SETSIZE; i++) {
if ((op == FDSET_AND)
? (FD_ISSET (i, set1) && FD_ISSET (i, set2))
: (FD_ISSET (i, set1) || FD_ISSET (i, set2))) {
FD_SET (i, &tempset);
n = i + 1;
}
}
*result = tempset;
return n;
}
/*
* Put the current elapsed time in msclock.
*/
static void
update_clock ()
{
struct timeval tv;
struct timezone tz;
gettimeofday (&tv, &tz);
msclock = 1000 * (tv.tv_sec - start.tv_sec)
+ tv.tv_usec / 1000 - start.tv_usec / 1000;
DEBUG (D_CLOCK, "clock %ld", msclock, 0, 0);
}
/*
* Awaken napping jobs whose wakeup times have arrived.
* Called every time we've read the system clock into msclock.
*
* Returns timeout in msec until next wakeup.
* This is MAX_INT if the queue is empty and 0 if somebody was awakened.
*/
static int
wake_sleepers ()
{
struct napper **ptr, *np;
int t, tmin;
tmin = MAX_INT;
if (nap_list != NULL) {
LOCK_QUEUE ("wake_sleepers");
for (ptr = &nap_list; (np = *ptr) != NULL;) {
t = np->wakeup - msclock;
if (t <= 0 || !np->sp->blocked.head) { /* if wakeup time, or gone */
V (np->sp); /* awaken process */
*ptr = np->next; /* remove from list */
mpd_free ((Ptr) np); /* free the entry */
tmin = 0;
} else {
ptr = &np->next;
if (tmin > t)
tmin = t; /* remember max remaining time */
}
}
UNLOCK_QUEUE ("wake_sleepers");
}
return tmin;
}
#ifdef sysV68
/* gettimeofday () is not supplied by the v68 library; fake it. */
int
gettimeofday (tp, tzp)
struct timeval *tp;
struct timezone *tzp;
{
time (&tp->tv_sec);
tp->tv_usec = 0;
tzp->tz_minuteswest = 0;
tzp->tz_dsttime = 0;
}
#endif
syntax highlighted by Code2HTML, v. 0.9.1