/*
 * The Spread Toolkit.
 *     
 * The contents of this file are subject to the Spread Open-Source
 * License, Version 1.0 (the ``License''); you may not use
 * this file except in compliance with the License.  You may obtain a
 * copy of the License at:
 *
 * http://www.spread.org/license/
 *
 * or in the file ``license.txt'' found in this distribution.
 *
 * Software distributed under the License is distributed on an AS IS basis, 
 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License 
 * for the specific language governing rights and limitations under the 
 * License.
 *
 * The Creators of Spread are:
 *  Yair Amir, Michal Miskin-Amir, Jonathan Stanton.
 *
 *  Copyright (C) 1993-2004 Spread Concepts LLC <spread@spreadconcepts.com>
 *
 *  All Rights Reserved.
 *
 * Major Contributor(s):
 * ---------------
 *    Cristina Nita-Rotaru crisn@cs.purdue.edu - group communication security.
 *    Theo Schlossnagle    jesus@omniti.com - Perl, skiplists, autoconf.
 *    Dan Schoenblum       dansch@cnds.jhu.edu - Java interface.
 *    John Schultz         jschultz@cnds.jhu.edu - contribution to process group membership.
 *
 */


#include "arch.h"
#include <string.h>
#include <stdio.h>
#include <assert.h>

#include "spread_params.h"
#include "net_types.h"
#include "protocol.h"
#include "prot_body.h"
#include "sess_types.h"
#include "sess_body.h"
#include "groups.h"
#include "objects.h"
#include "memory.h"
#include "sp_events.h"
#include "status.h"
#include "alarm.h"
#include "membership.h"
#if     ( SPREAD_PROTOCOL > 3 )
#include "queues.h"
#endif
#include "skiplist.h"
#include "alarm.h"
#include "message.h"

#ifndef NULL
#define		NULL 	0
#endif	/* NULL */

#define		ESTABLISHED_MEMBER		0
#define		NEW_MEMBER			1
#define		PARTITIONED_MEMBER		2
#define		Is_established_member( status )	( status == ESTABLISHED_MEMBER )
#define		Is_new_member( status )		( status == NEW_MEMBER )
#define		Is_partitioned_member( status )	( status == PARTITIONED_MEMBER )

typedef struct dummy_groups_buf_link {
  char buf[GROUPS_BUF_SIZE];
  int  bytes;
  struct dummy_groups_buf_link *next;
} groups_buf_link;
 
struct worklist {
  char name[MAX_GROUP_NAME];
  Skiplist *groups;
};

char *printgroup(void *vgrp) {
  group *grp = (group *)vgrp;
  return grp->name;
}

static	int		Gstate;
static	configuration	Trans_memb;
static	membership_id	Trans_memb_id;
static	configuration	Reg_memb;
static	membership_id	Reg_memb_id;
static	char		Mess_buf[MAX_MESSAGE_BODY_LEN];

static  groups_buf_link *Groups_bufs;
static  int             Num_mess_gathered;
static  int             Num_daemons_gathered;
static  message_link    Gathered; /* Groups messages */

#if 0
static	group		Work[MAX_PROCS_RING+1];
#endif
static  int             Groups_control_down_queue;

static	int		G_id_is_equal( group_id g1, group_id g2 );
static  void            G_print_group_id( group_id g );
static	group		*G_get_group( char *group_name );
static	member		*G_get_member( group *grp, char *private_group_name );
static	int		G_build_memb_buf( group *grp, message_obj *msg,
					  char buf[] );
static	int		G_build_memb_vs_buf( group *grp, message_obj *msg,
					     char buf[], int32 caused );
static	message_link	*G_build_trans_mess( group *grp );

static	void		G_stamp_groups_buf( char buf[] );
static  void            G_build_groups_msg_hdr( message_obj *msg, int groups_bytes );
static	int		G_build_groups_buf( char buf[], struct skiplistnode **iter_ptr );
static	int		G_mess_to_groups( message_link *mess_link, char *name,
					  struct worklist *work );
static	int		G_smallest_group_indices( Skiplist *work, struct worklist *indices[] );
static	void		G_compute_and_notify(void);
static	void		G_print(void);
static  void            G_empty_groups_bufs(void);

static  Skiplist        GroupsList;

static  int             G_compare(void *, void *);

static  int     G_compare(void *a, void *b)
{
  /* Takes two Group records and compares them based on their keys (name) */
  /* This will work for any record type that has a char[LENGTH] as the first
     member of the struct and is the key */
  assert(a);
  assert(b);
  return strcmp((char *)a, (char *)b);
}

int     G_member_recordcompare(void *a, void *b)
{
  int compared;
  /* Takes two Member records and compares them based on their keys (name) */
  member        *am = (member *)a, *bm = (member *)b;
  assert(a);
  assert(b);
  compared = strcmp( bm->private_name, am->private_name );
  if(compared > 0)
    return -1;
  if(compared == 0)
    return 0;
  return 1;
}
int     G_member_keycompare(void *a, void *b)
{
  int compared;
  /* Takes two Member records and compares them based on their keys (name) */
  member        *bm = (member *)b;
  char          *am = (char *)a;
  assert(a);
  assert(b);
  compared = strcmp( bm->private_name, am );
  if(compared > 0)
    return -1;
  if(compared == 0) {
    return 0;
  }
  return 1;
}
/* Take two worklist pointers and compare them by the ->groups pointers */
/* We do not actually care about the order they are stored in the skiplist
 * that is why we are using meaningless pointers as keys.
 */
static int     G_work_groups_comp(void *a, void *b)
{
        struct worklist *wA, *wB;
        assert(a);
        assert(b);
        wA = (struct worklist *)a;
        wB = (struct worklist *)b;
        
        if (wA->groups < wB->groups)
                return(-1);
        if (wA->groups == wB->groups)
                return(0);
        return(1);
}
static int     G_work_groups_keycomp(void *key, void *b)
{
        struct worklist *wB;
        Skiplist *wKey;
        assert(key);
        assert(b);
        wB = (struct worklist *)b;
        wKey = (Skiplist *)key;
        if (wKey < wB->groups)
                return(-1);
        if (wKey == wB->groups)
                return(0);
        return(1);
}
#if 0
int     G_group_revproc_comp(void *a, void *b) {
  int32 aid, bid;
  struct worklist *A=(struct worklist *)a, *B=(struct worklist *)b;
  aid=A->proc_index;
  bid=B->proc_index;
  return (aid>bid)?(-1):((aid==bid)?0:1);
}
int     G_group_revproc_keycomp(void *a, void *b) {
  int32 aid, bid;
  struct worklist *B=(struct worklist *)b;
  bid=B->proc_index;
  aid = *(int32 *)a;
  return (aid>bid)?-1:((aid==bid)?0:1);
}
#endif
void	G_init()
{
        int     ret;

	Alarm( GROUPS, "G_init: \n" );

	Num_groups = 0;
	GlobalStatus.num_groups = Num_groups;

	sl_init(&GroupsList);
	/* Key's address is the same as records address and is a null
	   terminated string, so we can use the same function to compare
	   record<->record
	   key<->record */
	sl_set_compare(&GroupsList,
		       G_compare,
		       G_compare);

	/*	Groups.next = NULL;
		Groups.prev = NULL; */

        ret = Mem_init_object(GROUP, sizeof(group), 100, 0);
        if (ret < 0)
        {
                Alarm(EXIT, "G_init: Failure to Initialize GROUP memory objects\n");
        }

        ret = Mem_init_object(MEMBER, sizeof(member), 200, 0);
        if (ret < 0)
        {
                Alarm(EXIT, "G_init: Failure to Initialize MEMBER memory objects\n");
        }
        ret = Mem_init_object(GROUPS_BUF_LINK, sizeof(groups_buf_link), 1, 1);
        if( ret < 0 )
        {
                Alarm(EXIT, "G_init: Failure to Initialize GROUPS_BUF_LINK memory objects\n");
        }
#if     ( SPREAD_PROTOCOL == 3 )
        Groups_control_down_queue = 0;
#else
        Groups_control_down_queue = init_queuesess(Groups_down_qs);
#endif

        Groups_bufs          = NULL;
        Num_mess_gathered    = 0;
        Num_daemons_gathered = 0;
	Gathered.next = NULL;

	Gstate = GOP;
	GlobalStatus.gstate = Gstate;
}

