/*
 * The Spread Toolkit.
 *     
 * The contents of this file are subject to the Spread Open-Source
 * License, Version 1.0 (the ``License''); you may not use
 * this file except in compliance with the License.  You may obtain a
 * copy of the License at:
 *
 * http://www.spread.org/license/
 *
 * or in the file ``license.txt'' found in this distribution.
 *
 * Software distributed under the License is distributed on an AS IS basis, 
 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License 
 * for the specific language governing rights and limitations under the 
 * License.
 *
 * The Creators of Spread are:
 *  Yair Amir, Michal Miskin-Amir, Jonathan Stanton.
 *
 *  Copyright (C) 1993-2004 Spread Concepts LLC <spread@spreadconcepts.com>
 *
 *  All Rights Reserved.
 *
 * Major Contributor(s):
 * ---------------
 *    Cristina Nita-Rotaru crisn@cs.purdue.edu - group communication security.
 *    Theo Schlossnagle    jesus@omniti.com - Perl, skiplists, autoconf.
 *    Dan Schoenblum       dansch@cnds.jhu.edu - Java interface.
 *    John Schultz         jschultz@cnds.jhu.edu - contribution to process group membership.
 *
 */


#define	ext_prot_body

#include <string.h>
#include "prot_body.h"
#include "spread_params.h"
#include "membership.h"
#include "flow_control.h"
#include "sp_events.h"
#include "session.h"
#include "network.h"
#include "net_types.h"
#include "status.h"
#include "log.h"
#include "objects.h"
#include "memory.h"
#include "alarm.h"
#include "sess_types.h" /* for message_header */

/* Prot variables */
static	proc		My;
static	int		My_index;

static	int32		Set_aru;
static	int		Token_counter;

/* Used ONLY in Prot_handle_bcast, inited in Prot_init */
static	sys_scatter	New_pack;

/* Used ONLY in Prot_handle_token and grurot, inited in Prot_init */
static	sys_scatter	New_token;
static	token_header	*Token;
static	sys_scatter	Send_pack;

static	packet_header	*Hurry_head;
static	sys_scatter	Hurry_pack;
static	sp_time		Wide_delay = { 0, 10000 };

/* ### Pack: 1 line */
static	packet_info	Buffered_packets[ARCH_SCATTER_SIZE];

static	down_queue	Protocol_down_queue[2]; /* only used in spread3 */

static	void	Prot_handle_bcast();
static	void	Prot_handle_token();
static	int	Answer_retrans( int *ret_new_ptr, int32 *proc_id, int16 *seg_index );
static	int	Send_new_packets( int num_allowed );
static	int	Is_token_hold();
static	int	To_hold_token();
static	void	Handle_hurry( packet_header *pack_ptr );
static	void	Deliver_packet( int pack_entry, int to_copy );
static	void	Flip_token_body( char *buf, token_header *token_ptr );
static	void	Deliver_reliable_packets( int32 start_seq, int num_packets );
static	void	Deliver_agreed_packets();

void	Prot_init(void)
{
	int	i, num_bcast, num_token;
        channel *bcast_channels;
        channel *token_channels;

	Mem_init_object( PACK_HEAD_OBJ, sizeof( packet_header ), MAX_PACKETS_IN_STRUCT, 0 );
	Mem_init_object( PACKET_BODY, sizeof( packet_body ), MAX_PACKETS_IN_STRUCT, 0 );
	Mem_init_object( TOKEN_HEAD_OBJ, sizeof( token_header ), 10, 0 );
	Mem_init_object( TOKEN_BODY_OBJ, sizeof( token_body ), 10, 0 );
	Mem_init_object( SCATTER, sizeof( scatter ), 200+MAX_PROCS_RING, 0 );

	My = Conf_my();
	My_index = Conf_proc_by_id( My.id, &My );
	GlobalStatus.my_id = My.id;
	GlobalStatus.packet_delivered = 0;

	for( i=0; i < MAX_PROCS_RING+1; i++ )
		Up_queue[i].exist = 0;

	for( i=0; i < MAX_PACKETS_IN_STRUCT; i++ )
		Packets[i].exist = 0;

	Highest_seq 	 = 0;
	Highest_fifo_seq = 0;
	My_aru	    	 = 0;
	Aru		 = 0;
	Set_aru		 = -1;
	Last_discarded	 = 0;
	Last_delivered	 = 0;

	New_pack.num_elements = 2;
	New_pack.elements[0].len = sizeof(packet_header);
	New_pack.elements[0].buf = (char *) new(PACK_HEAD_OBJ);
	New_pack.elements[1].len = sizeof(packet_body);
	New_pack.elements[1].buf = (char *) new(PACKET_BODY);

	New_token.num_elements	= 2;
	New_token.elements[0].len = sizeof(token_header);
	New_token.elements[0].buf = (char *) new(TOKEN_HEAD_OBJ);
	New_token.elements[1].len = sizeof(token_body);
	New_token.elements[1].buf = (char *) new(TOKEN_BODY_OBJ);

	Send_pack.num_elements = 2;
	Send_pack.elements[0].len = sizeof(packet_header);

	Token = (token_header *)New_token.elements[0].buf;
	Last_token = new(TOKEN_HEAD_OBJ);
	Last_token->type = 0; 
	Last_token->seq = 0;
	Last_token->aru = 0;
	Last_token->flow_control = 0;

	Hurry_pack.num_elements = 1;
	Hurry_pack.elements[0].len = sizeof(packet_header);
	Hurry_pack.elements[0].buf = (char *) new(PACKET_BODY);
	Hurry_head = (packet_header *)Hurry_pack.elements[0].buf;
	Hurry_head->proc_id = My.id;
	Hurry_head->type = HURRY_TYPE;

	Net_init();

        bcast_channels = Net_bcast_channel();
        token_channels = Net_token_channel();
        Net_num_channels( &num_bcast, &num_token);
        for ( i = 0; i < num_bcast; i++) {
            E_attach_fd( *bcast_channels, READ_FD, Prot_handle_bcast, 0, NULL, HIGH_PRIORITY );
            bcast_channels++;
        }
        for ( i = 0; i < num_token; i++) {
            E_attach_fd( *token_channels, READ_FD, Prot_handle_token, 0, NULL, MEDIUM_PRIORITY );
            token_channels++;
        }

	FC_init( );
	Memb_init();

	Net_set_membership( Reg_membership );

}

