/*
* The Spread Toolkit.
*
* The contents of this file are subject to the Spread Open-Source
* License, Version 1.0 (the ``License''); you may not use
* this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.spread.org/license/
*
* or in the file ``license.txt'' found in this distribution.
*
* Software distributed under the License is distributed on an AS IS basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Creators of Spread are:
* Yair Amir, Michal Miskin-Amir, Jonathan Stanton.
*
* Copyright (C) 1993-2004 Spread Concepts LLC <spread@spreadconcepts.com>
*
* All Rights Reserved.
*
* Major Contributor(s):
* ---------------
* Cristina Nita-Rotaru crisn@cs.purdue.edu - group communication security.
* Theo Schlossnagle jesus@omniti.com - Perl, skiplists, autoconf.
* Dan Schoenblum dansch@cnds.jhu.edu - Java interface.
* John Schultz jschultz@cnds.jhu.edu - contribution to process group membership.
*
*/
#include <assert.h>
#include "arch.h"
#include "spread_params.h"
#include "network.h"
#include "net_types.h"
#include "data_link.h"
#include "sp_events.h"
#include "status.h"
#include "alarm.h"
#include "configuration.h"
/* for Memb_print_form_token() */
#include "membership.h"
static channel Bcast_channel[MAX_INTERFACES_PROC];
static channel Token_channel[MAX_INTERFACES_PROC];
static channel Send_channel;
static int Num_bcast_channels;
static int Num_token_channels;
static int Bcast_needed;
static int32 Bcast_address;
static int16 Bcast_port;
static int Num_send_needed;
static int32 Send_address[MAX_SEGMENTS];
static int16 Send_ports[MAX_SEGMENTS];
/* ### Pack: 3 lines */
/* Global in function so both Net_queue_bcast and Net_flush_bcast can access them */
static sys_scatter Queue_scat;
static int Queued_bytes = 0;
static const char align_padding[4] = "padd";
/* address for token sending - which is always needed */
static int32 Token_address;
static int16 Token_port;
static configuration Net_membership;
static int Segment_leader;
static configuration Cn;
static proc My;
static segment My_seg;
static int16 Partition[MAX_PROCS_RING];
static sp_time Partition_timeout = { 60, 0};
static int My_index;
static void Clear_partition(int dummy, void *dummy_p);
static int In_my_component( int32 proc_id );
static void Flip_pack( packet_header *pack_ptr );
static void Flip_token( token_header *token_ptr );
void Net_init()
{
proc dummy_proc;
int32u interface_addr;
int i;
bool bcast_bound = FALSE;
Cn = Conf();
My = Conf_my();
My_index = Conf_proc_by_id( My.id, &dummy_proc );
Clear_partition(0, NULL);
if( Cn.segments[My.seg_index].num_procs > 1 )
{
/* I am not allone in segment */
Bcast_needed = 1;
Bcast_address = Cn.segments[My.seg_index].bcast_address;
Bcast_port = My.port;
Alarm( NETWORK, "Net_init: Bcast needed to address (%d, %d)\n",
Bcast_address, Bcast_port );
}else{
Bcast_needed = 0;
Bcast_address = 0;
Alarm( NETWORK, "Net_init: Bcast is not needed\n" );
}
/* To receive broadcast (and possibly multicast) packets on a socket
* bound to a specific interface, we also have to bind to the broadcast
* address on the interface as well as the unicast interface. That is
* what the double bind of the bcast_address does.
*/
for ( i=0; i < My.num_if; i++)
{
if (Is_IfType_Daemon( My.ifc[i].type ) || Is_IfType_Any( My.ifc[i].type ) )
{
if (Is_IfType_Any( My.ifc[i].type ) )
interface_addr = 0;
else {
interface_addr = My.ifc[i].ip;
if (Bcast_needed && !bcast_bound) {
Bcast_channel[Num_bcast_channels++] = DL_init_channel( RECV_CHANNEL, My.port, Bcast_address, Bcast_address );
bcast_bound = TRUE;
}
}
Bcast_channel[Num_bcast_channels++] = DL_init_channel( RECV_CHANNEL, My.port, Bcast_address, interface_addr );
Token_channel[Num_token_channels++] = DL_init_channel( RECV_CHANNEL, My.port+1, 0, interface_addr );
}
}
Send_channel = DL_init_channel( SEND_CHANNEL, My.port+2, 0, My.id );
Num_send_needed = 0;
}
void Net_set_membership( configuration memb )
{
int i;
int my_index_in_seg;
int my_next_index;
Net_membership = memb;
My_seg = Net_membership.segments[My.seg_index];
my_index_in_seg = Conf_id_in_seg( &My_seg, My.id );
if( my_index_in_seg < 0 )
Alarm( EXIT,"Net_set_membership: I am not in membership %c",
Conf_print( &Net_membership ) );
else if( my_index_in_seg == 0) {
Segment_leader = 1;
Alarm( NETWORK,"Net_set_membership: I am a Segment leader\n");
} else Segment_leader = 0;
Num_send_needed = 0;
my_next_index = -1;
for( i=0; i < Conf().num_segments; i++ )
{
if( i == My.seg_index )
{
/*
* This is my segment.
* There is no need to send to it
* but I need my_next_index to calculate
* the token send address.
*/
my_next_index = Num_send_needed;
} else if( Net_membership.segments[i].num_procs > 0 ) {
Send_address[Num_send_needed] = Net_membership.segments[i].procs[0]->id;
Send_ports [Num_send_needed] = Net_membership.segments[i].port;
Num_send_needed++;
}
}
assert(my_next_index != -1);
for( i=0; i < Num_send_needed; i++ )
Alarm( NETWORK,
"Net_set_membership: Send_addr[%d] is (%d,%d)\n",
i, Send_address[i], Send_ports[i] );
/* Calculate where to send the token */
Token_address = 0;
if( my_index_in_seg < My_seg.num_procs-1 )
{
Token_address = My_seg.procs[my_index_in_seg+1]->id;
Token_port = My_seg.port+1;
}else{
/* I am last in my segment */
if( Num_send_needed == 0 )
{
/*
* My segment is the only segment
* sending token to the first in my segment
*/
Token_address = My_seg.procs[0]->id;
Token_port = My_seg.port+1;
} else if( Num_send_needed == my_next_index ) {
/*
* My segment is the last segment
* sending token to the first in first valid segment
*/
Token_address = Send_address[0];
Token_port = Send_ports[0]+1;
} else {
/*
* There is a valid segment after mine
* sending token to the first in next valid segment
*/
Token_address = Send_address[my_next_index];
Token_port = Send_ports[my_next_index]+1;
}
}
Alarm( NETWORK, "Net_set_membership: Token_address : (%d, %d)\n",
Token_address, Token_port );
}
int Net_bcast( sys_scatter *scat )
{
packet_header *pack_ptr;
int i;
int ret;
ret = 0;
/* routing on channels if needed according to membership */
pack_ptr = (packet_header *)scat->elements[0].buf;
pack_ptr->type = Set_routed( pack_ptr->type );
pack_ptr->type = Set_endian( pack_ptr->type );
pack_ptr->transmiter_id = My.id;
for ( i=0; i< Num_send_needed; i++ )
{
ret = DL_send( Send_channel, Send_address[i], Send_ports[i], scat );
}
pack_ptr->type = Clear_routed( pack_ptr->type );
/* broadcasting if needed according to configuration */
if( Bcast_needed )
{
ret = DL_send( Send_channel, Bcast_address, Bcast_port, scat );
}
if( !Bcast_needed && (Num_send_needed == 0) )
ret = 1; /* No actual send is needed, but 'packet' can be considered 'sent' */
return( ret );
}
/* ### Pack: 2 routines */
int Net_queue_bcast( sys_scatter *scat )
{
packet_header *pack_ptr;
int new_bytes;
int i, j;
int ret;
int align_bytes, align_num_scatter;
/* This line is redundent because of static initialization to 0 */
if ( Queued_bytes == 0 ) Queue_scat.num_elements = 0;
ret = 0;
new_bytes = 0;
for ( i=0; i < scat->num_elements; i++) {
new_bytes += scat->elements[i].len;
}
/* Fix alignment of packed messages so they will each begin on a 4 byte alignment
* This is needed for Sparc, might need enhancement if other archs
* have more extensive alignement rules
*/
align_bytes = 0;
align_num_scatter = 0;
switch(Queued_bytes % 4) {
case 1:
align_bytes++;
case 2:
align_bytes++;
case 3:
align_bytes++;
align_num_scatter = 1;
case 0:
/* nothing since already aligned */
break;
}
if ( ( (Queued_bytes + new_bytes + align_bytes) > MAX_PACKET_SIZE ) ||
( (Queue_scat.num_elements + scat->num_elements + align_num_scatter) > ARCH_SCATTER_SIZE ) )
{
ret = Net_flush_bcast();
align_bytes = 0;
align_num_scatter = 0;
}
if ( Queued_bytes == 0 ) {
/* routing on channels if needed according to membership */
pack_ptr = (packet_header *)scat->elements[0].buf;
pack_ptr->type = Set_routed( pack_ptr->type );
pack_ptr->type = Set_endian( pack_ptr->type );
pack_ptr->transmiter_id = My.id;
}
if ( align_bytes > 0 )
{
Queue_scat.elements[Queue_scat.num_elements].len = align_bytes;
Queue_scat.elements[Queue_scat.num_elements].buf = (char *)align_padding;
Queued_bytes += align_bytes;
Queue_scat.num_elements += 1;
Alarm(NETWORK, "Net_queue_bcast: Inserted padding of %d bytes to message of size %d\n", align_bytes, new_bytes );
}
/* Add new packet to Queue_scat to be sent as packed packet */
for ( i=0, j=Queue_scat.num_elements; i < scat->num_elements; i++, j++) {
Queue_scat.elements[j].len = scat->elements[i].len;
Queue_scat.elements[j].buf = scat->elements[i].buf;
}
Queued_bytes += new_bytes;
Queue_scat.num_elements += scat->num_elements ;
return( ret );
}
int Net_flush_bcast(void)
{
packet_header *pack_ptr;
int i;
int ret;
if (Queued_bytes == 0 ) return( 0 );
Alarm(NETWORK, "Net_flush_bcast: Flushing with Queued_bytes = %d; num_elements in scat = %d; size of scat0,1 = %d %d\n", Queued_bytes, Queue_scat.num_elements, Queue_scat.elements[0].len, Queue_scat.elements[1].len);
ret = 0;
for ( i=0; i< Num_send_needed; i++ )
{
ret = DL_send( Send_channel, Send_address[i], Send_ports[i], &Queue_scat );
}
pack_ptr = (packet_header *)Queue_scat.elements[0].buf;
pack_ptr->type = Clear_routed( pack_ptr->type );
/* broadcasting if needed according to configuration */
if( Bcast_needed )
{
ret = DL_send( Send_channel, Bcast_address, Bcast_port, &Queue_scat );
}
if( !Bcast_needed && (Num_send_needed == 0) )
ret = 1; /* No actual send is needed, but 'packet' can be considered 'sent' */
Queue_scat.num_elements = 0;
Queued_bytes = 0;
return( ret );
}
int Net_scast( int16 seg_index, sys_scatter *scat )
{
packet_header *pack_ptr;
int ret;
bool send_not_needed_p = FALSE;
ret = 0;
pack_ptr = (packet_header *)scat->elements[0].buf;
pack_ptr->type = Set_endian( pack_ptr->type );
pack_ptr->transmiter_id = My.id;
if( seg_index == My.seg_index )
{
if( Bcast_needed )
{
ret = DL_send( Send_channel, Bcast_address, Bcast_port, scat );
} else
send_not_needed_p = TRUE;
}else{
if( Net_membership.segments[seg_index].num_procs > 0 )
{
pack_ptr->type = Set_routed( pack_ptr->type );
ret = DL_send( Send_channel,
Net_membership.segments[seg_index].procs[0]->id,
Net_membership.segments[seg_index].port,
scat );
pack_ptr->type = Clear_routed( pack_ptr->type );
} else
send_not_needed_p = TRUE;
}
if (send_not_needed_p)
ret = 1; /* notify that packet can be considered sent, even though no network send actually needed */
return( ret );
}
int Net_ucast( int32 proc_id, sys_scatter *scat )
{
packet_header *pack_ptr;
proc p;
int ret;
pack_ptr = (packet_header *)scat->elements[0].buf;
pack_ptr->type = Set_endian( pack_ptr->type );
pack_ptr->transmiter_id = My.id;
ret = Conf_proc_by_id( proc_id, &p );
if( ret < 0 )
{
Alarm( PRINT, "Net_ucast: non existing proc_id %d\n",proc_id );
return( ret );
}
ret = DL_send( Send_channel, proc_id, p.port, scat );
return( ret );
}
int Net_recv ( channel fd, sys_scatter *scat )
{
static scatter save;
packet_header *pack_ptr;
int bytes_left;
int received_bytes, body_offset;
int processed_bytes;
int pack_same_endian;
int i;
bool ch_found;
pack_ptr = (packet_header *)scat->elements[0].buf;
ch_found = FALSE;
for (i = 0 ; i < Num_bcast_channels; i++) {
if ( fd == Bcast_channel[i]) {
ch_found = TRUE;
break;
}
}
if (ch_found == FALSE) {
Alarm(EXIT, "Net_recv: Listening and received packet on un-used interface %d\n", fd);
}
received_bytes = DL_recv( fd, scat );
if( received_bytes <= 0 ) return( received_bytes );
if( received_bytes < sizeof( packet_header ) )
{
Alarm(PRINT, "Net_recv: ignoring packet of size %d, smaller than packet header size %d\n",
received_bytes, sizeof(packet_header) );
return( -1 );
}
/* Fliping packet header to my form if needed */
if( !Same_endian( pack_ptr->type ) ) Flip_pack( pack_ptr );
if( Is_partition( pack_ptr->type ) )
{
/* Monitor : updating partition */
int16 *cur_partition;
if ( ! Conf_get_dangerous_monitor_state() ) {
Alarm( PRINT, "Net_recv: Request to set partition or kill daemons from (%d.%d.%d.%d) denied. Monitor in safe mode\n", IP1(pack_ptr->proc_id), IP2(pack_ptr->proc_id), IP3(pack_ptr->proc_id), IP4(pack_ptr->proc_id) );
return( 0 );
}
if( ! ( pack_ptr->memb_id.proc_id == 15051963 && Conf_id_in_conf( &Cn, pack_ptr->proc_id ) != -1 ) ) return( 0 );
cur_partition = (int16 *)scat->elements[1].buf;
for( i=0; i < Conf_num_procs( &Cn ); i++ )
if( Same_endian( pack_ptr->type ) ) Partition[i] = cur_partition[i];
else Partition[i] = Flip_int16( cur_partition[i] );
E_queue( Clear_partition, 0, NULL, Partition_timeout );
if( Partition[My_index] < 0 )
Alarm( EXIT, "Net_recv: Instructed to exit by monitor\n");
Alarm( PRINT , "Net_recv: Got monitor message, component %d\n",
Partition[My_index] );
return( 0 );
}
/* Monitor : droping packet from other monitor components */
if( ! ( pack_ptr->memb_id.proc_id == 15051963 || In_my_component( pack_ptr->transmiter_id ) ) )
return( 0 );
/* no need to return my own packets */
if( pack_ptr->transmiter_id == My.id )
return( 0 );
if( Bcast_needed && Is_routed( pack_ptr->type ) )
{
if( !Segment_leader ) Alarm( NETWORK,
"Net_recv: recv routed message from %d but not seg leader\n",
pack_ptr->proc_id);
/* saving scat lens for another DL_recv */
save.num_elements = scat->num_elements;
for( i=0; i < save.num_elements; i++ )
save.elements[i].len = scat->elements[i].len;
/* computing true scat lens for sending */
bytes_left = received_bytes;
i = 0;
while ( bytes_left > 0 )
{
if( bytes_left < scat->elements[i].len )
scat->elements[i].len = bytes_left;
bytes_left -= scat->elements[i].len;
i ++;
}
scat->num_elements = i;
pack_ptr->type = Clear_routed ( pack_ptr->type );
pack_ptr->transmiter_id = My.id;
/* fliping to original form */
if( !Same_endian( pack_ptr->type ) ) Flip_pack( pack_ptr );
DL_send( Send_channel, Bcast_address, Bcast_port, scat );
/* re-fliping to my form */
if( !Same_endian( pack_ptr->type ) ) Flip_pack( pack_ptr );
/* restoring scat lens for another DL_recv */
scat->num_elements = save.num_elements;
for( i=0; i < save.num_elements; i++ )
scat->elements[i].len = save.elements[i].len;
}
/*
* we clear routed anyway in order not to ask if Bcast_needed again.
* This way, if bcast is not needed we give it to the upper layer
* right away. It will always get to the upper layer with this
* bit cleared.
*/
pack_ptr->type = Clear_routed ( pack_ptr->type );
/*
* Check validity of packet size and flip every packet header
* other than first header (which is already flipped).
* If packet size is not valid, return -1, otherwise
* return size of received packet.
*/
processed_bytes = sizeof( packet_header ) + pack_ptr->data_len;
pack_same_endian = Same_endian( pack_ptr->type );
/* ignore any alignment padding */
if ( processed_bytes < received_bytes ) {
switch(processed_bytes % 4)
{
case 1:
processed_bytes++;
case 2:
processed_bytes++;
case 3:
processed_bytes++;
case 0:
/* already aligned */
break;
}
}
while( processed_bytes < received_bytes )
{
body_offset = processed_bytes - sizeof(packet_header);
pack_ptr = (packet_header *)&scat->elements[1].buf[body_offset];
/* flip contigues packet header */
if( !pack_same_endian ) {
Flip_pack( pack_ptr );
}
processed_bytes += sizeof( packet_header ) + pack_ptr->data_len;
/* ignore any alignment padding */
if ( processed_bytes < received_bytes ) {
switch(processed_bytes % 4)
{
case 1:
processed_bytes++;
case 2:
processed_bytes++;
case 3:
processed_bytes++;
case 0:
/* already aligned */
break;
}
}
}
Alarm( NETWORK, "Net_recv: Received Packet - packet length(%d), packed message length(%d)\n", received_bytes, processed_bytes);
if( processed_bytes != received_bytes ) {
Alarm( PRINT, "Net_recv: Received Packet - packet length(%d) != packed message length(%d)\n", received_bytes, processed_bytes);
return( -1 );
}
return( received_bytes );
}
int Net_send_token( sys_scatter *scat )
{
token_header *token_ptr;
int ret;
token_ptr = (token_header *)scat->elements[0].buf;
token_ptr->type = Set_endian( token_ptr->type );
token_ptr->transmiter_id = My.id;
if ( token_ptr->rtr_len > (MAX_PACKET_SIZE - sizeof(token_header) ) )
{
if ( Is_form( token_ptr->type ) )
Memb_print_form_token( scat );
Alarmp( SPLOG_FATAL, PRINT, "Net_send_token: Token too long for packet!\n");
}
ret = DL_send( Send_channel, Token_address, Token_port, scat );
return ( ret );
}
int Net_recv_token( channel fd, sys_scatter *scat )
{
token_header *token_ptr;
int ret, i;
bool ch_found;
token_ptr = (token_header *)scat->elements[0].buf;
ch_found = FALSE;
for (i = 0 ; i < Num_token_channels; i++) {
if ( fd == Token_channel[i]) {
ch_found = TRUE;
break;
}
}
if (ch_found == FALSE) {
Alarm(EXIT, "Net_recv: Listening and received packet on un-used interface %d\n", fd);
}
ret = DL_recv( fd, scat );
if( ret <= 0 ) return( ret );
/* Fliping token header to my form if needed */
if( !Same_endian( token_ptr->type ) ) Flip_token( token_ptr );
/* Monitor : droping token from other monitor components */
if( !In_my_component( token_ptr->transmiter_id ) )
return( 0 );
return ( ret );
}
int Net_ucast_token( int32 proc_id, sys_scatter *scat )
{
token_header *token_ptr;
proc p;
int ret;
token_ptr = (token_header *)scat->elements[0].buf;
token_ptr->type = Set_endian( token_ptr->type );
token_ptr->transmiter_id = My.id;
ret = Conf_proc_by_id( proc_id, &p );
if( ret < 0 )
{
Alarm( PRINT, "Net_ucast_token: non existing proc_id %d\n",
proc_id );
return( ret );
}
if ( token_ptr->rtr_len > (MAX_PACKET_SIZE - sizeof(token_header) ) )
{
Memb_print_form_token( scat );
Alarmp( SPLOG_FATAL, PRINT, "Net_ucast_token: Token too long for packet!\n");
}
ret = DL_send( Send_channel, proc_id, p.port+1, scat );
return( ret );
}
void Net_num_channels(int *num_bcast, int *num_token)
{
*num_bcast = Num_bcast_channels;
*num_token = Num_token_channels;
}
channel *Net_bcast_channel()
{
return( &(Bcast_channel[0]) );
}
channel *Net_token_channel()
{
return( &(Token_channel[0]) );
}
static void Clear_partition(int dummy, void *dummy_p)
{
int i;
for( i=0; i < Conf_num_procs( &Cn ); i++ )
Partition[i] = 0;
}
static int In_my_component( int32 proc_id )
{
int proc_index;
proc dummy_proc;
char ip[16];
proc_index = Conf_proc_by_id( proc_id, &dummy_proc );
if( proc_index < 0 )
{
Conf_id_to_str( proc_id, ip );
Alarm( PRINT, "In_my_component: unknown proc %s\n", ip );
return( 0 );
}
return( Partition[My_index] == Partition[proc_index] );
}
void Flip_pack( packet_header *pack_ptr )
{
pack_ptr->type = Flip_int32( pack_ptr->type );
pack_ptr->transmiter_id = Flip_int32( pack_ptr->transmiter_id );
pack_ptr->proc_id = Flip_int32( pack_ptr->proc_id );
pack_ptr->memb_id.proc_id = Flip_int32( pack_ptr->memb_id.proc_id );
pack_ptr->memb_id.time = Flip_int32( pack_ptr->memb_id.time );
pack_ptr->seq = Flip_int32( pack_ptr->seq );
pack_ptr->fifo_seq = Flip_int32( pack_ptr->fifo_seq );
pack_ptr->packet_index = Flip_int16( pack_ptr->packet_index );
pack_ptr->data_len = Flip_int16( pack_ptr->data_len );
}
void Flip_token( token_header *token_ptr )
{
token_ptr->type = Flip_int32( token_ptr->type );
token_ptr->transmiter_id = Flip_int32( token_ptr->transmiter_id );
token_ptr->seq = Flip_int32( token_ptr->seq );
token_ptr->proc_id = Flip_int32( token_ptr->proc_id );
token_ptr->aru = Flip_int32( token_ptr->aru );
token_ptr->aru_last_id = Flip_int32( token_ptr->aru_last_id );
token_ptr->flow_control = Flip_int16( token_ptr->flow_control );
token_ptr->rtr_len = Flip_int16( token_ptr->rtr_len );
}
syntax highlighted by Code2HTML, v. 0.9.1