void	G_handle_reg_memb( configuration reg_memb, membership_id reg_memb_id )
{
	group		*grp, *nextgroup;
	member		*mbr, *nextmember;
	struct skiplistnode *giter, *iter, *passed_iter;
        groups_buf_link     *grps_buf_link;
	message_link	*mess_link;
	down_link	*down_ptr;
        message_obj     *msg;
        message_header  *head_ptr;
	int		num_bytes;
	int		needed;
	int		ses;
	int		i;


	Alarm( GROUPS, "G_handle_reg_memb:  with (%d.%d.%d.%d, %d) id\n", 
               IP1(reg_memb_id.proc_id),IP2(reg_memb_id.proc_id),IP3(reg_memb_id.proc_id),
               IP4(reg_memb_id.proc_id), reg_memb_id.time );
        
	switch( Gstate )
	{
	    case GOP:
		Alarm( EXIT, "G_handle_reg_memb in GOP\n");

		break;

	    case GTRANS:
		/* 
		 * Save reg_memb and reg_memb_id
		 * if previous Trans_memb is equal to reg_memb then:
		 * 	for every changed group
		 *		eliminate partitioned members
		 * 		set Grp_id to (reg_memb_id, 1)
		 *		notify local members of regular membership
		 *	Shift to GOP
		 * else
		 *	for every changed group
		 *		eliminate partitioned members
		 *		set Grp_id to (reg_memb_id, -1)
		 *	Replace protocol queue, raise event thershold
		 *	Build groups message -- only local members
		 *	Send groups message
		 *	Shift to GGATHER
		 */
		Alarm( GROUPS, "G_handle_reg_memb in GTRANS\n");

		Reg_memb = reg_memb;
		Reg_memb_id = reg_memb_id;

		if( Conf_num_procs( &Trans_memb ) == Conf_num_procs( &Reg_memb ) )
		{
		  giter = sl_getlist( &GroupsList );
		  grp = (giter)?(group *)giter->data:NULL;
		  for( ; grp != NULL ; grp = nextgroup )
		    {
                        nextgroup = sl_next( &GroupsList, &giter );
			if( grp->changed )
			{
			    /* The group has changed */
			    /* eliminating partitioned members */
                            iter = sl_getlist( &grp->MembersList );
                            mbr = (iter)?(member *)iter->data:NULL;
                            for( ; mbr != NULL ; mbr = nextmember )
                            {
                                nextmember = sl_next( &grp->MembersList, &iter );
                                if( Is_partitioned_member( mbr->status ) )
				{
					/* discard this member - proc no longer in membership */
                                        sl_remove ( &grp->MembersList,mbr->private_name, dispose);
                                        grp->num_members--;
				}
			    }
                            if( grp->num_members == 0 )
			    {
				/* discard this empty group */
                                sl_destruct ( &grp->MembersList, dispose);
                                sl_remove (  &GroupsList, grp->name, dispose);
				Num_groups--;
				GlobalStatus.num_groups = Num_groups;
			    }else{
                                Alarm( GROUPS, "G_handle_reg_memb: skipping state transfer for group %s.\n", grp->name );
                                Alarm( DEBUG,  "G_handle_reg_memb: changing group_id from: " );
                                G_print_group_id( grp->grp_id );
				grp->grp_id.memb_id = Reg_memb_id;
				grp->grp_id.index   = 1;
                                grp->changed        = 0;
                                Alarm( DEBUG, " to: " );
                                G_print_group_id( grp->grp_id );
                                Alarm( DEBUG, "\n" );

				if( grp->num_local > 0 ){
                                        /* send members regular membership */
                                        msg = Message_new_message();
                                        num_bytes = G_build_memb_vs_buf( grp, msg, Mess_buf, CAUSED_BY_NETWORK );
                                        
                                        /* create the mess_link */
                                        mess_link = new( MESSAGE_LINK );
                                        Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes );
                                        mess_link->mess = msg;
                                        Obj_Inc_Refcount(mess_link->mess);
                                        
                                        /* notify local members */
                                        needed = 0;
                                        for( i=0; i < grp->num_local; i++ )
                                        {
                                                ses = Sess_get_session_index ( grp->mbox[i] );
                                                if( Is_memb_session( Sessions[ ses ].status ) )
                                                        Sess_write( ses, mess_link, &needed );
                                        }
                                        Message_Dec_Refcount(msg);
                                        if( !needed ) Sess_dispose_message( mess_link );
                                }
			    }
			}
		    }
		    Gstate = GOP;
		    GlobalStatus.gstate = Gstate;
		}else{
		/*
		 * else
		 *	for every changed group
		 *		eliminate partitioned members
		 *		set Grp_id to (reg_memb_id, -1)
		 *	Replace protocol queue, raise event thershold
		 *	build groups message -- only local members
		 *	Send groups message
		 *	Shift to GGATHER
		 */
		    giter = sl_getlist( &GroupsList );
		    grp = (giter)?(group *)giter->data:NULL;
		    for( ; grp != NULL ; grp = nextgroup )
		    {
                        nextgroup = sl_next( &GroupsList, &giter );
			if( grp->changed )
			{
			    /* The group has changed */
			    /* eliminating partitioned members */
                            iter = sl_getlist( &grp->MembersList );
                            mbr = (iter)?(member *)iter->data:NULL;
                            for( ; mbr != NULL ; mbr = nextmember )
                            {
                                nextmember = sl_next( &grp->MembersList, &iter );
				if( Is_partitioned_member( mbr->status ) )
				{
					/* discard this member - proc no longer in membership */
                                        sl_remove ( &grp->MembersList,mbr->private_name, dispose);
					grp->num_members--;
				}
			    }
			    if( grp->num_members == 0 )
			    {
				/* discard this empty group */
                                sl_destruct ( &grp->MembersList, dispose);
                                sl_remove (  &GroupsList, grp->name, dispose);
				Num_groups--;
				GlobalStatus.num_groups = Num_groups;
                            }
			}
		    }
		    /* raise events threshold */
		    Session_threshold = MEDIUM_PRIORITY;
		    Sess_set_active_threshold();

                    /* Replace down queue */
                    Prot_set_down_queue( GROUPS_DOWNQUEUE );

		    /* build and send Groups message */
                    /* Nowadays, we can send multiple groups messages.  No group has
                     * data in more than one.  As an optimization, only the last message is
                     * AGREED, and all previous ones are RELIABLE.  G_handle_groups uses this
                     * knowledge when parsing Groups messages. */
                    passed_iter = NULL; 
                    do { 
                            msg = Message_new_message();
                            grps_buf_link = new( GROUPS_BUF_LINK );
                            grps_buf_link->next = Groups_bufs;
                            Groups_bufs = grps_buf_link;
                            grps_buf_link->bytes = G_build_groups_buf(grps_buf_link->buf, &passed_iter);
                            G_build_groups_msg_hdr( msg, grps_buf_link->bytes );
                            head_ptr = Message_get_message_header(msg);
                            if( passed_iter )
                                    head_ptr->type |= RELIABLE_MESS;
                            else
                                    head_ptr->type |= AGREED_MESS;
                            Message_Buffer_to_Message_Fragments( msg, grps_buf_link->buf, grps_buf_link->bytes );

                            down_ptr = Prot_Create_Down_Link(msg, Message_get_packet_type(head_ptr->type), 0, 0);
                            down_ptr->mess = msg; 
                            Obj_Inc_Refcount(down_ptr->mess);
                            /* Use control queue--not normal session queues */
                            Prot_new_message( down_ptr, Groups_control_down_queue );
                            Message_Dec_Refcount(msg);
                            Alarm( GROUPS, "G_handle_reg_memb: (%8s) GROUPS message sent in GTRANS with %d bytes\n",
                                   (passed_iter) ? "RELIABLE" : "AGREED", grps_buf_link->bytes );
                    } while( passed_iter != NULL );

		    Gstate = GGATHER;
		    GlobalStatus.gstate = Gstate;
		}
		break;

	    case GGATHER:
		Alarm( EXIT, "G_handle_reg_memb in GGATHER\n");

		break;

	    case GGT:
		/*
		 * Save reg_memb and reg_memb_id
		 * Clear all retained Groups messages
		 * Stamp own Groups message with current membership id
		 * Send group message
		 * Shift to GGATHER
		 */
		Alarm( GROUPS, "G_handle_reg_memb in GGT\n");

		Reg_memb = reg_memb;
		Reg_memb_id = reg_memb_id;

		/* Clear retained Groups messages in Gathered */
		for( i=0; i < Num_mess_gathered; i++ )
		{
			mess_link = Gathered.next;
			Gathered.next = mess_link->next;
			Sess_dispose_message( mess_link );
		}
                Num_mess_gathered    = 0;
                Num_daemons_gathered = 0;

                for( grps_buf_link = Groups_bufs; grps_buf_link; grps_buf_link = grps_buf_link->next )
                {
                        /*  Stamp own Groups message in buffer with current membership id */
                        G_stamp_groups_buf( grps_buf_link->buf );

                        /* send Groups message */
                        msg = Message_new_message();
                        G_build_groups_msg_hdr( msg, grps_buf_link->bytes );
                        head_ptr = Message_get_message_header(msg);
                        if( grps_buf_link->next )
                                head_ptr->type |= RELIABLE_MESS;
                        else
                                head_ptr->type |= AGREED_MESS;
                        Message_Buffer_to_Message_Fragments( msg, grps_buf_link->buf, grps_buf_link->bytes );
               
                        down_ptr = Prot_Create_Down_Link(msg, Message_get_packet_type(head_ptr->type), 0, 0);
                        down_ptr->mess = msg;
                        Obj_Inc_Refcount(down_ptr->mess);
                        /* Use control queue--not normal session queues */
                        Prot_new_message( down_ptr, Groups_control_down_queue );
                        Message_Dec_Refcount(msg);
                        Alarm( GROUPS, "G_handle_reg_memb: (%8s) GROUPS message sent in GGT with %d bytes\n",
                               (grps_buf_link->next) ? "RELIABLE" : "AGREED", grps_buf_link->bytes );
                }
        
		Gstate = GGATHER;
		GlobalStatus.gstate = Gstate;

		break;
	}
}