void    Prot_init_down_queues(void)
{
        Protocol_down_queue[NORMAL_DOWNQUEUE].num_mess	= 0 ;
	Protocol_down_queue[NORMAL_DOWNQUEUE].cur_element = 0;
	Protocol_down_queue[GROUPS_DOWNQUEUE].num_mess	= 0 ;
	Protocol_down_queue[GROUPS_DOWNQUEUE].cur_element = 0;
}

void	Prot_set_down_queue( int queue_type )
{
        switch(queue_type) {
        case NORMAL_DOWNQUEUE:
                Down_queue_ptr = &Protocol_down_queue[NORMAL_DOWNQUEUE];
                break;
        case GROUPS_DOWNQUEUE:
                Down_queue_ptr = &Protocol_down_queue[GROUPS_DOWNQUEUE];
                break;
        default:
                Alarm(EXIT, "Prot_set_down_queue: Illegal queue_type (%d)\n", queue_type);
        }
}

void    Prot_Create_Local_Session(session *new_sess)
{
        /* Nothing to do for Spread3 */
        return;
}
void    Prot_Destroy_Local_Session(session *old_sess)
{
        /* Nothing to do for Spread3 */
        return;
}
void    Prot_kill_session(message_obj *msg)
{
        /* Nothing to do for Spread3 */
        return;
}
down_link       *Prot_Create_Down_Link(message_obj *msg, int type, int mbox, int cur_element)
{
        down_link       *down_ptr;
        message_header  *head_ptr;

	if ((down_ptr = new( DOWN_LINK )) == NULL )
        {
                Alarm(EXIT, "Prot_Create_Down_Link: Failure to allocate a Down_link\n");
        }
        if ( -1 == (down_ptr->type = type) )
        {
                head_ptr = (message_header *)msg->elements[0].buf;
		dispose( down_ptr );
		Alarm( PROTOCOL, "Prot_Create_Down_Link: Illegal message type %d\n", head_ptr->type );
		return(NULL);
	}
        return(down_ptr);
}

