/*
* The Spread Toolkit.
*
* The contents of this file are subject to the Spread Open-Source
* License, Version 1.0 (the ``License''); you may not use
* this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.spread.org/license/
*
* or in the file ``license.txt'' found in this distribution.
*
* Software distributed under the License is distributed on an AS IS basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Creators of Spread are:
* Yair Amir, Michal Miskin-Amir, Jonathan Stanton.
*
* Copyright (C) 1993-2004 Spread Concepts LLC <spread@spreadconcepts.com>
*
* All Rights Reserved.
*
* Major Contributor(s):
* ---------------
* Cristina Nita-Rotaru crisn@cs.purdue.edu - group communication security.
* Theo Schlossnagle jesus@omniti.com - Perl, skiplists, autoconf.
* Dan Schoenblum dansch@cnds.jhu.edu - Java interface.
* John Schultz jschultz@cnds.jhu.edu - contribution to process group membership.
*
*/
#define ext_prot_body
#include <string.h>
#include "prot_body.h"
#include "spread_params.h"
#include "membership.h"
#include "flow_control.h"
#include "sp_events.h"
#include "session.h"
#include "network.h"
#include "net_types.h"
#include "status.h"
#include "log.h"
#include "objects.h"
#include "memory.h"
#include "alarm.h"
#include "sess_types.h" /* for message_header */
/* Prot variables */
static proc My;
static int My_index;
static int32 Set_aru;
static int Token_counter;
/* Used ONLY in Prot_handle_bcast, inited in Prot_init */
static sys_scatter New_pack;
/* Used ONLY in Prot_handle_token and grurot, inited in Prot_init */
static sys_scatter New_token;
static token_header *Token;
static sys_scatter Send_pack;
static packet_header *Hurry_head;
static sys_scatter Hurry_pack;
static sp_time Wide_delay = { 0, 10000 };
/* ### Pack: 1 line */
static packet_info Buffered_packets[ARCH_SCATTER_SIZE];
static down_queue Protocol_down_queue[2]; /* only used in spread3 */
static void Prot_handle_bcast();
static void Prot_handle_token();
static int Answer_retrans( int *ret_new_ptr, int32 *proc_id, int16 *seg_index );
static int Send_new_packets( int num_allowed );
static int Is_token_hold();
static int To_hold_token();
static void Handle_hurry( packet_header *pack_ptr );
static void Deliver_packet( int pack_entry, int to_copy );
static void Flip_token_body( char *buf, token_header *token_ptr );
static void Deliver_reliable_packets( int32 start_seq, int num_packets );
static void Deliver_agreed_packets();
void Prot_init(void)
{
int i, num_bcast, num_token;
channel *bcast_channels;
channel *token_channels;
Mem_init_object( PACK_HEAD_OBJ, sizeof( packet_header ), MAX_PACKETS_IN_STRUCT, 0 );
Mem_init_object( PACKET_BODY, sizeof( packet_body ), MAX_PACKETS_IN_STRUCT, 0 );
Mem_init_object( TOKEN_HEAD_OBJ, sizeof( token_header ), 10, 0 );
Mem_init_object( TOKEN_BODY_OBJ, sizeof( token_body ), 10, 0 );
Mem_init_object( SCATTER, sizeof( scatter ), 200+MAX_PROCS_RING, 0 );
My = Conf_my();
My_index = Conf_proc_by_id( My.id, &My );
GlobalStatus.my_id = My.id;
GlobalStatus.packet_delivered = 0;
for( i=0; i < MAX_PROCS_RING+1; i++ )
Up_queue[i].exist = 0;
for( i=0; i < MAX_PACKETS_IN_STRUCT; i++ )
Packets[i].exist = 0;
Highest_seq = 0;
Highest_fifo_seq = 0;
My_aru = 0;
Aru = 0;
Set_aru = -1;
Last_discarded = 0;
Last_delivered = 0;
New_pack.num_elements = 2;
New_pack.elements[0].len = sizeof(packet_header);
New_pack.elements[0].buf = (char *) new(PACK_HEAD_OBJ);
New_pack.elements[1].len = sizeof(packet_body);
New_pack.elements[1].buf = (char *) new(PACKET_BODY);
New_token.num_elements = 2;
New_token.elements[0].len = sizeof(token_header);
New_token.elements[0].buf = (char *) new(TOKEN_HEAD_OBJ);
New_token.elements[1].len = sizeof(token_body);
New_token.elements[1].buf = (char *) new(TOKEN_BODY_OBJ);
Send_pack.num_elements = 2;
Send_pack.elements[0].len = sizeof(packet_header);
Token = (token_header *)New_token.elements[0].buf;
Last_token = new(TOKEN_HEAD_OBJ);
Last_token->type = 0;
Last_token->seq = 0;
Last_token->aru = 0;
Last_token->flow_control = 0;
Hurry_pack.num_elements = 1;
Hurry_pack.elements[0].len = sizeof(packet_header);
Hurry_pack.elements[0].buf = (char *) new(PACKET_BODY);
Hurry_head = (packet_header *)Hurry_pack.elements[0].buf;
Hurry_head->proc_id = My.id;
Hurry_head->type = HURRY_TYPE;
Net_init();
bcast_channels = Net_bcast_channel();
token_channels = Net_token_channel();
Net_num_channels( &num_bcast, &num_token);
for ( i = 0; i < num_bcast; i++) {
E_attach_fd( *bcast_channels, READ_FD, Prot_handle_bcast, 0, NULL, HIGH_PRIORITY );
bcast_channels++;
}
for ( i = 0; i < num_token; i++) {
E_attach_fd( *token_channels, READ_FD, Prot_handle_token, 0, NULL, MEDIUM_PRIORITY );
token_channels++;
}
FC_init( );
Memb_init();
Net_set_membership( Reg_membership );
}
void Prot_init_down_queues(void)
{
Protocol_down_queue[NORMAL_DOWNQUEUE].num_mess = 0 ;
Protocol_down_queue[NORMAL_DOWNQUEUE].cur_element = 0;
Protocol_down_queue[GROUPS_DOWNQUEUE].num_mess = 0 ;
Protocol_down_queue[GROUPS_DOWNQUEUE].cur_element = 0;
}
void Prot_set_down_queue( int queue_type )
{
switch(queue_type) {
case NORMAL_DOWNQUEUE:
Down_queue_ptr = &Protocol_down_queue[NORMAL_DOWNQUEUE];
break;
case GROUPS_DOWNQUEUE:
Down_queue_ptr = &Protocol_down_queue[GROUPS_DOWNQUEUE];
break;
default:
Alarm(EXIT, "Prot_set_down_queue: Illegal queue_type (%d)\n", queue_type);
}
}
void Prot_Create_Local_Session(session *new_sess)
{
/* Nothing to do for Spread3 */
return;
}
void Prot_Destroy_Local_Session(session *old_sess)
{
/* Nothing to do for Spread3 */
return;
}
void Prot_kill_session(message_obj *msg)
{
/* Nothing to do for Spread3 */
return;
}
down_link *Prot_Create_Down_Link(message_obj *msg, int type, int mbox, int cur_element)
{
down_link *down_ptr;
message_header *head_ptr;
if ((down_ptr = new( DOWN_LINK )) == NULL )
{
Alarm(EXIT, "Prot_Create_Down_Link: Failure to allocate a Down_link\n");
}
if ( -1 == (down_ptr->type = type) )
{
head_ptr = (message_header *)msg->elements[0].buf;
dispose( down_ptr );
Alarm( PROTOCOL, "Prot_Create_Down_Link: Illegal message type %d\n", head_ptr->type );
return(NULL);
}
return(down_ptr);
}
static void Prot_handle_bcast(channel fd, int dummy, void *dummy_p)
{
packet_header *pack_ptr;
int pack_entry;
proc p;
int received_bytes;
int total_bytes_processed;
int num_buffered_packets;
int i;
int32 j;
/* int r1,r2; */
received_bytes = Net_recv( fd, &New_pack );
/* My own packet or from another monitor component */
if( received_bytes == 0 ) return;
/* problem in receiving */
if( received_bytes < 0 ) return;
pack_ptr = (packet_header *)New_pack.elements[0].buf;
/* ### Pack, this has to move down to network.c
* if( pack_ptr->data_len +sizeof(packet_header) != received_bytes )
* {
* Alarm( PRINT, "Prot_handle_bcast: received %d, should be %d\n",
* received_bytes, pack_ptr->data_len+sizeof(packet_header) );
* return;
* }
*/
if( Is_status( pack_ptr->type ) )
{
Stat_handle_message( &New_pack );
return;
}
if( Is_fc( pack_ptr->type ) )
{
FC_handle_message( &New_pack );
return;
}
/* delete random
r1 = ((-My.id)%17)+3;
r2 = get_rand() % (r1+3 );
if ( r2 == 0 ) return; */
if( Is_membership( pack_ptr->type ) )
{
Memb_handle_message( &New_pack );
return;
}
if( Is_hurry( pack_ptr->type ) )
{
Handle_hurry( pack_ptr );
return;
}
if( !Is_regular( pack_ptr->type ) )
{
Alarm( PROTOCOL, "Prot_handle_bcast: Unknown packet type %d\n",
pack_ptr->type );
return;
}
if( ! Memb_is_equal( Memb_id(), pack_ptr->memb_id ) )
{
/* Foreign message */
Memb_handle_message( &New_pack );
return;
}
if (Memb_token_alive() ) {
E_queue( Memb_token_loss, 0, NULL, Token_timeout );
if( Conf_leader( Memb_active_ptr() ) == My.id )
{
E_queue( Prot_token_hurry, 0, NULL, Hurry_timeout );
}
}
/* ### Pack: next 70 lines (almost till the end of the routine) have changed */
Buffered_packets[0].head = pack_ptr;
Buffered_packets[0].body = (packet_body *)New_pack.elements[1].buf;
received_bytes -= sizeof(packet_header);
total_bytes_processed = pack_ptr->data_len;
/* ignore any alignment padding */
switch(total_bytes_processed % 4)
{
case 1:
total_bytes_processed++;
case 2:
total_bytes_processed++;
case 3:
total_bytes_processed++;
case 0:
/* already aligned */
break;
}
for( i = 1; received_bytes > total_bytes_processed; i++ )
{
/* copy into each of the elements after the first element*/
Buffered_packets[i].head = (packet_header *)new(PACK_HEAD_OBJ);
Buffered_packets[i].body = (packet_body *)new(PACKET_BODY);
if (Buffered_packets[i].head == NULL)
Alarm(EXIT, "Prot_handle_bcast: Memory allocation failed for PACK_HEAD_OBJ\n");
if (Buffered_packets[i].body == NULL)
Alarm(EXIT, "Prot_handle_bcast: Memory allocation failed for PACKET_BODY\n");
pack_ptr = (packet_header *)&New_pack.elements[1].buf[total_bytes_processed];
memcpy( Buffered_packets[i].head, pack_ptr, sizeof( packet_header ) );
total_bytes_processed += sizeof(packet_header);
memcpy( Buffered_packets[i].body, &New_pack.elements[1].buf[total_bytes_processed], pack_ptr->data_len);
total_bytes_processed += pack_ptr->data_len;
/* ignore any alignment padding */
switch(total_bytes_processed % 4)
{
case 1:
total_bytes_processed++;
case 2:
total_bytes_processed++;
case 3:
total_bytes_processed++;
case 0:
/* already aligned */
break;
}
}
num_buffered_packets = i;
for( i = 0; i < num_buffered_packets; i++)
{
pack_ptr = Buffered_packets[i].head;
/* do we have this packet */
if( pack_ptr->seq <= Aru )
{
Alarm( PROTOCOL, "Prot_handle_bcast: delayed packet %d already delivered (Aru %d)\n",
pack_ptr->seq, Aru );
dispose(Buffered_packets[i].head);
dispose(Buffered_packets[i].body);
continue;
}
pack_entry = pack_ptr->seq & PACKET_MASK;
if( Packets[pack_entry].exist )
{
Alarm( PROTOCOL, "Prot_handle_bcast: packet %d already exist\n",
pack_ptr->seq );
dispose(Buffered_packets[i].head);
dispose(Buffered_packets[i].body);
continue;
}
Packets[pack_entry].proc_index = Conf_proc_by_id( pack_ptr->proc_id, &p );
if( Packets[pack_entry].proc_index < 0 )
{
Alarm( PROTOCOL, "Prot_handle_bcast: unknown proc %d\n", pack_ptr->proc_id );
dispose(Buffered_packets[i].head);
dispose(Buffered_packets[i].body);
continue;
}
/* insert new packet */
Packets[pack_entry].head = pack_ptr;
Packets[pack_entry].body = Buffered_packets[i].body;
Packets[pack_entry].exist = 1;
/* update variables */
if( Highest_seq < pack_ptr->seq ) Highest_seq = pack_ptr->seq;
if( pack_ptr->seq == My_aru+1 )
{
for( j=pack_ptr->seq; j <= Highest_seq; j++ )
{
if( ! Packets[j & PACKET_MASK].exist ) break;
My_aru++;
}
Deliver_agreed_packets();
}else Deliver_reliable_packets( pack_ptr->seq, 1 );
Alarm( PROTOCOL, "Prot_handle_bcast: packet %d inserted\n",
pack_ptr->seq );
} /* END OF LOOP */
GlobalStatus.packet_recv++;
GlobalStatus.my_aru = My_aru;
GlobalStatus.highest_seq = Highest_seq;
/* prepare New_pack for next packet */
New_pack.elements[0].buf = (char *) new(PACK_HEAD_OBJ);
New_pack.elements[1].buf = (char *) new(PACKET_BODY);
}
void Prot_handle_token(channel fd, int dummy, void *dummy_p)
{
int new_ptr;
int num_retrans, num_allowed, num_sent;
int flow_control;
char *new_rtr;
ring_rtr *ring_rtr_ptr;
int32 rtr_proc_id;
int16 rtr_seg_index;
int32 val;
int retrans_allowed; /* how many of my retrans are allowed on token */
int i, ret;
/* int r1,r2;*/
ret = Net_recv_token( fd, &New_token );
/* from another monitor component */
if( ret == 0 ) return;
/* delete random
r1 = ((-My.id)%17)+3;
r2 = get_rand() % (r1+3 );
if ( r2 == 0 ) return; */
Alarm( DEBUG, "Received Token\n");
/* check if it is a regular token */
if( Is_form( Token->type ) )
{
Alarm(PROTOCOL, "it is a Form Token.\n");
Memb_handle_token( &New_token );
return;
}
/* The Veto property for tokens - swallow this token */
if( ! Memb_token_alive() ) {
Alarm(PROTOCOL, "Prot_handle_token: Veto Property. Memb not alive.\n");
return;
}
if( ret != sizeof(token_header) + Token->rtr_len )
{
Alarm( PRINT,
"Prot_handle_token: recv token len is %d, should be %d\n",
ret,sizeof(token_header) + Token->rtr_len );
return;
}
if( !Same_endian( Token->type ) )
Flip_token_body( New_token.elements[1].buf, Token );
if( Conf_leader( Memb_active_ptr() ) == My.id )
{
if( Get_arq(Token->type) != Get_arq(Last_token->type) )
{
Alarm( PROTOCOL,
"Prot_handle_token: leader swallowing token %d %d %d\n",
Get_arq(Token->type),Get_retrans(Token->type),Get_arq(Last_token->type) );
/* received double token - swallow it */
return;
}
}else{
if( Get_arq(Token->type) == Get_arq(Last_token->type) )
{
if( Get_retrans(Token->type) > Get_retrans(Last_token->type) )
{
val = Get_retrans(Token->type);
Last_token->type = Set_retrans(Last_token->type,val);
/* asked to send token again (almost lost) */
Alarm( PROTOCOL,
"Prot_handle_token: not leader, asked to retrans %d %d\n",
Get_arq(Token->type), val );
Prot_token_hurry();
}else{
Alarm( PROTOCOL,
"Prot_handle_token: not leader, swallow same token %d %d\n",
Get_arq(Token->type), Get_retrans(Token->type) );
}
return;
} else {
if ( Get_retrans(Token->type) > 0 ) {
GlobalStatus.token_hurry++;
}
}
}
if( Highest_seq < Token->seq ) Highest_seq = Token->seq;
/* Handle retransmissions */
num_retrans = Answer_retrans( &new_ptr, &rtr_proc_id, &rtr_seg_index );
GlobalStatus.retrans += num_retrans;
new_rtr = New_token.elements[1].buf;
/* Handle new packets */
flow_control = (int) Token->flow_control;
num_allowed = FC_allowed( flow_control, num_retrans );
num_sent = Send_new_packets( num_allowed );
GlobalStatus.packet_sent += num_sent;
/* Flow control calculations */
Token->flow_control = Token->flow_control
- Last_num_retrans - Last_num_sent
+ num_retrans + num_sent;
Last_num_retrans = num_retrans;
Last_num_sent = num_sent;
/* Prepare my retransmission requests */
for( i = My_aru+1; i <= Highest_seq; i++ )
{
if( ! Packets[i & PACKET_MASK].exist ) break;
My_aru++;
}
GlobalStatus.my_aru = My_aru;
if( My_aru < Highest_seq )
{
/* Compute how many of my retransmission requests are possible to fit */
retrans_allowed = ( sizeof( token_body ) - new_ptr - sizeof( ring_rtr ) ) / sizeof( int32 );
if( retrans_allowed > 1 )
{
ring_rtr_ptr = (ring_rtr *)&new_rtr[new_ptr];
ring_rtr_ptr->memb_id = Memb_id();
ring_rtr_ptr->proc_id = rtr_proc_id;
ring_rtr_ptr->seg_index = rtr_seg_index;
ring_rtr_ptr->num_seq = 0;
new_ptr += sizeof(ring_rtr);
for( i=My_aru+1; i <= Highest_seq && retrans_allowed > 0; i++, retrans_allowed-- )
{
if( ! Packets[i & PACKET_MASK].exist )
{
memcpy( &new_rtr[new_ptr], &i, sizeof(int32) );
new_ptr += sizeof(int32);
ring_rtr_ptr->num_seq++;
}
}
}
}
if( Memb_state() == EVS )
{
if( My_aru == Highest_seq )
{
My_aru = Last_seq;
Memb_commit();
}
}
Token->rtr_len = new_ptr;
New_token.elements[1].len = new_ptr;
/* Calculating Token->aru and Set_aru */
if( ( Token->aru == Set_aru ) ||
( Token->aru_last_id == My.id ) ||
( Token->aru == Token->seq ) )
{
Token->aru = My_aru;
Token->aru_last_id = My.id;
if( My_aru < Highest_seq ) Set_aru = My_aru;
else Set_aru = -1;
}else if( Token->aru > My_aru ) {
Token->aru = My_aru;
Token->aru_last_id = My.id;
Set_aru = My_aru;
}else{
Set_aru = -1;
}
Token->proc_id = My.id;
if( Memb_state() != EVS ) Token->seq = Highest_seq;
if( Conf_leader( Memb_active_ptr() ) == My.id )
{
val = Get_arq( Token->type );
val = (val + 1)% 0x10;
Token->type = Set_arq( Token->type, val );
Token->type = Set_retrans( Token->type, 0 );
}
/* Send token */
if( ! ( Conf_leader( Memb_active_ptr() ) == My.id &&
To_hold_token() ) )
{
/* sending token */
Net_send_token( &New_token );
/* ### Bug fix for SGIs */
#ifdef ARCH_SGI_IRIX
Net_send_token( &New_token );
#endif /* ARCH_SGI_IRIX */
if( Wide_network &&
Conf_seg_last(Memb_active_ptr(), My.seg_index) == My.id )
{
/* sending again to another segment */
Net_send_token( &New_token );
E_delay( Wide_delay );
Net_send_token( &New_token );
}
if( Get_retrans( Token->type ) > 1 )
{
/* problems */
Net_send_token( &New_token );
Net_send_token( &New_token );
}
}
if( Conf_leader( Memb_active_ptr() ) == My.id )
E_queue( Prot_token_hurry, 0, NULL, Hurry_timeout );
E_queue( Memb_token_loss, 0, NULL, Token_timeout );
/* calculating Aru */
if( Token->aru > Last_token->aru )
Aru = Last_token->aru;
else
Aru = Token->aru;
if( Highest_seq == Aru ) Token_counter++;
else Token_counter = 0;
dispose( Last_token );
Last_token = Token;
New_token.elements[0].buf = (char *) new(TOKEN_HEAD_OBJ);
New_token.elements[1].len = sizeof(token_body);
Token = (token_header *)New_token.elements[0].buf;
/* Deliver & discard packets */
Discard_packets();
Deliver_agreed_packets();
Deliver_reliable_packets( Highest_seq-num_sent+1, num_sent );
GlobalStatus.highest_seq = Highest_seq;
GlobalStatus.aru = Aru;
GlobalStatus.token_rounds++;
}
void Prot_new_message( down_link *down_ptr, int not_used_in_spread3_p )
{
int32 leader_id;
if( Down_queue_ptr->num_mess > 0 )
{
down_ptr->next = NULL;
Down_queue_ptr->last->next = down_ptr;
Down_queue_ptr->last = down_ptr;
}else if( Down_queue_ptr->num_mess == 0 ){
Down_queue_ptr->first = down_ptr;
Down_queue_ptr->last = down_ptr;
}else{
Alarm( EXIT,"fast_spread_new_message: Down_queue_ptr->num_mess is %d\n",
Down_queue_ptr->num_mess );
}
Down_queue_ptr->num_mess++;
if( Down_queue_ptr->num_mess >= WATER_MARK )
Sess_block_users_level();
if( Down_queue_ptr->num_mess == 1 && Is_token_hold() )
{
leader_id = Conf_leader( Memb_active_ptr() );
if( leader_id == My.id )
{
Handle_hurry( Hurry_head );
}else{
Net_ucast( leader_id, &Hurry_pack );
}
}
}
/* ### Pack: this routine has changed */
static int Answer_retrans( int *ret_new_ptr,
int32 *proc_id, int16 *seg_index )
{
int num_retrans;
char *rtr;
int old_ptr,new_ptr;
ring_rtr *ring_rtr_ptr;
int pack_entry;
int bytes_to_copy;
packet_header *pack_ptr;
int i, ret;
int32 *req_seq;
num_retrans = 0;
new_ptr = 0;
*proc_id = My.id;
*seg_index = My.seg_index;
if( Token->rtr_len > 0 )
{
rtr = New_token.elements[1].buf;
old_ptr = 0;
while( old_ptr < Token->rtr_len )
{
ring_rtr_ptr = (ring_rtr *)&rtr[old_ptr];
if( Memb_is_equal(ring_rtr_ptr->memb_id,Memb_id() ) )
{
/* retransmit requests from my ring */
old_ptr += sizeof(ring_rtr);
for( i=0; i < ring_rtr_ptr->num_seq; i++ )
{
req_seq = (int32 *)&rtr[old_ptr];
old_ptr += sizeof(int32);
pack_entry = *req_seq & PACKET_MASK;
if( *req_seq < Aru )
Alarm( EXIT,
"Answer_retrans: retrans of %d requested while Aru is %d\n",
*req_seq,Aru );
if( Packets[pack_entry].exist )
{
pack_ptr = Packets[pack_entry].head;
Send_pack.elements[0].buf =
(char *)Packets[pack_entry].head;
Send_pack.elements[1].buf =
(char *)Packets[pack_entry].body;
Send_pack.elements[1].len =
pack_ptr->data_len;
if( ring_rtr_ptr->proc_id != -1 )
{
ret = Net_ucast ( ring_rtr_ptr->proc_id, &Send_pack );
GlobalStatus.u_retrans++;
Alarm( PROTOCOL,
"Answer_retrans: retransmit to proc %d\n", ring_rtr_ptr->proc_id );
}else if( ring_rtr_ptr->seg_index != -1 ) {
ret = Net_scast ( ring_rtr_ptr->seg_index, &Send_pack );
GlobalStatus.s_retrans++;
Alarm( PROTOCOL,
"Answer_retrans: retransmit to seg %d\n", ring_rtr_ptr->seg_index );
}else{
#if 1
ret = Net_queue_bcast ( &Send_pack );
#else
ret = Net_bcast ( &Send_pack );
#endif
if( ret > 0 ) GlobalStatus.b_retrans++;
Alarm( PROTOCOL,
"Answer_retrans: retransmit to all\n");
}
if( ret > 0 )
{
if( Wide_network ) E_delay( Wide_delay );
if( Wide_network && (num_retrans % 2 == 1 )) E_delay( Wide_delay );
num_retrans++;
}
}else{
*proc_id = -1;
if( ring_rtr_ptr->seg_index != My.seg_index )
*seg_index = -1;
}
}
}else{
/* copy requests of other rings */
bytes_to_copy = sizeof(ring_rtr) +
ring_rtr_ptr->num_seq * sizeof(int32);
if( new_ptr != old_ptr )
memmove( &rtr[new_ptr], &rtr[old_ptr], bytes_to_copy);
old_ptr += bytes_to_copy;
new_ptr += bytes_to_copy;
Alarm( PROTOCOL, "Prot_handle_token: Coping foreign rtr\n");
}
}
}
*ret_new_ptr = new_ptr;
ret = Net_flush_bcast();
if( ret > 0 )
{
GlobalStatus.b_retrans++;
if( Wide_network ) E_delay( Wide_delay );
if( Wide_network && (num_retrans % 2 == 1 )) E_delay( Wide_delay );
num_retrans++;
}
return (num_retrans);
}
/* ### Pack: this routine has changed */
static int Send_new_packets( int num_allowed )
{
packet_header *pack_ptr;
scatter *scat_ptr;
int pack_entry;
int num_sent;
int ret;
num_sent = 0;
while( num_sent < num_allowed )
{
/* check if down queue is empty */
if( Down_queue_ptr->num_mess == 0 ) break;
/* initialize packet_header */
pack_ptr = new(PACK_HEAD_OBJ);
scat_ptr = Down_queue_ptr->first->mess;
pack_ptr->type = Down_queue_ptr->first->type;
pack_ptr->proc_id = My.id;
pack_ptr->memb_id = Memb_id();
pack_ptr->seq = Highest_seq+1;
Highest_seq++;
pack_ptr->fifo_seq = Highest_fifo_seq+1;
Highest_fifo_seq++;
pack_ptr->data_len = scat_ptr->elements[
Down_queue_ptr->cur_element].len;
Send_pack.elements[1].buf = scat_ptr->elements[
Down_queue_ptr->cur_element].buf;
Down_queue_ptr->cur_element++;
if( Down_queue_ptr->cur_element < scat_ptr->num_elements )
{
/* not last packet in message */
pack_ptr->packet_index = Down_queue_ptr->cur_element;
}else if( Down_queue_ptr->cur_element == scat_ptr->num_elements ){
down_link *tmp_down;
/* last packet in message */
pack_ptr->packet_index = -scat_ptr->num_elements;
tmp_down = Down_queue_ptr->first;
Down_queue_ptr->first = Down_queue_ptr->first->next;
Down_queue_ptr->cur_element = 0;
Down_queue_ptr->num_mess--;
dispose( tmp_down->mess );
dispose( tmp_down );
if( Down_queue_ptr->num_mess < WATER_MARK )
Sess_unblock_users_level();
}else{
Alarm( EXIT,
"Send_new_packets: error in packet index: %d %d\n",
Down_queue_ptr->cur_element,scat_ptr->num_elements );
}
Send_pack.elements[0].buf = (char *) pack_ptr;
Send_pack.elements[1].len = pack_ptr->data_len;
#if 1
ret = Net_queue_bcast( &Send_pack );
#else
ret = Net_bcast( &Send_pack );
#endif
if( ret > 0 )
{
if( Wide_network ) E_delay( Wide_delay );
if( Wide_network && (num_sent % 2 == 1 )) E_delay( Wide_delay );
num_sent++;
}
pack_entry = pack_ptr->seq & PACKET_MASK;
if( Packets[pack_entry].exist )
Alarm( EXIT,
"Send_new_packets: created packet %d already exist %d\n",
pack_ptr->seq, Packets[pack_entry].exist );
/* insert new created packet */
Packets[pack_entry].head = pack_ptr;
Packets[pack_entry].body = (packet_body *)Send_pack.elements[1].buf;
Packets[pack_entry].exist = 1;
Packets[pack_entry].proc_index = My_index;
Alarm( PROTOCOL,
"Send_new_packets: packet %d sent and inserted \n",
pack_ptr->seq );
}
ret = Net_flush_bcast();
if( ret > 0 )
{
if( Wide_network ) E_delay( Wide_delay );
if( Wide_network && (num_sent % 2 == 1 )) E_delay( Wide_delay );
num_sent++;
}
return ( num_sent );
}
static void Deliver_packet( int pack_entry, int to_copy )
{
int proc_index;
up_queue *up_ptr;
packet_header *pack_ptr;
message_link *mess_link;
int index;
pack_ptr = Packets[pack_entry].head;
if( Is_reliable( pack_ptr->type ) &&
pack_ptr->packet_index == -1 )
{
/*
* for reliable single-packets message : deliver regardless
* of what is already in the queue.
*/
proc_index = MAX_PROCS_RING;
}else{
proc_index = Packets[pack_entry].proc_index;
}
up_ptr = &Up_queue[proc_index];
if( up_ptr->exist == 0 )
{
/* no message for proc - need to create one */
up_ptr->mess = new( SCATTER );
up_ptr->mess->num_elements = 0;
up_ptr->exist = 1;
}
/* validity check */
index = pack_ptr->packet_index;
if( index < 0 ) index = -index;
if( up_ptr->mess->num_elements+1 != index )
{
Alarm( EXIT, "Deliver_packet: sequence error: sec is %d, should be %d\n",
pack_ptr->packet_index,
up_ptr->mess->num_elements+1 );
}
/* chain this packet */
up_ptr->mess->num_elements++;
up_ptr->mess->elements[index-1].len = Packets[pack_entry].head->data_len;
up_ptr->mess->elements[index-1].buf = (char *)Packets[pack_entry].body;
if( to_copy)
{
/*
* copy the packet.
*
* Note, that the original packet space was delivered
* and a new space was left in Packets. This is done to
* guarantee that in case of a configuration change, the
* original packet space in down_queue will be the same as
* the one in the up_queue for messages that were partially sent.
*/
Packets[pack_entry].body = new(PACKET_BODY);
memcpy( Packets[pack_entry].body, up_ptr->mess->elements[index-1].buf,
Packets[pack_entry].head->data_len );
}
Packets[pack_entry].exist = 2;
GlobalStatus.packet_delivered++;
if( pack_ptr->packet_index < 0 )
{
/* end of message */
/* Push up big_scatter. i.e. up_ptr->mess */
mess_link = new(MESSAGE_LINK);
mess_link->mess = up_ptr->mess;
up_ptr->exist = 0;
Sess_deliver_message( mess_link );
}
}
static void Deliver_reliable_packets( int32 start_seq, int num_packets )
{
int pack_entry;
int end_seq;
int i;
if( Memb_state() == EVS ) return;
end_seq = start_seq+num_packets-1;
if( start_seq <= Last_delivered ) start_seq = Last_delivered + 1;
for( i = start_seq; i <= end_seq ; i++ )
{
pack_entry = i & PACKET_MASK;
if( Packets[pack_entry].exist == 1 )
{
if( Is_reliable( Packets[pack_entry].head->type ) &&
Packets[pack_entry].head->packet_index == -1 )
{
Deliver_packet( pack_entry, 1 );
Alarm( PROTOCOL, "Deliver_reliable_packets: packet %d was delivered\n", i );
}
}
}
}
static void Deliver_agreed_packets()
{
/* deliver all non-safe packets that are ordered and not delivered */
int pack_entry;
int i;
if( My_aru <= Last_delivered ) return;
if( Memb_state() == EVS ) return;
for( i = Last_delivered+1; i <= My_aru; i++ )
{
pack_entry = i & PACKET_MASK;
if( Packets[pack_entry].exist == 1 )
{
if( !Is_safe( Packets[pack_entry].head->type ) )
{
Deliver_packet( pack_entry, 1 );
Alarm( PROTOCOL, "Deliver_agreed_packets: packet %d was delivered\n", i );
Last_delivered++;
}else return;
}else if( Packets[pack_entry].exist == 2 ){
/* This is possible only for reliable delivery prior to agreed */
Last_delivered++;
}else Alarm( EXIT, "Deliver_agreed_packets: Error, exist is %d\n", Packets[pack_entry].exist );
}
}
void Discard_packets()
{
int pack_entry;
packet_body *body_ptr;
up_queue *up_ptr;
int proc_index;
int i;
if( Aru <= Last_discarded ) return;
if( Memb_state() == EVS )
{
int found_hole;
membership_id reg_memb_id;
if( Aru != Last_seq ) return;
/* Deliver packets that must be delivered before the transitional signal.
* Those up to the Aru for my old ring were delivered in Read_form2().
* So, it remains to deliver all packets up to the first hole or the first
* SAFE message. */
Alarmp( SPLOG_INFO, PROTOCOL,
"Discard_packets: delivering messages after old ring Aru before transitional\n" );
for( i = Last_discarded+1; i <= Highest_seq; i++ )
{
pack_entry = i & PACKET_MASK;
if( ! Packets[pack_entry].exist )
Alarmp( SPLOG_FATAL, PROTOCOL, "Discard_packets: (EVS before transitional) packet %d not exist\n", i);
if( Packets[pack_entry].exist == 3 )
{
Alarmp( SPLOG_INFO, PROTOCOL, "Discard_packets: Found first Hole in %d\n", i);
break;
}
if( Is_safe( Packets[pack_entry].head->type ) ) {
Alarmp( SPLOG_INFO, PROTOCOL, "Discard_packets: Found first SAFE message in %d", i);
break;
}
/* should deliver packet or dispose the body if it was delivered already */
if( Packets[pack_entry].exist == 1 ){
Deliver_packet( pack_entry, 0 );
} else {
dispose( Packets[pack_entry].body );
}
/* dispose packet header in any case */
dispose( Packets[pack_entry].head );
Alarmp( SPLOG_INFO, PROTOCOL, "Discard_packets: delivering %d in EVS\n",i);
Packets[pack_entry].exist = 0;
Last_discarded = i;
}
/* calculate and deliver transitional membership */
Alarmp( SPLOG_INFO, PROTOCOL, "Discard_packets: Delivering transitional membership\n" );
Memb_transitional();
Sess_deliver_trans_memb( Trans_membership, Memb_trans_id() );
/* deliver all remaining packets for EVS */
found_hole = 0;
for( i = Last_discarded+1; i <= Highest_seq; i++ )
{
pack_entry = i & PACKET_MASK;
if( ! Packets[pack_entry].exist )
Alarm( EXIT, "Discard_packets: (EVS after transitional) packet %d not exist\n", i);
if( Packets[pack_entry].exist == 3 )
{
/*
* There is a hole!
* from here, we need to check if the proc_id of the packet
* is in commited membership.
*/
found_hole = 1;
Alarm( PROTOCOL, "Discard_packets: Found a Hole in %d \n",i);
}else if( (!found_hole) ||
(Conf_id_in_conf( &Commit_membership, Packets[pack_entry].head->proc_id ) != -1) ){
/* should deliver packet or dispose the body if it was delivered already */
if( Packets[pack_entry].exist == 1 ){
Deliver_packet( pack_entry, 0 );
}else{
dispose( Packets[pack_entry].body );
}
/* dispose packet header in any case */
dispose( Packets[pack_entry].head );
Alarm( PROTOCOL, "Discard_packets: delivering %d in EVS\n",i);
}else{
/* should not deliver packet */
dispose( Packets[pack_entry].head );
dispose( Packets[pack_entry].body );
Alarm( PROTOCOL, "Discard_packets: Due to hole, not delivering %d \n",i);
}
Packets[pack_entry].exist = 0;
}
/* check up_queue and down_queue */
if( Down_queue_ptr->num_mess > 0 )
{
Down_queue_ptr->cur_element = 0;
}
for( proc_index=0; proc_index < MAX_PROCS_RING; proc_index++ )
{
if( Up_queue[proc_index].exist )
{
if( proc_index != My_index )
{
/*
* dispose only packets that are not mine
* my packets will stay in Down_queue if the message is not
* ready to be delivered (because not fully sent yet)
* so we need not to dispose them!
*/
up_ptr = &Up_queue[proc_index];
for( i=0; i < up_ptr->mess->num_elements; i++ )
{
body_ptr = (packet_body *)up_ptr->mess->elements[i].buf;
dispose( body_ptr );
}
}
dispose( Up_queue[proc_index].mess );
Up_queue[proc_index].exist = 0;
}
}
/* calculate and deliver regular membership */
Memb_regular();
Log_membership();
reg_memb_id = Memb_id();
Sess_deliver_reg_memb( Reg_membership, reg_memb_id );
/* set variables for next membership */
Last_token->aru = 0;
Highest_seq = 0;
Highest_fifo_seq = 0;
My_aru = 0;
Aru = 0;
Set_aru = -1;
Last_discarded = 0;
Last_delivered = 0;
GlobalStatus.my_aru = My_aru;
Token_counter = 0;
}else{
for( i = Last_discarded+1; i <= Aru; i++ )
{
pack_entry = i & PACKET_MASK;
if( ! Packets[pack_entry].exist )
Alarm( EXIT, "Discard_packets: (NOT EVS) packet %d not exist\n",i);
/* should deliver packet or dispose the body if it was delivered already */
if( Packets[pack_entry].exist == 1 ) Deliver_packet( pack_entry, 0 );
else dispose( Packets[pack_entry].body );
/* dispose packet header in any case */
dispose( Packets[pack_entry].head );
Packets[pack_entry].exist = 0;
}
Alarm( PROTOCOL, "Discard_packets: packets %d-%d were discarded\n",
Last_discarded+1, Aru );
Last_discarded = Aru;
if( Last_delivered < Last_discarded ) Last_delivered = Last_discarded;
}
}
static int Is_token_hold()
{
if( ( Memb_state() == OP ||
( Memb_state() == GATHER && Memb_token_alive() ) )&&
Get_retrans(Last_token->type) <= 1 &&
Aru == Highest_seq && Token_counter > 1 ) return ( 1 );
else return( 0 );
}
static int To_hold_token()
{
if( ( Memb_state() == OP ||
( Memb_state() == GATHER && Memb_token_alive() ) )&&
Get_retrans(Last_token->type) <= 1 &&
Aru == Highest_seq && Token_counter > 1 ) return ( 1 );
else return( 0 );
}
static void Handle_hurry( packet_header *pack_ptr )
{
if( Conf_leader( Memb_active_ptr() ) == My.id &&
Is_token_hold() )
{
if( Conf_id_in_conf( Memb_active_ptr(), pack_ptr->proc_id ) != -1 )
{
Alarm( PROTOCOL, "Handle_hurry: sending token now\n");
Prot_token_hurry();
}
}
}
void Prot_token_hurry()
{
/* asked to send token again (almost lost) */
sys_scatter retrans_token;
int32 val;
retrans_token.num_elements = 1;
retrans_token.elements[0].len = sizeof(token_header);
retrans_token.elements[0].buf = (char *)Last_token;
Last_token->rtr_len=0;
if( Conf_leader( Memb_active_ptr() ) == My.id )
{
val = Get_retrans(Last_token->type);
val++;
Last_token->type = Set_retrans( Last_token->type, val );
E_queue( Prot_token_hurry, 0, NULL, Hurry_timeout );
GlobalStatus.token_hurry++;
}
/* sending token */
Net_send_token( &retrans_token );
if( Wide_network &&
Conf_seg_last(Memb_active_ptr(), My.seg_index) == My.id )
{
/* sending again to another segment */
Net_send_token( &retrans_token );
}
if( Get_retrans( Last_token->type ) > 1 )
{
/* problems */
Net_send_token( &retrans_token );
Net_send_token( &retrans_token );
}
Alarm( PROTOCOL, "Prot_token_hurry: retransmiting token %d %d\n",
Get_arq(Last_token->type), Get_retrans(Last_token->type) );
}
void Flip_token_body( char *buf, token_header *token_ptr )
{
/*
* This routine can not be called twice for the same buffer because
* of ring_rtr_ptr->num_seq.
*/
ring_rtr *ring_rtr_ptr;
int32 *req_seq;
char *rtr;
int ptr;
int i;
if( token_ptr->rtr_len <= 0 ) return;
rtr = buf;
ptr = 0;
while( ptr < token_ptr->rtr_len )
{
ring_rtr_ptr = (ring_rtr *)&rtr[ptr];
ring_rtr_ptr->memb_id.proc_id = Flip_int32( ring_rtr_ptr->memb_id.proc_id );
ring_rtr_ptr->memb_id.time = Flip_int32( ring_rtr_ptr->memb_id.time );
ring_rtr_ptr->proc_id = Flip_int32( ring_rtr_ptr->proc_id );
ring_rtr_ptr->seg_index = Flip_int16( ring_rtr_ptr->seg_index );
ring_rtr_ptr->num_seq = Flip_int16( ring_rtr_ptr->num_seq );
ptr += sizeof(ring_rtr);
for( i=0; i < ring_rtr_ptr->num_seq; i++ )
{
req_seq = (int32 *)&rtr[ptr];
*req_seq = Flip_int32( *req_seq );
ptr += sizeof(int32);
}
}
}
syntax highlighted by Code2HTML, v. 0.9.1