void	G_handle_trans_memb( configuration trans_memb, membership_id trans_memb_id )
{
	group		*grp, *nextgroup;
	member		*mbr, *nextmember;
	struct skiplistnode *giter, *iter;
	int		group_changed;
	message_link	*mess_link;
	int		needed;
	int		ses;
	int		i;

	Alarm( GROUPS, "G_handle_trans_memb: \n" );

	switch( Gstate )
	{
	    case GOP:
		/* 
		 * Save transitional membership
		 * For every group that has members that are not in the trans_memb do:
		 *	mark group members that are not in trans_memb as partitioned.
		 * 	notify local members with an empty transitional group mess.
		 * 	mark group as changed (index = -1)
		 * Shift to GTRANS
		 */
		Alarm( GROUPS, "G_handle_trans_memb in GOP\n");

		Trans_memb    = trans_memb;
                Trans_memb_id = trans_memb_id;
                Alarm( GROUPS, "G_handle_trans_memb: Received trans memb id of:"
                       " {proc_id: %d "
                       " time:    %d}\n", Trans_memb_id.proc_id, Trans_memb_id.time );

		giter = sl_getlist( &GroupsList );
		grp = (giter)?(group *)giter->data:NULL;
		for( ; grp != NULL ; grp = nextgroup )
		  {
                      nextgroup = sl_next( &GroupsList, &giter );
		      group_changed = 0;
		      iter = sl_getlist( &grp->MembersList );
		      mbr = (iter)?(member *)iter->data:NULL;
		        for( ; mbr != NULL ; mbr = nextmember )
		        {
                                nextmember = sl_next( &grp->MembersList, &iter );
				if( Conf_id_in_conf( &Trans_memb, mbr->proc_id ) == -1 )
				{
					/* mark this member as partitioned - proc no longer in membership */
					mbr->status = PARTITIONED_MEMBER;
					group_changed = 1;
				}
                        }
			if( group_changed ) 
                        {
				if( grp->num_local > 0 )
				{
					/* send members transitional membership */
                                        mess_link = G_build_trans_mess( grp );
					needed = 0;
					for( i=0; i < grp->num_local; i++ )
					{
						ses = Sess_get_session_index ( grp->mbox[i] );
						if( Is_memb_session( Sessions[ ses ].status ) )
							Sess_write( ses, mess_link, &needed );
					}
					if( !needed ) Sess_dispose_message( mess_link );
				}
                                Alarm( DEBUG, "G_handle_trans_memb: changed group %s in GOP, change"
                                       " group id from: ", grp->name );
                                G_print_group_id( grp->grp_id );
                                grp->grp_id.memb_id = trans_memb_id;
                                grp->grp_id.index   = 1;  /* Not technically needed, but not bad, either. */
                                grp->changed        = 1;
                                Alarm( DEBUG, " to: " );
                                G_print_group_id( grp->grp_id );
                                Alarm( DEBUG, "\n" );
			}
		}

		Gstate = GTRANS;
		GlobalStatus.gstate = Gstate;

		break;

	    case GTRANS:
		Alarm( EXIT, "G_handle_trans_memb in GTRANS\n");

		break;

	    case GGATHER:
		/* 
		 * Save transitional membership
		 * For every group that has members that are not in the
		 *  trans_memb do:
		 *	discard group members that are not in trans_memb
		 * 	if group is changed then mark it as changed (index = -1) (it might be already changed, but its ok).
		 * Shift to GGT
		 *
		 *	Note: there is no need to notify local members with a transitional group mess
		 *	      becuase no message will come between the trans group memb and the next reg group memb.
		 *	Note: this cascading deletes of members that are not in transitional membership actually
		 *	      opens the door for implementation of the ERSADS97 algorithm.
		 */
		Alarm( GROUPS, "G_handle_trans_memb in GGATHER\n");

		Trans_memb    = trans_memb;
                Trans_memb_id = trans_memb_id; /* Need this because we deliver the transitional again if we complete
                                                * the state exchange during GGT. */
                Alarm( GROUPS, "G_handle_trans_memb: Received trans memb id of:"
                       " {proc_id: %d "
                       " time:    %d}\n", Trans_memb_id.proc_id, Trans_memb_id.time );

		giter = sl_getlist( &GroupsList );
		grp = (giter)?(group *)giter->data:NULL;
		for( ; grp != NULL ; grp = nextgroup )
                {
                        nextgroup = sl_next( &GroupsList, &giter );
			group_changed = 0;
			iter = sl_getlist( &grp->MembersList );
			mbr = (iter)?(member *)iter->data:NULL;
		        for( ; mbr != NULL ; mbr = nextmember )
			{
                                nextmember = sl_next( &grp->MembersList, &iter );
				if( Conf_id_in_conf( &Trans_memb, mbr->proc_id ) == -1 )
				{
				/* discard this member - proc no longer in membership */
                                    sl_remove ( &grp->MembersList,mbr->private_name, dispose);
				    grp->num_members--;
				  
                                    group_changed = 1;
				}
			}
			if( grp->num_members == 0 )
			{
				/* discard this empty group */
                                sl_destruct ( &grp->MembersList, dispose);
				sl_remove ( &GroupsList, grp->name, dispose);
				Num_groups--;
				GlobalStatus.num_groups = Num_groups;

			}else if( group_changed ) {
                                grp->changed = 1;
			}
		}

		Gstate = GGT;
		GlobalStatus.gstate = Gstate;

		break;

	    case GGT:
		Alarm( EXIT, "G_handle_trans_memb in GGT\n");

		break;
	}
}

void	G_handle_join( char *private_group_name, char *group_name )
{
	group		*grp, *new_grp;
	member		*mbr, *new_mbr;
	int		needed;
	char		*num_vs_ptr; /* num members in virtual-synchrony/failure-atomicity set */
	int		num_bytes;
	char		proc_name[MAX_PROC_NAME];
	char		private_name[MAX_PRIVATE_NAME+1];
	int		new_p_ind, p_ind1;
	proc		new_p, p1;
	int		ses;
	mailbox		new_mbox;
	message_link	*mess_link;
	message_header	*head_ptr;
        message_obj     *msg, *joiner_msg;
	char		*vs_ptr;     /* the virtual synchrony set */
	int		i;
	int32u          temp;

	Alarm( GROUPS, "G_handle_join: %s joins group %s\n", private_group_name, group_name );

	switch( Gstate )
	{
	    case GOP:
	    case GTRANS:

		if (Gstate == GOP) Alarm( GROUPS, "G_handle_join in GOP\n");
		if (Gstate == GTRANS) Alarm( GROUPS, "G_handle_join in GTRANS\n");

		/*
		 * if already in group then ignore
		 * if the group is unchanged and new member is coming from alive daemon then: 
		 *    Insert to group as established
		 *    Increment Grp_id
		 *    Notify all local members of a regular membership caused by join
		 * else if group is changed and coming from alive daemon then 
		 *    Insert to group as new
		 *    Increment Grp_id
		 *    if there are local members then
		 *	build a membership with all members, and vs set with all established members
		 *	notify all local established members with that membership (caused by network)
                 *	if new member is local
		 *	    notify new member with membership and self vs set (caused by network)
		 *      notify all local members a transitional membership
		 *    mark new member as established
		 * else (if new member is coming from a partitioned daemon then)
		 *    Insert to group as partitioned 
                 *    Increment Grp_id, and mark group as changed if not already done
		 *    if there are local members then
		 *      build a membership with all members and vs set with all established members
		 *      notify all local members with that membership (caused by network)
		 *      notify all local members with transitional membership
		 *
		 * Note: remember that when delivering a regular message while in GTRANS, you should use the
		 * mbox list of the group. You should still be cautious when delivering memberships to take
		 * care of the fact that the new guy gets a different treatment.
		 */
		G_private_to_names( private_group_name, private_name, proc_name );

		new_p_ind = Conf_proc_by_name( proc_name, &new_p );
		if( new_p_ind < 0 )
		{
			Alarm( PRINT, "G_handle_join: illegal proc_name %s in private_group %s \n",
				proc_name, private_group_name );
			return;
		}
		grp = G_get_group( group_name );
		if( grp == NULL )
		{
			new_grp = new( GROUP );
			memset( new_grp->name, 0, MAX_GROUP_NAME );
			strcpy( new_grp->name, group_name );
			sl_init( &new_grp->MembersList );
			sl_set_compare( &new_grp->MembersList,
					G_member_recordcompare,
					G_member_keycompare);
                        if( Gstate == GOP) {
                                new_grp->changed        = 0;
                                new_grp->grp_id.memb_id = Reg_memb_id;
                        } else { /* Gtrans */
                                new_grp->changed        = 1;
                                new_grp->grp_id.memb_id = Trans_memb_id;
                        }
                        new_grp->grp_id.index = 0; /* 0 because we will definitely increment it */
                        Alarm( DEBUG, "G_handle_join: New group added with group id: " );
                        G_print_group_id( new_grp->grp_id );
                        Alarm( DEBUG, "\n" );

			new_grp->num_members = 0;
			new_grp->num_local = 0;

			sl_insert( &GroupsList, new_grp );
			Num_groups++; /*sl need this?*/
			GlobalStatus.num_groups = Num_groups;
			grp = new_grp;
                }
		mbr = G_get_member( grp, private_group_name );
		if( mbr != NULL )
		{
			Alarm( PRINT, "G_handle_join: %s is already in group %s\n",
				private_group_name, group_name );
			return;
		}
		/* Add a new member as ESTABLISHED (might change later on depending on the situation */
		new_mbr = new( MEMBER );
		memset( new_mbr->private_name, 0, MAX_GROUP_NAME );
		strcpy( new_mbr->private_name, private_group_name );
		new_mbr->proc_id = new_p.id;
		new_mbr->status = ESTABLISHED_MEMBER;
		new_mbr->p_ind = new_p_ind;

		sl_insert( &grp->MembersList, new_mbr );
		grp->num_members++;

		/* if member is local then add mbox */
		if( new_mbr->proc_id == My.id )
		{
			ses = Sess_get_session( private_name );
			if( ses < 0 ) Alarm( EXIT, "G_handle_join: local session does not exist\n");
			grp->mbox[ grp->num_local ] = Sessions[ ses ].mbox;
			grp->num_local++;
			new_mbox = Sessions[ ses ].mbox;
		}else new_mbox = -1;

                /* This is the meat */
		if( Gstate == GOP || ( Conf_id_in_conf( &Trans_memb, new_p.id ) != -1 ) )
		{
			/* new member is coming from alive daemon */
			if( !grp->changed )
			{
				/* group is unchanged */
				/* Increment group id */
				grp->grp_id.index++;

				/* Notify local members */
				if( grp->num_local > 0 )
				{
                                        msg = Message_new_message();
					num_bytes = G_build_memb_buf( grp, msg, Mess_buf );
					head_ptr = Message_get_message_header(msg);

					head_ptr->type |= CAUSED_BY_JOIN ;

					/* notify all local members */
					num_vs_ptr = &Mess_buf[ num_bytes ];
					num_bytes += sizeof( int32 );
                                        temp = 1;
                                        memcpy( num_vs_ptr, &temp, sizeof( int32 ) ); /* *num_vs_ptr = 1; */

					vs_ptr = (char *)&Mess_buf[ num_bytes ];
					memcpy( vs_ptr, new_mbr->private_name, MAX_GROUP_NAME );
					num_bytes += MAX_GROUP_NAME;

					head_ptr->data_len += ( sizeof(int32) + MAX_GROUP_NAME );

					mess_link = new( MESSAGE_LINK );
                                        Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes );
                                        mess_link->mess = msg;
                                        Obj_Inc_Refcount(mess_link->mess);
					
					needed = 0;
					for( i=0; i < grp->num_local; i++ )
					{
						ses = Sess_get_session_index ( grp->mbox[i] );
						if( Is_memb_session( Sessions[ ses ].status ) )
							Sess_write( ses, mess_link, &needed );
					}
                                        if ( !needed ) Sess_dispose_message( mess_link );
                                        Message_Dec_Refcount(msg);
				}
			}else{
				/* group is changed */
				/* mark new member as new */
				new_mbr->status = NEW_MEMBER;

				/* Increment group id */
				grp->grp_id.index++;

				if( grp->num_local > 0 )
				{
					/* build a membership with all members, and vs set with all established members */
                                        msg = Message_new_message();
					num_bytes = G_build_memb_vs_buf( grp, msg, Mess_buf, CAUSED_BY_NETWORK );

					/* notify all non-new local members with that membership (caused by network) */
                                        mess_link = new( MESSAGE_LINK );
                                        Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes );
                                        mess_link->mess = msg;
                                        Obj_Inc_Refcount(mess_link->mess);

					needed = 0;
					for( i=0; i < grp->num_local; i++ )
					{
						/* if new member is local we do not notify it here */
						if( grp->mbox[i] == new_mbox ) continue;

						ses = Sess_get_session_index ( grp->mbox[i] );
						if( Is_memb_session( Sessions[ ses ].status ) )
							Sess_write( ses, mess_link, &needed );
					}
					if ( !needed ) Sess_dispose_message( mess_link );
                                        Message_Dec_Refcount(msg);

					/* notify new member if local */
					if( new_mbox != -1 )
					{
						/* build a membership with all members */
                                                joiner_msg = Message_new_message();
						num_bytes = G_build_memb_buf( grp, joiner_msg, Mess_buf );
						head_ptr = Message_get_message_header(joiner_msg);
						head_ptr->type |= CAUSED_BY_NETWORK ;
						/* build a self vs set */
						num_vs_ptr = &Mess_buf[ num_bytes ];
						num_bytes += sizeof( int32 );
                                                temp = 1;
                                                memcpy( num_vs_ptr, &temp, sizeof( int32 ) ); /* *num_vs_ptr = 1; */
						vs_ptr = (char *)&Mess_buf[ num_bytes ];
						memcpy( vs_ptr, new_mbr->private_name, MAX_GROUP_NAME );
						num_bytes += MAX_GROUP_NAME;
						head_ptr->data_len += ( sizeof(int32) + MAX_GROUP_NAME );
						mess_link = new( MESSAGE_LINK );
                                                Message_Buffer_to_Message_Fragments( joiner_msg, Mess_buf, num_bytes );
                                                mess_link->mess = joiner_msg;
                                                Obj_Inc_Refcount(mess_link->mess);

						needed = 0;
						ses = Sess_get_session_index ( new_mbox );
						if( Is_memb_session( Sessions[ ses ].status ) )
							Sess_write( ses, mess_link, &needed );
						if ( !needed ) Sess_dispose_message( mess_link );
                                                Message_Dec_Refcount(joiner_msg);
					}
		 			/* notify all local members a transitional membership */
					mess_link = G_build_trans_mess( grp );
					needed = 0;
					for( i=0; i < grp->num_local; i++ )
					{
						ses = Sess_get_session_index ( grp->mbox[i] );
						if( Is_memb_session( Sessions[ ses ].status ) )
							Sess_write( ses, mess_link, &needed );
					}
					if( !needed ) Sess_dispose_message( mess_link );
				}
				/* Mark new member as established */
				new_mbr->status = ESTABLISHED_MEMBER;
			}
		}else{
			/* coming from a partitioned daemon */
			/* mark new member as partitioned member */
			new_mbr->status = PARTITIONED_MEMBER;
			/* 
			 * (marking group as changed  - it might be already )
			 */
			if( !grp->changed ) grp->changed = 1;
			grp->grp_id.index++; 
			if( grp->num_local > 0 )
                        {
				/* build a membership with all members, and vs set with all non-partitioned members */
                                msg = Message_new_message();
				num_bytes = G_build_memb_vs_buf( grp, msg, Mess_buf, CAUSED_BY_NETWORK );

				/* notify all local members with that membership (caused by network) */
				mess_link = new( MESSAGE_LINK );
                                Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes );
                                mess_link->mess = msg;
                                Obj_Inc_Refcount(mess_link->mess);

				needed = 0;
				for( i=0; i < grp->num_local; i++ )
				{
					ses = Sess_get_session_index ( grp->mbox[i] );
					if( Is_memb_session( Sessions[ ses ].status ) )
						Sess_write( ses, mess_link, &needed );
				}
				if ( !needed ) Sess_dispose_message( mess_link );
                                Message_Dec_Refcount(msg);

		 		/* notify all local members a transitional membership */
				mess_link = G_build_trans_mess( grp );

				needed = 0;
				for( i=0; i < grp->num_local; i++ )
				{
					ses = Sess_get_session_index ( grp->mbox[i] );
					if( Is_memb_session( Sessions[ ses ].status ) )
						Sess_write( ses, mess_link, &needed );
				}
				if( !needed ) Sess_dispose_message( mess_link );
                        }
                }
		/* Compute the mask */
		for(i=0; i<4; i++)
		{
		    grp->grp_mask[i] = 0;
		}
		{
		  struct skiplistnode *iter;
		  member *memp;
		  /*
		    for( mbr= &grp->members; mbr->next != NULL; mbr=mbr->next )
		    {
		    p_ind1 = Conf_proc_by_id( mbr->next->proc_id, &p1 ); */
		  for( iter = sl_getlist( &grp->MembersList ),
			 memp=(member *)iter->data;
		       iter != NULL;
		       memp = (member *)sl_next( &grp->MembersList, &iter )) {
		    p_ind1 = Conf_proc_by_id( memp->proc_id, &p1 );
		    temp = 1;
		    for(i=0; i<p1.seg_index%32; i++)
		      {
			temp *= 2;
		      }
		    grp->grp_mask[p1.seg_index/32] |= temp;
		  }
		}
		Alarm(GROUPS, "G_handle_join: Mask for group %s set to %x %x %x %x\n", 
		      grp->name, grp->grp_mask[3], grp->grp_mask[2], grp->grp_mask[1], grp->grp_mask[0]);

		break;

	    case GGATHER:
		Alarm( EXIT, "G_handle_join in GGATHER\n");

		break;

	    case GGT:
		Alarm( EXIT, "G_handle_join in GGT\n");

		break;
	}
}