static  void	Prot_handle_bcast(channel fd, int dummy, void *dummy_p)
{
	packet_header	*pack_ptr;
	int		pack_entry;
	proc		p;
	int		received_bytes;
        int             total_bytes_processed;
        int             num_buffered_packets;
	int		i;
	int32		j;
	/* int		r1,r2; */

	received_bytes = Net_recv( fd, &New_pack );
	/* My own packet or from another monitor component */
	if( received_bytes == 0 ) return;
	/* problem in receiving */
	if( received_bytes < 0 ) return;

	pack_ptr = (packet_header *)New_pack.elements[0].buf;

	/* ### Pack, this has to move down to network.c
	 * if( pack_ptr->data_len +sizeof(packet_header) != received_bytes )
	 * {
	 *	Alarm( PRINT, "Prot_handle_bcast: received %d, should be %d\n",
	 *	received_bytes, pack_ptr->data_len+sizeof(packet_header) );
	 * 	return; 
	 * }
	 */

	if( Is_status( pack_ptr->type ) )
	{
		Stat_handle_message( &New_pack );
		return;
	}

	if( Is_fc( pack_ptr->type ) )
	{
		FC_handle_message( &New_pack );
		return;
	}

	/* delete random 
	r1 = ((-My.id)%17)+3;
	r2 = get_rand() % (r1+3 );
	if ( r2 == 0 ) return; */

	if( Is_membership( pack_ptr->type ) )
	{
		Memb_handle_message( &New_pack );
		return;
	}
	if( Is_hurry( pack_ptr->type ) )
	{
		Handle_hurry( pack_ptr );
		return;
	}
	if( !Is_regular( pack_ptr->type ) )
	{
		Alarm( PROTOCOL, "Prot_handle_bcast: Unknown packet type %d\n",
			pack_ptr->type );
		return;
	}
	if( ! Memb_is_equal( Memb_id(), pack_ptr->memb_id ) )
	{
		/* Foreign message */
		Memb_handle_message( &New_pack );
		return;
	}
        if (Memb_token_alive() ) {
                E_queue( Memb_token_loss, 0, NULL, Token_timeout );
                if( Conf_leader( Memb_active_ptr() ) == My.id ) 
                {
                        E_queue( Prot_token_hurry, 0, NULL, Hurry_timeout );
                }
        }

	/* ### Pack: next 70 lines (almost till the end of the routine) have changed */
        Buffered_packets[0].head = pack_ptr;
        Buffered_packets[0].body = (packet_body *)New_pack.elements[1].buf;
	received_bytes -= sizeof(packet_header);
        total_bytes_processed = pack_ptr->data_len;
        /* ignore any alignment padding */
        switch(total_bytes_processed % 4)
        {
        case 1:
                total_bytes_processed++;
        case 2:
                total_bytes_processed++;
        case 3:
                total_bytes_processed++;
        case 0:
                /* already aligned */
                break;
        }
        for( i = 1; received_bytes > total_bytes_processed; i++ )
        {                
        	/* copy into each of the elements after the first element*/
                Buffered_packets[i].head = (packet_header *)new(PACK_HEAD_OBJ);
                Buffered_packets[i].body = (packet_body *)new(PACKET_BODY);
                if (Buffered_packets[i].head == NULL) 
                        Alarm(EXIT, "Prot_handle_bcast: Memory allocation failed for PACK_HEAD_OBJ\n");
                if (Buffered_packets[i].body == NULL) 
                        Alarm(EXIT, "Prot_handle_bcast: Memory allocation failed for PACKET_BODY\n");
                        
                pack_ptr = (packet_header *)&New_pack.elements[1].buf[total_bytes_processed];
                memcpy( Buffered_packets[i].head, pack_ptr, sizeof( packet_header ) );
                total_bytes_processed += sizeof(packet_header);
                memcpy( Buffered_packets[i].body, &New_pack.elements[1].buf[total_bytes_processed], pack_ptr->data_len);
                total_bytes_processed += pack_ptr->data_len;
                /* ignore any alignment padding */
                switch(total_bytes_processed % 4)
                {
                case 1:
                        total_bytes_processed++;
                case 2:
                        total_bytes_processed++;
                case 3:
                        total_bytes_processed++;
                case 0:
                        /* already aligned */
                        break;
                }
        }
        num_buffered_packets = i;

        for( i = 0; i < num_buffered_packets; i++)
        {
                pack_ptr = Buffered_packets[i].head;
                 
                /* do we have this packet */
                if( pack_ptr->seq <= Aru )
                {
                        Alarm( PROTOCOL, "Prot_handle_bcast: delayed packet %d already delivered (Aru %d)\n",
                               pack_ptr->seq, Aru );
                        dispose(Buffered_packets[i].head);
                        dispose(Buffered_packets[i].body);
                        continue;
                }
                pack_entry = pack_ptr->seq & PACKET_MASK;
                if( Packets[pack_entry].exist ) 
                {
                        Alarm( PROTOCOL, "Prot_handle_bcast: packet %d already exist\n",
                               pack_ptr->seq );
                        dispose(Buffered_packets[i].head);
                        dispose(Buffered_packets[i].body);
                        continue;
                }

                Packets[pack_entry].proc_index = Conf_proc_by_id( pack_ptr->proc_id, &p );
                if( Packets[pack_entry].proc_index < 0 )
                {
                        Alarm( PROTOCOL, "Prot_handle_bcast: unknown proc %d\n", pack_ptr->proc_id );
                        dispose(Buffered_packets[i].head);
                        dispose(Buffered_packets[i].body);
                        continue;
                }
                /* insert new packet */
                Packets[pack_entry].head  = pack_ptr;
                Packets[pack_entry].body  = Buffered_packets[i].body;
                Packets[pack_entry].exist = 1;

                /* update variables */
                if( Highest_seq < pack_ptr->seq ) Highest_seq = pack_ptr->seq;
                if( pack_ptr->seq == My_aru+1 )
                {
                        for( j=pack_ptr->seq; j <= Highest_seq; j++ )
                        {
                                if( ! Packets[j & PACKET_MASK].exist ) break;
                                My_aru++;
                        }
                        Deliver_agreed_packets();
                }else Deliver_reliable_packets( pack_ptr->seq, 1 );

                Alarm( PROTOCOL, "Prot_handle_bcast: packet %d inserted\n",
                       pack_ptr->seq );
        }      /* END OF LOOP */

        GlobalStatus.packet_recv++;
	GlobalStatus.my_aru = My_aru;
	GlobalStatus.highest_seq = Highest_seq;

	/* prepare New_pack for next packet */
	New_pack.elements[0].buf = (char *) new(PACK_HEAD_OBJ);
	New_pack.elements[1].buf = (char *) new(PACKET_BODY);
}

