/* * The Spread Toolkit. * * The contents of this file are subject to the Spread Open-Source * License, Version 1.0 (the ``License''); you may not use * this file except in compliance with the License. You may obtain a * copy of the License at: * * http://www.spread.org/license/ * * or in the file ``license.txt'' found in this distribution. * * Software distributed under the License is distributed on an AS IS basis, * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License * for the specific language governing rights and limitations under the * License. * * The Creators of Spread are: * Yair Amir, Michal Miskin-Amir, Jonathan Stanton. * * Copyright (C) 1993-2004 Spread Concepts LLC * * All Rights Reserved. * * Major Contributor(s): * --------------- * Cristina Nita-Rotaru crisn@cs.purdue.edu - group communication security. * Theo Schlossnagle jesus@omniti.com - Perl, skiplists, autoconf. * Dan Schoenblum dansch@cnds.jhu.edu - Java interface. * John Schultz jschultz@cnds.jhu.edu - contribution to process group membership. * */ #include "arch.h" /* undef redefined variables under windows */ #ifdef ARCH_PC_WIN95 #undef EINTR #undef EAGAIN #undef EWOULDBLOCK #undef EINPROGRESS #endif #include #ifndef ARCH_PC_WIN95 #include #include #include #include #else /* ARCH_PC_WIN95 */ #include #include #endif /* ARCH_PC_WIN95 */ #include #include "sp_events.h" #include "objects.h" /* For memory */ #include "memory.h" /* for memory */ #include "alarm.h" typedef struct dummy_t_event { sp_time t; void (* func)( int code, void *data ); int code; void *data; struct dummy_t_event *next; } time_event; typedef struct dummy_fd_event { int fd; int fd_type; void (* func)( mailbox mbox, int code, void *data ); int code; void *data; int active; /* true if active, false if inactive */ } fd_event; typedef struct dummy_fd_queue { int num_fds; int num_active_fds; fd_event events[MAX_FD_EVENTS]; } fd_queue; static time_event *Time_queue; static sp_time Now; static fd_queue Fd_queue[NUM_PRIORITY]; static fd_set Fd_mask[NUM_FDTYPES]; static int Active_priority; static int Exit_events; int E_init(void) { int i,ret; Time_queue = NULL; ret = Mem_init_object(TIME_EVENT, sizeof(time_event), 100,0); if (ret < 0) { Alarm(EXIT, "E_Init: Failure to Initialize TIME_EVENT memory objects\n"); } for ( i=0; i < NUM_PRIORITY; i++ ) { Fd_queue[i].num_fds = 0; Fd_queue[i].num_active_fds = 0; } for ( i=0; i < NUM_FDTYPES; i++ ) { FD_ZERO( &Fd_mask[i] ); } Active_priority = LOW_PRIORITY; E_get_time(); Alarm( EVENTS, "E_init: went ok\n"); return( 0 ); } sp_time E_get_time(void) { #ifndef ARCH_PC_WIN95 #if HAVE_STRUCT_TIMEZONE struct timezone dummy_tz; #else sp_time dummy_tz; #endif int ret; ret = gettimeofday( (struct timeval *)&Now, (void *)&dummy_tz ); if ( ret < 0 ) Alarm( EXIT, "E_get_time: gettimeofday problems.\n" ); #else /* ARCH_PC_WIN95 */ struct _timeb timebuffer; _ftime( &timebuffer ); Now.sec = timebuffer.time; Now.usec= timebuffer.millitm; Now.usec= Now.usec * 1000; #endif /* ARCH_PC_WIN95 */ #if 0 Alarm( EVENTS, "E_get_time: time is (%d, %d)\n", Now.sec, Now.usec); #endif return ( Now ); } sp_time E_sub_time( sp_time t, sp_time delta_t ) { sp_time res; res.sec = t.sec - delta_t.sec; res.usec = t.usec - delta_t.usec; if ( res.usec < 0 ) { res.usec = res.usec + 1000000; res.sec--; } if ( res.sec < 0 ) Alarm( EVENTS, "E_sub_time: negative time result.\n"); return ( res ); } sp_time E_add_time( sp_time t, sp_time delta_t ) { sp_time res; res.sec = t.sec + delta_t.sec; res.usec = t.usec + delta_t.usec; if ( res.usec > 1000000 ) { res.usec = res.usec - 1000000; res.sec++; } return ( res ); } int E_compare_time( sp_time t1, sp_time t2 ) { if ( t1.sec > t2.sec ) return ( 1 ); else if ( t1.sec < t2.sec ) return ( -1 ); else if ( t1.usec > t2.usec ) return ( 1 ); else if ( t1.usec < t2.usec ) return ( -1 ); else return ( 0 ); } int E_queue( void (* func)( int code, void *data ), int code, void *data, sp_time delta_time ) { time_event *t_pre; time_event *t_post; time_event *t_e; int inserted; int deleted; int compare; t_e = new( TIME_EVENT ); t_e->t = E_add_time( E_get_time(), delta_time ); t_e->func = func; t_e->code = code; t_e->data = data; deleted = 0; inserted = 0; if( Time_queue != NULL ) { if( Time_queue->func == t_e->func && Time_queue->data == t_e->data && Time_queue->code == t_e->code ) { t_pre = Time_queue; Time_queue = Time_queue->next; dispose( t_pre ); deleted = 1; Alarm( EVENTS, "E_queue: dequeued a (first) simillar event\n" ); } } if( Time_queue == NULL ) { t_e->next = NULL; Time_queue = t_e; Alarm( EVENTS, "E_queue: (only) event queued func 0x%x code %d data 0x%x in future (%u:%u)\n",t_e->func,t_e->code, t_e->data, delta_time.sec, delta_time.usec ); return( 0 ); }else{ compare = E_compare_time ( t_e->t, Time_queue->t ); if( compare < 0 ) { t_e->next = Time_queue; Time_queue = t_e; inserted = 1; Alarm( EVENTS, "E_queue: (first) event queued func 0x%x code %d data 0x%x in future (%u:%u)\n",t_e->func,t_e->code, t_e->data, delta_time.sec,delta_time.usec ); } } t_pre = Time_queue ; t_post = Time_queue->next; while ( t_post != NULL && ( !inserted || !deleted ) ) { if( t_post->func == t_e->func && t_post->data == t_e->data && t_post->code == t_e->code ) { t_pre->next = t_post->next; dispose( t_post ); t_post = t_pre->next; deleted = 1; Alarm( EVENTS, "E_queue: dequeued a simillar event\n" ); continue; } if ( !inserted ) { compare = E_compare_time ( t_e->t, t_post->t ); if( compare < 0 ) { t_pre->next = t_e; t_e->next = t_post; inserted = 1; Alarm( EVENTS, "E_queue: event queued for func 0x%x code %d data 0x%x in future (%u:%u)\n",t_e->func,t_e->code, t_e->data, delta_time.sec, delta_time.usec ); } } t_pre = t_post; t_post = t_post->next; } if( !inserted ) { t_pre->next = t_e; t_e->next = NULL; Alarm( EVENTS, "E_queue: (last) event queued func 0x%x code %d data 0x%x in future (%u:%u)\n",t_e->func,t_e->code, t_e->data, delta_time.sec,delta_time.usec ); } return( 0 ); } int E_dequeue( void (* func)( int code, void *data ), int code, void *data ) { time_event *t_pre; time_event *t_ptr; if( Time_queue == NULL ) { Alarm( EVENTS, "E_dequeue: no such event\n" ); return( -1 ); } if( Time_queue->func == func && Time_queue->data == data && Time_queue->code == code ) { t_ptr = Time_queue; Time_queue = Time_queue->next; dispose( t_ptr ); Alarm( EVENTS, "E_dequeue: first event dequeued func 0x%x code %d data 0x%x\n",func,code, data); return( 0 ); } t_pre = Time_queue; while ( t_pre->next != NULL ) { t_ptr = t_pre->next; if( t_ptr->func == func && t_ptr->data == data && t_ptr->code == code ) { t_pre->next = t_ptr->next; dispose( t_ptr ); Alarm( EVENTS, "E_dequeue: event dequeued func 0x%x code %d data 0x%x\n",func,code, data); return( 0 ); } t_pre = t_ptr; } Alarm( EVENTS, "E_dequeue: no such event\n" ); return( -1 ); } void E_delay( sp_time t ) { sp_time tmp_t; tmp_t = t; #ifndef ARCH_PC_WIN95 if (select(0, NULL, NULL, NULL, (struct timeval *)&tmp_t ) < 0) { Alarm( EVENTS, "E_delay: select delay returned error: %s\n", strerror(errno)); } #else /* ARCH_PC_WIN95 */ SleepEx( tmp_t.sec*1000+tmp_t.usec/1000, 0 ); #endif /* ARCH_PC_WIN95 */ } int E_attach_fd( int fd, int fd_type, void (* func)( mailbox mbox, int code, void *data ), int code, void *data, int priority ) { int num_fds; int j; if( priority < 0 || priority > NUM_PRIORITY ) { Alarm( PRINT, "E_attach_fd: invalid priority %d for fd %d with fd_type %d\n", priority, fd, fd_type ); return( -1 ); } if( fd_type < 0 || fd_type > NUM_FDTYPES ) { Alarm( PRINT, "E_attach_fd: invalid fd_type %d for fd %d with priority %d\n", fd_type, fd, priority ); return( -1 ); } #ifndef ARCH_PC_WIN95 /* Windows bug: Reports FD_SETSIZE of 64 but select works on all * fd's even ones with numbers greater then 64. */ if( fd < 0 || fd > FD_SETSIZE ) { Alarm( PRINT, "E_attach_fd: invalid fd %d (max %d) with fd_type %d with priority %d\n", fd, FD_SETSIZE, fd_type, priority ); return( -1 ); } #endif for( j=0; j < Fd_queue[priority].num_fds; j++ ) { if( ( Fd_queue[priority].events[j].fd == fd ) && ( Fd_queue[priority].events[j].fd_type == fd_type ) ) { Fd_queue[priority].events[j].func = func; Fd_queue[priority].events[j].code = code; Fd_queue[priority].events[j].data = data; if ( !(Fd_queue[priority].events[j].active) ) Fd_queue[priority].num_active_fds++; Fd_queue[priority].events[j].active = TRUE; Alarm( PRINT, "E_attach_fd: fd %d with type %d exists & replaced & activated\n", fd, fd_type ); return( 1 ); } } num_fds = Fd_queue[priority].num_fds; if ( num_fds == MAX_FD_EVENTS ) { Alarm( PRINT, "E_attach_fd: Reached Maximum number of events. Recompile with larger MAX_FD_EVENTS\n"); return( -1 ); } Fd_queue[priority].events[num_fds].fd = fd; Fd_queue[priority].events[num_fds].fd_type = fd_type; Fd_queue[priority].events[num_fds].func = func; Fd_queue[priority].events[num_fds].code = code; Fd_queue[priority].events[num_fds].data = data; Fd_queue[priority].events[num_fds].active = TRUE; Fd_queue[priority].num_fds++; Fd_queue[priority].num_active_fds++; if( Active_priority <= priority ) FD_SET( fd, &Fd_mask[fd_type] ); Alarm( EVENTS, "E_attach_fd: fd %d, fd_type %d, code %d, data 0x%x, priority %d Active_priority %d\n", fd, fd_type, code, data, priority, Active_priority ); return( 0 ); } int E_detach_fd( int fd, int fd_type ) { int i,j; int found; if( fd_type < 0 || fd_type > NUM_FDTYPES ) { Alarm( PRINT, "E_detach_fd: invalid fd_type %d for fd %d\n", fd_type, fd ); return( -1 ); } found = 0; for( i=0; i < NUM_PRIORITY; i++ ) for( j=0; j < Fd_queue[i].num_fds; j++ ) { if( ( Fd_queue[i].events[j].fd == fd ) && ( Fd_queue[i].events[j].fd_type == fd_type ) ) { if (Fd_queue[i].events[j].active) Fd_queue[i].num_active_fds--; Fd_queue[i].num_fds--; Fd_queue[i].events[j] = Fd_queue[i].events[Fd_queue[i].num_fds]; FD_CLR( fd, &Fd_mask[fd_type] ); found = 1; break; /* from the j for only */ } } if( ! found ) return( -1 ); return( 0 ); } int E_deactivate_fd( int fd, int fd_type ) { int i,j; int found; if( fd_type < 0 || fd_type > NUM_FDTYPES ) { Alarm( PRINT, "E_deactivate_fd: invalid fd_type %d for fd %d\n", fd_type, fd ); return( -1 ); } found = 0; for( i=0; i < NUM_PRIORITY; i++ ) for( j=0; j < Fd_queue[i].num_fds; j++ ) { if( ( Fd_queue[i].events[j].fd == fd ) && ( Fd_queue[i].events[j].fd_type == fd_type ) ) { if (Fd_queue[i].events[j].active) Fd_queue[i].num_active_fds--; Fd_queue[i].events[j].active = FALSE; FD_CLR( fd, &Fd_mask[fd_type] ); found = 1; break; /* from the j for only */ } } if( ! found ) return( -1 ); return( 0 ); } int E_activate_fd( int fd, int fd_type ) { int i,j; int found; if( fd_type < 0 || fd_type > NUM_FDTYPES ) { Alarm( PRINT, "E_activate_fd: invalid fd_type %d for fd %d\n", fd_type, fd ); return( -1 ); } found = 0; for( i=0; i < NUM_PRIORITY; i++ ) for( j=0; j < Fd_queue[i].num_fds; j++ ) { if( ( Fd_queue[i].events[j].fd == fd ) && ( Fd_queue[i].events[j].fd_type == fd_type ) ) { if ( !(Fd_queue[i].events[j].active) ) Fd_queue[i].num_active_fds++; Fd_queue[i].events[j].active = TRUE; if( i >= Active_priority ) FD_SET( fd, &Fd_mask[ fd_type ] ); found = 1; break; /* from the j for only */ } } if( ! found ) return( -1 ); return( 0 ); } int E_set_active_threshold( int priority ) { int fd_type; int i,j; if( priority < 0 || priority > NUM_PRIORITY ) { Alarm( PRINT, "E_set_active_threshold: invalid priority %d\n", priority ); return( -1 ); } if( priority == Active_priority ) return( priority ); Active_priority = priority; for ( i=0; i < NUM_FDTYPES; i++ ) { FD_ZERO( &Fd_mask[i] ); } for( i = priority; i < NUM_PRIORITY; i++ ) for( j=0; j < Fd_queue[i].num_fds; j++ ) { fd_type = Fd_queue[i].events[j].fd_type; if (Fd_queue[i].events[j].active) FD_SET( Fd_queue[i].events[j].fd, &Fd_mask[fd_type] ); } Alarm( EVENTS, "E_set_active_threshold: changed to %d\n",Active_priority); return( priority ); } int E_num_active( int priority ) { if( priority < 0 || priority > NUM_PRIORITY ) { Alarm( PRINT, "E_num_active: invalid priority %d\n", priority ); return( -1 ); } return( Fd_queue[priority].num_active_fds ); } void E_handle_events(void) { static int Round_robin = 0; static const sp_time long_timeout = { 10000, 0}; static const sp_time zero_sec = { 0, 0}; #ifdef BADCLOCK static const sp_time mili_sec = { 0, 1000}; int clock_sync; #endif int num_set; int treated; int fd; int fd_type; int i,j; sp_time timeout, wait_timeout; fd_set current_mask[NUM_FDTYPES]; time_event *temp_ptr; int first=1; #ifdef TESTTIME sp_time tmp_late,start,stop,req_time; /* DEBUGGING */ #endif #ifdef BADCLOCK clock_sync = 0; #endif for( Exit_events = 0 ; !Exit_events ; ) { Alarm( EVENTS, "E_handle_events: next event \n"); /* Handle time events */ timeout = long_timeout; #ifdef TESTTIME start = E_get_time(); #endif while( Time_queue != NULL ) { #ifdef BADCLOCK if ( clock_sync >= 0 ) { E_get_time(); clock_sync = -20; } #else E_get_time(); #endif if ( !first && E_compare_time( Now, Time_queue->t ) >= 0 ) { #ifdef TESTTIME tmp_late = E_sub_time( Now, Time_queue->t ); #endif temp_ptr = Time_queue; Time_queue = Time_queue->next; Alarm( EVENTS, "E_handle_events: exec time event \n"); #ifdef TESTTIME Alarm( DEBUG, "Events: TimeEv is %d %d late\n",tmp_late.sec, tmp_late.usec); #endif temp_ptr->func( temp_ptr->code, temp_ptr->data ); dispose( temp_ptr ); #ifdef BADCLOCK Now = E_add_time( Now, mili_sec ); clock_sync++; #else E_get_time(); #endif if (Exit_events) goto end_handler; }else{ timeout = E_sub_time( Time_queue->t, Now ); break; } } if (timeout.sec < 0 ) timeout.sec = timeout.usec = 0; /* this can happen until first is unset */ #ifdef TESTTIME stop = E_get_time(); tmp_late = E_sub_time(stop, start); Alarm(DEBUG, "Events: TimeEv's took %d %d to handle\n", tmp_late.sec, tmp_late.usec); #endif /* Handle fd events */ for( i=0; i < NUM_FDTYPES; i++ ) { current_mask[i] = Fd_mask[i]; } Alarm( EVENTS, "E_handle_events: poll select\n"); #ifdef TESTTIME req_time = zero_sec; #endif wait_timeout = zero_sec; num_set = select( FD_SETSIZE, ¤t_mask[READ_FD], ¤t_mask[WRITE_FD], ¤t_mask[EXCEPT_FD], (struct timeval *)&wait_timeout ); if (num_set == 0 && !Exit_events) { #ifdef BADCLOCK clock_sync = 0; #endif for( i=0; i < NUM_FDTYPES; i++ ) { current_mask[i] = Fd_mask[i]; } Alarm( EVENTS, "E_handle_events: select with timeout (%d, %d)\n", timeout.sec,timeout.usec ); #ifdef TESTTIME req_time = E_add_time(req_time, timeout); #endif num_set = select( FD_SETSIZE, ¤t_mask[READ_FD], ¤t_mask[WRITE_FD], ¤t_mask[EXCEPT_FD], (struct timeval *)&timeout ); } #ifdef TESTTIME start = E_get_time(); tmp_late = E_sub_time(start, stop); Alarm( DEBUG, "Events: Waiting for fd or timout took %d %d asked for %d %d\n", tmp_late.sec, tmp_late.usec, req_time.sec, req_time.usec); #endif /* Handle all high and medium priority fd events */ for( i=NUM_PRIORITY-1,treated=0; i > LOW_PRIORITY && num_set > 0 && !treated; i-- ) { for( j=0; j < Fd_queue[i].num_fds && num_set > 0; j++ ) { fd = Fd_queue[i].events[j].fd; fd_type = Fd_queue[i].events[j].fd_type; if( FD_ISSET( fd, ¤t_mask[fd_type] ) ) { Alarm( EVENTS, "E_handle_events: exec handler for fd %d, fd_type %d, priority %d\n", fd, fd_type, i ); Fd_queue[i].events[j].func( Fd_queue[i].events[j].fd, Fd_queue[i].events[j].code, Fd_queue[i].events[j].data ); treated = 1; num_set--; #ifdef BADCLOCK Now = E_add_time( Now, mili_sec ); clock_sync++; #else E_get_time(); #endif if (Exit_events) goto end_handler; } } } /* Don't handle timed events until all non-low-priority fd events have been handled * FIXME: This may or may not be right. If continual high priority events occur, then * timed events will starve, I'm not sure if that is better then what we have. We * could also set first=0 no matter what after trying the high events once, then * they will get a shot first, but after that timed events will also be handled. */ if (!treated) first = 0; #ifdef TESTTIME stop = E_get_time(); tmp_late = E_sub_time(stop, start); Alarm(DEBUG, "Events: High & Med took %d %d time to handle\n", tmp_late.sec, tmp_late.usec); #endif /* Handle one low priority fd event. However, verify that Active_priority still allows LOW_PRIORITY events. Active_priority can change because of calls to E_set_threshold() during the current select loop. */ for( i=0; i < Fd_queue[LOW_PRIORITY].num_fds && num_set > 0 && Active_priority == LOW_PRIORITY; i++ ) { j = ( i + Round_robin ) % Fd_queue[LOW_PRIORITY].num_fds; fd = Fd_queue[LOW_PRIORITY].events[j].fd; fd_type = Fd_queue[LOW_PRIORITY].events[j].fd_type; if( FD_ISSET( fd, ¤t_mask[fd_type] ) ) { Round_robin = ( j + 1 ) % Fd_queue[LOW_PRIORITY].num_fds; Alarm( EVENTS , "E_handle_events: exec ext fd event \n"); Fd_queue[LOW_PRIORITY].events[j].func( Fd_queue[LOW_PRIORITY].events[j].fd, Fd_queue[LOW_PRIORITY].events[j].code, Fd_queue[LOW_PRIORITY].events[j].data ); num_set--; #ifdef BADCLOCK Now = E_add_time( Now, mili_sec ); clock_sync++; #else E_get_time(); #endif if (Exit_events) goto end_handler; break; } } #ifdef TESTTIME start = E_get_time(); tmp_late = E_sub_time(start, stop); Alarm(DEBUG, "Events: Low priority took %d %d to handle\n", tmp_late.sec, tmp_late.usec); #endif } end_handler: /* Clean up data structures for exit OR restart of handler loop */ /* Actually nothing needs to be cleaned up to allow E_handle_events() * to be called again. The events are still registered (or not registered) * and the only state for the actual events loop is Exit_events which is reset * in the for loop. */ return; } void E_exit_events(void) { Alarm( EVENTS, "E_exit_events:\n"); Exit_events = 1; }