void	G_handle_leave( char *private_group_name, char *group_name )
{

	char		proc_name[MAX_PROC_NAME];
	char		private_name[MAX_PRIVATE_NAME+1];
	char		departing_private_group_name[MAX_GROUP_NAME];
	int		p_ind, p_ind1;
	proc		p, p1;
	group		*grp;
	member		*mbr;
	char		*num_vs_ptr; /* num members in vs set */
	char		*vs_ptr;     /* the virtual synchrony set */
	message_link	*mess_link;
	message_header	*head_ptr;
        message_obj     *msg;
	int		num_bytes;
	int		needed;
	int		ses;
	int		i, j;
	int32u          temp;

	Alarm( GROUPS, "G_handle_leave: %s leaves group %s\n", private_group_name, group_name );

	switch( Gstate )
	{
	    case GOP:
	    case GTRANS:

		if (Gstate == GOP) Alarm( GROUPS, "G_handle_leave in GOP\n");
		if (Gstate == GTRANS) Alarm( GROUPS, "G_handle_leave in GTRANS\n");

		/*
		 * if not already in group then ignore
		 * if this member is local, notify it and extract its mbox
		 * Extract this member from group
		 * if the group is unchanged (in GOP all groups are unchanged) then: 
		 *    Increment Grp_id
		 *    Notify all local members of a regular membership caused by leave
		 */
		G_private_to_names( private_group_name, private_name, proc_name );
		p_ind = Conf_proc_by_name( proc_name, &p );
		if( p_ind < 0 )
		{
			Alarm( PRINT, "G_handle_leave: illegal proc_name %s in private_group %s \n",
				proc_name, private_group_name );
			return;
		}
		grp = G_get_group( group_name );
		if( grp == NULL ) 
		{
			Alarm( PRINT, "G_handle_leave: group %s does not exist\n",
				group_name );
			return;
		}
		mbr = G_get_member( grp, private_group_name );
		if( mbr == NULL )
		{
			Alarm( PRINT, "G_handle_leave: member %s does not exist in group %s\n",
				private_group_name, group_name );
			return;
		}

		if( p.id == My.id )
		{
			/* notify this local member and extract its mbox from group */
                        msg = Message_new_message();
			head_ptr = Message_get_message_header(msg);
			head_ptr->type = CAUSED_BY_LEAVE;
			head_ptr->type = Set_endian( head_ptr->type );
			head_ptr->hint = Set_endian( 0 );
			memcpy( head_ptr->private_group_name, grp->name, MAX_GROUP_NAME );
			head_ptr->num_groups = 0;
			head_ptr->data_len = 0;

			/* create the mess_link */
			mess_link = new( MESSAGE_LINK );
                        /* NOTE: Mess_buf contents are NOT used here. We only examine "0" bytes of it
                         * We just need a valid pointer here to prevent faults */
                        Message_Buffer_to_Message_Fragments( msg, Mess_buf, 0);
			mess_link->mess = msg;
                        Obj_Inc_Refcount(mess_link->mess);
			/* notify member */
			needed = 0;
			ses = Sess_get_session( private_name );
			if( Is_memb_session( Sessions[ ses ].status ) )
				Sess_write( ses, mess_link, &needed );
			if( !needed ) Sess_dispose_message( mess_link );

			/* extract this mbox */
			for( i=0, j=0; i < grp->num_local; i++,j++ )
			{
				if( grp->mbox[i] == Sessions[ses].mbox ) j--;
				else grp->mbox[j] = grp->mbox[i];
			}
			grp->num_local--;
                        Message_Dec_Refcount(msg);
		}

		/* extract this member from group */
		memcpy( departing_private_group_name, mbr->private_name, MAX_GROUP_NAME );
		sl_remove( &grp->MembersList, mbr->private_name, dispose );
		grp->num_members--;
		if( grp->num_members == 0 )
		{
			/* discard this empty group */
                        sl_destruct ( &grp->MembersList, dispose);
		        sl_remove( &GroupsList, grp->name, dispose );
			Num_groups--;
			GlobalStatus.num_groups = Num_groups;
			return;
		}
		
		if( grp->changed )
		{
			if( Gstate != GTRANS ) Alarm( EXIT, "G_handle_leave: changed group in GOP\n");
			/*
			 * If the group is changed (in GTRANS) then there is no need
			 * to increment group id or to notify the local members.
			 * They will get a group membership after the state transfer
			 * terminates.
			 */
			return;
		}

		/* Increment group id */
		grp->grp_id.index++; 

		if( grp->num_local > 0 )
		{
			/* notify all local members */
                        msg = Message_new_message();
			num_bytes = G_build_memb_buf( grp, msg, Mess_buf );
			head_ptr = Message_get_message_header(msg);
			head_ptr->type |= CAUSED_BY_LEAVE ;

			/* notify all local members */
			num_vs_ptr = &Mess_buf[ num_bytes ];
			num_bytes += sizeof( int32 );
                        temp = 1;
                        memcpy( num_vs_ptr, &temp, sizeof( int32 ) ); /* *num_vs_ptr = 1; */

			vs_ptr = (char *)&Mess_buf[ num_bytes ];
		        memcpy( vs_ptr, departing_private_group_name, MAX_GROUP_NAME );
			num_bytes += MAX_GROUP_NAME;

			head_ptr->data_len += ( sizeof(int32) + MAX_GROUP_NAME );

			mess_link = new( MESSAGE_LINK );
                        Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes );
			mess_link->mess = msg;
                        Obj_Inc_Refcount(mess_link->mess);
			needed = 0;
			for( i=0; i < grp->num_local; i++ )
			{
				ses = Sess_get_session_index ( grp->mbox[i] );
				if( Is_memb_session( Sessions[ ses ].status ) )
					Sess_write( ses, mess_link, &needed );
			}
			if ( !needed ) Sess_dispose_message( mess_link );
                        Message_Dec_Refcount(msg);
		}
		/* Compute the mask */
		for(i=0; i<4; i++)
		{
		    grp->grp_mask[i] = 0;
		}
		{
		  struct skiplistnode *iter;
		  member *memp;
		  for( iter = sl_getlist( &grp->MembersList ),
			 memp=(member *)iter->data;
		       iter != NULL;
		       memp = (member *)sl_next( &grp->MembersList, &iter )) {
		    p_ind1 = Conf_proc_by_id( memp->proc_id, &p1 );
		    temp = 1;
		    for(i=0; i<p1.seg_index%32; i++)
		      {
			temp *= 2;
		      }
		    grp->grp_mask[p1.seg_index/32] |= temp;
		  }
		}

		Alarm(GROUPS, "G_handle_leave: Mask for group %s set to %x %x %x %x\n", 
		      grp->name, grp->grp_mask[3], grp->grp_mask[2], grp->grp_mask[1], grp->grp_mask[0]);
		break;

	    case GGATHER:
		Alarm( EXIT, "G_handle_leave in GGATHER\n");

		break;

	    case GGT:
		Alarm( EXIT, "G_handle_leave in GGT\n");

		break;
	}
}