void	Prot_handle_token(channel fd, int dummy, void *dummy_p)
{
	int		new_ptr;
	int		num_retrans, num_allowed, num_sent;
	int		flow_control;
	char		*new_rtr;
	ring_rtr	*ring_rtr_ptr;
	int32		rtr_proc_id;
	int16		rtr_seg_index;
	int32		val;
	int		retrans_allowed; /* how many of my retrans are allowed on token */
	int		i, ret;

	/*	int		r1,r2;*/

	ret = Net_recv_token( fd, &New_token );
	/* from another monitor component */
	if( ret == 0 ) return;

	/* delete random
	r1 = ((-My.id)%17)+3;
	r2 = get_rand() % (r1+3 );
	if ( r2 == 0 ) return; */

        Alarm( DEBUG, "Received Token\n");
	/* check if it is a regular token */
	if( Is_form( Token->type ) )
	{
                Alarm(PROTOCOL, "it is a Form Token.\n");
                Memb_handle_token( &New_token );
		return;
	}

	/* The Veto property for tokens - swallow this token */
	if( ! Memb_token_alive() ) {
                Alarm(PROTOCOL, "Prot_handle_token: Veto Property. Memb not alive.\n");
		return;
        }

	if( ret != sizeof(token_header) + Token->rtr_len )
	{
		Alarm( PRINT, 
		    "Prot_handle_token: recv token len is %d, should be %d\n",
		    ret,sizeof(token_header) + Token->rtr_len );
		return;
	}

	if( !Same_endian( Token->type ) ) 
		Flip_token_body( New_token.elements[1].buf, Token );

	if( Conf_leader( Memb_active_ptr() ) == My.id )
	{
		if( Get_arq(Token->type) != Get_arq(Last_token->type) )
		{
		    Alarm( PROTOCOL, 
			"Prot_handle_token: leader swallowing token %d %d %d\n", 
			 Get_arq(Token->type),Get_retrans(Token->type),Get_arq(Last_token->type) );
		    /* received double token - swallow it */
		    return; 
		}
	}else{
		if( Get_arq(Token->type) == Get_arq(Last_token->type) )
		{
		    if( Get_retrans(Token->type) > Get_retrans(Last_token->type) )
		    {
			val = Get_retrans(Token->type);
			Last_token->type = Set_retrans(Last_token->type,val);
			/* asked to send token again (almost lost) */
			Alarm( PROTOCOL,
			    "Prot_handle_token: not leader, asked to retrans %d %d\n",
				Get_arq(Token->type), val );
			Prot_token_hurry();
		    }else{
			Alarm( PROTOCOL,
			    "Prot_handle_token: not leader, swallow same token %d %d\n",
	   			Get_arq(Token->type), Get_retrans(Token->type) );
		    }
		    return;
		} else {
                        if ( Get_retrans(Token->type) > 0 ) {
                                GlobalStatus.token_hurry++;
                        }
                }
	}
	if( Highest_seq < Token->seq ) Highest_seq = Token->seq;
		
	/* Handle retransmissions */
	num_retrans = Answer_retrans( &new_ptr, &rtr_proc_id, &rtr_seg_index );
	GlobalStatus.retrans += num_retrans;

	new_rtr = New_token.elements[1].buf;

	/* Handle new packets */
	flow_control = (int) Token->flow_control;
	num_allowed = FC_allowed( flow_control, num_retrans );
	num_sent    = Send_new_packets( num_allowed );
	GlobalStatus.packet_sent += num_sent;

	/* Flow control calculations */
	Token->flow_control = Token->flow_control
				- Last_num_retrans - Last_num_sent 
				+ num_retrans      + num_sent;
	Last_num_retrans = num_retrans;
	Last_num_sent	 = num_sent;

	/* Prepare my retransmission requests */

	for( i = My_aru+1; i <= Highest_seq; i++ )
	{
		if( ! Packets[i & PACKET_MASK].exist ) break;
		My_aru++;
	}
	GlobalStatus.my_aru = My_aru;

	if( My_aru < Highest_seq )
	{
	    /* Compute how many of my retransmission requests are possible to fit */
	    retrans_allowed = ( sizeof( token_body ) - new_ptr - sizeof( ring_rtr ) ) / sizeof( int32 );
	    if( retrans_allowed > 1 )
	    {
	    	ring_rtr_ptr = (ring_rtr *)&new_rtr[new_ptr];
	    	ring_rtr_ptr->memb_id 	= Memb_id();
	    	ring_rtr_ptr->proc_id	= rtr_proc_id;
	    	ring_rtr_ptr->seg_index	= rtr_seg_index;
	    	ring_rtr_ptr->num_seq	= 0;
	    	new_ptr += sizeof(ring_rtr);
	    	for( i=My_aru+1; i <= Highest_seq && retrans_allowed > 0; i++, retrans_allowed-- )
	    	{
			if( ! Packets[i & PACKET_MASK].exist ) 
			{
				memcpy( &new_rtr[new_ptr], &i, sizeof(int32) ); 
				new_ptr += sizeof(int32);
				ring_rtr_ptr->num_seq++;
			}
	    	}
	    }
	}

	if( Memb_state() == EVS )
	{
		if( My_aru == Highest_seq )
		{
			My_aru = Last_seq;
			Memb_commit();
		}
	}

	Token->rtr_len = new_ptr;
	New_token.elements[1].len = new_ptr;

	/* Calculating Token->aru and Set_aru */
	if( ( Token->aru == Set_aru    ) ||
            ( Token->aru_last_id == My.id ) ||
	    ( Token->aru == Token->seq ) )
	{
		Token->aru = My_aru;
                Token->aru_last_id = My.id;
		if( My_aru < Highest_seq ) Set_aru = My_aru;
		else Set_aru = -1;
	}else if( Token->aru > My_aru ) {
		Token->aru = My_aru;
                Token->aru_last_id = My.id;
		Set_aru    = My_aru;
	}else{
		Set_aru    = -1;
	}
	
	Token->proc_id = My.id;
	if( Memb_state() != EVS ) Token->seq = Highest_seq;

	if( Conf_leader( Memb_active_ptr() ) == My.id )
	{
	    	val = Get_arq( Token->type );
		val = (val + 1)% 0x10;
		Token->type = Set_arq(     Token->type, val );
		Token->type = Set_retrans( Token->type, 0   );
	}

	/* Send token */
	if( ! ( Conf_leader( Memb_active_ptr() ) == My.id &&
		To_hold_token() ) )
	{
		/* sending token */
		Net_send_token( &New_token );
/* ### Bug fix for SGIs */
#ifdef	ARCH_SGI_IRIX
		Net_send_token( &New_token );
#endif  /* ARCH_SGI_IRIX */

		if( Wide_network && 
		    Conf_seg_last(Memb_active_ptr(), My.seg_index) == My.id )
		{
			/* sending again to another segment */
			Net_send_token( &New_token );
			E_delay( Wide_delay );
			Net_send_token( &New_token );
		}
		if( Get_retrans( Token->type ) > 1 )
		{
			/* problems */ 
			Net_send_token( &New_token );
			Net_send_token( &New_token );
		}
	}

	if( Conf_leader( Memb_active_ptr() ) == My.id ) 
		E_queue( Prot_token_hurry, 0, NULL, Hurry_timeout );

	E_queue( Memb_token_loss, 0, NULL, Token_timeout );


	/* calculating Aru */
	if( Token->aru > Last_token->aru )
		Aru = Last_token->aru;
	else
		Aru = Token->aru;
	if( Highest_seq == Aru ) Token_counter++;
	else Token_counter = 0;

	dispose( Last_token );
	Last_token = Token;
	New_token.elements[0].buf = (char *) new(TOKEN_HEAD_OBJ);
	New_token.elements[1].len = sizeof(token_body);
	Token = (token_header *)New_token.elements[0].buf;

	/* Deliver & discard packets */

	Discard_packets();
	Deliver_agreed_packets();
	Deliver_reliable_packets( Highest_seq-num_sent+1, num_sent );

	GlobalStatus.highest_seq = Highest_seq;
	GlobalStatus.aru = Aru;
	GlobalStatus.token_rounds++;
}

