/*
* The Spread Toolkit.
*
* The contents of this file are subject to the Spread Open-Source
* License, Version 1.0 (the ``License''); you may not use
* this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.spread.org/license/
*
* or in the file ``license.txt'' found in this distribution.
*
* Software distributed under the License is distributed on an AS IS basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Creators of Spread are:
* Yair Amir, Michal Miskin-Amir, Jonathan Stanton.
*
* Copyright (C) 1993-2004 Spread Concepts LLC <spread@spreadconcepts.com>
*
* All Rights Reserved.
*
* Major Contributor(s):
* ---------------
* Cristina Nita-Rotaru crisn@cs.purdue.edu - group communication security.
* Theo Schlossnagle jesus@omniti.com - Perl, skiplists, autoconf.
* Dan Schoenblum dansch@cnds.jhu.edu - Java interface.
* John Schultz jschultz@cnds.jhu.edu - contribution to process group membership.
*
*/
#define status_ext
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include "arch.h"
#include "mutex.h"
#ifdef _REENTRANT
#ifndef ARCH_PC_WIN95
#include <sys/types.h>
#include <sys/socket.h>
#include <pthread.h>
#else /* ARCH_PC_WIN95 */
#include <windows.h>
#include <winsock.h>
#define ioctl ioctlsocket
WSADATA WSAData;
#endif /* ARCH_PC_WIN95 */
#endif /* _REENTRANT */
#include "scatter.h"
#include "configuration.h"
#include "sp_events.h"
#include "status.h"
#include "net_types.h"
#include "data_link.h"
#include "alarm.h"
static channel SendChan;
static sp_time Send_partition_timeout = { 25, 0 };
static sp_time Send_status_timeout;
static int Status_active = 0;
static int Partition_active = 0;
static sys_scatter Pack_scat;
static packet_header Pack;
static int16 Partition[MAX_PROCS_RING];
static int16 Work_partition[MAX_PROCS_RING];
static int16 Fc_buf[MAX_PROCS_RING];
static int16 Work_fc_buf[MAX_PROCS_RING];
static int Status_vector[MAX_PROCS_RING];
static int Status_vector[MAX_PROCS_RING];
static configuration Cn;
static proc My;
static int My_port;
static char *My_name;
static char My_name_buf[80];
static char Config_file[80];
static sys_scatter Report_scat;
static packet_header Report_pack;
static channel Report_socket;
static void Print_menu();
static void User_command();
static void Define_partition();
static void Send_partition();
static void Kill_spreads();
static void Print_partition( int16 partition[MAX_PROCS_RING] );
static void Define_flow_control();
static void Send_flow_control();
static void Print_flow_control( int16 fc_buf[MAX_PROCS_RING] );
static void Activate_status();
static void Send_status_query();
static void Report_message();
#ifdef _REENTRANT
#ifndef ARCH_PC_WIN95
static mutex_type Init_mutex = MUTEX_STATIC_INIT;
static pthread_t Read_thread;
static pthread_t Status_thread;
static pthread_t Partition_thread;
static void *Read_thread_routine();
static void *Status_send_thread_routine();
static void *Partition_send_thread_routine();
#else /* ARCH_PC_WIN95 */
static mutex_type Init_mutex = {MUTEX_STATIC_INIT};
static HANDLE Read_thread;
static HANDLE Status_thread;
static HANDLE Partition_thread;
static DWORD WINAPI Read_thread_routine( void *);
static DWORD WINAPI Status_send_thread_routine( void *);
static DWORD WINAPI Partition_send_thread_routine( void *);
#endif /* ARCH_PC_WIN95 */
static mutex_type Status_mutex;
static mutex_type Partition_mutex;
#endif /* _REENTRANT */
static void Usage( int argc, char *argv[] );
static void initialize_locks(void);
int main( int argc, char *argv[] )
{
int i;
#ifdef _REENTRANT
int ret;
#endif
fclose(stderr);
Alarm_set_types( NONE );
Alarm( PRINT, "/===========================================================================\\\n");
Alarm( PRINT, "| The Spread Toolkit. |\n");
Alarm( PRINT, "| Copyright (c) 1993-2001 Spread Concepts LLC |\n");
Alarm( PRINT, "| All rights reserved. |\n");
Alarm( PRINT, "| |\n");
Alarm( PRINT, "| The Spread package is licensed under the Spread Open-Source License. |\n");
Alarm( PRINT, "| You may only use this software in compliance with the License. |\n");
Alarm( PRINT, "| A copy of the license can be found at http://www.spread.org/license |\n");
Alarm( PRINT, "| |\n");
Alarm( PRINT, "| This product uses software developed by Spread Concepts LLC for use |\n");
Alarm( PRINT, "| in the Spread toolkit. For more information about Spread, |\n");
Alarm( PRINT, "| see http://www.spread.org |\n");
Alarm( PRINT, "| |\n");
Alarm( PRINT, "| This software is distributed on an \"AS IS\" basis, WITHOUT WARRANTY OF |\n");
Alarm( PRINT, "| ANY KIND, either express or implied. |\n");
Alarm( PRINT, "| |\n");
Alarm( PRINT, "| Spread is developed at Spread Concepts LLC and the Center for Networking |\n");
Alarm( PRINT, "| and Distributed Systems, The Johns Hopkins University. |\n");
Alarm( PRINT, "| |\n");
Alarm( PRINT, "| Creators: |\n");
Alarm( PRINT, "| Yair Amir yairamir@cs.jhu.edu |\n");
Alarm( PRINT, "| Michal Miskin-Amir michal@spread.org |\n");
Alarm( PRINT, "| Jonathan Stanton jstanton@gwu.edu |\n");
Alarm( PRINT, "| |\n");
Alarm( PRINT, "| Contributors: |\n");
Alarm( PRINT, "| Cristina Nita-Rotaru crisn@cs.purdue.edu - GC security. |\n");
Alarm( PRINT, "| Theo Schlossnagle jesus@omniti.com - Perl, skiplists, autoconf. |\n");
Alarm( PRINT, "| Dan Schoenblum dansch@cnds.jhu.edu - Java Interface Developer. |\n");
Alarm( PRINT, "| John Schultz jschultz@cnds.jhu.edu - contribution to process group |\n");
Alarm( PRINT, "| membership. |\n");
Alarm( PRINT, "| |\n");
Alarm( PRINT, "| Special thanks to the following for providing ideas and/or code: |\n");
Alarm( PRINT, "| Ken Birman, Danny Dolev, David Shaw, Robbert VanRenesse. |\n");
Alarm( PRINT, "| |\n");
Alarm( PRINT, "| WWW: www.spread.org www.cnds.jhu.edu www.spreadconcepts.com |\n");
Alarm( PRINT, "| Contact: spread@spread.org |\n");
Alarm( PRINT, "| |\n");
Alarm( PRINT, "| Version %d.%02d.%02d Built 15/October/2004 |\n",
(int)SP_MAJOR_VERSION, (int)SP_MINOR_VERSION, (int)SP_PATCH_VERSION);
Alarm( PRINT, "\\===========================================================================/\n");
Usage( argc, argv );
Alarm_set_interactive();
Conf_init( Config_file, My_name );
Cn = Conf();
My = Conf_my();
Alarm_clear_types(ALL);
Alarm_set_types(PRINT | EXIT );
#ifdef ARCH_PC_WIN95
ret = WSAStartup( MAKEWORD(2,0), &WSAData );
if( ret != 0 )
Alarm( EXIT, "sptmonitor: main: winsock initialization error %d\n", ret );
#endif /* ARCH_PC_WIN95 */
initialize_locks();
for( i=0; i < Conf_num_procs( &Cn ); i++ )
Partition[i] = 0;
for( i=0; i < Conf_num_procs( &Cn ); i++ )
Status_vector[i] = 0;
Pack_scat.elements[0].len = sizeof( packet_header );
Pack_scat.elements[0].buf = (char *)&Pack;
Pack.proc_id = My.id;
Pack.seq = My_port;
Pack.memb_id.proc_id = 15051963;
Report_scat.num_elements = 2;
Report_scat.elements[0].buf = (char *)&Report_pack;
Report_scat.elements[0].len = sizeof(packet_header);
Report_scat.elements[1].buf = (char *)&GlobalStatus;
Report_scat.elements[1].len = sizeof(status);
SendChan = DL_init_channel( SEND_CHANNEL , My_port, 0, 0 );
Report_socket = DL_init_channel( RECV_CHANNEL, My_port, 0, 0 );
E_init(); /* both reentrent and non-reentrant code uses events */
#ifndef _REENTRANT
E_attach_fd( 0, READ_FD, User_command, 0, NULL, LOW_PRIORITY );
E_attach_fd( Report_socket, READ_FD, Report_message, 0, NULL, HIGH_PRIORITY );
#endif /* _REENTRANT */
Print_menu();
#ifdef _REENTRANT
#ifndef ARCH_PC_WIN95
ret = pthread_create( &Read_thread, NULL, Read_thread_routine, 0 );
ret = pthread_create( &Status_thread, NULL, Status_send_thread_routine, 0 );
ret = pthread_create( &Partition_thread, NULL, Partition_send_thread_routine, 0 );
#else /* ARCH_PC_WIN95 */
Read_thread = CreateThread( NULL, 0, Read_thread_routine, NULL, 0, &ret );
Status_thread = CreateThread( NULL, 0, Status_send_thread_routine, NULL, 0, &ret );
Partition_thread = CreateThread( NULL, 0, Partition_send_thread_routine, NULL, 0, &ret );
#endif /* ARCH_PC_WIN95 */
for(;;)
{
User_command();
}
#else /*! _REENTRANT */
E_handle_events();
#endif /* _REENTRANT */
return 0;
}
static void initialize_locks(void)
{
int ret;
ret = Mutex_trylock( &Init_mutex );
if( ret == 0 )
{
/*
* we managed to lock the Init_mutex. This means we are the first thread
* to get here.
*/
Mutex_init( &Status_mutex );
Mutex_init( &Partition_mutex );
}
return;
}
#ifdef _REENTRANT
#ifndef ARCH_PC_WIN95
static void *Read_thread_routine()
#else /* ARCH_PC_WIN95 */
static DWORD WINAPI Read_thread_routine( void *dummy)
#endif /* ARCH_PC_WIN95 */
{
for(;;)
{
Report_message( Report_socket, 0, NULL);
}
return( 0 );
}
#endif /* _REENTRANT */
static void User_command()
{
char command[80];
int i;
if( fgets( command,70,stdin ) == NULL )
{
printf("Bye.\n");
exit( 0 );
}
switch( command[0] )
{
case '0':
Activate_status();
printf("\n");
printf("Monitor> ");
fflush(stdout);
break;
case '1':
Define_partition();
Print_partition( Work_partition );
break;
case '2':
for( i=0; i < Conf_num_procs( &Cn ); i++ )
Partition[i] = Work_partition[i];
Mutex_lock( &Partition_mutex );
Partition_active = 1;
Mutex_unlock( &Partition_mutex );
#ifndef _REENTRANT
Send_partition();
#endif
printf("\n");
printf("Monitor> ");
fflush(stdout);
break;
case '3':
Print_partition( Partition );
break;
case '4':
for( i=0; i < Conf_num_procs( &Cn ); i++ )
{
Partition[i] = 0;
Work_partition[i] = 0;
}
Mutex_lock( &Partition_mutex );
Partition_active = 0;
Mutex_unlock( &Partition_mutex );
Send_partition();
#ifndef _REENTRANT
E_dequeue( Send_partition, 0, NULL );
#endif
printf("\n");
printf("Monitor> ");
fflush(stdout);
break;
case '5':
Define_flow_control();
Print_flow_control( Work_fc_buf );
break;
case '6':
for( i=0; i < Conf_num_procs( &Cn )+1; i++ )
Fc_buf[i] = Work_fc_buf[i];
Send_flow_control();
printf("\n");
printf("Monitor> ");
fflush(stdout);
break;
case '7':
Print_flow_control( Fc_buf );
break;
case '8':
Kill_spreads();
printf("\n");
printf("Monitor> ");
fflush(stdout);
break;
case '9':
case 'q':
printf("Bye.\n");
exit( 0 );
break;
default:
printf("\nUnknown commnad\n");
Print_menu();
break;
}
}
static void Print_menu()
{
printf("\n");
printf("=============\n");
printf("Monitor Menu:\n");
printf("-------------\n");
printf("\t0. Activate/Deactivate Status {all, none, Proc, CR}\n");
printf("\n");
printf("\t1. Define Partition\n");
printf("\t2. Send Partition\n");
printf("\t3. Review Partition\n");
printf("\t4. Cancel Partition Effects\n");
printf("\n");
printf("\t5. Define Flow Control\n");
printf("\t6. Send Flow Control\n");
printf("\t7. Review Flow Control\n");
printf("\n");
printf("\t8. Terminate Spread Daemons {all, none, Proc, CR}\n");
printf("\n");
printf("\t9. Exit\n");
printf("\n");
printf("Monitor> ");
fflush(stdout);
}
static void Print_partition( int16 partition[MAX_PROCS_RING] )
{
int32 proc_id;
proc p;
int proc_index;
int i,j;
printf("\n");
printf("=============\n");
printf("Partition Map:\n");
printf("-------------\n");
printf("\n");
for( i=0; i < Cn.num_segments ; i++ )
{
for( j=0; j < Cn.segments[i].num_procs; j++ )
{
proc_id = Cn.segments[i].procs[j]->id;
proc_index = Conf_proc_by_id( proc_id, &p );
printf("\t%s\t%d\n", p.name, partition[proc_index] );
}
printf("\n");
}
printf("\n");
printf("Monitor> ");
fflush(stdout);
}
static void Define_partition()
{
int32 proc_id;
proc p;
int proc_index;
char str[80];
int legal,ret,temp;
int i,j;
printf("\n");
printf("=============\n");
printf("Define Partition\n");
printf("-------------\n");
printf("\n");
for( i=0; i < Cn.num_segments ; i++ )
{
for( j=0; j < Cn.segments[i].num_procs; j++ )
{
proc_id = Cn.segments[i].procs[j]->id;
proc_index = Conf_proc_by_id( proc_id, &p );
for( legal=0; !legal; )
{
printf("\t%s\t", p.name);
if( fgets( str, 70, stdin ) == NULL )
{
printf("Bye.\n");
exit(0);
}
ret = sscanf(str, "%d", &temp );
Work_partition[proc_index] = temp;
if( ret > 0 ) legal = 1;
else printf("Please enter a number\n");
}
}
printf("\n");
}
}
static void Send_partition()
{
int32 proc_id;
proc p;
int proc_index;
int i,j;
Pack.type = PARTITION_TYPE;
Pack.type = Set_endian( Pack.type );
Pack.data_len= sizeof( Partition );;
Pack_scat.num_elements = 2;
Pack_scat.elements[1].len = sizeof( Partition );
Pack_scat.elements[1].buf = (char *)&Partition;
Alarm( PRINT , "Monitor: send partition\n");
for( i=0; i < Cn.num_segments ; i++ )
{
for( j=0; j < Cn.segments[i].num_procs; j++ )
{
proc_id = Cn.segments[i].procs[j]->id;
proc_index = Conf_proc_by_id( proc_id, &p );
DL_send( SendChan, p.id, p.port, &Pack_scat );
DL_send( SendChan, p.id, p.port, &Pack_scat );
}
}
#ifndef _REENTRANT
E_queue( Send_partition, 0, NULL, Send_partition_timeout );
#endif
}
#ifdef _REENTRANT
#ifndef ARCH_PC_WIN95
static void *Partition_send_thread_routine()
#else /* ARCH_PC_WIN95 */
static DWORD WINAPI Partition_send_thread_routine( void *dummy)
#endif /* ARCH_PC_WIN95 */
{
sp_time onesecond_time = { 1, 0};
sp_time send_interval;
int active_p;
for(;;)
{
Mutex_lock( &Partition_mutex );
active_p = Partition_active;
send_interval = Send_partition_timeout;
Mutex_unlock( &Partition_mutex );
if (active_p) {
Send_partition();
E_delay(send_interval);
} else {
E_delay(onesecond_time);
}
}
return( 0 );
}
#endif /* _REENTRANT */
static void Print_flow_control( int16 fc_buf[MAX_PROCS_RING] )
{
int32 proc_id;
proc p;
int proc_index;
int i,j;
printf("\n");
printf("========================\n");
printf("Flow Control Parameters:\n");
printf("------------------------\n");
printf("\n");
printf("Window size: %d\n",fc_buf[ Conf_num_procs( &Cn )]);
printf("\n");
for( i=0; i < Cn.num_segments ; i++ )
{
for( j=0; j < Cn.segments[i].num_procs; j++ )
{
proc_id = Cn.segments[i].procs[j]->id;
proc_index = Conf_proc_by_id( proc_id, &p );
printf("\t%s\t%d\n", p.name, fc_buf[proc_index] );
}
printf("\n");
}
printf("\n");
printf("Monitor> ");
fflush(stdout);
}
static void Define_flow_control()
{
int32 proc_id;
proc p;
int proc_index;
char str[80];
int legal,ret,temp;
int i,j;
printf("\n");
printf("===================\n");
printf("Define Flow Control\n");
printf("-------------------\n");
printf("\n");
for( legal=0; !legal; )
{
printf(" Window size: ");
if( fgets( str,70,stdin ) == NULL )
{
printf("Bye.\n");
exit(0);
}
ret = sscanf(str, "%d", &temp );
Work_fc_buf[Conf_num_procs( &Cn )] = temp;
if( ret > 0 ) legal = 1;
else if( ret == -1 ){
legal = 1;
Work_fc_buf[Conf_num_procs( &Cn )] = -1;
}else printf("Please enter a number\n");
}
printf("\n");
for( i=0; i < Cn.num_segments ; i++ )
{
for( j=0; j < Cn.segments[i].num_procs; j++ )
{
proc_id = Cn.segments[i].procs[j]->id;
proc_index = Conf_proc_by_id( proc_id, &p );
for( legal=0; !legal; )
{
printf("\t%s\t", p.name);
if( fgets( str, 70, stdin ) == NULL )
{
printf("Bye.\n");
exit(0);
}
ret = sscanf(str, "%d", &temp);
Work_fc_buf[proc_index] = temp;
if( ret > 0 ) legal = 1;
else if( ret == -1 ){
legal = 1;
Work_fc_buf[proc_index] = -1;
}else printf("Please enter a number\n");
}
}
printf("\n");
}
}
static void Send_flow_control()
{
int32 proc_id;
proc p;
int proc_index;
int i,j;
Pack.type = FC_TYPE;
Pack.type = Set_endian( Pack.type );
Pack.data_len= sizeof( Fc_buf );;
Pack_scat.num_elements = 2;
Pack_scat.elements[1].len = sizeof( Fc_buf );
Pack_scat.elements[1].buf = (char *)&Fc_buf;
Alarm( PRINT , "Monitor: send flow control params\n");
for( i=0; i < Cn.num_segments ; i++ )
{
for( j=0; j < Cn.segments[i].num_procs; j++ )
{
proc_id = Cn.segments[i].procs[j]->id;
proc_index = Conf_proc_by_id( proc_id, &p );
DL_send( SendChan, p.id, p.port, &Pack_scat );
DL_send( SendChan, p.id, p.port, &Pack_scat );
}
}
}
static void Activate_status()
{
proc p;
int proc_index;
char str[80];
int legal,ret;
int i;
int end;
printf("\n");
printf("=============\n");
printf("Activate Status\n");
printf("-------------\n");
printf("\n");
end = 0;
while( !end )
{
for( legal=0; !legal; )
{
printf("\tEnter Proc Name: ");
if( fgets( str, 70, stdin ) == NULL )
{
printf("Bye.\n");
exit(0);
}
ret = sscanf(str, "%s", p.name );
if( ret > 0 || str[0] == '\n' ) legal = 1;
else printf("Please enter a legal proc name, none, or all\n");
}
if( str[0] == '\n' ){
end = 1;
}else if( !strcmp( p.name, "all" ) ){
for( i=0; i < Conf_num_procs( &Cn ); i++ )
Status_vector[i] = 1;
}else if( !strcmp( p.name, "none" ) ){
for( i=0; i < Conf_num_procs( &Cn ); i++ )
Status_vector[i] = 0;
}else{
proc_index = Conf_proc_by_name( p.name, &p );
if( proc_index != -1 ){
Status_vector[proc_index] = 1;
}else printf("Please! enter a legal proc name, none, or all\n");
}
}
#ifndef _REENTRANT
E_dequeue( Send_status_query, 0, NULL );
#endif
Mutex_lock( &Status_mutex );
Status_active = 0;
for( i=0; i < Conf_num_procs( &Cn ); i++ )
{
if( Status_vector[i] )
{
Status_active = 1;
break;
}
}
Mutex_unlock( &Status_mutex );
#ifndef _REENTRANT
if (Status_active)
Send_status_query();
#endif
}
static void Send_status_query()
{
int32 proc_id;
proc p;
int proc_index;
int i,j;
Pack.type = STATUS_TYPE;
Pack.type = Set_endian( Pack.type );
Pack.data_len= 0;
Pack_scat.num_elements = 1;
Alarm( PRINT , "Monitor: send status query\n");
for( i=0; i < Cn.num_segments ; i++ )
{
for( j=0; j < Cn.segments[i].num_procs; j++ )
{
proc_id = Cn.segments[i].procs[j]->id;
proc_index = Conf_proc_by_id( proc_id, &p );
if( Status_vector[proc_index] )
{
DL_send( SendChan, p.id, p.port, &Pack_scat );
}
}
}
#ifndef _REENTRANT
E_queue( Send_status_query, 0, NULL, Send_status_timeout );
#endif
}
#ifdef _REENTRANT
#ifndef ARCH_PC_WIN95
static void *Status_send_thread_routine()
#else /* ARCH_PC_WIN95 */
static DWORD WINAPI Status_send_thread_routine( void *dummy)
#endif /* ARCH_PC_WIN95 */
{
sp_time onesecond_time = { 1, 0};
sp_time send_interval;
int active_p;
for(;;)
{
Mutex_lock( &Status_mutex );
active_p = Status_active;
send_interval = Send_status_timeout;
Mutex_unlock( &Status_mutex );
if (active_p) {
Send_status_query();
E_delay(send_interval);
} else {
E_delay(onesecond_time);
}
}
return( 0 );
}
#endif /* _REENTRANT */
static void Kill_spreads()
{
int16 Kill_partition[MAX_PROCS_RING];
proc p;
int proc_index;
int32 proc_id;
char str[80];
int legal, ret;
int i, j;
int end;
for( i=0; i < MAX_PROCS_RING; i++ )
Kill_partition[i] = 0;
end = 0;
while( !end )
{
for( legal=0; !legal; )
{
printf("\tEnter Proc Name to Terminate: ");
if( fgets( str, 70, stdin ) == NULL )
{
printf("Bye.\n");
exit(0);
}
ret = sscanf(str, "%s", p.name );
if( ret > 0 || str[0] == '\n' ) legal = 1;
else printf("Please enter a legal proc name, none, or all\n");
}
if( str[0] == '\n' ){
end = 1;
}else if( !strcmp( p.name, "all" ) ){
for( i=0; i < Conf_num_procs( &Cn ); i++ )
Kill_partition[i] = -1;
}else if( !strcmp( p.name, "none" ) ){
for( i=0; i < Conf_num_procs( &Cn ); i++ )
Kill_partition[i] = 0;
}else{
proc_index = Conf_proc_by_name( p.name, &p );
if( proc_index != -1 ){
Kill_partition[proc_index] = -1;
}else printf("Please! enter a legal proc name, none, or all\n");
}
}
for( i=0; i < Conf_num_procs( &Cn ); i++ )
{
if( Kill_partition[i] != -1 ) Kill_partition[i] = Partition[i];
}
Pack.type = PARTITION_TYPE;
Pack.type = Set_endian( Pack.type );
Pack.data_len= sizeof( Kill_partition );;
Pack_scat.num_elements = 2;
Pack_scat.elements[1].len = sizeof( Kill_partition );
Pack_scat.elements[1].buf = (char *)&Kill_partition;
for( i=0; i < Cn.num_segments ; i++ )
{
for( j=0; j < Cn.segments[i].num_procs; j++ )
{
proc_id = Cn.segments[i].procs[j]->id;
proc_index = Conf_proc_by_id( proc_id, &p );
if( Kill_partition[proc_index] == -1 )
{
Alarm( PRINT , "Monitor: Terminating %s\n", p.name );
DL_send( SendChan, p.id, p.port, &Pack_scat );
DL_send( SendChan, p.id, p.port, &Pack_scat );
}
}
}
}
static void Report_message(mailbox fd, int dummy, void *dummy_p)
{
proc p;
proc leader_p;
int ret;
int ret1,ret2;
static int32 last_mes;
static int32 last_aru;
static int32 last_sec;
last_mes = GlobalStatus.message_delivered;
last_aru = GlobalStatus.aru;
last_sec = GlobalStatus.sec;
ret = DL_recv( fd, &Report_scat );
if( ret <= 0 ) {
Alarm( DEBUG, "Report_messsage: DL_recv failed with ret %d, errno %d\n", ret, sock_errno);
return;
}
if( !Same_endian( Report_pack.type ) )
{
GlobalStatus.sec = Flip_int32( GlobalStatus.sec );
GlobalStatus.state = Flip_int32( GlobalStatus.state );
GlobalStatus.gstate = Flip_int32( GlobalStatus.gstate );
GlobalStatus.packet_sent = Flip_int32( GlobalStatus.packet_sent );
GlobalStatus.packet_recv = Flip_int32( GlobalStatus.packet_recv );
GlobalStatus.packet_delivered = Flip_int32( GlobalStatus.packet_delivered );
GlobalStatus.retrans = Flip_int32( GlobalStatus.retrans );
GlobalStatus.u_retrans = Flip_int32( GlobalStatus.u_retrans );
GlobalStatus.s_retrans = Flip_int32( GlobalStatus.s_retrans );
GlobalStatus.b_retrans = Flip_int32( GlobalStatus.b_retrans );
GlobalStatus.aru = Flip_int32( GlobalStatus.aru );
GlobalStatus.my_aru = Flip_int32( GlobalStatus.my_aru );
GlobalStatus.highest_seq = Flip_int32( GlobalStatus.highest_seq );
GlobalStatus.token_hurry = Flip_int32( GlobalStatus.token_hurry );
GlobalStatus.token_rounds = Flip_int32( GlobalStatus.token_rounds );
GlobalStatus.my_id = Flip_int32( GlobalStatus.my_id );
GlobalStatus.leader_id = Flip_int32( GlobalStatus.leader_id );
GlobalStatus.message_delivered = Flip_int32( GlobalStatus.message_delivered );
GlobalStatus.membership_changes = Flip_int16( GlobalStatus.membership_changes);
GlobalStatus.num_procs = Flip_int16( GlobalStatus.num_procs );
GlobalStatus.num_segments = Flip_int16( GlobalStatus.num_segments );
GlobalStatus.window = Flip_int16( GlobalStatus.window );
GlobalStatus.personal_window = Flip_int16( GlobalStatus.personal_window );
GlobalStatus.num_sessions = Flip_int16( GlobalStatus.num_sessions );
GlobalStatus.num_groups = Flip_int16( GlobalStatus.num_groups );
GlobalStatus.major_version = Flip_int16( GlobalStatus.major_version );
GlobalStatus.minor_version = Flip_int16( GlobalStatus.minor_version );
GlobalStatus.patch_version = Flip_int16( GlobalStatus.patch_version );
}
printf("\n============================\n");
ret1 = Conf_proc_by_id( GlobalStatus.my_id, &p );
ret2 = Conf_proc_by_id( GlobalStatus.leader_id, &leader_p );
if( ret1 < 0 )
{
printf("Report_message: Skiping illegal status \n");
printf("==================================\n");
return;
}
printf("Status at %s V%2d.%02d.%2d (state %d, gstate %d) after %d seconds :\n",p.name,
GlobalStatus.major_version,GlobalStatus.minor_version,GlobalStatus.patch_version,
GlobalStatus.state, GlobalStatus.gstate, GlobalStatus.sec);
if( ret2 < 0 )
printf("Membership : %d procs in %d segments, leader is %d\n",
GlobalStatus.num_procs,GlobalStatus.num_segments,GlobalStatus.leader_id);
else printf("Membership : %d procs in %d segments, leader is %s\n",
GlobalStatus.num_procs,GlobalStatus.num_segments,leader_p.name);
printf("rounds : %7d\ttok_hurry : %7d\tmemb change: %7d\n",GlobalStatus.token_rounds,GlobalStatus.token_hurry,GlobalStatus.membership_changes);
printf("sent pack: %7d\trecv pack : %7d\tretrans : %7d\n",GlobalStatus.packet_sent,GlobalStatus.packet_recv,GlobalStatus.retrans);
printf("u retrans: %7d\ts retrans : %7d\tb retrans : %7d\n",GlobalStatus.u_retrans,GlobalStatus.s_retrans,GlobalStatus.b_retrans);
printf("My_aru : %7d\tAru : %7d\tHighest seq: %7d\n",GlobalStatus.my_aru,GlobalStatus.aru, GlobalStatus.highest_seq);
printf("Sessions : %7d\tGroups : %7d\tWindow : %7d\n",GlobalStatus.num_sessions,GlobalStatus.num_groups,GlobalStatus.window);
printf("Deliver M: %7d\tDeliver Pk: %7d\tPers Window: %7d\n",GlobalStatus.message_delivered,GlobalStatus.packet_delivered,GlobalStatus.personal_window);
printf("Delta Mes: %7d\tDelta Pack: %7d\tDelta sec : %7d\n",
GlobalStatus.message_delivered - last_mes, GlobalStatus.aru - last_aru, GlobalStatus.sec - last_sec );
printf("==================================\n");
printf("\n");
printf("Monitor> ");
fflush(stdout);
}
static void Usage(int argc, char *argv[])
{
My_name = 0; /* NULL */
My_port = 6543;
Send_status_timeout.sec = 10;
Send_status_timeout.usec= 0;
strcpy( Config_file, "spread.conf" );
while( --argc > 0 )
{
argv++;
if( !strncmp( *argv, "-p", 2 ) )
{
sscanf(argv[1], "%d", &My_port );
argc--; argv++;
}else if( !strncmp( *argv, "-n", 2 ) ) {
if( strlen( argv[1] ) > MAX_PROC_NAME-1 ) /* -1 for the null */
Alarm( EXIT, "Usage: proc name %s too long\n",
argv[1] );
memcpy( My_name_buf, argv[1], strlen( argv[1] ) );
My_name = My_name_buf;
argc--; argv++;
}else if( !strncmp( *argv, "-t", 2 ) ){
sscanf( argv[1], "%ld", &Send_status_timeout.sec );
argc--; argv++;
}else if( !strncmp( *argv, "-c", 2 ) ){
strcpy( Config_file, argv[1] );
argc--; argv++;
}else{
Alarm( EXIT, "Usage: spmonitor\n%s\n%s\n%s\n%s\n",
"\t[-p <port number>]: specify port number",
"\t[-n <proc name>] : force computer name",
"\t[-t <status timeout>]: specify number of seconds between status queries",
"\t[-c <file name>] : specify configuration file" );
}
}
}
syntax highlighted by Code2HTML, v. 0.9.1