void	G_handle_kill( char *private_group_name )
{
	char		proc_name[MAX_PROC_NAME];
	char		private_name[MAX_PRIVATE_NAME+1];
	char		departing_private_group_name[MAX_GROUP_NAME];
	int		p_ind, p_ind1;
	proc		p, p1;
	group		*grp, *nextgroup;
	member		*mbr;
	char		*num_vs_ptr; /* num members in vs set */
	char		*vs_ptr;     /* the virtual synchrony set */
	message_link	*mess_link;
	message_header	*head_ptr;
        message_obj     *msg;
	int		num_bytes;
	int		needed;
	int		ses = -1;       /* Fool compiler */
	int		i, j;
	int32u          temp;
	struct skiplistnode *giter;

	Alarm( GROUPS, "G_handle_kill: %s is killed\n", private_group_name );

	switch( Gstate )
	{
	    case GOP:
	    case GTRANS:

		if (Gstate == GOP) Alarm( GROUPS, "G_handle_kill in GOP\n");
		if (Gstate == GTRANS) Alarm( GROUPS, "G_handle_kill in GTRANS\n");

		/*
		 * for every group this guy is a member of
		 *    Extract this member from group
		 *    if the group is unchanged (in GOP all groups are unchanged) then: 
		 *        Increment Grp_id
		 *        Notify all local members of a regular membership caused by disconnet
		 */
		G_private_to_names( private_group_name, private_name, proc_name );
		p_ind = Conf_proc_by_name( proc_name, &p );
		if( p_ind < 0 )
		{
			Alarm( PRINT, "G_handle_kill: illegal proc_name %s in private_group %s \n",
				proc_name, private_group_name );
			return;
		}

		if( p.id == My.id ) ses = Sess_get_session( private_name );
	       
		giter = sl_getlist( &GroupsList );
		grp = (giter)?(group *)giter->data:NULL;
		for( ; grp != NULL ; grp = nextgroup)
		{
		  /* This is confusing... get the nextgroup so that if we
		     choose to remove it it doesn't screw up the iterator.
		     Then next time through use this "next" value */
                        nextgroup = sl_next( &GroupsList, &giter );
			mbr = G_get_member( grp, private_group_name );
			if( mbr == NULL ) continue; /* no such member in that group */

			/* Extract this member from group */
			if( p.id == My.id )
			{
				/* extract the mbox if local member */
				for( i=0, j=0; i < grp->num_local; i++, j++ )
				{
					if( grp->mbox[i] == Sessions[ses].mbox ) j--;
					else grp->mbox[j] = grp->mbox[i];
				}
				grp->num_local--;
			}
			memcpy( departing_private_group_name, mbr->private_name, MAX_GROUP_NAME );
			sl_remove( &grp->MembersList, mbr->private_name, dispose );
			grp->num_members--;
			if( grp->num_members == 0 )
			{
                                sl_destruct ( &grp->MembersList, dispose);
                                sl_remove( &GroupsList, grp->name, dispose );
				Num_groups--;
				GlobalStatus.num_groups = Num_groups;
				continue;
			}
		
			if( grp->changed )
			{
			    if( Gstate != GTRANS ) Alarm( EXIT, "G_handle_kill: changed group in GOP\n");
			    /*
			     * If the group is changed (in GTRANS) then there is no need
			     * to increment group id or to notify the local members.
			     * They will get a group membership after the state transfer
			     * terminates.
			     */
			    continue;
			}

			/* Increment group id */
			grp->grp_id.index++; 

		        /* Compute the mask */
		        for(i=0; i<4; i++)
		        {
		            grp->grp_mask[i] = 0;
		        }
			{
			  struct skiplistnode *iter;
			  member *memp;
			  for( iter = sl_getlist( &grp->MembersList ),
				 memp=(member *)iter->data;
			       iter != NULL;
			       memp = (member *)sl_next( &grp->MembersList, &iter )) {
			    p_ind1 = Conf_proc_by_id( memp->proc_id, &p1 );
		            temp = 1;
		            for(i=0; i<p1.seg_index%32; i++)
			      {
			        temp *= 2;
			      }
		            grp->grp_mask[p1.seg_index/32] |= temp;
			  }
			}

  		        Alarm(GROUPS, "G_handle_kill: Mask for group %s set to %x %x %x %x\n", 
		              grp->name, grp->grp_mask[3], grp->grp_mask[2], grp->grp_mask[1], grp->grp_mask[0]);

			if( grp->num_local > 0 )
			{
				/* notify all local members */
                                msg = Message_new_message();
				num_bytes = G_build_memb_buf( grp, msg, Mess_buf );
				head_ptr = Message_get_message_header(msg);

				head_ptr->type |= CAUSED_BY_DISCONNECT ;

				num_vs_ptr = &Mess_buf[ num_bytes ];
				num_bytes += sizeof( int32 );
                                temp = 1;
                                memcpy( num_vs_ptr, &temp, sizeof( int32 ) ); /* *num_vs_ptr = 1; */

				vs_ptr = (char *)&Mess_buf[ num_bytes ];
				memcpy( vs_ptr, departing_private_group_name, MAX_GROUP_NAME );
				num_bytes += MAX_GROUP_NAME;

				head_ptr->data_len += ( sizeof(int32) + MAX_GROUP_NAME );

				mess_link = new( MESSAGE_LINK );
                                Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes );
				mess_link->mess = msg;
                                Obj_Inc_Refcount(mess_link->mess);
				needed = 0;
				for( i=0; i < grp->num_local; i++ )
				{
                                        int temp_ses;

			 		temp_ses = Sess_get_session_index ( grp->mbox[i] );
					if( Is_memb_session( Sessions[ temp_ses ].status ) )
						Sess_write( temp_ses, mess_link, &needed );
				}
				if ( !needed ) Sess_dispose_message( mess_link );
                                Message_Dec_Refcount(msg);
			}
		}
		break;

	    case GGATHER:
		Alarm( EXIT, "G_handle_kill in GGATHER\n");

		break;

	    case GGT:
		Alarm( EXIT, "G_handle_kill in GGT\n");

		break;
	}
}

void	G_handle_groups( message_link *mess_link )
{
	char		*memb_id_ptr;
	membership_id	temp_memb_id;
        message_obj     *msg;
	message_header	*head_ptr;

	Alarm( GROUPS, "G_handle_groups: \n" );

	switch( Gstate )
	{
	    case GOP:

		Alarm( EXIT, "G_handle_groups in GOP\n");

		break;

	    case GTRANS:

		Alarm( EXIT, "G_handle_groups in GTRANS\n");

		break;

	    case GGATHER:
	    case GGT:

		if (Gstate == GGATHER) Alarm( GROUPS, "G_handle_groups in GGATHER\n");
		if (Gstate == GGT) Alarm( GROUPS, "G_handle_groups in GGT\n");

		msg = mess_link->mess;
                Obj_Inc_Refcount(msg);
		head_ptr = Message_get_message_header(msg);
		memb_id_ptr = Message_get_first_data_ptr(msg);
		memcpy( &temp_memb_id, memb_id_ptr, sizeof( membership_id ) );
		if( !Same_endian( head_ptr->type ) )
		{
			/* Flip membership id */
			temp_memb_id.proc_id = Flip_int32( temp_memb_id.proc_id );
			temp_memb_id.time    = Flip_int32( temp_memb_id.time    );
		}
		if( ! Memb_is_equal( temp_memb_id, Reg_memb_id ) )
		{
                        Alarm( GROUPS, 
                               "G_handle_groups: GROUPS message received from bad memb id proc %d, time %d, daemon %s.\n",
                               temp_memb_id.proc_id, temp_memb_id.time, head_ptr->private_group_name );
			Sess_dispose_message( mess_link );
                        Message_Dec_Refcount(msg);
			return;
		}

		mess_link->next = Gathered.next;
		Gathered.next = mess_link;
		Num_mess_gathered++;
                /* The last Groups message a daemon sends is AGREED. */
                if( Is_agreed_mess( head_ptr->type ) ) Num_daemons_gathered++;
                Alarm( GROUPS, "G_handle_groups: GROUPS message received from %s - msgs %d, daemons %d\n", 
                       head_ptr->private_group_name, Num_mess_gathered, Num_daemons_gathered );
		if( Num_daemons_gathered != Conf_num_procs( &Reg_memb ) )
                {
                        Message_Dec_Refcount(msg);
                        return;
                }
                Alarm( GROUPS, "G_handle_groups: Last GROUPS message received - msgs %d, daemons %d\n",
                       Num_mess_gathered, Num_daemons_gathered );
		/* Replace protocol queue */
		Prot_set_down_queue( NORMAL_DOWNQUEUE );

		/* lower events threshold */
		Session_threshold = LOW_PRIORITY;
		Sess_set_active_threshold();

		/* 
		 * Compute new groups membership and notify members of
		 * groups that have changed 
		 */
		G_compute_and_notify();

		if( Gstate == GGATHER )
		{
			Gstate = GOP;
			GlobalStatus.gstate = Gstate;
		}else{
			Gstate = GOP;
                        /* We do want to deliver a transitional signal to any
                         * groups that are going to get a CAUSED_BY_NETWORK
                         * after our Reg_memb is delivered. */
			G_handle_trans_memb( Trans_memb, Trans_memb_id );
		}
                
                Message_Dec_Refcount(msg);
		break;
	}
}