void	Prot_new_message( down_link *down_ptr, int not_used_in_spread3_p )
{
	int32	leader_id;

	if( Down_queue_ptr->num_mess > 0 )
	{
		down_ptr->next = NULL;
		Down_queue_ptr->last->next = down_ptr;
		Down_queue_ptr->last = down_ptr;
	}else if( Down_queue_ptr->num_mess == 0 ){
		Down_queue_ptr->first = down_ptr;
		Down_queue_ptr->last  = down_ptr;
	}else{
		Alarm( EXIT,"fast_spread_new_message: Down_queue_ptr->num_mess is %d\n",
			Down_queue_ptr->num_mess );
	}
	Down_queue_ptr->num_mess++;
	if( Down_queue_ptr->num_mess >= WATER_MARK ) 
		Sess_block_users_level();

	if( Down_queue_ptr->num_mess == 1  && Is_token_hold() )
	{
		leader_id = Conf_leader( Memb_active_ptr() );
		if( leader_id == My.id )
		{
			Handle_hurry( Hurry_head );
		}else{
			Net_ucast( leader_id, &Hurry_pack );
		}
	}
}

/* ### Pack: this routine has changed */
static	int	Answer_retrans( int *ret_new_ptr, 
				int32 *proc_id, int16 *seg_index )
{
	int		num_retrans;
	char		*rtr;
	int		old_ptr,new_ptr;
	ring_rtr	*ring_rtr_ptr;
	int		pack_entry;
	int		bytes_to_copy;
	packet_header	*pack_ptr;
	int		i, ret;
	int32		*req_seq;

        num_retrans     = 0;
        new_ptr         = 0;
        *proc_id     = My.id;
        *seg_index   = My.seg_index;
        if( Token->rtr_len > 0 )
        {
            rtr = New_token.elements[1].buf;
            old_ptr = 0;
            while( old_ptr < Token->rtr_len )
            {
                ring_rtr_ptr = (ring_rtr *)&rtr[old_ptr];
                if( Memb_is_equal(ring_rtr_ptr->memb_id,Memb_id() ) )
                {
                    /* retransmit requests from my ring */
                    old_ptr += sizeof(ring_rtr);
                    for( i=0; i < ring_rtr_ptr->num_seq; i++ )
                    {
			req_seq = (int32 *)&rtr[old_ptr];
                        old_ptr += sizeof(int32);
                        pack_entry = *req_seq & PACKET_MASK;
                        if( *req_seq < Aru ) 
                                Alarm( EXIT, 
                "Answer_retrans: retrans of %d requested while Aru is %d\n",
                                *req_seq,Aru );

                        if( Packets[pack_entry].exist )
                        {
                                pack_ptr = Packets[pack_entry].head;
                                Send_pack.elements[0].buf = 
                                    (char *)Packets[pack_entry].head;
                                Send_pack.elements[1].buf = 
                                    (char *)Packets[pack_entry].body;
                                Send_pack.elements[1].len = 
                                    pack_ptr->data_len; 

                                if( ring_rtr_ptr->proc_id != -1 )
                                {
                                    ret = Net_ucast ( ring_rtr_ptr->proc_id, &Send_pack );
				    GlobalStatus.u_retrans++;

                                    Alarm( PROTOCOL, 
        "Answer_retrans: retransmit to proc %d\n", ring_rtr_ptr->proc_id );
                                }else if( ring_rtr_ptr->seg_index != -1 ) {
                                    ret = Net_scast ( ring_rtr_ptr->seg_index, &Send_pack );
				    GlobalStatus.s_retrans++;

                                    Alarm( PROTOCOL, 
        "Answer_retrans: retransmit to seg %d\n", ring_rtr_ptr->seg_index );
                                }else{
#if 1
                                    ret = Net_queue_bcast ( &Send_pack );
#else
                                    ret = Net_bcast ( &Send_pack );
#endif
				    if( ret > 0 ) GlobalStatus.b_retrans++;

                                    Alarm( PROTOCOL, 
        "Answer_retrans: retransmit to all\n");
                                }
				if( ret > 0 )
				{
					if( Wide_network ) E_delay( Wide_delay );
					if( Wide_network && (num_retrans % 2 == 1 )) E_delay( Wide_delay ); 
                                	num_retrans++;
				}
                        }else{
                                *proc_id = -1;
                                if( ring_rtr_ptr->seg_index != My.seg_index )
                                        *seg_index = -1;
                        }
                    }
                }else{
                    /* copy requests of other rings */
                    bytes_to_copy = sizeof(ring_rtr) + 
                                ring_rtr_ptr->num_seq * sizeof(int32);

		    if( new_ptr != old_ptr )
                    	memmove( &rtr[new_ptr], &rtr[old_ptr], bytes_to_copy);

                    old_ptr += bytes_to_copy;
                    new_ptr += bytes_to_copy;

                    Alarm( PROTOCOL, "Prot_handle_token: Coping foreign rtr\n");
                }
            }
        }
	*ret_new_ptr = new_ptr;

	ret = Net_flush_bcast();
	if( ret > 0 )
	{
		GlobalStatus.b_retrans++;
		if( Wide_network ) E_delay( Wide_delay );
		if( Wide_network && (num_retrans % 2 == 1 )) E_delay( Wide_delay ); 
		num_retrans++;
	}
	return (num_retrans);
}

