/* * The Spread Toolkit. * * The contents of this file are subject to the Spread Open-Source * License, Version 1.0 (the ``License''); you may not use * this file except in compliance with the License. You may obtain a * copy of the License at: * * http://www.spread.org/license/ * * or in the file ``license.txt'' found in this distribution. * * Software distributed under the License is distributed on an AS IS basis, * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License * for the specific language governing rights and limitations under the * License. * * The Creators of Spread are: * Yair Amir, Michal Miskin-Amir, Jonathan Stanton. * * Copyright (C) 1993-2004 Spread Concepts LLC * * All Rights Reserved. * * Major Contributor(s): * --------------- * Cristina Nita-Rotaru crisn@cs.purdue.edu - group communication security. * Theo Schlossnagle jesus@omniti.com - Perl, skiplists, autoconf. * Dan Schoenblum dansch@cnds.jhu.edu - Java interface. * John Schultz jschultz@cnds.jhu.edu - contribution to process group membership. * */ #include #include #include #include "membership.h" #include "spread_params.h" #include "flow_control.h" #include "prot_body.h" #include "net_types.h" #include "network.h" #include "objects.h" #include "memory.h" #include "status.h" #include "alarm.h" #define POTENTIAL_REP 0 #define SEG_REP 1 #define RING_REP 2 typedef struct dummy_members_info{ int16 num_members; int16 num_pending; int32 members[MAX_PROCS_RING]; } members_info; typedef struct dummy_rep_info{ int32 proc_id; int16 type; int16 seg_index; } rep_info; typedef struct dummy_reps_info{ int16 num_reps; int16 rep_index; rep_info reps[MAX_REPS]; } reps_info; typedef struct dummy_ring_info{ membership_id memb_id; int32 trans_time; int32 aru; int32 highest_seq; int32 num_holes; int16 num_commit; int16 num_trans; } ring_info; static configuration Membership; static membership_id Membership_id; static configuration Future_membership; static membership_id Future_membership_id; static membership_id Trans_memb_id; static int32 F_trans_memb_time; static int32 Last_time_used; static int State; static int Token_alive; static proc My; static segment My_conf_seg; static int32 My_seg_rep; static int Foreign_found; static configuration Cn; static members_info F_members; static reps_info F_reps; static reps_info Potential_reps; static members_info Commit_set; static members_info Future_commit_set; static sys_scatter Send_pack; static sp_time Zero_timeout = { 0, 0}; static void Memb_handle_alive ( sys_scatter *scat ); static void Memb_handle_join ( sys_scatter *scat ); static void Memb_handle_refer ( sys_scatter *scat ); static void Memb_handle_foreign( sys_scatter *scat ); static void Memb_handle_form1 ( sys_scatter *scat ); static void Memb_handle_form2 ( sys_scatter *scat ); static void Shift_to_seg(); static void Gather_or_represented(); static void Shift_to_gather(); static void Shift_to_represented(); static void Form_or_fail(); static void Scast_alive(); static void Send_join(); static void Lookup_new_members(); static int Insert_member( members_info *m, int32 proc_id ); static int Insert_rep( reps_info *r, rep_info rep ); static int32 Smallest_member( members_info *m, int *index ); static int32 Smallest_rep( reps_info *r, int *index ); static void Sort_members( members_info *m ); static void Sort_reps( reps_info *r ); static void Create_form1(); static void Fill_form1( sys_scatter *scat ); static void Read_form2( sys_scatter *scat ); static void Backoff_membership(); static void Flip_members( members_info *members_ptr ); static void Flip_reps( reps_info *reps_ptr ); static void Flip_rings( char *buf ); void Memb_init() { packet_header *pack_ptr; int32 reference_subnet; int32 current_subnet; int i; State = OP; GlobalStatus.state = OP; GlobalStatus.membership_changes = 0; My = Conf_my(); Cn = Conf(); reference_subnet = Cn.segments[0].procs[0]->id; reference_subnet = reference_subnet & 0xffff0000; Wide_network = 0; for( i=1; i < Cn.num_segments; i++ ) { current_subnet = Cn.segments[i].procs[0]->id; current_subnet = current_subnet & 0xffff0000; if( current_subnet != reference_subnet ) { Wide_network = 1; break; } } if( Wide_network ) { Token_timeout.sec = 20; Token_timeout.usec = 0; Hurry_timeout.sec = 6; Hurry_timeout.usec = 0; Alive_timeout.sec = 1; Alive_timeout.usec = 0; Join_timeout.sec = 1; Join_timeout.usec = 0; Rep_timeout.sec = 5; Rep_timeout.usec = 0; Seg_timeout.sec = 2; Seg_timeout.usec = 0; Gather_timeout.sec = 10; Gather_timeout.usec = 0; Form_timeout.sec = 10; Form_timeout.usec = 0; Lookup_timeout.sec = 90; Lookup_timeout.usec = 0; }else{ Token_timeout.sec = 5; Token_timeout.usec = 0; Hurry_timeout.sec = 2; Hurry_timeout.usec = 0; Alive_timeout.sec = 1; Alive_timeout.usec = 0; Join_timeout.sec = 1; Join_timeout.usec = 0; Rep_timeout.sec = 2; Rep_timeout.usec = 500000; Seg_timeout.sec = 2; Seg_timeout.usec = 0; Gather_timeout.sec = 5; Gather_timeout.usec = 0; Form_timeout.sec = 5; Form_timeout.usec = 0; Lookup_timeout.sec = 60; Lookup_timeout.usec = 0; } /* Lookup timeout when only one segment exists can be longer, * since a no remote segments need to be probed */ if ( Cn.num_segments == 1 ) Lookup_timeout.sec = 300; Membership = Conf(); My_conf_seg = Cn.segments[My.seg_index]; for( i=0; i < Conf().num_segments; i++ ) Membership.segments[i].num_procs = 0; Conf_append_id_to_seg( &Membership.segments[My.seg_index], My.id); Membership_id.proc_id = My.id; Membership_id.time = -1; Last_time_used = Membership_id.time; Transitional = 0; Reg_membership = Membership; pack_ptr = new(PACK_HEAD_OBJ); pack_ptr->proc_id = My.id; Send_pack.elements[0].len = sizeof(packet_header); Send_pack.elements[0].buf = (char *)pack_ptr; Memb_token_loss(); } configuration *Memb_active_ptr() { if( State == EVS ) return ( &Future_membership ); else return( &Membership ); } membership_id Memb_id() { return( Membership_id ); } membership_id Memb_trans_id() { return( Trans_memb_id ); } int Memb_is_equal( membership_id m1, membership_id m2 ) { if( m1.proc_id == m2.proc_id && m1.time == m2.time ) return( 1 ); else return( 0 ); } int32 Memb_state() { return( State ); } int Memb_token_alive() { return( Token_alive ); } void Memb_handle_message( sys_scatter *scat ) { packet_header *pack_ptr; pack_ptr = (packet_header *)scat->elements[0].buf; if( Is_alive( pack_ptr->type ) ) { Alarm( MEMB, "Memb_handle_message: handling alive message\n"); Memb_handle_alive( scat ); }else if( Is_join( pack_ptr->type ) ){ Alarm( MEMB, "Memb_handle_message: handling join message from %d, State is %d\n", pack_ptr->proc_id, State); Memb_handle_join( scat ); }else if( Is_refer( pack_ptr->type ) ){ Alarm( MEMB, "Memb_handle_message: handling refer message from %d, State is %d\n",pack_ptr->proc_id, State); Memb_handle_refer( scat ); }else if( Is_regular( pack_ptr->type ) ){ Alarm( NONE, "Memb_handle_message: handling foreign message from %d, State is %d\n",pack_ptr->proc_id, State); Memb_handle_foreign( scat ); }else{ Alarm( EXIT, "Memb_handle_message: unknown message type %d\n", pack_ptr->type ); } } void Memb_handle_token( sys_scatter *scat ) { token_header *token_ptr; token_ptr = (token_header *)scat->elements[0].buf; if( Is_form1( token_ptr->type ) ) { Alarm( MEMB, "Memb_handle_token: handling form1 token\n"); Memb_handle_form1( scat ); }else if( Is_form2( token_ptr->type ) ){ Alarm( MEMB, "Memb_handle_token: handling form2 token\n"); Memb_handle_form2( scat ); }else{ Alarm( EXIT, "Memb_handle_token: unknown token type %d\n", token_ptr->type ); } } static void Memb_handle_alive( sys_scatter *scat ) { packet_header *pack_ptr; pack_ptr = (packet_header *)scat->elements[0].buf; switch( State ) { case OP: Alarm( MEMB, "Handle_alive in OP\n"); if( Conf_id_in_conf( &Membership, pack_ptr->proc_id ) != -1 ) { /* sender belongs to my ring - my token is lost */ Memb_token_loss(); /* update my F_members list */ Insert_member( &F_members, pack_ptr->proc_id ); } break; case SEG: Alarm( MEMB, "Handle_alive in SEG\n"); /* update my F_members list */ Insert_member( &F_members, pack_ptr->proc_id ); break; case REPRESENTED: Alarm( MEMB, "Handle_alive in REPRESENTED\n"); E_queue( Shift_to_seg, 0, NULL, Rep_timeout ); break; case GATHER: Alarm( MEMB, "Handle_alive in GATHER\n"); /* * If I am a seg representative - take this guy * If I am a ring leader: if this guy in my ring - lost token * otherwise ignore. */ if( ! Token_alive ) Insert_member( &F_members, pack_ptr->proc_id ); else if( Conf_id_in_conf( &Membership, pack_ptr->proc_id ) != -1 ){ /* sender belongs to my ring - my token is lost */ Memb_token_loss(); /* update my F_members list */ Insert_member( &F_members, pack_ptr->proc_id ); } break; case FORM: /* * Ignore. * In principle if this guy is in my formed membership * then I know I lost the token, but there is no way * to know at this time if this guy is in my formed memb. */ break; case EVS: /* * If this guy in my formed membership then my token is lost * Otherwise ignore */ if( Conf_id_in_conf( &Future_membership, pack_ptr->proc_id ) != -1 ) { Memb_token_loss(); /* update my F_members list */ Insert_member( &F_members, pack_ptr->proc_id ); } break; } } static void Memb_handle_join( sys_scatter *scat ) { packet_header *pack_ptr; members_info *members_ptr; reps_info *reps_ptr; packet_header refer_pack; sys_scatter send_scat; proc p; int i; int ret; int dummy; pack_ptr = (packet_header *) scat->elements[0].buf; members_ptr = (members_info *) scat->elements[1].buf; if( !Same_endian( pack_ptr->type ) ) Flip_members( members_ptr ); i = 2*sizeof(int16) + (members_ptr->num_members)*sizeof(int32); reps_ptr = (reps_info *)&scat->elements[1].buf[i]; if( !Same_endian( pack_ptr->type ) ) Flip_reps( reps_ptr ); switch( State ) { case OP: /* if sender belongs to my ring then my token is lost (except for single segment confs) otherwise if I am a ring leader shift_to_gather */ Alarm( MEMB, "Handle_join in OP\n"); if( Conf_id_in_conf( &Membership, pack_ptr->proc_id ) == -1 ) { if( Conf_leader( &Membership ) == My.id ) { Potential_reps.reps[0] = reps_ptr->reps[0]; Potential_reps.num_reps = 1; F_members.num_members = 0; F_members.num_pending = 0; Shift_to_gather(); }else{ /* if !from my_seg OR seg_leader - answer ref */ if( reps_ptr->reps[0].seg_index != My.seg_index || Conf_seg_leader( &Membership, My.seg_index) == My.id ) { refer_pack.type = REFER_TYPE; refer_pack.proc_id = My.id; refer_pack.memb_id.proc_id = Conf_leader( &Membership ); refer_pack.memb_id.time = RING_REP; refer_pack.data_len = 0; send_scat.num_elements = 1; send_scat.elements[0].buf = (char *)&refer_pack; send_scat.elements[0].len = sizeof(packet_header); Net_ucast( pack_ptr->proc_id, &send_scat ); } } }else{ /* sender belongs to my ring - my token is lost */ /* If only one segment exists, then token is not lost, but rather * a JOIN probe for merged segments was received. This can be * ignored because a real token loss will trigger an ALIVE broadcast * packet which will force the membership change. */ if ( Cn.num_segments > 1 ) Memb_token_loss(); } break; case SEG: ret = Conf_proc_by_id( pack_ptr->proc_id, &p ); if( My.seg_index == p.seg_index && reps_ptr->reps[0].type == SEG_REP) { /* this guy is my representative */ My_seg_rep = pack_ptr->proc_id; Shift_to_represented(); for( i=0; i < members_ptr->num_members; i++ ) if( members_ptr->members[i] == My.id ) break; Scast_alive( 1 ); }else{ /* no need to remember this join */ } break; case REPRESENTED: ret = Conf_proc_by_id( pack_ptr->proc_id, &p ); if( My.seg_index == p.seg_index && reps_ptr->reps[0].type == SEG_REP) { if( pack_ptr->proc_id != My_seg_rep ) { Alarm( MEMB, "Memb_handle_join: representative shifts: old %d, new %d\n", My_seg_rep, pack_ptr->proc_id ); My_seg_rep = pack_ptr->proc_id; } E_queue( Shift_to_seg, 0, NULL, Rep_timeout ); for( i=0; i < members_ptr->num_members; i++ ) if( members_ptr->members[i] == My.id ) break; Scast_alive( 1 ); }else{ /* if My_seg_rep is determined - send it to this guy */ /* My_seg_rep can be undetermined if it did not issue a join yet */ if( My_seg_rep != -1 ) { /* send my_seg_rep to pack_ptr->proc_id */ refer_pack.type = REFER_TYPE; refer_pack.proc_id = My.id; refer_pack.memb_id.proc_id = My_seg_rep; refer_pack.memb_id.time = SEG_REP; refer_pack.data_len = 0; send_scat.num_elements = 1; send_scat.elements[0].buf = (char *)&refer_pack; send_scat.elements[0].len = sizeof(packet_header); Net_ucast( pack_ptr->proc_id, &send_scat ); } } break; case GATHER: /* update my F_reps list */ Insert_rep( &F_reps, reps_ptr->reps[0] ); for( i=0; i < reps_ptr->num_reps; i++ ) { Insert_rep( &Potential_reps, reps_ptr->reps[i] ); } /* if sender is my Smallest rep - advance Gather_timeout */ if( Smallest_rep( &F_reps, &dummy ) == reps_ptr->reps[0].proc_id ) E_queue( Form_or_fail, 0, NULL, Gather_timeout ); break; case FORM: /* Ignore */ break; case EVS: /* * If this guy in my formed membership then my token is lost * Otherwise ignore */ if( Conf_id_in_conf( &Future_membership, pack_ptr->proc_id ) != -1 ) { Memb_token_loss(); /* update my F_members list */ Insert_member( &F_members, pack_ptr->proc_id ); } break; } } static void Memb_handle_refer( sys_scatter *scat ) { packet_header *pack_ptr; proc p; rep_info temp_rep; int ret; pack_ptr = (packet_header *) scat->elements[0].buf; switch( State ) { case OP: Alarm( MEMB, "Handle_refer in OP\n"); break; case SEG: Alarm( MEMB, "Handle_refer in SEG\n"); break; case REPRESENTED: Alarm( MEMB, "Handle_refer in REPRESENTED\n"); break; case GATHER: Alarm( MEMB, "Handle_refer in GATHER\n"); ret = Conf_proc_by_id( pack_ptr->memb_id.proc_id, &p ); if( ret < 0 ) { Alarm( PRINT, "Memb_handle_refer: unknown proc_id %d\n", pack_ptr->memb_id.proc_id ); return; } temp_rep.proc_id = p.id; temp_rep.type = pack_ptr->memb_id.time; temp_rep.seg_index = p.seg_index; Insert_rep( &Potential_reps, temp_rep ); break; case FORM: Alarm( MEMB, "Handle_refer in FORM\n"); break; case EVS: Alarm( MEMB, "Handle_refer in EVS\n"); break; } } static void Memb_handle_foreign( sys_scatter *scat ) { packet_header *pack_ptr; proc p; int ret; pack_ptr = (packet_header *) scat->elements[0].buf; switch( State ) { case OP: Alarm( NONE, "Handle_foreign in OP\n"); ret = Conf_proc_by_id( pack_ptr->proc_id, &p ); if( ret < 0 ) { Alarm( PRINT, "Memb_handle_foreign: unknown proc_id %d\n", pack_ptr->proc_id ); return; } if( Conf_leader( &Membership ) == My.id ) { Lookup_new_members(); }else if( Conf_seg_leader( &Membership, My.seg_index ) == My.id && (!Foreign_found) ){ /* * Seg leader : sending one foreign message * to Conf leader */ sys_scatter send_scat; Foreign_found = 1; send_scat.num_elements = 2; send_scat.elements[0].buf = scat->elements[0].buf; send_scat.elements[0].len = scat->elements[0].len; send_scat.elements[1].buf = scat->elements[1].buf; send_scat.elements[1].len = pack_ptr->data_len; Net_ucast( Conf_leader( &Membership ), &send_scat ); Net_ucast( Conf_leader( &Membership ), &send_scat ); } break; case SEG: Alarm( NONE, "Handle_foreign in SEG\n"); break; case REPRESENTED: Alarm( NONE, "Handle_foreign in REPRESENTED\n"); break; case GATHER: Alarm( NONE, "Handle_foreign in GATHER\n"); break; case FORM: Alarm( NONE, "Handle_foreign in FORM\n"); break; case EVS: Alarm( NONE, "Handle_foreign in EVS\n"); /* * This may happen when multiple old memberships merge * and a message is retransmitted using bcast or scast. * i.e. more than one proc miss it and another proc recieves it */ break; } } static void Memb_handle_form1( sys_scatter *scat ) { switch( State ) { case OP: Alarm( MEMB, "Handle_form1 in OP\n"); if( Conf_leader( &Membership ) == My.id ) /* do nothing */; else Fill_form1( scat ); break; case SEG: Alarm( MEMB, "Handle_form1 in SEG\n"); /* swallow this token */ break; case REPRESENTED: Alarm( MEMB, "Handle_form1 in REPRESENTED\n"); Fill_form1( scat ); break; case GATHER: Alarm( MEMB, "Handle_form1 in GATHER\n"); Fill_form1( scat ); break; case FORM: Alarm( MEMB, "Handle_form1 in FORM\n"); /* swallow this token */ break; case EVS: Alarm( MEMB, "Handle_form1 in EVS\n"); /* swallow this token */ break; } } static void Memb_handle_form2( sys_scatter *scat ) { switch( State ) { case OP: Alarm( MEMB, "Handle_form2 in OP\n"); /* swallow this token */ break; case SEG: Alarm( MEMB, "Handle_form2 in SEG\n"); /* swallow this token */ break; case REPRESENTED: Alarm( MEMB, "Handle_form2 in REPRESENTED\n"); /* swallow this token */ break; case GATHER: Alarm( MEMB, "Handle_form2 in GATHER\n"); /* swallow this token */ break; case FORM: Alarm( MEMB, "Handle_form2 in FORM\n"); Read_form2( scat ); break; case EVS: Alarm( MEMB, "Handle_form2 in EVS\n"); /* swallow this token */ break; } } void Memb_token_loss() { rep_info temp_rep; int i; switch( State ) { case OP: /* my token is lost - shift to seg */ Commit_set.num_members = 1; Commit_set.members[0] = My.id; Potential_reps.num_reps = 0; temp_rep.type = POTENTIAL_REP; for( i=0; i < Conf().num_segments; i++ ) { if( i == My.seg_index ) continue; temp_rep.seg_index = i; if( Membership.segments[i].num_procs > 0 ) { temp_rep.proc_id = Membership.segments[i].procs[0]->id; }else{ temp_rep.proc_id = Cn.segments[i].procs[0]->id; } Insert_rep( &Potential_reps, temp_rep ); } break; case SEG: case REPRESENTED: Alarm( EXIT, "Memb_token_loss: bug !!! state is %d\n",State); break; case GATHER: /* I think I totally solved it */ /* If I am not a ring leader it is a bug */ if( ! Token_alive ) Alarm( EXIT, "Memb_token_loss: bug !!! state is %d\n",State); /* I am a ring leader and I lost my ring */ Commit_set.num_members = 1; Commit_set.members[0] = My.id; Potential_reps.num_reps = 0; temp_rep.type = POTENTIAL_REP; /* First adding reps that I gathered - they are more important */ /* starting from 1 not to include myself!! */ for( i=1; i < F_reps.num_reps; i++ ) { temp_rep = F_reps.reps[i]; Insert_rep( &Potential_reps, temp_rep ); } /* Next adding reps from my membership - they are less important */ for( i=0; i < Conf().num_segments; i++ ) { if( i == My.seg_index ) continue; temp_rep.seg_index = i; if( Membership.segments[i].num_procs > 0 ) { temp_rep.proc_id = Membership.segments[i].procs[0]->id; }else{ temp_rep.proc_id = Cn.segments[i].procs[0]->id; } Insert_rep( &Potential_reps, temp_rep ); } break; case FORM: /* my token is lost - shift to seg */ /* potential already updated according to form token */ break; case EVS: /* my token is lost - shift to seg */ /* clear empty messages */ Backoff_membership(); /* update potential according to future_membership */ Potential_reps.num_reps = 0; temp_rep.type = POTENTIAL_REP; for( i=0; i < Conf().num_segments; i++ ) { if( i == My.seg_index ) continue; temp_rep.seg_index = i; if( Future_membership.segments[i].num_procs > 0 ) { temp_rep.proc_id = Future_membership.segments[i].procs[0]->id; }else{ temp_rep.proc_id = Cn.segments[i].procs[0]->id; } Insert_rep( &Potential_reps, temp_rep ); } break; } Alarm( MEMB, "Memb_token_loss: I lost my token, state is %d\n",State); Shift_to_seg(); Token_alive = 0; E_dequeue( Memb_token_loss, 0, NULL ); E_dequeue( Send_join, 0, NULL ); E_dequeue( Form_or_fail, 0, NULL ); E_dequeue( Prot_token_hurry, 0, NULL ); E_dequeue( Lookup_new_members, 0, NULL ); Last_token->type = 0; Last_token->seq = 0; Last_token->aru = 0; for( i=0; i < Conf().num_segments; i++ ) Membership.segments[i].num_procs = 0; Conf_append_id_to_seg(&Membership.segments[My.seg_index], My.id); } static void Shift_to_seg() { State = SEG; GlobalStatus.state = SEG; F_members.num_members = 1; F_members.members[0] = My.id; F_members.num_pending = 0; E_queue( Scast_alive, 0, NULL, Zero_timeout ); E_queue( Gather_or_represented, 0, NULL, Seg_timeout ); } static void Gather_or_represented() { int dummy; My_seg_rep = -1; if( Smallest_member( &F_members, &dummy ) == My.id ) { Shift_to_gather(); }else{ Shift_to_represented(); } } static void Shift_to_gather() { State = GATHER; GlobalStatus.state = GATHER; F_reps.num_reps = 1; F_reps.reps[0].proc_id = My.id; F_reps.reps[0].seg_index = My.seg_index; if( Token_alive ) F_reps.reps[0].type = RING_REP; else F_reps.reps[0].type = SEG_REP; E_dequeue( Scast_alive, 0, NULL ); E_dequeue( Lookup_new_members, 0, NULL ); E_queue( Send_join, 0, NULL, Zero_timeout ); E_queue( Form_or_fail, 0, NULL, Gather_timeout ); } static void Shift_to_represented() { State = REPRESENTED; GlobalStatus.state = REPRESENTED; E_dequeue( Scast_alive, 0, NULL ); E_dequeue( Gather_or_represented, 0, NULL ); E_queue( Shift_to_seg, 0, NULL, Rep_timeout ); } static void Form_or_fail() { rep_info temp_rep; int i; int dummy; if( Smallest_rep( &F_reps, &dummy ) == My.id ) { if( Token_alive && F_reps.num_reps == 1 ) { /* clear everything and go back to op */ E_dequeue( Send_join, 0, NULL); E_queue( Lookup_new_members, 0, NULL, Lookup_timeout ); State = OP; GlobalStatus.state = OP; }else{ /* create and send form token */ Create_form1(); } }else{ if( Token_alive ) { /* clear everything and go back to op */ Alarm( MEMB, "Form_or_fail:failed, return to OP\n"); E_dequeue( Send_join, 0, NULL ); E_queue( Lookup_new_members, 0, NULL, Lookup_timeout ); State = OP; GlobalStatus.state = OP; }else{ Alarm( MEMB, "Form_or_fail: failed to gather\n"); /* failed to gather again */ F_members.num_members = 1; F_members.members[0] = My.id; F_members.num_pending = 0; Potential_reps.num_reps = 0; temp_rep.type = POTENTIAL_REP; /* starting from 1 not to include myself!! */ for( i=1; i < F_reps.num_reps; i++ ) { temp_rep = F_reps.reps[i]; Insert_rep( &Potential_reps, temp_rep ); } Shift_to_gather(); } } } static void Scast_alive( int code, void *dummy ) { packet_header *pack_ptr; Alarm( MEMB, "Scast_alive: State is %d\n", State); pack_ptr = (packet_header *)Send_pack.elements[0].buf; pack_ptr->type = ALIVE_TYPE; pack_ptr->data_len = 2*sizeof(int16) + (F_members.num_members)*sizeof(int32); Send_pack.elements[1].buf = (char *)&F_members; Send_pack.elements[1].len = pack_ptr->data_len; Send_pack.num_elements = 2; Net_scast( My.seg_index, &Send_pack ); if( code == 0 ) E_queue( Scast_alive, 0, NULL, Alive_timeout ); } static void Send_join() { packet_header *pack_ptr; int i; Alarm( MEMB, "Send_join: State is %d\n", State); pack_ptr = (packet_header *)Send_pack.elements[0].buf; pack_ptr->type = JOIN_TYPE; Send_pack.elements[1].buf = (char *)&F_members; Send_pack.elements[1].len = 2*sizeof(int16) + (F_members.num_members)*sizeof(int32); Send_pack.elements[2].buf = (char *)&F_reps; Send_pack.elements[2].len = 2*sizeof(int16) + (F_reps.num_reps)*sizeof(rep_info); Send_pack.num_elements = 3; pack_ptr->data_len = Send_pack.elements[1].len + Send_pack.elements[2].len; if( !Token_alive ) { Net_scast( My.seg_index, &Send_pack ); } for( i=0; i < Potential_reps.num_reps; i++ ) { Net_ucast( Potential_reps.reps[i].proc_id, &Send_pack ); } E_queue( Send_join, 0, NULL, Join_timeout ); } static void Lookup_new_members() { packet_header *pack_ptr; int num_missing; int i,j; if( State != OP ) { Alarm( MEMB, "Lookup_new_member: not in OP state, returning\n"); return; } Potential_reps.num_reps = 0; F_reps.num_reps = 1; F_reps.reps[0].proc_id = My.id; F_reps.reps[0].seg_index= My.seg_index; F_reps.reps[0].type = RING_REP; F_members.num_members = 0; F_members.num_pending = 0; pack_ptr = (packet_header *)Send_pack.elements[0].buf; pack_ptr->type = JOIN_TYPE; Send_pack.elements[1].buf = (char *)&F_members; Send_pack.elements[1].len = 2*sizeof(int16) + (F_members.num_members)*sizeof(int32); Send_pack.elements[2].buf = (char *)&F_reps; Send_pack.elements[2].len = 2*sizeof(int16) + (F_reps.num_reps)*sizeof(rep_info); Send_pack.num_elements = 3; pack_ptr->data_len = Send_pack.elements[1].len + Send_pack.elements[2].len; num_missing = 0; /* For single segment configured, send local broadcast of join to entire segment -- current members will ignore */ if ( Cn.num_segments == 1 ) { Net_scast( My.seg_index, &Send_pack ); num_missing++; } else { /* Send unicasts to each host that is not in the current membership. */ for( i=0; i < Cn.num_segments; i++ ) { for( j=0; j < Cn.segments[i].num_procs; j++ ) { if( Conf_id_in_conf( &Reg_membership, Cn.segments[i].procs[j]->id ) == -1 ) { Net_ucast( Cn.segments[i].procs[j]->id, &Send_pack ); num_missing++; } } } } if( num_missing ) Shift_to_gather(); } static int Insert_member( members_info *m, int32 proc_id ) { int i; for( i=0; i < m->num_members; i++ ) if( m->members[i] == proc_id ) { return( 0 ); } if (m->num_members == MAX_PROCS_RING) { /* members structure is full -- so we ignore this new member */ Alarmp( SPLOG_WARNING, MEMB, "Insert_member: members structure full (%u members) so ignore new member (ID %u.%u.%u.%u)\n", m->num_members, IP1(proc_id), IP2(proc_id), IP3(proc_id), IP4(proc_id) ); return( 0 ); } m->members[m->num_members] = proc_id; m->num_members++; return( 1 ); } static int Insert_rep( reps_info *r, rep_info rep ) { proc p; int ret; int i; ret = Conf_proc_by_id( rep.proc_id, &p ); if( ret < 0 ) Alarm( EXIT, "Insert_rep: proc %d not exist\n", rep.proc_id ); if( rep.type == POTENTIAL_REP ) { /* when Potential - if there is someone from same segment (of any type RING, SEG, POTENTIAL - don't insert */ for( i=0; i < r->num_reps; i++ ) { if( r->reps[i].seg_index == p.seg_index ) { return( 0 ); } } }else if( rep.type == SEG_REP ) { /* * when Seg - if there is other SEG or RING from same segment - keep all. * if it is POTENTIAL then replace it and exit because no other from this segment can exist. */ for( i=0; i < r->num_reps; i++ ) { if( r->reps[i].seg_index == p.seg_index ) { if( r->reps[i].type == POTENTIAL_REP ) { r->reps[i].type = SEG_REP; r->reps[i].proc_id = p.id; return( 1 ); }else if( r->reps[i].type == SEG_REP ) { if (r->reps[i].proc_id == p.id ) return( 0 ); }else if( r->reps[i].type == RING_REP && r->reps[i].proc_id == p.id ) { /* Former RING_REP lost its ring and became SEG_REP */ r->reps[i].type = SEG_REP; return( 1 ); } } } }else if( rep.type == RING_REP ) { /* * when Ring - if it exists exactly (proc_id) - make it from ring, * in any case elminate potentials. * If potential exists, it is known that no ring or seg exists. */ for( i=0; i < r->num_reps; i++ ) { if( r->reps[i].seg_index == p.seg_index ) { if( r->reps[i].type == POTENTIAL_REP ) { r->reps[i].type = RING_REP; r->reps[i].proc_id = p.id; return( 1 ); }else if( r->reps[i].type == SEG_REP ) { if (r->reps[i].proc_id == p.id ) { r->reps[i].type = RING_REP; return( 1 ); } }else if( r->reps[i].type == RING_REP && r->reps[i].proc_id == p.id ) { return( 0 ); } } } }else Alarm( EXIT, "Insert_rep: unknown rep.type %d \n", rep.type ); if (r->num_reps == MAX_REPS) { /* reps structure is full -- so we ignore this new rep */ Alarmp( SPLOG_WARNING, MEMB, "Insert_rep: reps structure full (%u reps) so ignore new rep (Type %d SegIndex %u ID %u.%u.%u.%u)\n", r->num_reps, rep.type, rep.seg_index, IP1(rep.proc_id), IP2(rep.proc_id), IP3(rep.proc_id), IP4(rep.proc_id) ); return( 0 ); } r->reps[r->num_reps] = rep; r->num_reps++; return( 1 ); } static int32 Smallest_member( members_info *m, int *index ) { int current; int i; proc curr_proc, i_proc; int ret; current = 0; ret = Conf_proc_by_id( m->members[current], &curr_proc ); if( ret < 0 ) Alarm( EXIT, "Smallest_member: unknown proc_id %d\n", m->members[current] ); for( i=1; i < m->num_members; i++ ) { ret = Conf_proc_by_id( m->members[i], &i_proc ); if( ret < 0 ) Alarm( EXIT, "Smallest_member: Bug! i: %d, proc %d\n", i, m->members[i] ); if( i_proc.seg_index < curr_proc.seg_index || ( i_proc.seg_index == curr_proc.seg_index && i_proc.index_in_seg < curr_proc.index_in_seg ) ) { curr_proc = i_proc; current = i; } } *index = current; return( m->members[current] ); } static int32 Smallest_rep( reps_info *r, int *index ) { int current; int i; proc curr_p, i_p; int ret; current = 0; for( i=1; i < r->num_reps; i++ ) { if( r->reps[current].type == SEG_REP ) { if( r->reps[i].type == SEG_REP ) { if( r->reps[i].seg_index < r->reps[current].seg_index ) current = i; } }else if( r->reps[current].type == RING_REP ) { if( r->reps[i].type == RING_REP ) { if( r->reps[i].seg_index < r->reps[current].seg_index ) current = i; else if( r->reps[i].seg_index == r->reps[current].seg_index ) { ret = Conf_proc_by_id( r->reps[current].proc_id, &curr_p ); ret = Conf_proc_by_id( r->reps[i].proc_id, &i_p ); if( i_p.index_in_seg < curr_p.index_in_seg ) current = i; } }else if( r->reps[i].type == SEG_REP ){ current = i; } }else Alarm( EXIT, "Smallest_rep: bug! current index %d is proc %d of type %d\n", current,r->reps[current].proc_id,r->reps[current].type ); } *index = current; return ( r->reps[current].proc_id ); } static void Sort_members( members_info *m ) { members_info temp_members; int index; int i; int32 dummy; temp_members = *m; for( i=0; i < m->num_members; i++ ) { dummy = Smallest_member( &temp_members, &index ); m->members[i] = temp_members.members[index]; temp_members.num_members--; temp_members.members[index] = temp_members.members[temp_members.num_members]; } } static void Sort_reps( reps_info *r ) { reps_info temp_reps; int index; int i; int32 dummy; temp_reps = *r; for( i=0; i < r->num_reps; i++ ) { dummy = Smallest_rep( &temp_reps, &index ); r->reps[i] = temp_reps.reps[index]; temp_reps.num_reps--; temp_reps.reps[index] = temp_reps.reps[temp_reps.num_reps]; } } static void Create_form1() { token_header form_token; ring_info *rg_info; int32 *num_rings; int32 *holes_procs_ptr; int32 index; int pack_entry; int num_bytes; sys_scatter send_scat; char rg_info_buf[sizeof(token_body)]; rep_info temp_rep; int i,j; int cur_num_members; members_info valid_members; form_token.seq = Highest_seq+3333; form_token.proc_id = My.id; form_token.type = FORM1_TYPE; /* if I am a ring leader - update my F_members */ if( F_reps.reps[0].type == RING_REP ) { F_members.num_members = 0; for( i=0; i < Membership.num_segments; i++ ) for( j=0; j < Membership.segments[i].num_procs; j++ ) { F_members.members[F_members.num_members] = Membership.segments[i].procs[j]->id; F_members.num_members++; } } cur_num_members = 0; for ( i = 0; i < F_members.num_members; i++ ) { int invalid_member; invalid_member = 0; /* Remove from F_members any members that are also in OUR F_reps (except myself). */ for ( j = 0; j < F_reps.num_reps; j++ ) { if ( (F_members.members[i] == F_reps.reps[j].proc_id ) && (F_members.members[i] != My.id) ) { invalid_member = 1; break; } } if (!invalid_member) { valid_members.members[cur_num_members] = F_members.members[i]; cur_num_members++; } } memcpy( &F_members.members[0], &valid_members.members[0], cur_num_members * sizeof(int32) ); F_members.num_members = cur_num_members; /* I am the first in F_members. put the rest in pending */ F_members.num_pending = F_members.num_members - 1; F_members.num_members = 1; Sort_reps( &F_reps ); F_reps.rep_index = 1; /* update potential in case of failure */ Potential_reps.num_reps = 0; for( i=0; i < F_reps.num_reps; i++ ) { temp_rep = F_reps.reps[i]; if( temp_rep.seg_index != My.seg_index ) { temp_rep.type = POTENTIAL_REP; Insert_rep( &Potential_reps, temp_rep ); } } num_bytes = 0; num_rings = (int32 *)rg_info_buf; *num_rings= 1; num_bytes += sizeof(int32); rg_info = (ring_info *)&rg_info_buf[num_bytes]; num_bytes += sizeof(ring_info); holes_procs_ptr = (int32 *)&rg_info_buf[num_bytes]; rg_info->memb_id = Membership_id; rg_info->trans_time = 0; rg_info->aru = Aru; rg_info->highest_seq = Highest_seq; /* update holes */ rg_info->num_holes = 0; for( index = My_aru+1; index <= Highest_seq; index++ ) { pack_entry = index & PACKET_MASK; if( ! Packets[pack_entry].exist ) { *holes_procs_ptr = index; Alarm( MEMB , "INSERT HOLE 1 IS %d My_aru is %d, Highest_seq is %d\n", index,My_aru, Highest_seq); holes_procs_ptr++; num_bytes += sizeof(int32); rg_info->num_holes++; } } /* update commit-trans procs */ /* insert self in trans and commit */ rg_info->num_commit = 1; rg_info->num_trans = 1; *holes_procs_ptr = My.id; num_bytes += sizeof(int32); holes_procs_ptr++; /* insert other members of commit set */ for( i=0; i < Commit_set.num_members; i++ ) { /* skipping self, because already there */ if( Commit_set.members[i] == My.id ) continue; /* insert this member */ *holes_procs_ptr = Commit_set.members[i]; holes_procs_ptr++; num_bytes += sizeof(int32); rg_info->num_commit++; } send_scat.num_elements = 4; send_scat.elements[0].buf = (char *)&form_token; send_scat.elements[0].len = sizeof(token_header); send_scat.elements[1].buf = (char *)&F_members; send_scat.elements[1].len = sizeof(members_info); send_scat.elements[2].buf = (char *)&F_reps; send_scat.elements[2].len = sizeof(reps_info); send_scat.elements[3].buf = rg_info_buf; send_scat.elements[3].len = num_bytes; form_token.rtr_len = send_scat.elements[1].len + send_scat.elements[2].len + send_scat.elements[3].len; /* compute whom to send to */ if( F_members.num_pending > 0 ) { /* send to next member in pending list */ Net_ucast_token( F_members.members[F_members.num_members], &send_scat ); Net_ucast_token( F_members.members[F_members.num_members], &send_scat ); }else if( F_reps.rep_index < F_reps.num_reps){ /* send to next rep */ Net_ucast_token( F_reps.reps[F_reps.rep_index].proc_id, &send_scat ); Net_ucast_token( F_reps.reps[F_reps.rep_index].proc_id, &send_scat ); }else{ /* singleton membership */ F_members.num_pending = 1; F_members.num_members = 0; form_token.type = FORM2_TYPE; send_scat.elements[2].len = sizeof(membership_id); form_token.rtr_len = send_scat.elements[1].len + send_scat.elements[2].len + send_scat.elements[3].len; Net_ucast_token( My.id, &send_scat ); } E_dequeue( Send_join, 0, NULL ); E_queue( Memb_token_loss, 0, NULL, Form_timeout ); E_dequeue( Prot_token_hurry, 0, NULL ); Token_alive = 0; State = FORM; GlobalStatus.state = FORM; } static void Fill_form1( sys_scatter *scat ) { sys_scatter send_scat; token_header *form_token; members_info *m_info; reps_info *r_info; ring_info *old_rg_info, *new_rg_info; ring_info *my_rg_info; int32 *old_num_rings, *new_num_rings; int32 *my_holes_procs_ptr, *new_holes_procs_ptr; int32 index; int pack_entry; char rg_info_buf[sizeof(token_body)]; char *c_ptr; char *rings_buf; int num_bytes; int bytes_to_copy; rep_info temp_rep; int i,j,k; int cur_num_members; int num_to_copy; members_info valid_members; num_bytes = 0; form_token = (token_header *)scat->elements[0].buf; m_info = (members_info *)scat->elements[1].buf; num_bytes += sizeof(members_info); r_info = (reps_info *)&scat->elements[1].buf[num_bytes]; num_bytes += sizeof(reps_info); rings_buf = &scat->elements[1].buf[num_bytes]; old_num_rings = (int32 *)&scat->elements[1].buf[num_bytes]; num_bytes += sizeof(int32); old_rg_info= (ring_info *)&scat->elements[1].buf[num_bytes]; if( !Same_endian( form_token->type ) ) { Flip_members( m_info ); Flip_reps( r_info ); Flip_rings( rings_buf ); } /* update header */ form_token->proc_id = My.id; if( form_token->seq < Highest_seq+3333 ) form_token->seq = Highest_seq+3333; /* update members and reps */ if( State == OP || State == REPRESENTED ) { /* validity check */ if( m_info->members[m_info->num_members] != My.id || m_info->num_pending <= 0 ) return; m_info->num_members++; m_info->num_pending--; }else if( State == GATHER ){ /* validity check */ if( r_info->reps[r_info->rep_index].proc_id != My.id || m_info->num_pending != 0 ) { return; } /* * Add validation check to my F_members list. I need to remove members: * 1) Any members already in the m_info I just received in form1. * 2) Any members of r_info I just received in form1 (except myself). * 3) Any members that are also in OUR F_reps (except myself). * The reason for this is that these guys also became seg_reps * from the same segment as me, but did not make it to the final reps list on form1 * and if included as members will act as seg_reps which they should not. */ cur_num_members = 0; for ( i = 0; i < F_members.num_members; i++ ) { int invalid_member; invalid_member = 0; /* 1) Any members already in the m_info I just received in form1. */ for ( j = 0; j < m_info->num_members; j++ ) { if (F_members.members[i] == m_info->members[j] ) { invalid_member = 1; break; } } /* 2) Any members of r_info I just received in form1 (except myself). */ for ( j = 0; !invalid_member && j < r_info->num_reps; j++ ) { if ( (F_members.members[i] == r_info->reps[j].proc_id ) && (F_members.members[i] != My.id) ) { invalid_member = 1; break; } } /* 3) Any members that are also in OUR F_reps (except myself). */ for ( j = 0; !invalid_member && j < F_reps.num_reps; j++ ) { if ( (F_members.members[i] == F_reps.reps[j].proc_id ) && (F_members.members[i] != My.id) ) { invalid_member = 1; break; } } if (!invalid_member) { valid_members.members[cur_num_members] = F_members.members[i]; cur_num_members++; } } memcpy( &F_members.members[0], &valid_members.members[0], cur_num_members * sizeof(int32) ); F_members.num_members = cur_num_members; /* Now add all my members into the form1 token */ if( r_info->reps[r_info->rep_index].type == SEG_REP ) { /* Fill myself and my members */ i = m_info->num_members; for( j=0; j < F_members.num_members; j++, i++ ) m_info->members[i] = F_members.members[j]; m_info->num_pending = F_members.num_members - 1; m_info->num_members += 1; }else if( r_info->reps[r_info->rep_index].type == RING_REP ){ /* Fill myself and my ring members */ i = m_info->num_members; for( j=0; j < Membership.num_segments; j++ ) for( k=0; k < Membership.segments[j].num_procs; k++, i++ ) m_info->members[i] = Membership.segments[j].procs[k]->id; m_info->num_pending = i - m_info->num_members -1; m_info->num_members += 1; }else Alarm( EXIT, "Fill_form1: invalid rep type: %d\n", r_info->reps[r_info->rep_index].type ); r_info->rep_index++; }else Alarm( EXIT, "Fill_form1: invalid State: %d\n",State ); /* update potential in case of failure */ Potential_reps.num_reps = 0; for( i=0; i < r_info->num_reps; i++ ) { temp_rep = r_info->reps[i]; if( temp_rep.seg_index != My.seg_index ) { temp_rep.type = POTENTIAL_REP; Insert_rep( &Potential_reps, temp_rep ); } } /* update rings info */ num_bytes = 0; new_num_rings = (int32 *)&rg_info_buf[num_bytes]; *new_num_rings = 0; num_bytes += sizeof(int32); my_rg_info = NULL; my_holes_procs_ptr = NULL; /* optimiser thinks it may be used uninitialised (its wrong, but it is too subtle for it) */ for( i=0; i < *old_num_rings; i++ ) { bytes_to_copy = sizeof(ring_info) + ( old_rg_info->num_holes + old_rg_info->num_commit )* sizeof(int32); if( Memb_is_equal( old_rg_info->memb_id, Membership_id ) ) { my_rg_info = old_rg_info; c_ptr = (char *) old_rg_info; my_holes_procs_ptr = (int32 *)&c_ptr[sizeof(ring_info)]; old_rg_info = (ring_info *)&c_ptr[bytes_to_copy]; }else{ new_rg_info= (ring_info *)&rg_info_buf[num_bytes]; memmove((char *)new_rg_info, (char *)old_rg_info, bytes_to_copy ); c_ptr = (char *) old_rg_info; old_rg_info = (ring_info *)&c_ptr[bytes_to_copy]; num_bytes += bytes_to_copy; (*new_num_rings)++; } } new_rg_info= (ring_info *)&rg_info_buf[num_bytes]; num_bytes += sizeof(ring_info); (*new_num_rings)++; new_rg_info->memb_id = Membership_id; new_rg_info->trans_time = 0; new_rg_info->num_holes = 0; new_holes_procs_ptr = (int32 *)&rg_info_buf[num_bytes]; new_rg_info->aru = Aru; new_rg_info->highest_seq = Highest_seq; if( my_rg_info == NULL ) { if ( *new_num_rings > MAX_FORM_REPS ) { /* This ring_info entry will NOT fit in the FORM token packet. * So since the ring_info is needed for me (this daemon) to be * included in the current membership ring, we will have * to remove ourselves from the m_info list and not * create this ring_info */ Alarmp( SPLOG_WARNING, MEMB, "Fill_form1: ring_info entry for %u.%u.%u.%u will not fit in FORM token. Removing self from current membership attempt by removing IP from m_info list\n", IP1(My.id), IP2(My.id), IP3(My.id), IP4(My.id)); /* since new ring is always at the end, we just decrease current byte count */ num_bytes = num_bytes - sizeof(ring_info); (*new_num_rings)--; /* Remove ourselves from m_info */ for ( i=0; i < m_info->num_members; i++) { if( m_info->members[i] == My.id ) { num_to_copy = m_info->num_members + m_info->num_pending - i - 1; memmove(&m_info->members[i], &m_info->members[i+1], num_to_copy * sizeof(int32)); break; } } m_info->num_members--; } else { /* New ring_info will fit, so create it */ for( index = Last_discarded+1; index <= Highest_seq; index++ ) { pack_entry = index & PACKET_MASK; if( ! Packets[pack_entry].exist ) { *new_holes_procs_ptr = index; Alarm( MEMB , "INSERT HOLE 2 IS %d\n",index); new_holes_procs_ptr++; num_bytes += sizeof(int32); new_rg_info->num_holes++; } } /* update commit-trans procs */ /* insert self in trans and commit */ new_rg_info->num_commit = 1; new_rg_info->num_trans = 1; *new_holes_procs_ptr = My.id; new_holes_procs_ptr++; num_bytes += sizeof(int32); /* insert other members of commit set */ for( i=0; i < Commit_set.num_members; i++ ) { /* skipping self, because already there */ if( Commit_set.members[i] == My.id ) continue; /* insert this member */ *new_holes_procs_ptr = Commit_set.members[i]; new_holes_procs_ptr++; num_bytes += sizeof(int32); new_rg_info->num_commit++; } } }else{ members_info temp_set; if( my_rg_info->aru > Aru ) new_rg_info->aru = my_rg_info->aru; if( my_rg_info->highest_seq > Highest_seq ) new_rg_info->highest_seq= my_rg_info->highest_seq; for( i=0; i < my_rg_info->num_holes; i++ ) { pack_entry = *my_holes_procs_ptr & PACKET_MASK; if( ! Packets[pack_entry].exist ) { *new_holes_procs_ptr = *my_holes_procs_ptr; Alarm( MEMB , "INSERT HOLE 3 IS %d My_aru is %d, Highest_seq is %d\n", *new_holes_procs_ptr,My_aru, Highest_seq); new_holes_procs_ptr++; num_bytes += sizeof(int32); new_rg_info->num_holes++; } my_holes_procs_ptr++; } if( my_rg_info->highest_seq < Highest_seq ) { for( index = my_rg_info->highest_seq+1; index <= Highest_seq; index++ ) { pack_entry = index & PACKET_MASK; if( ! Packets[pack_entry].exist ) { Alarm( MEMB , "INSERT HOLE 4 IS %d\n",index); *new_holes_procs_ptr = index; new_holes_procs_ptr++; num_bytes += sizeof(int32); new_rg_info->num_holes++; } } } /* setting temp_set to be trans members only */ temp_set.num_members = 0; for( i = 0; i < my_rg_info->num_trans; i++ ) { Insert_member( &temp_set, *my_holes_procs_ptr); my_holes_procs_ptr++; } /* creating an updated temp_set based on my_rg_info and Commit_set */ /* adding self to trans members */ Insert_member( &temp_set, My.id ); if( temp_set.num_members != (my_rg_info->num_trans + 1) ) Alarm( EXIT, "Fill_form1: incorrect trans set\n"); temp_set.num_pending = my_rg_info->num_trans+1; /* adding rest of original commit set */ for( i = my_rg_info->num_trans; i < my_rg_info->num_commit; i++ ) { Insert_member( &temp_set, *my_holes_procs_ptr ); my_holes_procs_ptr++; } /* adding my commit set */ for( i = 0; i < Commit_set.num_members; i++ ) { Insert_member( &temp_set, Commit_set.members[i] ); } /* writing my ring commit and trans information to new_rg_info */ new_rg_info->num_commit = temp_set.num_members; new_rg_info->num_trans = temp_set.num_pending; for( i = 0; i < temp_set.num_members; i++ ) { *new_holes_procs_ptr = temp_set.members[i]; new_holes_procs_ptr++; num_bytes += sizeof(int32); } } send_scat.num_elements = 4; send_scat.elements[0].buf = (char *)form_token; send_scat.elements[0].len = sizeof(token_header); send_scat.elements[1].buf = (char *)m_info; send_scat.elements[1].len = sizeof(members_info); send_scat.elements[2].buf = (char *)r_info; send_scat.elements[2].len = sizeof(reps_info); send_scat.elements[3].buf = rg_info_buf; send_scat.elements[3].len = num_bytes; form_token->rtr_len = send_scat.elements[1].len + send_scat.elements[2].len + send_scat.elements[3].len; /* compute whom to send to */ if( m_info->num_pending > 0 ) { /* send to next member in pending list */ Net_ucast_token( m_info->members[m_info->num_members], &send_scat ); Net_ucast_token( m_info->members[m_info->num_members], &send_scat ); }else if( r_info->rep_index < r_info->num_reps){ /* send to next rep */ Net_ucast_token( r_info->reps[r_info->rep_index].proc_id, &send_scat ); Net_ucast_token( r_info->reps[r_info->rep_index].proc_id, &send_scat ); }else{ /* prepare form2 token */ Sort_members( m_info ); m_info->num_pending = m_info->num_members; m_info->num_members = 0; form_token->type = FORM2_TYPE; /* this is the only difference between form1 and form2 tokens */ send_scat.elements[2].len = sizeof(membership_id); form_token->rtr_len = send_scat.elements[1].len + send_scat.elements[2].len + send_scat.elements[3].len; Net_ucast_token( m_info->members[0], &send_scat ); Net_ucast_token( m_info->members[0], &send_scat ); } E_dequeue( Send_join, 0, NULL ); E_dequeue( Form_or_fail, 0, NULL ); E_dequeue( Shift_to_seg, 0, NULL ); E_queue( Memb_token_loss, 0, NULL, Form_timeout ); E_dequeue( Prot_token_hurry, 0, NULL ); Token_alive = 0; State = FORM; GlobalStatus.state = FORM; } static void Read_form2( sys_scatter *scat ) { sys_scatter send_scat; token_header *form_token; members_info *m_info; membership_id *m_id_info; ring_info *rg_info; ring_info *my_rg_info; int32 *my_holes_procs_ptr; int32 *num_rings; int pack_entry; char *c_ptr; char *rings_buf; int num_bytes; int bytes_to_skip; proc p; int ret; int i; int32 memb_time = 0; num_bytes = 0; form_token = (token_header *)scat->elements[0].buf; m_info = (members_info *)scat->elements[1].buf; num_bytes += sizeof(members_info); m_id_info = (membership_id *)&scat->elements[1].buf[num_bytes]; num_bytes += sizeof(membership_id); rings_buf = &scat->elements[1].buf[num_bytes]; num_rings = (int32 *)&scat->elements[1].buf[num_bytes]; num_bytes += sizeof(int32); rg_info= (ring_info *)&scat->elements[1].buf[num_bytes]; if( !Same_endian( form_token->type ) ) { Flip_members( m_info ); m_id_info->proc_id = Flip_int32( m_id_info->proc_id ); m_id_info->time = Flip_int32( m_id_info->time ); Flip_rings( rings_buf ); } form_token->proc_id = My.id; /* validity check */ if( m_info->members[m_info->num_members] != My.id || m_info->num_pending <= 0 ) return; m_info->num_members++; m_info->num_pending--; Last_seq = form_token->seq; if( m_info->num_members == 1 ) { /* The time in memb_time is saved to use on one of the * Trans_memb_ids (there is one per Trans_membership view). * The time for the real membership id will be one past * the one in memb_time, because I want time to go forward. */ memb_time = E_get_time().sec; if( memb_time <= Last_time_used ) memb_time = Last_time_used + 1; Last_time_used = memb_time; /* I am future leader, fill membership_id */ m_id_info->proc_id = My.id; m_id_info->time = ++Last_time_used; } /* build Future membership and Future membership id */ Future_membership_id.proc_id = m_id_info->proc_id; Future_membership_id.time = m_id_info->time; Future_membership = Conf(); for( i=0; i < Future_membership.num_segments; i++ ) Future_membership.segments[i].num_procs = 0; for( i=0; i < (m_info->num_members + m_info->num_pending); i++ ) { ret = Conf_proc_by_id( m_info->members[i], &p ); if( ret < 0 ) Alarm( EXIT, "Read_form2: no such id %u\n", m_info->members[i] ); if ( Conf_append_id_to_seg( &Future_membership.segments[p.seg_index], p.id) == -1) Alarm( EXIT, "Read_form2: BUG2 no such id %u\n", p.id); } Net_set_membership( Future_membership ); FC_new_configuration( ); /* get my ring info */ my_rg_info = NULL; my_holes_procs_ptr = NULL; for( i=0; i < *num_rings; i++ ) { bytes_to_skip = sizeof(ring_info) + ( rg_info->num_holes + rg_info->num_commit ) * sizeof(int32); if( Memb_is_equal( rg_info->memb_id, Membership_id ) ) { my_rg_info = rg_info; my_holes_procs_ptr = (int32 *)&scat->elements[1].buf[num_bytes+sizeof(ring_info)]; } c_ptr = (char *) rg_info; rg_info = (ring_info *)&c_ptr[bytes_to_skip]; num_bytes += bytes_to_skip; } if (my_rg_info == NULL) { Alarm(EXIT, "Read_form2: num_rings = %d, num_bytes = %d, Memb_id = (%d %d)\n", *num_rings, num_bytes, Membership_id.proc_id, Membership_id.time); } Highest_seq = my_rg_info->highest_seq; Aru = my_rg_info->aru; /* Note: this call to Discard_packets handles delivery of all the messages * from the old membership with sequence numbers prior to the old Aru. */ Discard_packets(); for( i=0; i < my_rg_info->num_holes; i++ ) { /* create dummy messages */ pack_entry = *my_holes_procs_ptr & PACKET_MASK; Alarm( MEMB , "EXTRACT HOLE IS %d\n",*my_holes_procs_ptr); if( Packets[pack_entry].exist != 0 ) Alarm( EXIT, "Read_form2: seq %d should be a hole, but is %d\n", *my_holes_procs_ptr, Packets[pack_entry].exist ); Packets[pack_entry].exist = 3; my_holes_procs_ptr++; } /* extract future commit set (and future trans membership) */ Future_commit_set.num_members = my_rg_info->num_commit; Future_commit_set.num_pending = my_rg_info->num_trans; for( i=0; i < my_rg_info->num_commit; i++ ) { Future_commit_set.members[i] = *my_holes_procs_ptr; my_holes_procs_ptr++; } /* The token circulates in conf order, which also defines the order * by which we choose "leaders." So, if noone else has set the id * for my ring, I get to, and I'll be leader. */ if( !my_rg_info->trans_time ) { /* memb_time could be already set, if this daemon is the * future leader */ if( !memb_time ) { memb_time = E_get_time().sec; if( memb_time <= Last_time_used ) memb_time = Last_time_used + 1; Last_time_used = memb_time; } my_rg_info->trans_time = memb_time; } F_trans_memb_time = my_rg_info->trans_time; send_scat.num_elements = 2; send_scat.elements[0].buf = scat->elements[0].buf; send_scat.elements[0].len = sizeof(token_header); send_scat.elements[1].buf = scat->elements[1].buf; send_scat.elements[1].len = num_bytes; if( Conf_last( &Future_membership ) != My.id ) { Net_send_token( &send_scat ); Net_send_token( &send_scat ); }else{ /* build first regular token */ send_scat.num_elements = 1; form_token->type = 0; form_token->seq = 0; form_token->aru = Last_seq; form_token->flow_control = 0; form_token->rtr_len = 0; Net_send_token( &send_scat ); } Token_alive = 1; E_queue( Memb_token_loss, 0, NULL, Token_timeout ); Last_token->type = 0; Last_token->seq = 0; Last_token->aru = 0; State = EVS; GlobalStatus.state = EVS; } void Memb_print_form_token( sys_scatter *scat ) { token_header *form_token; members_info *m_info; reps_info *r_info = NULL; /* avoids compile warning -- gcc not detect initialization */ membership_id *m_id_info = NULL; /* avoids compile warning -- gcc not detect initialization */ ring_info *rg_info; int32 *num_rings; int32 *commit_id; char *c_ptr; int num_bytes, bytes_to_skip; int i,j, scat_index; int is_form1 = 0; char form_name[10]; num_bytes = 0; scat_index = 1; form_token = (token_header *)scat->elements[0].buf; m_info = (members_info *)&scat->elements[scat_index].buf[num_bytes]; num_bytes += sizeof(members_info); if (num_bytes == scat->elements[scat_index].len ) { num_bytes = 0; scat_index++; } if ( Is_form1( form_token->type ) ) { r_info = (reps_info *)&scat->elements[scat_index].buf[num_bytes]; num_bytes += sizeof(reps_info); is_form1 = 1; } else if( Is_form2( form_token->type ) ) { m_id_info = (membership_id *)&scat->elements[scat_index].buf[num_bytes]; num_bytes += sizeof(membership_id); is_form1 = 0; } else { Alarm( EXIT, "Invalid token type received: 0x%x\n", form_token->type); return; } if (num_bytes == scat->elements[scat_index].len ) { num_bytes = 0; scat_index++; } num_rings = (int32 *)&scat->elements[scat_index].buf[num_bytes]; num_bytes += sizeof(int32); rg_info= (ring_info *)&scat->elements[scat_index].buf[num_bytes]; /* Print form_token_header */ Alarmp( SPLOG_PRINT, PRINT, "=========== Form Token ==========\n"); if ( is_form1 ) snprintf(&form_name[0], 10, "FORM 1"); else snprintf(&form_name[0], 10, "FORM 2"); Alarmp( SPLOG_PRINT, PRINT, "%s Token, sent by %u.%u.%u.%u. Seq: %d\n", form_name, IP1(form_token->transmiter_id), IP2(form_token->transmiter_id), IP3(form_token->transmiter_id), IP4(form_token->transmiter_id), form_token->seq); Alarmp( SPLOG_PRINT, PRINT, "ProcID: %u.%u.%u.%u\t ARU: %d, ARU LastID: %u.%u.%u.%u\n", IP1(form_token->proc_id), IP2(form_token->proc_id), IP3(form_token->proc_id), IP4(form_token->proc_id), form_token->aru, IP1(form_token->aru_last_id), IP2(form_token->aru_last_id), IP3(form_token->aru_last_id), IP4(form_token->aru_last_id) ); Alarmp( SPLOG_PRINT, PRINT, "FlowControl: %hd\tRTR Len: %hd\n", form_token->flow_control, form_token->rtr_len); /* Print members list */ Alarmp( SPLOG_PRINT, PRINT, "Form Token members list -- Active (%hd) Pending (%hd)\n", m_info->num_members, m_info->num_pending); for (i=0; i < m_info->num_members; i++) { Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", i, IP1(m_info->members[i]), IP2(m_info->members[i]), IP3(m_info->members[i]), IP4(m_info->members[i]) ); if ( (i % 3) == 2 ) Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n"); } Alarmp( SPLOG_PRINT_NODATE, PRINT, "\nPending Members:\n"); for (i= m_info->num_members; i < ( m_info->num_members + m_info->num_pending); i++) { Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", i, IP1(m_info->members[i]), IP2(m_info->members[i]), IP3(m_info->members[i]), IP4(m_info->members[i]) ); if ( (i % 3) == 2 ) Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n"); } Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n"); if ( is_form1 ) { /* Print reps list */ Alarmp( SPLOG_PRINT, PRINT, "Form Token reps list -- Count (%hd) index (%hd)\n", r_info->num_reps, r_info->rep_index); for (i=0; i < r_info->num_reps; i++) { Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u (T %hd SegInd %hd) ", i, IP1(r_info->reps[i].proc_id), IP2(r_info->reps[i].proc_id), IP3(r_info->reps[i].proc_id), IP4(r_info->reps[i].proc_id), r_info->reps[i].type, r_info->reps[i].seg_index ); if ( (i % 3) == 2 ) Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n"); } Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n"); } else /* so is FORM2 type */ { Alarmp( SPLOG_PRINT, PRINT, "Form Token Membership ID %u.%u.%u.%u : %d\n", IP1(m_id_info->proc_id), IP2(m_id_info->proc_id), IP3(m_id_info->proc_id), IP4(m_id_info->proc_id), m_id_info->time ); } /* Print ring info */ Alarmp( SPLOG_PRINT, PRINT, "Form Token RING list -- Count (%d)\n", *num_rings); for (i=0; i < *num_rings; i++) { bytes_to_skip = sizeof(ring_info) + ( rg_info->num_holes + rg_info->num_commit ) * sizeof(int32); c_ptr = (char *) rg_info; Alarmp( SPLOG_PRINT, PRINT, "Ring %u: MembID %u.%u.%u.%u - %u\tTransTime %u\n", i, IP1(rg_info->memb_id.proc_id), IP2(rg_info->memb_id.proc_id), IP3(rg_info->memb_id.proc_id), IP4(rg_info->memb_id.proc_id), rg_info->memb_id.time, rg_info->trans_time); Alarmp( SPLOG_PRINT, PRINT, "\tARU: %d\tHighSeq: %d\tNumHoles: %d\n", rg_info->aru, rg_info->highest_seq, rg_info->num_holes); Alarmp( SPLOG_PRINT, PRINT, "\tNumCommit: %hd\tNumTrans: %hd\n", rg_info->num_commit, rg_info->num_trans); /* Now print all missing messages from this ring (holes) */ commit_id = (int32 *) &c_ptr[sizeof(ring_info)]; Alarmp( SPLOG_PRINT, PRINT, "\tMessage Holes:"); for (j=0; j < rg_info->num_holes; j++) { Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u ", *commit_id); commit_id++; } Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n"); /* Now print transitional member list */ Alarmp( SPLOG_PRINT, PRINT, "\tTrans List:"); for (j=0; j < rg_info->num_trans; j++) { Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", j, IP1(*commit_id), IP2(*commit_id), IP3(*commit_id), IP4(*commit_id) ); if ( (j % 3) == 2 ) Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n"); commit_id++; } Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n"); /* Now print commit list. This follows the trans list with no gaps. */ Alarmp( SPLOG_PRINT, PRINT, "\tCommit List:"); for (j=rg_info->num_trans; j < rg_info->num_commit; j++) { Alarmp( SPLOG_PRINT_NODATE, PRINT, "\t%u: %u.%u.%u.%u ", j, IP1(*commit_id), IP2(*commit_id), IP3(*commit_id), IP4(*commit_id) ); if ( (j % 3) == 2 ) Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n"); commit_id++; } Alarmp( SPLOG_PRINT_NODATE, PRINT, "\n"); /* next ring */ rg_info = (ring_info *)&c_ptr[bytes_to_skip]; } Alarmp( SPLOG_PRINT, PRINT, "====================================================\n"); } static void Backoff_membership() { int pack_entry; int i; pack_entry=-1; for( i=Last_discarded+1; i <= Highest_seq; i++ ) { /* clear dummy messages */ pack_entry = i & PACKET_MASK; if( Packets[pack_entry].exist == 3 ) Packets[pack_entry].exist = 0; } /* return Aru and My_aru */ Aru = Last_discarded; My_aru = Last_discarded; for( i=Last_discarded+1; i <= Highest_seq; i++ ) { if( !Packets[pack_entry].exist ) break; My_aru++; } } void Memb_commit() { Commit_set = Future_commit_set; } void Memb_transitional() { int i, j, k; int32u proc_id; Alarm( MEMB, "Memb_transitional\n"); Transitional = 1; Trans_membership.num_segments = Cn.num_segments; for( i=0; i < Cn.num_segments; i++ ) { Trans_membership.segments[i].num_procs = 0; for( j=0; j < Cn.segments[i].num_procs; j++ ) { proc_id = Cn.segments[i].procs[j]->id; for( k=0; k < Commit_set.num_pending; k++ ) { if( Commit_set.members[k] == proc_id ) { if ( Conf_append_id_to_seg( &Trans_membership.segments[i], proc_id) == -1) Alarm(EXIT, "Memb_transitional: Commit_set has member %u for trans who doesn't exist\n", proc_id); break; } } } } Trans_memb_id.proc_id = Conf_leader( &Trans_membership ); Trans_memb_id.time = F_trans_memb_time; Commit_membership.num_segments = Cn.num_segments; for( i=0; i < Cn.num_segments; i++ ) { Commit_membership.segments[i].num_procs = 0; for( j=0; j < Cn.segments[i].num_procs; j++ ) { proc_id = Cn.segments[i].procs[j]->id; for( k=0; k < Commit_set.num_members; k++ ) { if( Commit_set.members[k] == proc_id ) { if ( Conf_append_id_to_seg(&Commit_membership.segments[i], proc_id) == -1) Alarm(EXIT, "Memb_transitional: Commit_set has member %u who doesn't exist\n", proc_id); break; } } } } } void Memb_regular() { int i; Alarm( MEMB, "Memb_regular\n"); Transitional = 0; State = OP; GlobalStatus.state = OP; GlobalStatus.membership_changes++; Membership = Future_membership; Membership_id = Future_membership_id; Reg_membership = Membership; GlobalStatus.num_procs = 0; GlobalStatus.num_segments = 0; for( i=0; i < Membership.num_segments; i++ ) { if( Membership.segments[i].num_procs > 0 ) { GlobalStatus.num_procs += Membership.segments[i].num_procs; GlobalStatus.num_segments++; } } GlobalStatus.leader_id = Membership_id.proc_id; Foreign_found = 0; if( Conf_leader( &Membership ) == My.id ) E_queue( Lookup_new_members, 0, NULL, Lookup_timeout ); printf("Membership id is ( %d, %d)\n", Membership_id.proc_id, Membership_id.time ); printf("%c", Conf_print( &Membership ) ); } void Flip_members( members_info *members_ptr ) { /* * This routine can not be called twice beacuse of num_members and num_pending */ int i; members_ptr->num_members = Flip_int16( members_ptr->num_members ); members_ptr->num_pending = Flip_int16( members_ptr->num_pending ); for( i=0; i < members_ptr->num_members + members_ptr->num_pending; i++ ) members_ptr->members[i] = Flip_int32( members_ptr->members[i] ); } void Flip_reps( reps_info *reps_ptr ) { /* * This routine can not be called twice beacuse of num_reps */ int i; reps_ptr->num_reps = Flip_int16( reps_ptr->num_reps ); reps_ptr->rep_index = Flip_int16( reps_ptr->rep_index ); for( i=0; i < reps_ptr->num_reps; i++ ) { reps_ptr->reps[i].proc_id = Flip_int32( reps_ptr->reps[i].proc_id ); reps_ptr->reps[i].type = Flip_int16( reps_ptr->reps[i].type ); reps_ptr->reps[i].seg_index = Flip_int16( reps_ptr->reps[i].seg_index ); } } void Flip_rings( char *buf ) { /* * This routine can not be called twice beacuse of *num_rings * and of ring_info_ptr->num_holes */ ring_info *ring_info_ptr; int32 *num_rings; int ptr; char *c_ptr; int32 *seq_or_proc; int i,j; c_ptr = buf; ptr = 0; num_rings = (int32 *)&c_ptr[ptr]; *num_rings = Flip_int32( *num_rings ); ptr += sizeof(int32); for( i=0; i < *num_rings; i++ ) { ring_info_ptr = (ring_info *)&c_ptr[ptr]; ring_info_ptr->memb_id.proc_id = Flip_int32( ring_info_ptr->memb_id.proc_id ); ring_info_ptr->memb_id.time = Flip_int32( ring_info_ptr->memb_id.time ); ring_info_ptr->trans_time = Flip_int32( ring_info_ptr->trans_time ); ring_info_ptr->aru = Flip_int32( ring_info_ptr->aru ); ring_info_ptr->highest_seq = Flip_int32( ring_info_ptr->highest_seq ); ring_info_ptr->num_holes = Flip_int32( ring_info_ptr->num_holes ); ring_info_ptr->num_commit = Flip_int16( ring_info_ptr->num_commit ); ring_info_ptr->num_trans = Flip_int16( ring_info_ptr->num_trans ); ptr += sizeof(ring_info); for( j=0; j < ( ring_info_ptr->num_holes + ring_info_ptr->num_commit ); j++ ) { seq_or_proc = (int32 *)&c_ptr[ptr]; *seq_or_proc = Flip_int32( *seq_or_proc ); ptr += sizeof(int32); } } }