static	void		G_compute_and_notify()
{
	group		*grp, *new_grp, *orig_grp;
	member		*mbr;
	int		changed;
	int		ret;
	int		vs_bytes;
	char		*num_vs_ptr; /* num members in virtual-synchrony/failure-atomicity set */
        int32           num_vs;
	int		num_exist;
	struct worklist *indices[MAX_PROCS_RING];
	int		num_bytes;
	message_link	*mess_link;
	message_header	*head_ptr;
        message_obj     *msg;
	int		needed;
	char		proc_name[MAX_PROC_NAME];
	char		private_name[MAX_PRIVATE_NAME+1];
	int		ses;
	int		i;
	Skiplist        work;
	
	Alarm( GROUPS, "G_compute_and_notify:\n");
	/* Compute groups structure in Work from gathered messages and clear messages */

	sl_init(&work);
	sl_set_compare(&work, G_work_groups_comp, G_work_groups_keycomp);

	for( i=0; i < Num_mess_gathered; i++ )
	{
	        struct worklist *tp;
		tp = (struct worklist *)Mem_alloc(sizeof(struct worklist));
		tp->groups=NULL;
		mess_link = Gathered.next;
		Gathered.next = mess_link->next;
		ret = G_mess_to_groups( mess_link, tp->name, tp );
                if( ret < 0 )
                        Alarm( EXIT, "G_compute_and_notify: G_mess_to_groups errored %d\n",
                               ret );
                Sess_dispose_message( mess_link );
		if ( !sl_insert(&work, tp) )
                {
                        Alarm(EXIT, "G_compute_and_notify: Failed to insert worklist (%s) into work\n", tp->name);
                }
	}
	/* 
	 * for every sorted group name:
	 * 	Join the member lists to one list in Groups with a vs set.
	 *	If the group has changed (*) 
	 *		Set new gid 
	 *		notify all local members: non-new get vs set, new get self.
	 *		cancel new mark.
	 *	dispose of this group is all of Work.
	 *
	 * Note: group has changed unless all of this hold:
	 *	- everyone has the same gid
	 *	- gid is not changed (-1)
	 */
	
	for( num_exist = G_smallest_group_indices( &work, indices ) ;
	     num_exist > 0 ; 
	     num_exist = G_smallest_group_indices( &work, indices ) )
        {
	    struct skiplistnode *top_iter;
	    group *this_group;
	    /* prepare vs set */
	    vs_bytes = 0;
	    num_vs_ptr = &Temp_buf[0];
	    vs_bytes+= sizeof( int32 );
            num_vs = 0;
	    
	    changed = 0;
	    orig_grp = NULL;
	    assert( NULL != (this_group = (group *)(sl_getlist(indices[0]->groups)->data)) );
            orig_grp = sl_find( &GroupsList, this_group->name, &top_iter);
	    	    
	    if( orig_grp == NULL )
            {
			new_grp = new( GROUP );
			memset( new_grp->name, 0, MAX_GROUP_NAME );
			strcpy( new_grp->name, this_group->name );
	
			new_grp->grp_id = this_group->grp_id;

			new_grp->num_members = 0;
			sl_init( &new_grp->MembersList );
			sl_set_compare( &new_grp->MembersList,
					G_member_recordcompare,
					G_member_keycompare);
			new_grp->num_local = 0;

			sl_insert( &GroupsList, new_grp );
			Num_groups++;
			GlobalStatus.num_groups = Num_groups;
			orig_grp = new_grp;
            }else{
		  /* free members but keep local mbox */
		  sl_remove_all( &orig_grp->MembersList, dispose );
		  orig_grp->num_members = 0;
            }

            for( i=0 ; i < num_exist; i++ )
            {
		  group *currentgroup;
		  currentgroup =
		    (group *)sl_getlist(indices[i]->groups)->data;
		  if( G_id_is_equal( orig_grp->grp_id, currentgroup->grp_id ) )
                  {
			  struct skiplistnode *iter;
			  Skiplist *currentmembers;
			  currentmembers = &currentgroup->MembersList;
			  iter = sl_getlist(currentmembers);
                          assert(iter != NULL); /* memberlist in Groups message should never be empty */
			  for( mbr = iter->data;
			       mbr != NULL;
			       mbr = sl_next(currentmembers, &iter))
                          {
                                        /* add this non-new member to vs */
                                        memcpy( &Temp_buf[vs_bytes], mbr->private_name, MAX_GROUP_NAME );
                                        vs_bytes += MAX_GROUP_NAME;
                                        num_vs++;	
                          }
                  }else{
			  /* not the same grp_id */
                          changed = 1;
                  }
                  /* in any way, mbr points here to the last member */
                  /* chain these members */

		  sl_concat(&orig_grp->MembersList,
			    &currentgroup->MembersList);
		  orig_grp->num_members = orig_grp->MembersList.size;

			/* free this Work group */
                  sl_destruct(&currentgroup->MembersList, dispose);
		  sl_remove(indices[i]->groups, currentgroup, dispose);
            }

            memcpy( num_vs_ptr, &num_vs, sizeof( int32 ) ); /* *num_vs_ptr = current count; */

            /* now our orig_grp is almost updated */
            grp = orig_grp;
            
            if( grp->changed ) changed = 1;
            
            if( !changed ) continue;

            /* the group has changed */
            Alarm( GROUPS, "G_compute_and_notify: completed group %s.\n", grp->name );
            Alarm( DEBUG,  "G_compute_and_notify: changing group id from: " );
            G_print_group_id( grp->grp_id );
            grp->grp_id.memb_id = Reg_memb_id;
            grp->grp_id.index   = 1;
            grp->changed        = 0;
            Alarm( DEBUG, " to: " );
            G_print_group_id( grp->grp_id );
            Alarm( DEBUG, "\n" );
            
            if( grp->num_local > 0 )
            {
                    struct skiplistnode *iter;
                    msg = Message_new_message();
                    num_bytes = G_build_memb_buf( grp, msg, Mess_buf );
                    head_ptr = Message_get_message_header(msg);
                    
                    head_ptr->type |= CAUSED_BY_NETWORK ;
                    
                    /* notify non-new local members */
                    memcpy( &Mess_buf[num_bytes], Temp_buf, vs_bytes );
                    head_ptr->data_len += vs_bytes;
                    
                    mess_link = new( MESSAGE_LINK );
                    Message_Buffer_to_Message_Fragments( msg, Mess_buf, num_bytes + vs_bytes);
                    mess_link->mess = msg;
                    Obj_Inc_Refcount(mess_link->mess);
                    needed = 0;
                    iter = sl_getlist(&grp->MembersList);
                    for( mbr = iter->data;
			     mbr != NULL;
			     mbr = sl_next(&grp->MembersList, &iter))
                    {
                            if( Is_new_member( mbr->status ) ) continue;
                            if( mbr->proc_id != My.id ) continue;
                            
                            G_private_to_names( mbr->private_name, private_name, proc_name );
                            ses = Sess_get_session( private_name );
                            if( ses < 0 ) Alarm( EXIT, "G_compute_and_notify: no session for %s\n", private_name);
                            
                            if( Is_memb_session( Sessions[ ses ].status ) )
                                    Sess_write( ses, mess_link, &needed );
                    }
                    if( !needed ) Sess_dispose_message( mess_link );
                    Message_Dec_Refcount(msg);
            }
	}
        Num_mess_gathered    = 0;
        Num_daemons_gathered = 0;

        /* We're going back to GOP... destroy our groups messages. */
        G_empty_groups_bufs();

        /* Finish freeing the memory in our worklists */
	{
		struct worklist     *worklist;
		struct skiplistnode *iter;
                
		iter = sl_getlist(&work);       
		worklist = (iter)?iter->data:NULL;      
                
		while( worklist != NULL ) {     
			assert( worklist->groups->size == 0 );  
			dispose( worklist->groups );    
			worklist = sl_next(&work, &iter);       
		}       
	}       
        
        sl_destruct( &work, dispose );
        
	G_print();
}