/* ### Pack: this routine has changed */
static	int	Send_new_packets( int num_allowed )
{
	packet_header	*pack_ptr;
	scatter	        *scat_ptr;
	int		pack_entry;
	int		num_sent;
	int		ret;

	num_sent = 0;
        while( num_sent < num_allowed )
        {
		/* check if down queue is empty */
		if( Down_queue_ptr->num_mess == 0 ) break;

		/* initialize packet_header */
                pack_ptr =  new(PACK_HEAD_OBJ);

		scat_ptr = Down_queue_ptr->first->mess;

                pack_ptr->type = Down_queue_ptr->first->type;
                pack_ptr->proc_id = My.id;
                pack_ptr->memb_id = Memb_id();
                pack_ptr->seq = Highest_seq+1;
                Highest_seq++;
                pack_ptr->fifo_seq = Highest_fifo_seq+1;
		Highest_fifo_seq++;
                pack_ptr->data_len = scat_ptr->elements[
				Down_queue_ptr->cur_element].len;

                Send_pack.elements[1].buf = scat_ptr->elements[
					Down_queue_ptr->cur_element].buf;

		Down_queue_ptr->cur_element++;
		if( Down_queue_ptr->cur_element < scat_ptr->num_elements )
		{
			/* not last packet in message */
			pack_ptr->packet_index = Down_queue_ptr->cur_element;
		}else if( Down_queue_ptr->cur_element == scat_ptr->num_elements ){
			down_link	*tmp_down;

			/* last packet in message */
			pack_ptr->packet_index = -scat_ptr->num_elements;

			tmp_down = Down_queue_ptr->first;
			Down_queue_ptr->first = Down_queue_ptr->first->next;
			Down_queue_ptr->cur_element = 0;
			Down_queue_ptr->num_mess--;
			dispose( tmp_down->mess );
			dispose( tmp_down );
			if( Down_queue_ptr->num_mess < WATER_MARK ) 
				Sess_unblock_users_level();
		}else{
			Alarm( EXIT, 
                          "Send_new_packets: error in packet index: %d %d\n",
			  Down_queue_ptr->cur_element,scat_ptr->num_elements );
		}

                Send_pack.elements[0].buf = (char *) pack_ptr;
                Send_pack.elements[1].len = pack_ptr->data_len;

#if 1
                ret = Net_queue_bcast( &Send_pack );
#else
                ret = Net_bcast( &Send_pack );
#endif
		if( ret > 0 )
		{
			if( Wide_network ) E_delay( Wide_delay ); 
			if( Wide_network && (num_sent % 2 == 1 )) E_delay( Wide_delay ); 
			num_sent++;
		}

                pack_entry = pack_ptr->seq & PACKET_MASK;
                if( Packets[pack_entry].exist ) 
                    Alarm( EXIT, 
                        "Send_new_packets: created packet %d already exist %d\n",
                        pack_ptr->seq, Packets[pack_entry].exist );

                /* insert new created packet */
                Packets[pack_entry].head       = pack_ptr;
                Packets[pack_entry].body       = (packet_body *)Send_pack.elements[1].buf;
                Packets[pack_entry].exist      = 1;
                Packets[pack_entry].proc_index = My_index;
                Alarm( PROTOCOL, 
			"Send_new_packets: packet %d sent and inserted \n",
			 pack_ptr->seq );
        }
	ret = Net_flush_bcast();
	if( ret > 0 )
	{
		if( Wide_network ) E_delay( Wide_delay ); 
		if( Wide_network && (num_sent % 2 == 1 )) E_delay( Wide_delay ); 
		num_sent++;
	}
	return ( num_sent );
}
 
static	void	Deliver_packet( int pack_entry, int to_copy )
{	
	int		proc_index;
	up_queue	*up_ptr;
	packet_header	*pack_ptr;
	message_link	*mess_link;
	int		index;

	pack_ptr = Packets[pack_entry].head;

	if( Is_reliable( pack_ptr->type ) &&
	    pack_ptr->packet_index == -1 ) 
	{
		/*
		 * for reliable single-packets message : deliver regardless
		 * of what is already in the queue.
		 */
		proc_index = MAX_PROCS_RING;
	}else{
		proc_index = Packets[pack_entry].proc_index;
	}

	up_ptr = &Up_queue[proc_index];

	if( up_ptr->exist == 0 )
	{ 
		/* no message for proc - need to create one */
		up_ptr->mess = new( SCATTER );
		up_ptr->mess->num_elements = 0;
		up_ptr->exist = 1;
	}

	/* validity check */
	index = pack_ptr->packet_index;
	if( index < 0 ) index = -index;
	if( up_ptr->mess->num_elements+1 != index )
	{
		Alarm( EXIT, "Deliver_packet: sequence error: sec is %d, should be %d\n",
			pack_ptr->packet_index,
			up_ptr->mess->num_elements+1 );
	}

	/* chain this packet */
	up_ptr->mess->num_elements++;
	up_ptr->mess->elements[index-1].len = Packets[pack_entry].head->data_len;
	up_ptr->mess->elements[index-1].buf = (char *)Packets[pack_entry].body;
	if( to_copy)
	{
		/* 
		 * copy the packet.
		 *
		 * Note, that the original packet space was delivered
		 * and a new space was left in Packets. This is done to
		 * guarantee that in case of a configuration change, the
		 * original packet space in down_queue will be the same as
		 * the one in the up_queue for messages that were partially sent.
		 */
		Packets[pack_entry].body = new(PACKET_BODY);
		memcpy( Packets[pack_entry].body, up_ptr->mess->elements[index-1].buf, 
			Packets[pack_entry].head->data_len );
	}
	Packets[pack_entry].exist = 2;

	GlobalStatus.packet_delivered++;

	if( pack_ptr->packet_index < 0 )
	{
		/* end of message */
		/* Push up big_scatter. i.e. up_ptr->mess */
		mess_link = new(MESSAGE_LINK);
		mess_link->mess = up_ptr->mess;
		up_ptr->exist = 0;
		Sess_deliver_message( mess_link );
	}
}

