/* event.c -- event waiting and handling */ #include "rts.h" #ifdef __PARAGON__ #include "nx.h" extern local_message_type; /* message type == ptype of current VM */ #endif static void update_clock (); static int wake_sleepers (), iocheck (), fdmerge (); struct napper { /* napping job information */ struct napper *next; /* next list entry */ Sem sp; /* semaphore to tickle */ int wakeup; /* wakeup time */ }; struct iowait { /* i/o blocked job information */ struct iowait *next; /* next list entry */ Sem sp; /* semaphore to tickle */ fd_set *query; /* where to find set of wanted files */ fd_set *results; /* where to put the results */ enum io_type inout; /* INPUT or OUTPUT wait */ }; enum fdset_op { FDSET_AND, FDSET_OR }; static fd_set zeroset; /* nap_list and io_list are protected by mpd_queue_mutex. */ static struct napper *nap_list = NULL; /* napping jobs (unordered list) */ static struct iowait *io_list = NULL; /* I/O blocked jobs (in order blocked)*/ static struct iowait *io_tail; /* end of the list */ static struct timeval start; /* system time when we started executing */ static int msclock; /* elapsed real time, in milliseconds */ /* * mpd_nap_list_empty() and mpd_evio_list_empty() are for idleness checking. * The caller already holds the queue_mutex. */ Bool mpd_nap_list_empty () { return nap_list == NULL; } Bool mpd_evio_list_empty () { return io_list == NULL || (mpd_exec_up && io_list->next == NULL); } /* * Initialize the clock so we can measure a program's age. * Should be called before job servers are created, since it * contains a system call gettimeofday (). */ void mpd_init_event () { struct timezone tz; gettimeofday (&start, &tz); } /* * Return the elapsed time in milliseconds. */ int mpd_age () { update_clock (); /* get the clock */ wake_sleepers (); /* having done so, check wakeups */ return msclock; /* return the age */ } /* * Delay a process a specified number of milliseconds. */ void mpd_nap (locn, msec) char *locn; { Sem sp; struct napper *np; if (msec <= 0) { /* if no delay, just reschedule */ mpd_loop_resched (locn); return; } update_clock (); /* get clock reading */ wake_sleepers (); /* awaken all whose clocks have rung */ sp = mpd_make_sem (0); np = (struct napper *) mpd_alc (-sizeof (struct napper), 1); np->sp = sp; np->wakeup = msclock + msec; LOCK_QUEUE ("mpd_nap"); np->next = nap_list; /* add to nap list */ nap_list = np; UNLOCK_QUEUE ("mpd_nap"); P ((char *) 0, sp); /* wait for wakeup call */ mpd_kill_sem (sp); } /* * Wait for input or activity on a set of files. * "query" is not dereferenced here yet, but later when blocking on I/O. * * One place this is called is from mpd_net_recv () in socket.c, with the * first two parameters needing to be protected for MultiMPD. Fortunately, * the only place in this file they are assigned to (indirectly, through * the io_list) is protected by a mutex on the io_list. */ void mpd_iowait (query, results, inout) fd_set *query, *results; enum io_type inout; { Sem sp; struct iowait *ip; sp = mpd_make_sem (0); ip = (struct iowait *) mpd_alc (-sizeof (struct iowait), 1); ip->next = NULL; ip->sp = sp; /* since query and results are to be protected in MultiMPD, any * assignment to ip->query or ip->results must be protected. */ ip->query = query; ip->results = results; ip->inout = inout; /* add to end of list, with interlock */ LOCK_QUEUE ("mpd_iowait"); if (io_list == NULL) io_list = ip; else io_tail->next = ip; io_tail = ip; UNLOCK_QUEUE ("mpd_iowait"); P ((char *) 0, sp); /* wait for file ready signal */ mpd_kill_sem (sp); } /* * Check for I/O or timer event; return nonzero if any processes were awakened. * Time out after t milliseconds, or never if t < 0. */ int mpd_evcheck (t) int t; { int n, w; w = 0; if (nap_list != NULL) { update_clock (); /* update clock reading */ n = wake_sleepers (); /* try to awaken napping jobs */ if (n == 0) w = 1; /* if we awakened somebody */ #ifndef __PARAGON__ if ((t < 0 && n < MAX_INT) || (t > n)) t = n; /* shorten timeout until wakeup due */ #endif } #ifdef __PARAGON__ if (t != 0) t = 1; /* on Paragon, just busy-wait */ #endif return w | iocheck (t); } /* * Check I/O and return nonzero if any processes were awakened. * Time out after t milliseconds, or never if t < 0. */ static int iocheck (t) int t; { struct iowait *ip, *ip2, **ipp; struct timeval tv, *tp; static struct timeval zerotime; fd_set inset, outset, readyset; int n; LOCK_QUEUE ("iocheck"); if (io_list == NULL && t == 0) { UNLOCK_QUEUE ("iocheck"); return 0; } n = 0; inset = outset = zeroset; #ifndef __PARAGON__ for (ip = io_list; ip != NULL; ip = ip->next) if (ip->inout == INPUT) n = fdmerge (&inset, &inset, ip->query, FDSET_OR); else n = fdmerge (&outset, &outset, ip->query, FDSET_OR); #else ip = io_list; ip2 = NULL; while (ip != NULL) { if (ip->query != NULL) { /* if not receive of Paragon message */ if (ip->inout == INPUT) n = fdmerge (&inset, &inset, ip->query, FDSET_OR); else n = fdmerge (&outset, &outset, ip->query, FDSET_OR); ip2 = ip; ip = ip->next; } else { /* * Check if a message on with correct message type is waiting. * If so, remove entry from list and restart waiting process. */ DEBUG (D_SELECT, "Network check for msg_type %u", local_message_type, 0, 0); if (iprobex (local_message_type, -1, -1, msginfo) != 0) { DEBUG (D_SELECT, "Network traffic", 0, 0, 0); V (ip->sp); if (ip == io_list) { io_list = io_list->next; ip2 = NULL; } else { ip2->next = ip->next; } if (io_tail == ip) io_tail = ip2; mpd_free ((Ptr) ip); UNLOCK_QUEUE ("iocheck"); return 1; } ip2 = ip; ip = ip->next; } } #endif UNLOCK_QUEUE ("iocheck"); DEBUG (D_SELECT, "select: %08lX %08lX t=%ld", inset.fds_bits[0], outset.fds_bits[0], t); if (t < 0) tp = NULL; /* if not to timeout */ else { tv = zerotime; /* set timeout value */ tv.tv_usec = 1000 * (t % 1000); tv.tv_sec = t / 1000; tp = &tv; } BEGIN_IO (NULL); n = select (n, &inset, &outset, (fd_set *) 0, tp); END_IO (NULL); DEBUG (D_SELECT, "selectd %08lX %08lX n=%ld", inset.fds_bits[0], outset.fds_bits[0], n); if (n < 0) { perror ("select failure"); mpd_abort ("I/O error"); } if (n == 0) /* if no I/O ready */ return 0; /* one or more files are ready for I/O */ LOCK_QUEUE ("iocheck"); for (ipp = &io_list; (ip = *ipp) != NULL;) { if (ip->inout == INPUT) n = fdmerge (&readyset, &inset, ip->query, FDSET_AND); else n = fdmerge (&readyset, &outset, ip->query, FDSET_AND); if (n > 0) { *ip->results = readyset; V (ip->sp); *ipp = ip->next; /* Important note: this ip cannot belong to any resource. * Otherwise mpd_free would try to lock the resource mutex, * violating the rule that it must be grabbed before queue_mutex, * which we already hold. */ mpd_free ((Ptr) ip); } else { ipp = &ip->next; io_tail = ip; } } UNLOCK_QUEUE ("iocheck"); return 1; /* indicate we accomplished something */ } /* * Merge two fd_set structures to create a third. * * fdmerge (result, set1, set2, op) * first three parameters are pointers to fd_set structs. * op is FDSET_AND or FDSET_OR. * return value is 1 + highest fd set in result, or 0 if none. */ static int fdmerge (result, set1, set2, op) fd_set *result, *set1, *set2; enum fdset_op op; { int i, n; fd_set tempset; tempset = zeroset; for (i = n = 0; i < FD_SETSIZE; i++) { if ((op == FDSET_AND) ? (FD_ISSET (i, set1) && FD_ISSET (i, set2)) : (FD_ISSET (i, set1) || FD_ISSET (i, set2))) { FD_SET (i, &tempset); n = i + 1; } } *result = tempset; return n; } /* * Put the current elapsed time in msclock. */ static void update_clock () { struct timeval tv; struct timezone tz; gettimeofday (&tv, &tz); msclock = 1000 * (tv.tv_sec - start.tv_sec) + tv.tv_usec / 1000 - start.tv_usec / 1000; DEBUG (D_CLOCK, "clock %ld", msclock, 0, 0); } /* * Awaken napping jobs whose wakeup times have arrived. * Called every time we've read the system clock into msclock. * * Returns timeout in msec until next wakeup. * This is MAX_INT if the queue is empty and 0 if somebody was awakened. */ static int wake_sleepers () { struct napper **ptr, *np; int t, tmin; tmin = MAX_INT; if (nap_list != NULL) { LOCK_QUEUE ("wake_sleepers"); for (ptr = &nap_list; (np = *ptr) != NULL;) { t = np->wakeup - msclock; if (t <= 0 || !np->sp->blocked.head) { /* if wakeup time, or gone */ V (np->sp); /* awaken process */ *ptr = np->next; /* remove from list */ mpd_free ((Ptr) np); /* free the entry */ tmin = 0; } else { ptr = &np->next; if (tmin > t) tmin = t; /* remember max remaining time */ } } UNLOCK_QUEUE ("wake_sleepers"); } return tmin; } #ifdef sysV68 /* gettimeofday () is not supplied by the v68 library; fake it. */ int gettimeofday (tp, tzp) struct timeval *tp; struct timezone *tzp; { time (&tp->tv_sec); tp->tv_usec = 0; tzp->tz_minuteswest = 0; tzp->tz_dsttime = 0; } #endif