static	int		G_smallest_group_indices( Skiplist *work, struct worklist *indices[] )
{
	/* 
	 * this function searches the Work structure for the smallest
	 * alphabetically ordered group name. It stores  
	 * all of the occurences of that group in the indices array, 
	 * and returns the number of occurences.
	 */
	int	num_exist;
	int	cmp;
	struct worklist *worklist;
	Skiplist *groups;
	struct skiplistnode *iter;
	
	iter = sl_getlist(work);
	worklist = (iter)?iter->data:NULL;
	num_exist = 0;
	if(!worklist) {
                return 0;
	}
        /* set indices[0] to first worklist with any groups */
        do {
                if ( worklist->groups->size == 0 )
                {
                        worklist = sl_next(work, &iter);
                } else {
                        indices[0] = worklist;
                        num_exist = 1;
                        break;
                }
        } while ( worklist != NULL );

	if(!worklist) {
                /* All worklist groups are empty (no daemons have any alive groups) */
                return 0;
	}
        
        worklist = sl_next( work, &iter );
        /* Check rest of worklists for any with earlier groups or the same first group as indices[0] */
        while ( worklist != NULL )
	{
                group *first, *current;
                
                groups = worklist->groups;
		if( groups->size == 0 ) 
                {
                        worklist = sl_next(work, &iter);
                        continue;
                }
		first = (group *)(sl_getlist(indices[0]->groups)->data);
		current = (group *)(sl_getlist(groups)->data);
		cmp = strcmp( first->name, current->name );
		if( cmp == 0 )
                {
                        indices[num_exist] = worklist;
                        num_exist++;
                }else if( cmp > 0 ){
                        indices[0] = worklist;
                        num_exist = 1;
                }
		worklist = sl_next(work, &iter);
	}
	return( num_exist );
}

static	int		G_id_is_equal( group_id g1, group_id g2 )
{
	if( g1.index == g2.index && Memb_is_equal( g1.memb_id, g2.memb_id ) )
		return( 1 );
	else	return( 0 );
}

static	group		*G_get_group( char *group_name )
{
	struct skiplistnode *iter;

	return sl_find( &GroupsList, group_name, &iter );
}

static	member		*G_get_member( group *grp, char *private_group_name )
{
	struct skiplistnode *iter;

	return sl_find( &grp->MembersList, private_group_name, &iter );
}

static	message_link	*G_build_trans_mess( group *grp )
{
	/* 
	 * This routine builds a ready-to-be-sent transitional message signal 
	 * to the members of the process group grp 
	 */

	message_link	*mess_link;
	scatter	        *scat;
	message_header	*head_ptr;
	char		*gid_ptr;

	mess_link = new( MESSAGE_LINK );
	mess_link->mess = Message_create_message(TRANSITION_MESS, grp->name);

	scat = Message_get_data_scatter(mess_link->mess);
	scat->elements[0].len = Message_get_data_header_size() +
				sizeof( group_id );
	head_ptr = Message_get_message_header(mess_link->mess);
	gid_ptr = Message_get_first_data_ptr(mess_link->mess );

	head_ptr->data_len = sizeof( group_id );
	memcpy( gid_ptr, &grp->grp_id, sizeof(group_id) );

	return( mess_link );
}

static	int		G_build_memb_buf( group *grp, message_obj *msg, char buf[])
{
	int		num_bytes;
	message_header	*head_ptr;
	char		*gid_ptr;
	member		*mbr;
	struct skiplistnode *iter;
	char		*memb_ptr;


	head_ptr = Message_get_message_header(msg);
	head_ptr->type = REG_MEMB_MESS;
	head_ptr->type = Set_endian( head_ptr->type );
	head_ptr->hint = Set_endian( 0 );
	memcpy( head_ptr->private_group_name, grp->name, MAX_GROUP_NAME );
	head_ptr->num_groups = grp->num_members;
	head_ptr->data_len = sizeof( group_id );

        num_bytes = 0;
	iter = sl_getlist( &grp->MembersList );
	mbr = (iter)?(member *)iter->data:NULL;
	for( ; mbr != NULL ; mbr = sl_next( &grp->MembersList, &iter ) )
	{
		memb_ptr = (char *)&buf[num_bytes];
		num_bytes += MAX_GROUP_NAME;
		memcpy( memb_ptr, mbr->private_name, MAX_GROUP_NAME );
	}

	gid_ptr = &buf[num_bytes];
	num_bytes += sizeof( group_id );
	memcpy( gid_ptr, &grp->grp_id, sizeof(group_id) );

	return( num_bytes );
}


static	int		G_build_memb_vs_buf( group *grp, message_obj *msg, char buf[], int32 caused )
{
/* 
 * This routine builds the memb buffer message, including a virtual synchrony
 * (failure atomicity) part with a set which contains only the established members
 * in the group membership.
 * 
 * Note that in leave and disconnect we provide the member that left or
 * got disconnected in the vs_set. Therefore, caused will always be CAUSED_BY_NETWORK.
 */

	int		num_bytes;
	message_header	*head_ptr;
	char		*num_vs_ptr; /* num members in virtual-synchrony/failure-atomicity set */
	struct skiplistnode *iter;
        member          *mbr;
	char		*membs_ptr;
        int32           num_vs;

	num_bytes = G_build_memb_buf( grp, msg, buf);
	head_ptr = Message_get_message_header(msg);

	head_ptr->type = head_ptr->type | caused;

	num_vs_ptr = &buf[num_bytes];
	num_bytes += sizeof( int32 );
        head_ptr->data_len += sizeof( int32 );
        num_vs = 0;

        iter = sl_getlist( &grp->MembersList );
	mbr = (iter)?(member *)iter->data:NULL;
	for( ; mbr != NULL ; mbr = sl_next( &grp->MembersList, &iter ) )
        {
		if( Is_established_member( mbr->status ) )
		{
			membs_ptr = (char *)&buf[num_bytes];
			memcpy( membs_ptr, mbr->private_name, MAX_GROUP_NAME );
			num_vs++ ;
			num_bytes += MAX_GROUP_NAME;
			head_ptr->data_len += MAX_GROUP_NAME;
		}
	}
        memcpy( num_vs_ptr, &num_vs, sizeof( int32 ) ); /* *num_vs_ptr = total count; */

	return( num_bytes );
}

static	void		G_stamp_groups_buf( char buf[] )
{
	char	*memb_id_ptr;
	memb_id_ptr = buf;
	memcpy( memb_id_ptr, &Reg_memb_id, sizeof( membership_id ) );
}

/* This function used to be called G_refresh_groups_msg. */
static  void            G_build_groups_msg_hdr( message_obj *msg, int groups_bytes )
{
	message_header	*head_ptr;

	head_ptr = Message_get_message_header(msg);
	head_ptr->type = GROUPS_MESS;
	head_ptr->type = Set_endian( head_ptr->type );
	head_ptr->hint = Set_endian( 0 );
	memset(head_ptr->private_group_name, 0, MAX_GROUP_NAME);
	strcpy( head_ptr->private_group_name, My.name );
	head_ptr->num_groups = 0;
	head_ptr->data_len = groups_bytes;
}

/* This function guarantees that each group's data appears in only one buffer in
 * a sequence, and that the sorted order is preserved from the GroupsList. */
static	int		G_build_groups_buf(char buf[], struct skiplistnode **iter_ptr)
{
	int		num_bytes;
	char		*memb_id_ptr;
	group		*grp;
	char		*gid_ptr;
	member		*mbr;
	struct skiplistnode *giter, *iter;
	char		*num_memb_ptr;
        int16           num_memb;
	char		*memb_ptr;
        int     size_for_this_group;

	num_bytes = 0;

	memb_id_ptr = &buf[num_bytes];
	num_bytes += sizeof( membership_id );
	memcpy( memb_id_ptr, &Reg_memb_id, sizeof( membership_id ) );

        giter = (*iter_ptr) ? (*iter_ptr) : (sl_getlist( &GroupsList ));

	grp = (giter)?(group *)giter->data:NULL;
	for( ; grp != NULL ; grp = sl_next( &GroupsList, &giter ) )
	{
		if( grp->num_local == 0 ) continue;
		
                size_for_this_group = MAX_GROUP_NAME + sizeof(group_id) + sizeof(int16) +
                        (grp->num_local * MAX_GROUP_NAME);
                /* This requires that the number of local group members be limited. */
                if( size_for_this_group > GROUPS_BUF_SIZE - num_bytes )  break;

		memcpy( &buf[num_bytes], grp->name, MAX_GROUP_NAME );
		num_bytes += MAX_GROUP_NAME;
		
		gid_ptr = &buf[num_bytes];
		num_bytes += sizeof( group_id );
		memcpy( gid_ptr, &grp->grp_id, sizeof(group_id) );

		num_memb_ptr = &buf[num_bytes];
		num_bytes += sizeof( int16 );
		num_memb  = 0;

		iter = sl_getlist( &grp->MembersList );
		mbr = (iter)?(member *)iter->data:NULL;
		for( ; mbr != NULL ; mbr = sl_next( &grp->MembersList, &iter ) )
		{
			/* collect local members */
			if( mbr->proc_id != My.id ) continue;
			memb_ptr = (char *)&buf[num_bytes];
			num_bytes += MAX_GROUP_NAME;
			memcpy( memb_ptr, mbr->private_name, MAX_GROUP_NAME );
			num_memb++;
		}
                memcpy(num_memb_ptr, &num_memb, sizeof( int16 ) );
                if( num_memb != grp->num_local )
                        Alarm( EXIT, "G_build_groups_buf: group %s has %d %d members\n",
                               grp->name, num_memb, grp->num_local );
                
        }
        *iter_ptr = giter; 
	return( num_bytes );
}