static	void	Deliver_reliable_packets( int32 start_seq, int num_packets )
{
	int		pack_entry;
	int		end_seq;
	int		i;

	if( Memb_state() == EVS ) return;
	end_seq = start_seq+num_packets-1;
	if( start_seq <= Last_delivered ) start_seq = Last_delivered + 1;
	for( i = start_seq; i <= end_seq  ; i++ )
	{
		pack_entry = i & PACKET_MASK;

		if( Packets[pack_entry].exist == 1 )
		{
			if( Is_reliable( Packets[pack_entry].head->type ) &&
			    Packets[pack_entry].head->packet_index == -1 )
			{
				Deliver_packet( pack_entry, 1 );
				Alarm( PROTOCOL, "Deliver_reliable_packets: packet %d was delivered\n", i );
			}
		}
	}
}

static	void	Deliver_agreed_packets()
{
	/* deliver all non-safe packets that are ordered and not delivered */

	int		pack_entry;
	int		i;

	if( My_aru <= Last_delivered ) return;
	if( Memb_state() == EVS ) return;

	for( i = Last_delivered+1; i <= My_aru; i++ )
	{
		pack_entry = i & PACKET_MASK;

		if( Packets[pack_entry].exist == 1 ) 
		{
			if( !Is_safe( Packets[pack_entry].head->type ) )
			{
				Deliver_packet( pack_entry, 1 );
				Alarm( PROTOCOL, "Deliver_agreed_packets: packet %d was delivered\n", i );
				Last_delivered++;
			}else return;
		}else if( Packets[pack_entry].exist == 2 ){
			/* This is possible only for reliable delivery prior to agreed */
			Last_delivered++;
		}else Alarm( EXIT, "Deliver_agreed_packets: Error, exist is %d\n", Packets[pack_entry].exist );
	}

}

void	Discard_packets()
{
	int		pack_entry;
	packet_body	*body_ptr;
	up_queue	*up_ptr;
	int		proc_index;
	int		i;

    if( Aru <= Last_discarded ) return;
    if( Memb_state() == EVS )
    {
	int		found_hole;
	membership_id	reg_memb_id;

	if( Aru != Last_seq ) return;

        /* Deliver packets that must be delivered before the transitional signal.
         * Those up to the Aru for my old ring were delivered in Read_form2().
         * So, it remains to deliver all packets up to the first hole or the first
         * SAFE message. */
        Alarmp( SPLOG_INFO, PROTOCOL,
                "Discard_packets: delivering messages after old ring Aru before transitional\n" );

        for( i = Last_discarded+1; i <= Highest_seq; i++ )
        {
            pack_entry = i & PACKET_MASK;
	    if( ! Packets[pack_entry].exist )
		Alarmp( SPLOG_FATAL, PROTOCOL, "Discard_packets: (EVS before transitional) packet %d not exist\n", i);
	    if( Packets[pack_entry].exist == 3 )
	    {
		Alarmp( SPLOG_INFO, PROTOCOL, "Discard_packets: Found first Hole in %d\n", i);
                break;
	    }
            if( Is_safe( Packets[pack_entry].head->type ) ) {
                Alarmp( SPLOG_INFO, PROTOCOL, "Discard_packets: Found first SAFE message in %d", i);
                break;
            }
            /* should deliver packet or dispose the body if it was delivered already */
            if( Packets[pack_entry].exist == 1 ){
                Deliver_packet( pack_entry, 0 );
            } else {
                dispose( Packets[pack_entry].body );
            }
            /* dispose packet header in any case */
            dispose( Packets[pack_entry].head );
            Alarmp( SPLOG_INFO, PROTOCOL, "Discard_packets: delivering %d in EVS\n",i);
	    Packets[pack_entry].exist = 0;
            Last_discarded = i;
        }

	/* calculate and deliver transitional membership */
        Alarmp( SPLOG_INFO, PROTOCOL, "Discard_packets: Delivering transitional membership\n" );
	Memb_transitional();
	Sess_deliver_trans_memb( Trans_membership, Memb_trans_id() );

	/* deliver all remaining packets for EVS */
	found_hole = 0;
	for( i = Last_discarded+1; i <= Highest_seq; i++ )
	{
	    pack_entry = i & PACKET_MASK;
	    if( ! Packets[pack_entry].exist )
		Alarm( EXIT, "Discard_packets: (EVS after transitional) packet %d not exist\n", i);
	    if( Packets[pack_entry].exist == 3 )
	    {
		/* 
		 * There is a hole! 
		 * from here, we need to check if the proc_id of the packet
		 * is in commited membership. 
		 */
		found_hole = 1;
		Alarm( PROTOCOL, "Discard_packets: Found a Hole in %d \n",i);

	    }else if( (!found_hole) || 
   (Conf_id_in_conf( &Commit_membership, Packets[pack_entry].head->proc_id ) != -1) ){
		/* should deliver packet or dispose the body if it was delivered already */
		if( Packets[pack_entry].exist == 1 ){
			 Deliver_packet( pack_entry, 0 );
		}else{
			 dispose( Packets[pack_entry].body );
		}
		/* dispose packet header in any case */
		dispose( Packets[pack_entry].head );
		Alarm( PROTOCOL, "Discard_packets: delivering %d in EVS\n",i);
	    }else{
		/* should not deliver packet */
		dispose( Packets[pack_entry].head );
		dispose(   Packets[pack_entry].body );
		Alarm( PROTOCOL, "Discard_packets: Due to hole, not delivering %d \n",i);
	    }
	    Packets[pack_entry].exist = 0;
	}

	/* check up_queue and down_queue */
	if( Down_queue_ptr->num_mess > 0 )
	{
		Down_queue_ptr->cur_element = 0;
	}

	for( proc_index=0; proc_index < MAX_PROCS_RING; proc_index++ )
	{
		if( Up_queue[proc_index].exist )
		{
			if( proc_index != My_index )
			{
			    /* 
			     * dispose only packets that are not mine 
			     * my packets will stay in Down_queue if the message is not
			     * ready to be delivered (because not fully sent yet) 
			     * so we need not to dispose them! 
			     */
			    up_ptr = &Up_queue[proc_index];
			    for( i=0; i < up_ptr->mess->num_elements; i++ )
			    {
				body_ptr = (packet_body *)up_ptr->mess->elements[i].buf;
				dispose( body_ptr );
			    }
			}
			dispose( Up_queue[proc_index].mess );
			Up_queue[proc_index].exist = 0;
		}
	}

	/* calculate and deliver regular membership */
	Memb_regular();
	Log_membership();
	reg_memb_id = Memb_id();
	Sess_deliver_reg_memb( Reg_membership, reg_memb_id );


	/* set variables for next membership */
	Last_token->aru	 = 0;
	Highest_seq 	 = 0;
	Highest_fifo_seq = 0;
	My_aru	    	 = 0;
	Aru		 = 0;
	Set_aru		 = -1;
	Last_discarded	 = 0;
	Last_delivered	 = 0;
	GlobalStatus.my_aru	 = My_aru;

	Token_counter 	= 0;

    }else{

	for( i = Last_discarded+1; i <= Aru; i++ )
	{
	    pack_entry = i & PACKET_MASK;
	    if( ! Packets[pack_entry].exist )
		Alarm( EXIT, "Discard_packets: (NOT EVS) packet %d not exist\n",i);

	    /* should deliver packet or dispose the body if it was delivered already */
	    if( Packets[pack_entry].exist == 1 ) Deliver_packet( pack_entry, 0 );
	    else dispose( Packets[pack_entry].body );
	    /* dispose packet header in any case */
	    dispose( Packets[pack_entry].head );
	    Packets[pack_entry].exist = 0;
	}
	Alarm( PROTOCOL, "Discard_packets: packets %d-%d were discarded\n",
			Last_discarded+1, Aru );

	Last_discarded = Aru;
	if( Last_delivered < Last_discarded ) Last_delivered = Last_discarded;
    }
}

