/* * The Spread Toolkit. * * The contents of this file are subject to the Spread Open-Source * License, Version 1.0 (the ``License''); you may not use * this file except in compliance with the License. You may obtain a * copy of the License at: * * http://www.spread.org/license/ * * or in the file ``license.txt'' found in this distribution. * * Software distributed under the License is distributed on an AS IS basis, * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License * for the specific language governing rights and limitations under the * License. * * The Creators of Spread are: * Yair Amir, Michal Miskin-Amir, Jonathan Stanton. * * Copyright (C) 1993-2004 Spread Concepts LLC * * All Rights Reserved. * * Major Contributor(s): * --------------- * Cristina Nita-Rotaru crisn@cs.purdue.edu - group communication security. * Theo Schlossnagle jesus@omniti.com - Perl, skiplists, autoconf. * Dan Schoenblum dansch@cnds.jhu.edu - Java interface. * John Schultz jschultz@cnds.jhu.edu - contribution to process group membership. * */ #include "arch.h" #include #include #include #include "spread_params.h" #include "net_types.h" #include "protocol.h" #include "prot_body.h" #include "sess_types.h" #include "sess_body.h" #include "groups.h" #include "objects.h" #include "memory.h" #include "sp_events.h" #include "status.h" #include "alarm.h" #include "membership.h" #if ( SPREAD_PROTOCOL > 3 ) #include "queues.h" #endif #include "skiplist.h" #include "alarm.h" #include "message.h" #ifndef NULL #define NULL 0 #endif /* NULL */ #define ESTABLISHED_MEMBER 0 #define NEW_MEMBER 1 #define PARTITIONED_MEMBER 2 #define Is_established_member( status ) ( status == ESTABLISHED_MEMBER ) #define Is_new_member( status ) ( status == NEW_MEMBER ) #define Is_partitioned_member( status ) ( status == PARTITIONED_MEMBER ) typedef struct dummy_groups_buf_link { char buf[GROUPS_BUF_SIZE]; int bytes; struct dummy_groups_buf_link *next; } groups_buf_link; struct worklist { char name[MAX_GROUP_NAME]; Skiplist *groups; }; char *printgroup(void *vgrp) { group *grp = (group *)vgrp; return grp->name; } static int Gstate; static configuration Trans_memb; static membership_id Trans_memb_id; static configuration Reg_memb; static membership_id Reg_memb_id; static char Mess_buf[MAX_MESSAGE_BODY_LEN]; static groups_buf_link *Groups_bufs; static int Num_mess_gathered; static int Num_daemons_gathered; static message_link Gathered; /* Groups messages */ #if 0 static group Work[MAX_PROCS_RING+1]; #endif static int Groups_control_down_queue; static int G_id_is_equal( group_id g1, group_id g2 ); static void G_print_group_id( group_id g ); static group *G_get_group( char *group_name ); static member *G_get_member( group *grp, char *private_group_name ); static int G_build_memb_buf( group *grp, message_obj *msg, char buf[] ); static int G_build_memb_vs_buf( group *grp, message_obj *msg, char buf[], int32 caused ); static message_link *G_build_trans_mess( group *grp ); static void G_stamp_groups_buf( char buf[] ); static void G_build_groups_msg_hdr( message_obj *msg, int groups_bytes ); static int G_build_groups_buf( char buf[], struct skiplistnode **iter_ptr ); static int G_mess_to_groups( message_link *mess_link, char *name, struct worklist *work ); static int G_smallest_group_indices( Skiplist *work, struct worklist *indices[] ); static void G_compute_and_notify(void); static void G_print(void); static void G_empty_groups_bufs(void); static Skiplist GroupsList; static int G_compare(void *, void *); static int G_compare(void *a, void *b) { /* Takes two Group records and compares them based on their keys (name) */ /* This will work for any record type that has a char[LENGTH] as the first member of the struct and is the key */ assert(a); assert(b); return strcmp((char *)a, (char *)b); } int G_member_recordcompare(void *a, void *b) { int compared; /* Takes two Member records and compares them based on their keys (name) */ member *am = (member *)a, *bm = (member *)b; assert(a); assert(b); compared = strcmp( bm->private_name, am->private_name ); if(compared > 0) return -1; if(compared == 0) return 0; return 1; } int G_member_keycompare(void *a, void *b) { int compared; /* Takes two Member records and compares them based on their keys (name) */ member *bm = (member *)b; char *am = (char *)a; assert(a); assert(b); compared = strcmp( bm->private_name, am ); if(compared > 0) return -1; if(compared == 0) { return 0; } return 1; } /* Take two worklist pointers and compare them by the ->groups pointers */ /* We do not actually care about the order they are stored in the skiplist * that is why we are using meaningless pointers as keys. */ static int G_work_groups_comp(void *a, void *b) { struct worklist *wA, *wB; assert(a); assert(b); wA = (struct worklist *)a; wB = (struct worklist *)b; if (wA->groups < wB->groups) return(-1); if (wA->groups == wB->groups) return(0); return(1); } static int G_work_groups_keycomp(void *key, void *b) { struct worklist *wB; Skiplist *wKey; assert(key); assert(b); wB = (struct worklist *)b; wKey = (Skiplist *)key; if (wKey < wB->groups) return(-1); if (wKey == wB->groups) return(0); return(1); } #if 0 int G_group_revproc_comp(void *a, void *b) { int32 aid, bid; struct worklist *A=(struct worklist *)a, *B=(struct worklist *)b; aid=A->proc_index; bid=B->proc_index; return (aid>bid)?(-1):((aid==bid)?0:1); } int G_group_revproc_keycomp(void *a, void *b) { int32 aid, bid; struct worklist *B=(struct worklist *)b; bid=B->proc_index; aid = *(int32 *)a; return (aid>bid)?-1:((aid==bid)?0:1); } #endif void G_init() { int ret; Alarm( GROUPS, "G_init: \n" ); Num_groups = 0; GlobalStatus.num_groups = Num_groups; sl_init(&GroupsList); /* Key's address is the same as records address and is a null terminated string, so we can use the same function to compare record<->record key<->record */ sl_set_compare(&GroupsList, G_compare, G_compare); /* Groups.next = NULL; Groups.prev = NULL; */ ret = Mem_init_object(GROUP, sizeof(group), 100, 0); if (ret < 0) { Alarm(EXIT, "G_init: Failure to Initialize GROUP memory objects\n"); } ret = Mem_init_object(MEMBER, sizeof(member), 200, 0); if (ret < 0) { Alarm(EXIT, "G_init: Failure to Initialize MEMBER memory objects\n"); } ret = Mem_init_object(GROUPS_BUF_LINK, sizeof(groups_buf_link), 1, 1); if( ret < 0 ) { Alarm(EXIT, "G_init: Failure to Initialize GROUPS_BUF_LINK memory objects\n"); } #if ( SPREAD_PROTOCOL == 3 ) Groups_control_down_queue = 0; #else Groups_control_down_queue = init_queuesess(Groups_down_qs); #endif Groups_bufs = NULL; Num_mess_gathered = 0; Num_daemons_gathered = 0; Gathered.next = NULL; Gstate = GOP; GlobalStatus.gstate = Gstate; } void G_handle_reg_memb( configuration reg_memb, membership_id reg_memb_id ) { group *grp, *nextgroup; member *mbr, *nextmember; struct skiplistnode *giter, *iter, *passed_iter; groups_buf_link *grps_buf_link; message_link *mess_link; down_link *down_ptr; message_obj *msg; message_header *head_ptr; int num_bytes; int needed; int ses; int i; Alarm( GROUPS, "G_handle_reg_memb: with (%d.%d.%d.%d, %d) id\n", IP1(reg_memb_id.proc_id),IP2(reg_memb_id.proc_id),IP3(reg_memb_id.proc_id), IP4(reg_memb_id.proc_id), reg_memb_id.time ); switch( Gstate ) { case GOP: Alarm( EXIT, "G_handle_reg_memb in GOP\n"); break; case GTRANS: /* * Save reg_memb and reg_memb_id * if previous Trans_memb is equal to reg_memb then: * for every changed group * eliminate partitioned members * set Grp_id to (reg_memb_id, 1) * notify local members of regular membership * Shift to GOP * else * for every changed group * eliminate partitioned members * set Grp_id to (reg_memb_id, -1) * Replace protocol queue, raise event thershold * Build groups message -- only local members * Send groups message * Shift to GGATHER */ Alarm( GROUPS, "G_handle_reg_memb in GTRANS\n"); Reg_memb = reg_memb; Reg_memb_id = reg_memb_id; if( Conf_num_procs( &Trans_memb ) == Conf_num_procs( &Reg_memb ) ) { giter = sl_getlist( &GroupsList ); grp = (giter)?(group *)giter->data:NULL; for( ; grp != NULL ; grp = nextgroup ) { nextgroup = sl_next( &GroupsList, &giter ); if( grp->changed ) { /* The group has changed */ /* eliminating partitioned members */ iter = sl_getlist( &grp->MembersList ); mbr = (iter)?(member *)iter->data:NULL; for( ; mbr != NULL ; mbr = nextmember ) { nextmember = sl_next( &grp->MembersList, &iter ); if( Is_partitioned_member( mbr->status ) ) { /* discard this member - proc no longer in membership */ sl_remove ( &grp->MembersList,mbr->private_name, dispose); grp->num_members--; } } if( grp->num_members == 0 ) { /* discard this empty group */ sl_destruct ( &grp->MembersList, dispose); sl_remove ( &GroupsList, grp->name, dispose); Num_groups--; GlobalStatus.num_groups = Num_groups; }else{ Alarm( GROUPS, "G_handle_reg_memb: skipping state transfer for group %s.\n", grp->name ); Alarm( DEBUG, "G_handle_reg_memb: changing group_id from: " ); G_print_group_id( grp->grp_id ); grp->grp_id.memb_id = Reg_memb_id; grp->grp_id.index = 1; grp->changed = 0; Alarm( DEBUG, " to: " ); G_print_group_id( grp->grp_id ); Alarm( DEBUG, "\n" ); if( grp->num_local > 0 ){ /* send members regular membership */ msg = Message_new_message(); num_bytes = G_build_memb_vs_buf( grp, msg, Mess_buf, CAUSED_BY_NETWORK ); /* create the mess_link */ mess_link = new( MESSAGE_LINK ); Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes ); mess_link->mess = msg; Obj_Inc_Refcount(mess_link->mess); /* notify local members */ needed = 0; for( i=0; i < grp->num_local; i++ ) { ses = Sess_get_session_index ( grp->mbox[i] ); if( Is_memb_session( Sessions[ ses ].status ) ) Sess_write( ses, mess_link, &needed ); } Message_Dec_Refcount(msg); if( !needed ) Sess_dispose_message( mess_link ); } } } } Gstate = GOP; GlobalStatus.gstate = Gstate; }else{ /* * else * for every changed group * eliminate partitioned members * set Grp_id to (reg_memb_id, -1) * Replace protocol queue, raise event thershold * build groups message -- only local members * Send groups message * Shift to GGATHER */ giter = sl_getlist( &GroupsList ); grp = (giter)?(group *)giter->data:NULL; for( ; grp != NULL ; grp = nextgroup ) { nextgroup = sl_next( &GroupsList, &giter ); if( grp->changed ) { /* The group has changed */ /* eliminating partitioned members */ iter = sl_getlist( &grp->MembersList ); mbr = (iter)?(member *)iter->data:NULL; for( ; mbr != NULL ; mbr = nextmember ) { nextmember = sl_next( &grp->MembersList, &iter ); if( Is_partitioned_member( mbr->status ) ) { /* discard this member - proc no longer in membership */ sl_remove ( &grp->MembersList,mbr->private_name, dispose); grp->num_members--; } } if( grp->num_members == 0 ) { /* discard this empty group */ sl_destruct ( &grp->MembersList, dispose); sl_remove ( &GroupsList, grp->name, dispose); Num_groups--; GlobalStatus.num_groups = Num_groups; } } } /* raise events threshold */ Session_threshold = MEDIUM_PRIORITY; Sess_set_active_threshold(); /* Replace down queue */ Prot_set_down_queue( GROUPS_DOWNQUEUE ); /* build and send Groups message */ /* Nowadays, we can send multiple groups messages. No group has * data in more than one. As an optimization, only the last message is * AGREED, and all previous ones are RELIABLE. G_handle_groups uses this * knowledge when parsing Groups messages. */ passed_iter = NULL; do { msg = Message_new_message(); grps_buf_link = new( GROUPS_BUF_LINK ); grps_buf_link->next = Groups_bufs; Groups_bufs = grps_buf_link; grps_buf_link->bytes = G_build_groups_buf(grps_buf_link->buf, &passed_iter); G_build_groups_msg_hdr( msg, grps_buf_link->bytes ); head_ptr = Message_get_message_header(msg); if( passed_iter ) head_ptr->type |= RELIABLE_MESS; else head_ptr->type |= AGREED_MESS; Message_Buffer_to_Message_Fragments( msg, grps_buf_link->buf, grps_buf_link->bytes ); down_ptr = Prot_Create_Down_Link(msg, Message_get_packet_type(head_ptr->type), 0, 0); down_ptr->mess = msg; Obj_Inc_Refcount(down_ptr->mess); /* Use control queue--not normal session queues */ Prot_new_message( down_ptr, Groups_control_down_queue ); Message_Dec_Refcount(msg); Alarm( GROUPS, "G_handle_reg_memb: (%8s) GROUPS message sent in GTRANS with %d bytes\n", (passed_iter) ? "RELIABLE" : "AGREED", grps_buf_link->bytes ); } while( passed_iter != NULL ); Gstate = GGATHER; GlobalStatus.gstate = Gstate; } break; case GGATHER: Alarm( EXIT, "G_handle_reg_memb in GGATHER\n"); break; case GGT: /* * Save reg_memb and reg_memb_id * Clear all retained Groups messages * Stamp own Groups message with current membership id * Send group message * Shift to GGATHER */ Alarm( GROUPS, "G_handle_reg_memb in GGT\n"); Reg_memb = reg_memb; Reg_memb_id = reg_memb_id; /* Clear retained Groups messages in Gathered */ for( i=0; i < Num_mess_gathered; i++ ) { mess_link = Gathered.next; Gathered.next = mess_link->next; Sess_dispose_message( mess_link ); } Num_mess_gathered = 0; Num_daemons_gathered = 0; for( grps_buf_link = Groups_bufs; grps_buf_link; grps_buf_link = grps_buf_link->next ) { /* Stamp own Groups message in buffer with current membership id */ G_stamp_groups_buf( grps_buf_link->buf ); /* send Groups message */ msg = Message_new_message(); G_build_groups_msg_hdr( msg, grps_buf_link->bytes ); head_ptr = Message_get_message_header(msg); if( grps_buf_link->next ) head_ptr->type |= RELIABLE_MESS; else head_ptr->type |= AGREED_MESS; Message_Buffer_to_Message_Fragments( msg, grps_buf_link->buf, grps_buf_link->bytes ); down_ptr = Prot_Create_Down_Link(msg, Message_get_packet_type(head_ptr->type), 0, 0); down_ptr->mess = msg; Obj_Inc_Refcount(down_ptr->mess); /* Use control queue--not normal session queues */ Prot_new_message( down_ptr, Groups_control_down_queue ); Message_Dec_Refcount(msg); Alarm( GROUPS, "G_handle_reg_memb: (%8s) GROUPS message sent in GGT with %d bytes\n", (grps_buf_link->next) ? "RELIABLE" : "AGREED", grps_buf_link->bytes ); } Gstate = GGATHER; GlobalStatus.gstate = Gstate; break; } } void G_handle_trans_memb( configuration trans_memb, membership_id trans_memb_id ) { group *grp, *nextgroup; member *mbr, *nextmember; struct skiplistnode *giter, *iter; int group_changed; message_link *mess_link; int needed; int ses; int i; Alarm( GROUPS, "G_handle_trans_memb: \n" ); switch( Gstate ) { case GOP: /* * Save transitional membership * For every group that has members that are not in the trans_memb do: * mark group members that are not in trans_memb as partitioned. * notify local members with an empty transitional group mess. * mark group as changed (index = -1) * Shift to GTRANS */ Alarm( GROUPS, "G_handle_trans_memb in GOP\n"); Trans_memb = trans_memb; Trans_memb_id = trans_memb_id; Alarm( GROUPS, "G_handle_trans_memb: Received trans memb id of:" " {proc_id: %d " " time: %d}\n", Trans_memb_id.proc_id, Trans_memb_id.time ); giter = sl_getlist( &GroupsList ); grp = (giter)?(group *)giter->data:NULL; for( ; grp != NULL ; grp = nextgroup ) { nextgroup = sl_next( &GroupsList, &giter ); group_changed = 0; iter = sl_getlist( &grp->MembersList ); mbr = (iter)?(member *)iter->data:NULL; for( ; mbr != NULL ; mbr = nextmember ) { nextmember = sl_next( &grp->MembersList, &iter ); if( Conf_id_in_conf( &Trans_memb, mbr->proc_id ) == -1 ) { /* mark this member as partitioned - proc no longer in membership */ mbr->status = PARTITIONED_MEMBER; group_changed = 1; } } if( group_changed ) { if( grp->num_local > 0 ) { /* send members transitional membership */ mess_link = G_build_trans_mess( grp ); needed = 0; for( i=0; i < grp->num_local; i++ ) { ses = Sess_get_session_index ( grp->mbox[i] ); if( Is_memb_session( Sessions[ ses ].status ) ) Sess_write( ses, mess_link, &needed ); } if( !needed ) Sess_dispose_message( mess_link ); } Alarm( DEBUG, "G_handle_trans_memb: changed group %s in GOP, change" " group id from: ", grp->name ); G_print_group_id( grp->grp_id ); grp->grp_id.memb_id = trans_memb_id; grp->grp_id.index = 1; /* Not technically needed, but not bad, either. */ grp->changed = 1; Alarm( DEBUG, " to: " ); G_print_group_id( grp->grp_id ); Alarm( DEBUG, "\n" ); } } Gstate = GTRANS; GlobalStatus.gstate = Gstate; break; case GTRANS: Alarm( EXIT, "G_handle_trans_memb in GTRANS\n"); break; case GGATHER: /* * Save transitional membership * For every group that has members that are not in the * trans_memb do: * discard group members that are not in trans_memb * if group is changed then mark it as changed (index = -1) (it might be already changed, but its ok). * Shift to GGT * * Note: there is no need to notify local members with a transitional group mess * becuase no message will come between the trans group memb and the next reg group memb. * Note: this cascading deletes of members that are not in transitional membership actually * opens the door for implementation of the ERSADS97 algorithm. */ Alarm( GROUPS, "G_handle_trans_memb in GGATHER\n"); Trans_memb = trans_memb; Trans_memb_id = trans_memb_id; /* Need this because we deliver the transitional again if we complete * the state exchange during GGT. */ Alarm( GROUPS, "G_handle_trans_memb: Received trans memb id of:" " {proc_id: %d " " time: %d}\n", Trans_memb_id.proc_id, Trans_memb_id.time ); giter = sl_getlist( &GroupsList ); grp = (giter)?(group *)giter->data:NULL; for( ; grp != NULL ; grp = nextgroup ) { nextgroup = sl_next( &GroupsList, &giter ); group_changed = 0; iter = sl_getlist( &grp->MembersList ); mbr = (iter)?(member *)iter->data:NULL; for( ; mbr != NULL ; mbr = nextmember ) { nextmember = sl_next( &grp->MembersList, &iter ); if( Conf_id_in_conf( &Trans_memb, mbr->proc_id ) == -1 ) { /* discard this member - proc no longer in membership */ sl_remove ( &grp->MembersList,mbr->private_name, dispose); grp->num_members--; group_changed = 1; } } if( grp->num_members == 0 ) { /* discard this empty group */ sl_destruct ( &grp->MembersList, dispose); sl_remove ( &GroupsList, grp->name, dispose); Num_groups--; GlobalStatus.num_groups = Num_groups; }else if( group_changed ) { grp->changed = 1; } } Gstate = GGT; GlobalStatus.gstate = Gstate; break; case GGT: Alarm( EXIT, "G_handle_trans_memb in GGT\n"); break; } } void G_handle_join( char *private_group_name, char *group_name ) { group *grp, *new_grp; member *mbr, *new_mbr; int needed; char *num_vs_ptr; /* num members in virtual-synchrony/failure-atomicity set */ int num_bytes; char proc_name[MAX_PROC_NAME]; char private_name[MAX_PRIVATE_NAME+1]; int new_p_ind, p_ind1; proc new_p, p1; int ses; mailbox new_mbox; message_link *mess_link; message_header *head_ptr; message_obj *msg, *joiner_msg; char *vs_ptr; /* the virtual synchrony set */ int i; int32u temp; Alarm( GROUPS, "G_handle_join: %s joins group %s\n", private_group_name, group_name ); switch( Gstate ) { case GOP: case GTRANS: if (Gstate == GOP) Alarm( GROUPS, "G_handle_join in GOP\n"); if (Gstate == GTRANS) Alarm( GROUPS, "G_handle_join in GTRANS\n"); /* * if already in group then ignore * if the group is unchanged and new member is coming from alive daemon then: * Insert to group as established * Increment Grp_id * Notify all local members of a regular membership caused by join * else if group is changed and coming from alive daemon then * Insert to group as new * Increment Grp_id * if there are local members then * build a membership with all members, and vs set with all established members * notify all local established members with that membership (caused by network) * if new member is local * notify new member with membership and self vs set (caused by network) * notify all local members a transitional membership * mark new member as established * else (if new member is coming from a partitioned daemon then) * Insert to group as partitioned * Increment Grp_id, and mark group as changed if not already done * if there are local members then * build a membership with all members and vs set with all established members * notify all local members with that membership (caused by network) * notify all local members with transitional membership * * Note: remember that when delivering a regular message while in GTRANS, you should use the * mbox list of the group. You should still be cautious when delivering memberships to take * care of the fact that the new guy gets a different treatment. */ G_private_to_names( private_group_name, private_name, proc_name ); new_p_ind = Conf_proc_by_name( proc_name, &new_p ); if( new_p_ind < 0 ) { Alarm( PRINT, "G_handle_join: illegal proc_name %s in private_group %s \n", proc_name, private_group_name ); return; } grp = G_get_group( group_name ); if( grp == NULL ) { new_grp = new( GROUP ); memset( new_grp->name, 0, MAX_GROUP_NAME ); strcpy( new_grp->name, group_name ); sl_init( &new_grp->MembersList ); sl_set_compare( &new_grp->MembersList, G_member_recordcompare, G_member_keycompare); if( Gstate == GOP) { new_grp->changed = 0; new_grp->grp_id.memb_id = Reg_memb_id; } else { /* Gtrans */ new_grp->changed = 1; new_grp->grp_id.memb_id = Trans_memb_id; } new_grp->grp_id.index = 0; /* 0 because we will definitely increment it */ Alarm( DEBUG, "G_handle_join: New group added with group id: " ); G_print_group_id( new_grp->grp_id ); Alarm( DEBUG, "\n" ); new_grp->num_members = 0; new_grp->num_local = 0; sl_insert( &GroupsList, new_grp ); Num_groups++; /*sl need this?*/ GlobalStatus.num_groups = Num_groups; grp = new_grp; } mbr = G_get_member( grp, private_group_name ); if( mbr != NULL ) { Alarm( PRINT, "G_handle_join: %s is already in group %s\n", private_group_name, group_name ); return; } /* Add a new member as ESTABLISHED (might change later on depending on the situation */ new_mbr = new( MEMBER ); memset( new_mbr->private_name, 0, MAX_GROUP_NAME ); strcpy( new_mbr->private_name, private_group_name ); new_mbr->proc_id = new_p.id; new_mbr->status = ESTABLISHED_MEMBER; new_mbr->p_ind = new_p_ind; sl_insert( &grp->MembersList, new_mbr ); grp->num_members++; /* if member is local then add mbox */ if( new_mbr->proc_id == My.id ) { ses = Sess_get_session( private_name ); if( ses < 0 ) Alarm( EXIT, "G_handle_join: local session does not exist\n"); grp->mbox[ grp->num_local ] = Sessions[ ses ].mbox; grp->num_local++; new_mbox = Sessions[ ses ].mbox; }else new_mbox = -1; /* This is the meat */ if( Gstate == GOP || ( Conf_id_in_conf( &Trans_memb, new_p.id ) != -1 ) ) { /* new member is coming from alive daemon */ if( !grp->changed ) { /* group is unchanged */ /* Increment group id */ grp->grp_id.index++; /* Notify local members */ if( grp->num_local > 0 ) { msg = Message_new_message(); num_bytes = G_build_memb_buf( grp, msg, Mess_buf ); head_ptr = Message_get_message_header(msg); head_ptr->type |= CAUSED_BY_JOIN ; /* notify all local members */ num_vs_ptr = &Mess_buf[ num_bytes ]; num_bytes += sizeof( int32 ); temp = 1; memcpy( num_vs_ptr, &temp, sizeof( int32 ) ); /* *num_vs_ptr = 1; */ vs_ptr = (char *)&Mess_buf[ num_bytes ]; memcpy( vs_ptr, new_mbr->private_name, MAX_GROUP_NAME ); num_bytes += MAX_GROUP_NAME; head_ptr->data_len += ( sizeof(int32) + MAX_GROUP_NAME ); mess_link = new( MESSAGE_LINK ); Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes ); mess_link->mess = msg; Obj_Inc_Refcount(mess_link->mess); needed = 0; for( i=0; i < grp->num_local; i++ ) { ses = Sess_get_session_index ( grp->mbox[i] ); if( Is_memb_session( Sessions[ ses ].status ) ) Sess_write( ses, mess_link, &needed ); } if ( !needed ) Sess_dispose_message( mess_link ); Message_Dec_Refcount(msg); } }else{ /* group is changed */ /* mark new member as new */ new_mbr->status = NEW_MEMBER; /* Increment group id */ grp->grp_id.index++; if( grp->num_local > 0 ) { /* build a membership with all members, and vs set with all established members */ msg = Message_new_message(); num_bytes = G_build_memb_vs_buf( grp, msg, Mess_buf, CAUSED_BY_NETWORK ); /* notify all non-new local members with that membership (caused by network) */ mess_link = new( MESSAGE_LINK ); Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes ); mess_link->mess = msg; Obj_Inc_Refcount(mess_link->mess); needed = 0; for( i=0; i < grp->num_local; i++ ) { /* if new member is local we do not notify it here */ if( grp->mbox[i] == new_mbox ) continue; ses = Sess_get_session_index ( grp->mbox[i] ); if( Is_memb_session( Sessions[ ses ].status ) ) Sess_write( ses, mess_link, &needed ); } if ( !needed ) Sess_dispose_message( mess_link ); Message_Dec_Refcount(msg); /* notify new member if local */ if( new_mbox != -1 ) { /* build a membership with all members */ joiner_msg = Message_new_message(); num_bytes = G_build_memb_buf( grp, joiner_msg, Mess_buf ); head_ptr = Message_get_message_header(joiner_msg); head_ptr->type |= CAUSED_BY_NETWORK ; /* build a self vs set */ num_vs_ptr = &Mess_buf[ num_bytes ]; num_bytes += sizeof( int32 ); temp = 1; memcpy( num_vs_ptr, &temp, sizeof( int32 ) ); /* *num_vs_ptr = 1; */ vs_ptr = (char *)&Mess_buf[ num_bytes ]; memcpy( vs_ptr, new_mbr->private_name, MAX_GROUP_NAME ); num_bytes += MAX_GROUP_NAME; head_ptr->data_len += ( sizeof(int32) + MAX_GROUP_NAME ); mess_link = new( MESSAGE_LINK ); Message_Buffer_to_Message_Fragments( joiner_msg, Mess_buf, num_bytes ); mess_link->mess = joiner_msg; Obj_Inc_Refcount(mess_link->mess); needed = 0; ses = Sess_get_session_index ( new_mbox ); if( Is_memb_session( Sessions[ ses ].status ) ) Sess_write( ses, mess_link, &needed ); if ( !needed ) Sess_dispose_message( mess_link ); Message_Dec_Refcount(joiner_msg); } /* notify all local members a transitional membership */ mess_link = G_build_trans_mess( grp ); needed = 0; for( i=0; i < grp->num_local; i++ ) { ses = Sess_get_session_index ( grp->mbox[i] ); if( Is_memb_session( Sessions[ ses ].status ) ) Sess_write( ses, mess_link, &needed ); } if( !needed ) Sess_dispose_message( mess_link ); } /* Mark new member as established */ new_mbr->status = ESTABLISHED_MEMBER; } }else{ /* coming from a partitioned daemon */ /* mark new member as partitioned member */ new_mbr->status = PARTITIONED_MEMBER; /* * (marking group as changed - it might be already ) */ if( !grp->changed ) grp->changed = 1; grp->grp_id.index++; if( grp->num_local > 0 ) { /* build a membership with all members, and vs set with all non-partitioned members */ msg = Message_new_message(); num_bytes = G_build_memb_vs_buf( grp, msg, Mess_buf, CAUSED_BY_NETWORK ); /* notify all local members with that membership (caused by network) */ mess_link = new( MESSAGE_LINK ); Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes ); mess_link->mess = msg; Obj_Inc_Refcount(mess_link->mess); needed = 0; for( i=0; i < grp->num_local; i++ ) { ses = Sess_get_session_index ( grp->mbox[i] ); if( Is_memb_session( Sessions[ ses ].status ) ) Sess_write( ses, mess_link, &needed ); } if ( !needed ) Sess_dispose_message( mess_link ); Message_Dec_Refcount(msg); /* notify all local members a transitional membership */ mess_link = G_build_trans_mess( grp ); needed = 0; for( i=0; i < grp->num_local; i++ ) { ses = Sess_get_session_index ( grp->mbox[i] ); if( Is_memb_session( Sessions[ ses ].status ) ) Sess_write( ses, mess_link, &needed ); } if( !needed ) Sess_dispose_message( mess_link ); } } /* Compute the mask */ for(i=0; i<4; i++) { grp->grp_mask[i] = 0; } { struct skiplistnode *iter; member *memp; /* for( mbr= &grp->members; mbr->next != NULL; mbr=mbr->next ) { p_ind1 = Conf_proc_by_id( mbr->next->proc_id, &p1 ); */ for( iter = sl_getlist( &grp->MembersList ), memp=(member *)iter->data; iter != NULL; memp = (member *)sl_next( &grp->MembersList, &iter )) { p_ind1 = Conf_proc_by_id( memp->proc_id, &p1 ); temp = 1; for(i=0; igrp_mask[p1.seg_index/32] |= temp; } } Alarm(GROUPS, "G_handle_join: Mask for group %s set to %x %x %x %x\n", grp->name, grp->grp_mask[3], grp->grp_mask[2], grp->grp_mask[1], grp->grp_mask[0]); break; case GGATHER: Alarm( EXIT, "G_handle_join in GGATHER\n"); break; case GGT: Alarm( EXIT, "G_handle_join in GGT\n"); break; } } void G_handle_leave( char *private_group_name, char *group_name ) { char proc_name[MAX_PROC_NAME]; char private_name[MAX_PRIVATE_NAME+1]; char departing_private_group_name[MAX_GROUP_NAME]; int p_ind, p_ind1; proc p, p1; group *grp; member *mbr; char *num_vs_ptr; /* num members in vs set */ char *vs_ptr; /* the virtual synchrony set */ message_link *mess_link; message_header *head_ptr; message_obj *msg; int num_bytes; int needed; int ses; int i, j; int32u temp; Alarm( GROUPS, "G_handle_leave: %s leaves group %s\n", private_group_name, group_name ); switch( Gstate ) { case GOP: case GTRANS: if (Gstate == GOP) Alarm( GROUPS, "G_handle_leave in GOP\n"); if (Gstate == GTRANS) Alarm( GROUPS, "G_handle_leave in GTRANS\n"); /* * if not already in group then ignore * if this member is local, notify it and extract its mbox * Extract this member from group * if the group is unchanged (in GOP all groups are unchanged) then: * Increment Grp_id * Notify all local members of a regular membership caused by leave */ G_private_to_names( private_group_name, private_name, proc_name ); p_ind = Conf_proc_by_name( proc_name, &p ); if( p_ind < 0 ) { Alarm( PRINT, "G_handle_leave: illegal proc_name %s in private_group %s \n", proc_name, private_group_name ); return; } grp = G_get_group( group_name ); if( grp == NULL ) { Alarm( PRINT, "G_handle_leave: group %s does not exist\n", group_name ); return; } mbr = G_get_member( grp, private_group_name ); if( mbr == NULL ) { Alarm( PRINT, "G_handle_leave: member %s does not exist in group %s\n", private_group_name, group_name ); return; } if( p.id == My.id ) { /* notify this local member and extract its mbox from group */ msg = Message_new_message(); head_ptr = Message_get_message_header(msg); head_ptr->type = CAUSED_BY_LEAVE; head_ptr->type = Set_endian( head_ptr->type ); head_ptr->hint = Set_endian( 0 ); memcpy( head_ptr->private_group_name, grp->name, MAX_GROUP_NAME ); head_ptr->num_groups = 0; head_ptr->data_len = 0; /* create the mess_link */ mess_link = new( MESSAGE_LINK ); /* NOTE: Mess_buf contents are NOT used here. We only examine "0" bytes of it * We just need a valid pointer here to prevent faults */ Message_Buffer_to_Message_Fragments( msg, Mess_buf, 0); mess_link->mess = msg; Obj_Inc_Refcount(mess_link->mess); /* notify member */ needed = 0; ses = Sess_get_session( private_name ); if( Is_memb_session( Sessions[ ses ].status ) ) Sess_write( ses, mess_link, &needed ); if( !needed ) Sess_dispose_message( mess_link ); /* extract this mbox */ for( i=0, j=0; i < grp->num_local; i++,j++ ) { if( grp->mbox[i] == Sessions[ses].mbox ) j--; else grp->mbox[j] = grp->mbox[i]; } grp->num_local--; Message_Dec_Refcount(msg); } /* extract this member from group */ memcpy( departing_private_group_name, mbr->private_name, MAX_GROUP_NAME ); sl_remove( &grp->MembersList, mbr->private_name, dispose ); grp->num_members--; if( grp->num_members == 0 ) { /* discard this empty group */ sl_destruct ( &grp->MembersList, dispose); sl_remove( &GroupsList, grp->name, dispose ); Num_groups--; GlobalStatus.num_groups = Num_groups; return; } if( grp->changed ) { if( Gstate != GTRANS ) Alarm( EXIT, "G_handle_leave: changed group in GOP\n"); /* * If the group is changed (in GTRANS) then there is no need * to increment group id or to notify the local members. * They will get a group membership after the state transfer * terminates. */ return; } /* Increment group id */ grp->grp_id.index++; if( grp->num_local > 0 ) { /* notify all local members */ msg = Message_new_message(); num_bytes = G_build_memb_buf( grp, msg, Mess_buf ); head_ptr = Message_get_message_header(msg); head_ptr->type |= CAUSED_BY_LEAVE ; /* notify all local members */ num_vs_ptr = &Mess_buf[ num_bytes ]; num_bytes += sizeof( int32 ); temp = 1; memcpy( num_vs_ptr, &temp, sizeof( int32 ) ); /* *num_vs_ptr = 1; */ vs_ptr = (char *)&Mess_buf[ num_bytes ]; memcpy( vs_ptr, departing_private_group_name, MAX_GROUP_NAME ); num_bytes += MAX_GROUP_NAME; head_ptr->data_len += ( sizeof(int32) + MAX_GROUP_NAME ); mess_link = new( MESSAGE_LINK ); Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes ); mess_link->mess = msg; Obj_Inc_Refcount(mess_link->mess); needed = 0; for( i=0; i < grp->num_local; i++ ) { ses = Sess_get_session_index ( grp->mbox[i] ); if( Is_memb_session( Sessions[ ses ].status ) ) Sess_write( ses, mess_link, &needed ); } if ( !needed ) Sess_dispose_message( mess_link ); Message_Dec_Refcount(msg); } /* Compute the mask */ for(i=0; i<4; i++) { grp->grp_mask[i] = 0; } { struct skiplistnode *iter; member *memp; for( iter = sl_getlist( &grp->MembersList ), memp=(member *)iter->data; iter != NULL; memp = (member *)sl_next( &grp->MembersList, &iter )) { p_ind1 = Conf_proc_by_id( memp->proc_id, &p1 ); temp = 1; for(i=0; igrp_mask[p1.seg_index/32] |= temp; } } Alarm(GROUPS, "G_handle_leave: Mask for group %s set to %x %x %x %x\n", grp->name, grp->grp_mask[3], grp->grp_mask[2], grp->grp_mask[1], grp->grp_mask[0]); break; case GGATHER: Alarm( EXIT, "G_handle_leave in GGATHER\n"); break; case GGT: Alarm( EXIT, "G_handle_leave in GGT\n"); break; } } void G_handle_kill( char *private_group_name ) { char proc_name[MAX_PROC_NAME]; char private_name[MAX_PRIVATE_NAME+1]; char departing_private_group_name[MAX_GROUP_NAME]; int p_ind, p_ind1; proc p, p1; group *grp, *nextgroup; member *mbr; char *num_vs_ptr; /* num members in vs set */ char *vs_ptr; /* the virtual synchrony set */ message_link *mess_link; message_header *head_ptr; message_obj *msg; int num_bytes; int needed; int ses = -1; /* Fool compiler */ int i, j; int32u temp; struct skiplistnode *giter; Alarm( GROUPS, "G_handle_kill: %s is killed\n", private_group_name ); switch( Gstate ) { case GOP: case GTRANS: if (Gstate == GOP) Alarm( GROUPS, "G_handle_kill in GOP\n"); if (Gstate == GTRANS) Alarm( GROUPS, "G_handle_kill in GTRANS\n"); /* * for every group this guy is a member of * Extract this member from group * if the group is unchanged (in GOP all groups are unchanged) then: * Increment Grp_id * Notify all local members of a regular membership caused by disconnet */ G_private_to_names( private_group_name, private_name, proc_name ); p_ind = Conf_proc_by_name( proc_name, &p ); if( p_ind < 0 ) { Alarm( PRINT, "G_handle_kill: illegal proc_name %s in private_group %s \n", proc_name, private_group_name ); return; } if( p.id == My.id ) ses = Sess_get_session( private_name ); giter = sl_getlist( &GroupsList ); grp = (giter)?(group *)giter->data:NULL; for( ; grp != NULL ; grp = nextgroup) { /* This is confusing... get the nextgroup so that if we choose to remove it it doesn't screw up the iterator. Then next time through use this "next" value */ nextgroup = sl_next( &GroupsList, &giter ); mbr = G_get_member( grp, private_group_name ); if( mbr == NULL ) continue; /* no such member in that group */ /* Extract this member from group */ if( p.id == My.id ) { /* extract the mbox if local member */ for( i=0, j=0; i < grp->num_local; i++, j++ ) { if( grp->mbox[i] == Sessions[ses].mbox ) j--; else grp->mbox[j] = grp->mbox[i]; } grp->num_local--; } memcpy( departing_private_group_name, mbr->private_name, MAX_GROUP_NAME ); sl_remove( &grp->MembersList, mbr->private_name, dispose ); grp->num_members--; if( grp->num_members == 0 ) { sl_destruct ( &grp->MembersList, dispose); sl_remove( &GroupsList, grp->name, dispose ); Num_groups--; GlobalStatus.num_groups = Num_groups; continue; } if( grp->changed ) { if( Gstate != GTRANS ) Alarm( EXIT, "G_handle_kill: changed group in GOP\n"); /* * If the group is changed (in GTRANS) then there is no need * to increment group id or to notify the local members. * They will get a group membership after the state transfer * terminates. */ continue; } /* Increment group id */ grp->grp_id.index++; /* Compute the mask */ for(i=0; i<4; i++) { grp->grp_mask[i] = 0; } { struct skiplistnode *iter; member *memp; for( iter = sl_getlist( &grp->MembersList ), memp=(member *)iter->data; iter != NULL; memp = (member *)sl_next( &grp->MembersList, &iter )) { p_ind1 = Conf_proc_by_id( memp->proc_id, &p1 ); temp = 1; for(i=0; igrp_mask[p1.seg_index/32] |= temp; } } Alarm(GROUPS, "G_handle_kill: Mask for group %s set to %x %x %x %x\n", grp->name, grp->grp_mask[3], grp->grp_mask[2], grp->grp_mask[1], grp->grp_mask[0]); if( grp->num_local > 0 ) { /* notify all local members */ msg = Message_new_message(); num_bytes = G_build_memb_buf( grp, msg, Mess_buf ); head_ptr = Message_get_message_header(msg); head_ptr->type |= CAUSED_BY_DISCONNECT ; num_vs_ptr = &Mess_buf[ num_bytes ]; num_bytes += sizeof( int32 ); temp = 1; memcpy( num_vs_ptr, &temp, sizeof( int32 ) ); /* *num_vs_ptr = 1; */ vs_ptr = (char *)&Mess_buf[ num_bytes ]; memcpy( vs_ptr, departing_private_group_name, MAX_GROUP_NAME ); num_bytes += MAX_GROUP_NAME; head_ptr->data_len += ( sizeof(int32) + MAX_GROUP_NAME ); mess_link = new( MESSAGE_LINK ); Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes ); mess_link->mess = msg; Obj_Inc_Refcount(mess_link->mess); needed = 0; for( i=0; i < grp->num_local; i++ ) { int temp_ses; temp_ses = Sess_get_session_index ( grp->mbox[i] ); if( Is_memb_session( Sessions[ temp_ses ].status ) ) Sess_write( temp_ses, mess_link, &needed ); } if ( !needed ) Sess_dispose_message( mess_link ); Message_Dec_Refcount(msg); } } break; case GGATHER: Alarm( EXIT, "G_handle_kill in GGATHER\n"); break; case GGT: Alarm( EXIT, "G_handle_kill in GGT\n"); break; } } void G_handle_groups( message_link *mess_link ) { char *memb_id_ptr; membership_id temp_memb_id; message_obj *msg; message_header *head_ptr; Alarm( GROUPS, "G_handle_groups: \n" ); switch( Gstate ) { case GOP: Alarm( EXIT, "G_handle_groups in GOP\n"); break; case GTRANS: Alarm( EXIT, "G_handle_groups in GTRANS\n"); break; case GGATHER: case GGT: if (Gstate == GGATHER) Alarm( GROUPS, "G_handle_groups in GGATHER\n"); if (Gstate == GGT) Alarm( GROUPS, "G_handle_groups in GGT\n"); msg = mess_link->mess; Obj_Inc_Refcount(msg); head_ptr = Message_get_message_header(msg); memb_id_ptr = Message_get_first_data_ptr(msg); memcpy( &temp_memb_id, memb_id_ptr, sizeof( membership_id ) ); if( !Same_endian( head_ptr->type ) ) { /* Flip membership id */ temp_memb_id.proc_id = Flip_int32( temp_memb_id.proc_id ); temp_memb_id.time = Flip_int32( temp_memb_id.time ); } if( ! Memb_is_equal( temp_memb_id, Reg_memb_id ) ) { Alarm( GROUPS, "G_handle_groups: GROUPS message received from bad memb id proc %d, time %d, daemon %s.\n", temp_memb_id.proc_id, temp_memb_id.time, head_ptr->private_group_name ); Sess_dispose_message( mess_link ); Message_Dec_Refcount(msg); return; } mess_link->next = Gathered.next; Gathered.next = mess_link; Num_mess_gathered++; /* The last Groups message a daemon sends is AGREED. */ if( Is_agreed_mess( head_ptr->type ) ) Num_daemons_gathered++; Alarm( GROUPS, "G_handle_groups: GROUPS message received from %s - msgs %d, daemons %d\n", head_ptr->private_group_name, Num_mess_gathered, Num_daemons_gathered ); if( Num_daemons_gathered != Conf_num_procs( &Reg_memb ) ) { Message_Dec_Refcount(msg); return; } Alarm( GROUPS, "G_handle_groups: Last GROUPS message received - msgs %d, daemons %d\n", Num_mess_gathered, Num_daemons_gathered ); /* Replace protocol queue */ Prot_set_down_queue( NORMAL_DOWNQUEUE ); /* lower events threshold */ Session_threshold = LOW_PRIORITY; Sess_set_active_threshold(); /* * Compute new groups membership and notify members of * groups that have changed */ G_compute_and_notify(); if( Gstate == GGATHER ) { Gstate = GOP; GlobalStatus.gstate = Gstate; }else{ Gstate = GOP; /* We do want to deliver a transitional signal to any * groups that are going to get a CAUSED_BY_NETWORK * after our Reg_memb is delivered. */ G_handle_trans_memb( Trans_memb, Trans_memb_id ); } Message_Dec_Refcount(msg); break; } } static void G_compute_and_notify() { group *grp, *new_grp, *orig_grp; member *mbr; int changed; int ret; int vs_bytes; char *num_vs_ptr; /* num members in virtual-synchrony/failure-atomicity set */ int32 num_vs; int num_exist; struct worklist *indices[MAX_PROCS_RING]; int num_bytes; message_link *mess_link; message_header *head_ptr; message_obj *msg; int needed; char proc_name[MAX_PROC_NAME]; char private_name[MAX_PRIVATE_NAME+1]; int ses; int i; Skiplist work; Alarm( GROUPS, "G_compute_and_notify:\n"); /* Compute groups structure in Work from gathered messages and clear messages */ sl_init(&work); sl_set_compare(&work, G_work_groups_comp, G_work_groups_keycomp); for( i=0; i < Num_mess_gathered; i++ ) { struct worklist *tp; tp = (struct worklist *)Mem_alloc(sizeof(struct worklist)); tp->groups=NULL; mess_link = Gathered.next; Gathered.next = mess_link->next; ret = G_mess_to_groups( mess_link, tp->name, tp ); if( ret < 0 ) Alarm( EXIT, "G_compute_and_notify: G_mess_to_groups errored %d\n", ret ); Sess_dispose_message( mess_link ); if ( !sl_insert(&work, tp) ) { Alarm(EXIT, "G_compute_and_notify: Failed to insert worklist (%s) into work\n", tp->name); } } /* * for every sorted group name: * Join the member lists to one list in Groups with a vs set. * If the group has changed (*) * Set new gid * notify all local members: non-new get vs set, new get self. * cancel new mark. * dispose of this group is all of Work. * * Note: group has changed unless all of this hold: * - everyone has the same gid * - gid is not changed (-1) */ for( num_exist = G_smallest_group_indices( &work, indices ) ; num_exist > 0 ; num_exist = G_smallest_group_indices( &work, indices ) ) { struct skiplistnode *top_iter; group *this_group; /* prepare vs set */ vs_bytes = 0; num_vs_ptr = &Temp_buf[0]; vs_bytes+= sizeof( int32 ); num_vs = 0; changed = 0; orig_grp = NULL; assert( NULL != (this_group = (group *)(sl_getlist(indices[0]->groups)->data)) ); orig_grp = sl_find( &GroupsList, this_group->name, &top_iter); if( orig_grp == NULL ) { new_grp = new( GROUP ); memset( new_grp->name, 0, MAX_GROUP_NAME ); strcpy( new_grp->name, this_group->name ); new_grp->grp_id = this_group->grp_id; new_grp->num_members = 0; sl_init( &new_grp->MembersList ); sl_set_compare( &new_grp->MembersList, G_member_recordcompare, G_member_keycompare); new_grp->num_local = 0; sl_insert( &GroupsList, new_grp ); Num_groups++; GlobalStatus.num_groups = Num_groups; orig_grp = new_grp; }else{ /* free members but keep local mbox */ sl_remove_all( &orig_grp->MembersList, dispose ); orig_grp->num_members = 0; } for( i=0 ; i < num_exist; i++ ) { group *currentgroup; currentgroup = (group *)sl_getlist(indices[i]->groups)->data; if( G_id_is_equal( orig_grp->grp_id, currentgroup->grp_id ) ) { struct skiplistnode *iter; Skiplist *currentmembers; currentmembers = ¤tgroup->MembersList; iter = sl_getlist(currentmembers); assert(iter != NULL); /* memberlist in Groups message should never be empty */ for( mbr = iter->data; mbr != NULL; mbr = sl_next(currentmembers, &iter)) { /* add this non-new member to vs */ memcpy( &Temp_buf[vs_bytes], mbr->private_name, MAX_GROUP_NAME ); vs_bytes += MAX_GROUP_NAME; num_vs++; } }else{ /* not the same grp_id */ changed = 1; } /* in any way, mbr points here to the last member */ /* chain these members */ sl_concat(&orig_grp->MembersList, ¤tgroup->MembersList); orig_grp->num_members = orig_grp->MembersList.size; /* free this Work group */ sl_destruct(¤tgroup->MembersList, dispose); sl_remove(indices[i]->groups, currentgroup, dispose); } memcpy( num_vs_ptr, &num_vs, sizeof( int32 ) ); /* *num_vs_ptr = current count; */ /* now our orig_grp is almost updated */ grp = orig_grp; if( grp->changed ) changed = 1; if( !changed ) continue; /* the group has changed */ Alarm( GROUPS, "G_compute_and_notify: completed group %s.\n", grp->name ); Alarm( DEBUG, "G_compute_and_notify: changing group id from: " ); G_print_group_id( grp->grp_id ); grp->grp_id.memb_id = Reg_memb_id; grp->grp_id.index = 1; grp->changed = 0; Alarm( DEBUG, " to: " ); G_print_group_id( grp->grp_id ); Alarm( DEBUG, "\n" ); if( grp->num_local > 0 ) { struct skiplistnode *iter; msg = Message_new_message(); num_bytes = G_build_memb_buf( grp, msg, Mess_buf ); head_ptr = Message_get_message_header(msg); head_ptr->type |= CAUSED_BY_NETWORK ; /* notify non-new local members */ memcpy( &Mess_buf[num_bytes], Temp_buf, vs_bytes ); head_ptr->data_len += vs_bytes; mess_link = new( MESSAGE_LINK ); Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes + vs_bytes); mess_link->mess = msg; Obj_Inc_Refcount(mess_link->mess); needed = 0; iter = sl_getlist(&grp->MembersList); for( mbr = iter->data; mbr != NULL; mbr = sl_next(&grp->MembersList, &iter)) { if( Is_new_member( mbr->status ) ) continue; if( mbr->proc_id != My.id ) continue; G_private_to_names( mbr->private_name, private_name, proc_name ); ses = Sess_get_session( private_name ); if( ses < 0 ) Alarm( EXIT, "G_compute_and_notify: no session for %s\n", private_name); if( Is_memb_session( Sessions[ ses ].status ) ) Sess_write( ses, mess_link, &needed ); } if( !needed ) Sess_dispose_message( mess_link ); Message_Dec_Refcount(msg); } } Num_mess_gathered = 0; Num_daemons_gathered = 0; /* We're going back to GOP... destroy our groups messages. */ G_empty_groups_bufs(); /* Finish freeing the memory in our worklists */ { struct worklist *worklist; struct skiplistnode *iter; iter = sl_getlist(&work); worklist = (iter)?iter->data:NULL; while( worklist != NULL ) { assert( worklist->groups->size == 0 ); dispose( worklist->groups ); worklist = sl_next(&work, &iter); } } sl_destruct( &work, dispose ); G_print(); } static int G_smallest_group_indices( Skiplist *work, struct worklist *indices[] ) { /* * this function searches the Work structure for the smallest * alphabetically ordered group name. It stores * all of the occurences of that group in the indices array, * and returns the number of occurences. */ int num_exist; int cmp; struct worklist *worklist; Skiplist *groups; struct skiplistnode *iter; iter = sl_getlist(work); worklist = (iter)?iter->data:NULL; num_exist = 0; if(!worklist) { return 0; } /* set indices[0] to first worklist with any groups */ do { if ( worklist->groups->size == 0 ) { worklist = sl_next(work, &iter); } else { indices[0] = worklist; num_exist = 1; break; } } while ( worklist != NULL ); if(!worklist) { /* All worklist groups are empty (no daemons have any alive groups) */ return 0; } worklist = sl_next( work, &iter ); /* Check rest of worklists for any with earlier groups or the same first group as indices[0] */ while ( worklist != NULL ) { group *first, *current; groups = worklist->groups; if( groups->size == 0 ) { worklist = sl_next(work, &iter); continue; } first = (group *)(sl_getlist(indices[0]->groups)->data); current = (group *)(sl_getlist(groups)->data); cmp = strcmp( first->name, current->name ); if( cmp == 0 ) { indices[num_exist] = worklist; num_exist++; }else if( cmp > 0 ){ indices[0] = worklist; num_exist = 1; } worklist = sl_next(work, &iter); } return( num_exist ); } static int G_id_is_equal( group_id g1, group_id g2 ) { if( g1.index == g2.index && Memb_is_equal( g1.memb_id, g2.memb_id ) ) return( 1 ); else return( 0 ); } static group *G_get_group( char *group_name ) { struct skiplistnode *iter; return sl_find( &GroupsList, group_name, &iter ); } static member *G_get_member( group *grp, char *private_group_name ) { struct skiplistnode *iter; return sl_find( &grp->MembersList, private_group_name, &iter ); } static message_link *G_build_trans_mess( group *grp ) { /* * This routine builds a ready-to-be-sent transitional message signal * to the members of the process group grp */ message_link *mess_link; scatter *scat; message_header *head_ptr; char *gid_ptr; mess_link = new( MESSAGE_LINK ); mess_link->mess = Message_create_message(TRANSITION_MESS, grp->name); scat = Message_get_data_scatter(mess_link->mess); scat->elements[0].len = Message_get_data_header_size() + sizeof( group_id ); head_ptr = Message_get_message_header(mess_link->mess); gid_ptr = Message_get_first_data_ptr(mess_link->mess ); head_ptr->data_len = sizeof( group_id ); memcpy( gid_ptr, &grp->grp_id, sizeof(group_id) ); return( mess_link ); } static int G_build_memb_buf( group *grp, message_obj *msg, char buf[]) { int num_bytes; message_header *head_ptr; char *gid_ptr; member *mbr; struct skiplistnode *iter; char *memb_ptr; head_ptr = Message_get_message_header(msg); head_ptr->type = REG_MEMB_MESS; head_ptr->type = Set_endian( head_ptr->type ); head_ptr->hint = Set_endian( 0 ); memcpy( head_ptr->private_group_name, grp->name, MAX_GROUP_NAME ); head_ptr->num_groups = grp->num_members; head_ptr->data_len = sizeof( group_id ); num_bytes = 0; iter = sl_getlist( &grp->MembersList ); mbr = (iter)?(member *)iter->data:NULL; for( ; mbr != NULL ; mbr = sl_next( &grp->MembersList, &iter ) ) { memb_ptr = (char *)&buf[num_bytes]; num_bytes += MAX_GROUP_NAME; memcpy( memb_ptr, mbr->private_name, MAX_GROUP_NAME ); } gid_ptr = &buf[num_bytes]; num_bytes += sizeof( group_id ); memcpy( gid_ptr, &grp->grp_id, sizeof(group_id) ); return( num_bytes ); } static int G_build_memb_vs_buf( group *grp, message_obj *msg, char buf[], int32 caused ) { /* * This routine builds the memb buffer message, including a virtual synchrony * (failure atomicity) part with a set which contains only the established members * in the group membership. * * Note that in leave and disconnect we provide the member that left or * got disconnected in the vs_set. Therefore, caused will always be CAUSED_BY_NETWORK. */ int num_bytes; message_header *head_ptr; char *num_vs_ptr; /* num members in virtual-synchrony/failure-atomicity set */ struct skiplistnode *iter; member *mbr; char *membs_ptr; int32 num_vs; num_bytes = G_build_memb_buf( grp, msg, buf); head_ptr = Message_get_message_header(msg); head_ptr->type = head_ptr->type | caused; num_vs_ptr = &buf[num_bytes]; num_bytes += sizeof( int32 ); head_ptr->data_len += sizeof( int32 ); num_vs = 0; iter = sl_getlist( &grp->MembersList ); mbr = (iter)?(member *)iter->data:NULL; for( ; mbr != NULL ; mbr = sl_next( &grp->MembersList, &iter ) ) { if( Is_established_member( mbr->status ) ) { membs_ptr = (char *)&buf[num_bytes]; memcpy( membs_ptr, mbr->private_name, MAX_GROUP_NAME ); num_vs++ ; num_bytes += MAX_GROUP_NAME; head_ptr->data_len += MAX_GROUP_NAME; } } memcpy( num_vs_ptr, &num_vs, sizeof( int32 ) ); /* *num_vs_ptr = total count; */ return( num_bytes ); } static void G_stamp_groups_buf( char buf[] ) { char *memb_id_ptr; memb_id_ptr = buf; memcpy( memb_id_ptr, &Reg_memb_id, sizeof( membership_id ) ); } /* This function used to be called G_refresh_groups_msg. */ static void G_build_groups_msg_hdr( message_obj *msg, int groups_bytes ) { message_header *head_ptr; head_ptr = Message_get_message_header(msg); head_ptr->type = GROUPS_MESS; head_ptr->type = Set_endian( head_ptr->type ); head_ptr->hint = Set_endian( 0 ); memset(head_ptr->private_group_name, 0, MAX_GROUP_NAME); strcpy( head_ptr->private_group_name, My.name ); head_ptr->num_groups = 0; head_ptr->data_len = groups_bytes; } /* This function guarantees that each group's data appears in only one buffer in * a sequence, and that the sorted order is preserved from the GroupsList. */ static int G_build_groups_buf(char buf[], struct skiplistnode **iter_ptr) { int num_bytes; char *memb_id_ptr; group *grp; char *gid_ptr; member *mbr; struct skiplistnode *giter, *iter; char *num_memb_ptr; int16 num_memb; char *memb_ptr; int size_for_this_group; num_bytes = 0; memb_id_ptr = &buf[num_bytes]; num_bytes += sizeof( membership_id ); memcpy( memb_id_ptr, &Reg_memb_id, sizeof( membership_id ) ); giter = (*iter_ptr) ? (*iter_ptr) : (sl_getlist( &GroupsList )); grp = (giter)?(group *)giter->data:NULL; for( ; grp != NULL ; grp = sl_next( &GroupsList, &giter ) ) { if( grp->num_local == 0 ) continue; size_for_this_group = MAX_GROUP_NAME + sizeof(group_id) + sizeof(int16) + (grp->num_local * MAX_GROUP_NAME); /* This requires that the number of local group members be limited. */ if( size_for_this_group > GROUPS_BUF_SIZE - num_bytes ) break; memcpy( &buf[num_bytes], grp->name, MAX_GROUP_NAME ); num_bytes += MAX_GROUP_NAME; gid_ptr = &buf[num_bytes]; num_bytes += sizeof( group_id ); memcpy( gid_ptr, &grp->grp_id, sizeof(group_id) ); num_memb_ptr = &buf[num_bytes]; num_bytes += sizeof( int16 ); num_memb = 0; iter = sl_getlist( &grp->MembersList ); mbr = (iter)?(member *)iter->data:NULL; for( ; mbr != NULL ; mbr = sl_next( &grp->MembersList, &iter ) ) { /* collect local members */ if( mbr->proc_id != My.id ) continue; memb_ptr = (char *)&buf[num_bytes]; num_bytes += MAX_GROUP_NAME; memcpy( memb_ptr, mbr->private_name, MAX_GROUP_NAME ); num_memb++; } memcpy(num_memb_ptr, &num_memb, sizeof( int16 ) ); if( num_memb != grp->num_local ) Alarm( EXIT, "G_build_groups_buf: group %s has %d %d members\n", grp->name, num_memb, grp->num_local ); } *iter_ptr = giter; return( num_bytes ); } static int G_mess_to_groups( message_link *mess_link, char *name, struct worklist *work ) { /* the function returns 0 for success or -1 if an error occured */ message_obj *msg; scatter *scat; message_header *head_ptr; proc p; int num_bytes, total_bytes; group *grp; char *gid_ptr; member *mbr; char *num_memb_ptr; int16 num_memb; int i; total_bytes = 0; msg = mess_link->mess; scat = Message_get_data_scatter(msg); for( i=0; i < scat->num_elements ; i++ ) { memcpy( &Temp_buf[total_bytes], scat->elements[i].buf, scat->elements[i].len ); total_bytes += scat->elements[i].len; } num_bytes = 0; head_ptr = Message_get_message_header(msg); num_bytes += Message_get_data_header_size(); if (0 > Conf_proc_by_name( head_ptr->private_group_name , &p ) ) { Alarm( PRINT, "G_mess_to_groups: Groups message from someone (%s) not in conf\n", head_ptr->private_group_name); return( -1 ); } work->groups = (Skiplist *)Mem_alloc(sizeof(Skiplist)); sl_init(work->groups); sl_set_compare(work->groups, G_compare, G_compare); memcpy( name, head_ptr->private_group_name, MAX_GROUP_NAME ); num_bytes += sizeof( membership_id ); for( ; num_bytes < total_bytes; ) { /* creating a new group */ grp = new( GROUP ); memcpy( grp->name, &Temp_buf[num_bytes], MAX_GROUP_NAME ); num_bytes += MAX_GROUP_NAME; sl_append( work->groups, grp ); sl_init( &grp->MembersList ); sl_set_compare( &grp->MembersList, G_member_recordcompare, G_member_keycompare); gid_ptr = &Temp_buf[num_bytes]; num_bytes += sizeof( group_id ); memcpy( &grp->grp_id, gid_ptr, sizeof(group_id) ); num_memb_ptr = &Temp_buf[num_bytes]; num_bytes += sizeof( int16 ); memcpy( &num_memb, num_memb_ptr, sizeof( int16 ) ); if( !Same_endian( head_ptr->type ) ) { /* Flip group id */ grp->grp_id.memb_id.proc_id = Flip_int32( grp->grp_id.memb_id.proc_id ); grp->grp_id.memb_id.time = Flip_int32( grp->grp_id.memb_id.time ); grp->grp_id.index = Flip_int32( grp->grp_id.index ); /* flip other parts of the message */ num_memb = Flip_int16( num_memb ); } /* creating members */ for( i=0; i < num_memb; i++ ) { mbr = new( MEMBER ); mbr->proc_id = p.id; mbr->status = ESTABLISHED_MEMBER; memcpy( mbr->private_name, &Temp_buf[num_bytes], MAX_GROUP_NAME ); num_bytes += MAX_GROUP_NAME; sl_append( &grp->MembersList, mbr ); } grp->num_members = num_memb; memcpy( num_memb_ptr, &num_memb, sizeof( int16 ) ); } return( 0 ); } int G_analize_groups( int num_groups, char target_groups[][MAX_GROUP_NAME], int target_sessions[] ) { static mailbox mbox[MAX_SESSIONS]; static mailbox current[MAX_SESSIONS]; static mailbox *current_ptr; int num_mbox; int num_mbox_pre; int num_current; group *grp; member *mbr; struct skiplistnode *iter; char proc_name[MAX_PROC_NAME]; char private_name[MAX_PRIVATE_NAME+1]; int found; int ses; int ret; int i, j, k; /* get the mbox */ num_mbox = 0; for( i=0; i < num_groups; i++ ) { if( target_groups[i][0] == '#' ) { /* private group */ ret = G_private_to_names( target_groups[i], private_name, proc_name ); /* Illegal group */ if( ret < 0 ) continue; /* this private group is not local */ if( strcmp( My.name, proc_name ) != 0 ) continue; ses = Sess_get_session( private_name ); /* we have no such session */ if( ses < 0 ) continue; current[0] = Sessions[ ses ].mbox; current_ptr = current; num_current = 1; }else{ /* regular group */ grp = G_get_group( target_groups[i] ); if( grp == NULL ) continue; if( Gstate == GOP ) { current_ptr = grp->mbox; num_current = grp->num_local; }else if( Gstate == GTRANS ){ current_ptr = current; num_current = 0; iter = sl_getlist( &grp->MembersList ); mbr = (iter)?(member *)iter->data:NULL; for( ; mbr != NULL ; mbr = sl_next( &grp->MembersList, &iter ) ) { if( mbr->proc_id == My.id && !Is_new_member( mbr->status ) ) { G_private_to_names( mbr->private_name, private_name, proc_name ); ses = Sess_get_session( private_name ); if( ses < 0 ) Alarm( EXIT, "G_analize_groups: ses is %d private_name is %s\n", ses, private_name ); current[ num_current ] = Sessions[ ses ].mbox; num_current++; } } }else { num_current = 0; /* fool compiler warnings */ Alarm( EXIT, "G_analize_groups: Gstate is %d\n", Gstate ); } } num_mbox_pre = num_mbox; for( j=0; j < num_current; j++ ) { found = 0; for( k=0; k < num_mbox_pre; k++ ) { if( mbox[k] == current_ptr[j] ) { found = 1; break; } } if( !found ) { mbox[num_mbox] = current_ptr[j]; num_mbox++; } } } /* convert mbox to sessions */ for( i=0; i < num_mbox; i++ ) target_sessions[i] = Sess_get_session_index ( mbox[ i ] ); return( num_mbox ); } void G_set_mask( int num_groups, char target_groups[][MAX_GROUP_NAME], int32u *grp_mask ) { group *grp; char proc_name[MAX_PROC_NAME]; char private_name[MAX_PRIVATE_NAME+1]; int ret; int i, j; proc p; int32u temp; for(i=0; i<4; i++) { grp_mask[i] = 0; } for( i=0; i < num_groups; i++ ) { if( target_groups[i][0] == '#' ) { /* private group */ ret = G_private_to_names( target_groups[i], private_name, proc_name ); /* Illegal group */ if( ret < 0 ) continue; Conf_proc_by_name( proc_name, &p ); temp = 1; for(j=0; jgrp_mask[j]; } p = Conf_my(); temp = 1; for(j=0; j MAX_PRIVATE_NAME) || ( priv_name_len < 1 ) || ( proc_name_len >= MAX_PROC_NAME ) || ( proc_name_len < 1 ) ) { Alarm( GROUPS, "G_private_to_names: Illegal private_group_name (priv, proc)\n"); return( ILLEGAL_GROUP ); } /* start strings at actual beginning */ pn++; pn[proc_name_len] = '\0'; prvn = &name[1]; legal_private_name = 1; for( i=0; i < priv_name_len; i++ ) if( prvn[i] <= '#' || prvn[i] > '~' ) { legal_private_name = 0; prvn[i] = '.'; } for( i=0; i < proc_name_len; i++ ) if( pn[i] <= '#' || pn[i] > '~' ) { legal_private_name = 0; pn[i] = '.'; } if( !legal_private_name ) { Alarm( GROUPS, "G_private_to_names: Illegal private_group_name characters (%s) (%s)\n", prvn, pn ); return( ILLEGAL_GROUP ); } /* copy name components including null termination */ memcpy( private_name, prvn, priv_name_len + 1 ); memcpy( proc_name, pn, proc_name_len + 1 ); return( 1 ); } static void G_print() { group *grp; member *mbr; struct skiplistnode *giter, *iter; int i, j; printf("++++++++++++++++++++++\n"); printf("Num of groups: %d\n", Num_groups ); giter = sl_getlist( &GroupsList ); grp = (giter)?(group *)giter->data:NULL; for( i=0; grp != NULL ; i++, grp = sl_next( &GroupsList, &giter ) ) { printf("[%d] group %s with %d members:\n", i+1, grp->name, grp->num_members ); iter = sl_getlist( &grp->MembersList ); mbr = (iter)?(member *)iter->data:NULL; for( j=0; mbr != NULL ; j++, mbr = sl_next( &grp->MembersList, &iter ) ) { printf("\t[%d] %s\n", j+1, mbr->private_name ); } printf("----------------------\n"); } } static void G_empty_groups_bufs() { groups_buf_link *next; for( ; Groups_bufs; Groups_bufs = next ) { next = Groups_bufs->next; dispose( Groups_bufs ); } return; } int G_get_num_local( char *group_name ) { group *grp = G_get_group( group_name ); if( grp == NULL ) return 0; return grp->num_local; } static void G_print_group_id( group_id g ) { Alarm( DEBUG, "{Proc ID: %d, Time: %d, Index: %d}", g.memb_id.proc_id, g.memb_id.time, g.index ); }