static	int		G_mess_to_groups( message_link *mess_link, char *name, struct worklist *work )
{
	/* the function returns 0 for success or -1 if an error occured */

        message_obj     *msg;
        scatter         *scat;
	message_header	*head_ptr;
	proc		p;
	int		num_bytes, total_bytes;
	group		*grp;
	char		*gid_ptr;
	member		*mbr;
	char		*num_memb_ptr;
        int16           num_memb;
	int		i;

	total_bytes = 0;
	msg = mess_link->mess;
        scat = Message_get_data_scatter(msg);
	for( i=0; i < scat->num_elements ; i++ )
	{
		memcpy( &Temp_buf[total_bytes], scat->elements[i].buf, scat->elements[i].len );
		total_bytes += scat->elements[i].len;
	}

	num_bytes = 0;
	head_ptr = Message_get_message_header(msg);
        num_bytes += Message_get_data_header_size();
	if (0 > Conf_proc_by_name( head_ptr->private_group_name , &p ) )
        {
                Alarm( PRINT, "G_mess_to_groups: Groups message from someone (%s) not in conf\n", head_ptr->private_group_name);
                return( -1 );
        }
	work->groups = (Skiplist *)Mem_alloc(sizeof(Skiplist));
	sl_init(work->groups);
	sl_set_compare(work->groups,
		       G_compare,
		       G_compare);
	
	memcpy( name, head_ptr->private_group_name, MAX_GROUP_NAME );

	num_bytes += sizeof( membership_id );

	for( ; num_bytes < total_bytes; )
	{
		/* creating a new group */
		grp = new( GROUP );

		memcpy( grp->name, &Temp_buf[num_bytes], MAX_GROUP_NAME );
		num_bytes += MAX_GROUP_NAME;

		sl_append( work->groups, grp );
		sl_init( &grp->MembersList );
		sl_set_compare( &grp->MembersList,
				G_member_recordcompare,
				G_member_keycompare);

		gid_ptr = &Temp_buf[num_bytes];
		num_bytes += sizeof( group_id );
		memcpy( &grp->grp_id, gid_ptr, sizeof(group_id) );

		num_memb_ptr = &Temp_buf[num_bytes];
		num_bytes += sizeof( int16 );
                memcpy( &num_memb, num_memb_ptr, sizeof( int16 ) );

		if( !Same_endian( head_ptr->type ) )
		{
			/* Flip group id */
			grp->grp_id.memb_id.proc_id = Flip_int32( grp->grp_id.memb_id.proc_id );
			grp->grp_id.memb_id.time    = Flip_int32( grp->grp_id.memb_id.time );
			grp->grp_id.index    	    = Flip_int32( grp->grp_id.index );

			/* flip other parts of the message */
			num_memb  = Flip_int16( num_memb  );
		}
		/* creating members */
		for( i=0; i < num_memb; i++ )
		{
			mbr = new( MEMBER );
			
			mbr->proc_id = p.id;
			mbr->status = ESTABLISHED_MEMBER;
			memcpy( mbr->private_name, &Temp_buf[num_bytes], MAX_GROUP_NAME );
			num_bytes += MAX_GROUP_NAME;

			sl_append( &grp->MembersList, mbr );
		}

		grp->num_members = num_memb;
                memcpy( num_memb_ptr, &num_memb, sizeof( int16 ) );
	}
	return( 0 );
}

int	G_analize_groups( int num_groups, char target_groups[][MAX_GROUP_NAME], int target_sessions[] )
{
static	mailbox mbox[MAX_SESSIONS];
static	mailbox	current[MAX_SESSIONS];
static	mailbox *current_ptr;
	int	num_mbox;
	int	num_mbox_pre;
	int	num_current;
	group	*grp;
	member 	*mbr;
	struct skiplistnode *iter;
	char	proc_name[MAX_PROC_NAME];
	char	private_name[MAX_PRIVATE_NAME+1];
	int	found;
	int	ses;
	int	ret;
	int	i, j, k;
	
	/* get the mbox */
	num_mbox = 0;
	for( i=0; i < num_groups; i++ )
	{
		if( target_groups[i][0] == '#' )
		{
			/* private group */
			ret = G_private_to_names( target_groups[i], private_name, proc_name );

			/* Illegal group */
			if( ret < 0 ) continue;

			/* this private group is not local */
			if( strcmp( My.name, proc_name ) != 0 ) continue;

			ses = Sess_get_session( private_name );

			/* we have no such session */
			if( ses < 0 ) continue;

			current[0] = Sessions[ ses ].mbox;
			current_ptr = current;
			num_current = 1;
		}else{
			/* regular group */
			grp = G_get_group( target_groups[i] );
			if( grp == NULL ) continue;
			if( Gstate == GOP )
			{
				current_ptr = grp->mbox;
				num_current = grp->num_local;
			}else if( Gstate == GTRANS ){
				current_ptr = current;
				num_current = 0;
				iter = sl_getlist( &grp->MembersList );
				mbr = (iter)?(member *)iter->data:NULL;
				for( ; mbr != NULL ;
				     mbr = sl_next( &grp->MembersList, &iter ) )
				{
					if( mbr->proc_id == My.id && !Is_new_member( mbr->status ) )
					{
						G_private_to_names( mbr->private_name, private_name, proc_name );
						ses = Sess_get_session( private_name );
						if( ses < 0 ) Alarm( EXIT, 
							"G_analize_groups: ses is %d private_name is %s\n", 
							ses, private_name );
						current[ num_current ] = Sessions[ ses ].mbox;
						num_current++;
					}
				}
			}else {
                                num_current = 0; /* fool compiler warnings */
                                Alarm( EXIT, "G_analize_groups: Gstate is %d\n", Gstate );
                        }
		}
		num_mbox_pre = num_mbox;
		for( j=0; j < num_current; j++ )
		{
			found = 0;
			for( k=0; k < num_mbox_pre; k++ )
			{
				if( mbox[k] == current_ptr[j] )
				{
					found = 1;
					break;
				}
			}
			if( !found )
			{
				mbox[num_mbox] = current_ptr[j];
				num_mbox++;
			}
		}
	}

	/* convert mbox to sessions */
	for( i=0; i < num_mbox; i++ ) target_sessions[i] = Sess_get_session_index ( mbox[ i ] );
	return( num_mbox );
}


void G_set_mask( int num_groups, char target_groups[][MAX_GROUP_NAME], int32u *grp_mask )
{
	group	*grp;
	char	proc_name[MAX_PROC_NAME];
	char	private_name[MAX_PRIVATE_NAME+1];
	int	ret;
	int	i, j;
	proc    p;
	int32u  temp;


	for(i=0; i<4; i++)
        {
	   grp_mask[i] = 0;
	}

	for( i=0; i < num_groups; i++ )
	{
		if( target_groups[i][0] == '#' )
		{
			/* private group */
			ret = G_private_to_names( target_groups[i], private_name, proc_name );

			/* Illegal group */
			if( ret < 0 ) continue;

		        Conf_proc_by_name( proc_name, &p ); 
		        temp = 1;
		        for(j=0; j<p.seg_index%32; j++)
		        {
			    temp *= 2;
		        }
		        grp_mask[p.seg_index/32] |= temp;

		}else{
			/* regular group */
			grp = G_get_group( target_groups[i] );
			if( grp == NULL )
			{
			    p = Conf_my();
		            temp = 1;
		            for(j=0; j<p.seg_index%32; j++)
		            {
			        temp *= 2;
		            }
		            grp_mask[p.seg_index/32] |= temp;
			}
			else if(( Gstate == GOP )||(Gstate == GTRANS))
			{
		            for(j=0; j<4; j++)
		            {
			        grp_mask[j] |= grp->grp_mask[j]; 
		            }
			    p = Conf_my();
		            temp = 1;
		            for(j=0; j<p.seg_index%32; j++)
		            {
			        temp *= 2;
		            }
		            grp_mask[p.seg_index/32] |= temp;

			}else Alarm( EXIT, "G_set_mask: Gstate is %d\n", Gstate );
		}
	}
}



int	G_private_to_names( char *private_group_name, char *private_name, char *proc_name )
{
	char	name[MAX_GROUP_NAME];
	char	*pn, *prvn;
	unsigned int	priv_name_len, proc_name_len;
	int	i,legal_private_name;

        memcpy(name, private_group_name, MAX_GROUP_NAME );
        proc_name_len = 0; /* gcc not smart enough to detect that proc_name_len is always initialized when used */

        pn = strchr(&name[1], '#');
        if (pn != NULL)
        {
                pn[0] = '\0';
                proc_name_len = strlen( &(pn[1]));
        }
        priv_name_len = strlen( &(name[1]));
        if ( (pn == NULL) || (name[0] != '#' ) || 
             ( priv_name_len > MAX_PRIVATE_NAME) ||
             ( priv_name_len < 1 ) ||
             ( proc_name_len >= MAX_PROC_NAME ) ||
             ( proc_name_len < 1 ) )
        {
                Alarm( GROUPS, "G_private_to_names: Illegal private_group_name (priv, proc)\n");
                return( ILLEGAL_GROUP );
        }
        /* start strings at actual beginning */
        pn++;
        pn[proc_name_len] = '\0';
        prvn = &name[1];
        legal_private_name = 1;
        for( i=0; i < priv_name_len; i++ )
                if( prvn[i] <= '#' ||
                    prvn[i] >  '~' ) 
                {
                        legal_private_name = 0;
                        prvn[i] = '.';
                }
        for( i=0; i < proc_name_len; i++ )
                if( pn[i] <= '#' ||
                    pn[i] >  '~' ) 
                {
                        legal_private_name = 0;
                        pn[i] = '.';
                }
        if( !legal_private_name )
        {
                Alarm( GROUPS, "G_private_to_names: Illegal private_group_name characters (%s) (%s)\n", prvn, pn );
                return( ILLEGAL_GROUP );
        }
        /* copy name components including null termination */
        memcpy( private_name, prvn, priv_name_len + 1 );
        memcpy( proc_name, pn, proc_name_len + 1 );
	return( 1 );
}

static	void	G_print()
{
	group	*grp;
	member	*mbr;
	struct skiplistnode *giter, *iter;
	int	i, j;

	printf("++++++++++++++++++++++\n");
	printf("Num of groups: %d\n", Num_groups );
	giter = sl_getlist( &GroupsList );
	grp = (giter)?(group *)giter->data:NULL;
	for( i=0; grp != NULL ; i++, grp = sl_next( &GroupsList, &giter ) )
	{
		printf("[%d] group %s with %d members:\n", i+1, grp->name, grp->num_members );
		iter = sl_getlist( &grp->MembersList );
		mbr = (iter)?(member *)iter->data:NULL;
		for( j=0; mbr != NULL ;
		     j++, mbr = sl_next( &grp->MembersList, &iter ) )
		{
			printf("\t[%d] %s\n", j+1, mbr->private_name );
		}
		printf("----------------------\n");
	}
}

static  void  G_empty_groups_bufs()
{
        groups_buf_link *next;

        for( ; Groups_bufs;  Groups_bufs = next )
        {
                next = Groups_bufs->next;
                dispose( Groups_bufs );
        }
        return;
}

int     G_get_num_local( char *group_name )
{
        group *grp = G_get_group( group_name );
        if( grp == NULL ) return 0;
        return grp->num_local;
}

static  void  G_print_group_id( group_id g )
{
        Alarm( DEBUG, "{Proc ID: %d, Time: %d, Index: %d}",
               g.memb_id.proc_id, g.memb_id.time, g.index );
}


syntax highlighted by Code2HTML, v. 0.9.1