static	int	Is_token_hold()
{
	if( ( Memb_state() == OP || 
	      ( Memb_state() == GATHER && Memb_token_alive() ) )&&
	    Get_retrans(Last_token->type) <= 1			&&
	    Aru == Highest_seq && Token_counter > 1 ) return ( 1 );
	else return( 0 );
}

static	int	To_hold_token()
{
	if( ( Memb_state() == OP || 
	      ( Memb_state() == GATHER && Memb_token_alive() ) )&&
	    Get_retrans(Last_token->type) <= 1			&&
	    Aru == Highest_seq && Token_counter > 1 ) return ( 1 );
	else return( 0 );
}

static	void	Handle_hurry( packet_header *pack_ptr )
{
	if( Conf_leader( Memb_active_ptr() ) == My.id &&
	    Is_token_hold() ) 
	{
	    if( Conf_id_in_conf( Memb_active_ptr(), pack_ptr->proc_id ) != -1 )
	    {
		Alarm( PROTOCOL, "Handle_hurry: sending token now\n");
		Prot_token_hurry();
	    }
	}
}

void	Prot_token_hurry()
{
	/* asked to send token again (almost lost) */
			
	sys_scatter	retrans_token;
	int32		val;

	retrans_token.num_elements = 1;
	retrans_token.elements[0].len = sizeof(token_header);
	retrans_token.elements[0].buf = (char *)Last_token;
	Last_token->rtr_len=0;
	if( Conf_leader( Memb_active_ptr() ) == My.id ) 
	{
		val = Get_retrans(Last_token->type);
		val++;
		Last_token->type = Set_retrans( Last_token->type, val );
		E_queue( Prot_token_hurry, 0, NULL, Hurry_timeout );
		GlobalStatus.token_hurry++;
	}
	/* sending token */
	Net_send_token( &retrans_token );
	if( Wide_network && 
	    Conf_seg_last(Memb_active_ptr(), My.seg_index) == My.id )
	{
		/* sending again to another segment */
		Net_send_token( &retrans_token );
	}
	if( Get_retrans( Last_token->type ) > 1 )
	{
		/* problems */ 
		Net_send_token( &retrans_token );
		Net_send_token( &retrans_token );
	}

	Alarm( PROTOCOL, "Prot_token_hurry: retransmiting token %d %d\n",
		Get_arq(Last_token->type), Get_retrans(Last_token->type) );
}

void	Flip_token_body( char *buf, token_header *token_ptr )
{
/*
 * This routine can not be called twice for the same buffer because
 * of ring_rtr_ptr->num_seq.
 */
	ring_rtr	*ring_rtr_ptr;
	int32		*req_seq;
	char		*rtr;
	int		ptr;
	int		i;

	if( token_ptr->rtr_len <= 0 ) return;

	rtr = buf;
	ptr = 0;
	while( ptr < token_ptr->rtr_len )
	{
	    ring_rtr_ptr = (ring_rtr *)&rtr[ptr];

	    ring_rtr_ptr->memb_id.proc_id = Flip_int32( ring_rtr_ptr->memb_id.proc_id );
	    ring_rtr_ptr->memb_id.time    = Flip_int32( ring_rtr_ptr->memb_id.time );
	    ring_rtr_ptr->proc_id	  = Flip_int32( ring_rtr_ptr->proc_id );
	    ring_rtr_ptr->seg_index	  = Flip_int16( ring_rtr_ptr->seg_index );
	    ring_rtr_ptr->num_seq	  = Flip_int16( ring_rtr_ptr->num_seq );

	    ptr += sizeof(ring_rtr);
	    for( i=0; i < ring_rtr_ptr->num_seq; i++ )
	    {
		req_seq = (int32 *)&rtr[ptr];
		*req_seq = Flip_int32( *req_seq );
		ptr += sizeof(int32);
	    }	
	}
}


syntax highlighted by Code2HTML, v. 0.9.1