/*
* The Spread Toolkit.
*
* The contents of this file are subject to the Spread Open-Source
* License, Version 1.0 (the ``License''); you may not use
* this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.spread.org/license/
*
* or in the file ``license.txt'' found in this distribution.
*
* Software distributed under the License is distributed on an AS IS basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Creators of Spread are:
* Yair Amir, Michal Miskin-Amir, Jonathan Stanton.
*
* Copyright (C) 1993-2004 Spread Concepts LLC <spread@spreadconcepts.com>
*
* All Rights Reserved.
*
* Major Contributor(s):
* ---------------
* Cristina Nita-Rotaru crisn@cs.purdue.edu - group communication security.
* Theo Schlossnagle jesus@omniti.com - Perl, skiplists, autoconf.
* Dan Schoenblum dansch@cnds.jhu.edu - Java interface.
* John Schultz jschultz@cnds.jhu.edu - contribution to process group membership.
*
*/
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include "message.h"
#include "memory.h"
#include "net_types.h"
#include "objects.h"
#include "prot_objs.h"
#include "alarm.h"
#include "session.h"
#include "spread_params.h"
static char Temp_buf[100000];
int32 Obj_Inc_Refcount(void *obj)
{
return(0);
}
void Message_populate_with_buffers(message_obj *msg)
{
int i;
msg->num_elements = MAX_SCATTER_ELEMENTS;
for( i=0; i < msg->num_elements; i++ )
{
msg->elements[i].len = sizeof(packet_body);
msg->elements[i].buf = (char *) new(PACKET_BODY);
if (msg->elements[i].buf == NULL) {
Alarm(EXIT, "Message_populate_with_buffers: Failed to allocate a new PACKET_BODY object\n");
return;
}
}
}
message_header *Message_get_message_header(message_obj *msg)
{
return( (message_header *)msg->elements[0].buf);
}
int Message_get_header_size()
{
return(sizeof(message_header) );
}
int Message_get_non_body_header_size()
{
return( 0 );
}
int Message_get_data_fragment_len()
{
return(sizeof(packet_body) );
}
scatter *Message_get_data_scatter(message_obj *msg)
{
return((scatter *)msg);
}
/* This is what sets which message type a message shold be sent as */
int Message_get_packet_type(int mess_type)
{
if( Is_unreliable_mess( mess_type ) ) return(RELIABLE_TYPE);
if( Is_reliable_mess( mess_type ) ) return(RELIABLE_TYPE);
if( Is_fifo_mess( mess_type ) ) return(FIFO_TYPE);
if( Is_causal_mess( mess_type ) ||
Is_agreed_mess( mess_type ) ) return(AGREED_TYPE);
if( Is_safe_mess( mess_type) ) return(SAFE_TYPE);
if( Is_join_mess( mess_type) ) return(AGREED_TYPE);
if( Is_leave_mess( mess_type) ) return(AGREED_TYPE);
if( Is_kill_mess( mess_type) ) return(AGREED_TYPE);
return(-1);
}
char *Message_get_first_data_ptr(message_obj *msg)
{
message_header *head_ptr;
head_ptr = Message_get_message_header(msg);
if (Is_reject_mess(head_ptr->type))
{
return(&(msg->elements[1].buf[0]) );
} else {
return(&(msg->elements[0].buf[sizeof(message_header)]) );
}
}
int Message_get_first_data_len(message_obj *msg)
{
message_header *head_ptr;
head_ptr = Message_get_message_header(msg);
if (Is_reject_mess(head_ptr->type))
{
/* the scat->element[0] has NOTHING in it for data here.
* just return 0 and the actual body will be sent by the
* main loop from elemnt 1..n
*/
return(0);
} else {
return(msg->elements[0].len - sizeof(message_header) );
}
}
int Message_get_data_header_size(void)
{
return( sizeof(message_header) );
}
char *Message_get_first_group(message_obj *msg)
{
message_header *head_ptr;
head_ptr = Message_get_message_header(msg);
if (Is_reject_mess(head_ptr->type))
{
return( &(msg->elements[1].buf[4]) );
} else {
return( &(msg->elements[0].buf[sizeof(message_header)]) );
}
}
char *Message_get_groups_array(message_obj *msg)
{
int groups_bytes, num_bytes, first_scat_index, groups_start_location;
int i;
message_header *head_ptr;
scatter *scat;
head_ptr = Message_get_message_header(msg);
scat = Message_get_data_scatter(msg);
if (Is_reject_mess(head_ptr->type))
{
groups_bytes = 4 + (head_ptr->num_groups * MAX_GROUP_NAME);
groups_start_location = 4;
first_scat_index = 1;
} else {
groups_bytes = sizeof(message_header) + (head_ptr->num_groups * MAX_GROUP_NAME);
groups_start_location = sizeof(message_header);
first_scat_index = 0;
}
/* if groups array didn't fit in first scat element, then copy it into Temp_buf */
if( groups_bytes > scat->elements[first_scat_index].len )
{
num_bytes = 0;
for( i=first_scat_index; i < scat->num_elements && num_bytes < groups_bytes; i++ )
{
int copy_bytes;
if (groups_bytes - num_bytes < scat->elements[i].len)
copy_bytes = groups_bytes - num_bytes;
else
copy_bytes = scat->elements[i].len;
memcpy( &Temp_buf[num_bytes], scat->elements[i].buf, copy_bytes );
num_bytes += copy_bytes;
}
assert(num_bytes == groups_bytes);
return(&Temp_buf[0]);
} else
return(&(scat->elements[first_scat_index].buf[groups_start_location]));
}
/* STORE old_type at beginning of data before groups list
* so Sess_write() can write it for reject messages */
/* FIXME: This is totally broken in Spread 4 */
void Message_add_oldtype_to_reject( message_obj *msg, int32u old_type )
{
int i;
int old_byte_location;
int new_byte_location;
scatter *scat;
message_header *head_ptr;
head_ptr = Message_get_message_header(msg);
scat = Message_get_data_scatter(msg);
for (i=scat->num_elements; i >1; i--)
{
scat->elements[i].len = scat->elements[i-1].len;
scat->elements[i].buf = scat->elements[i-1].buf;
}
scat->elements[1].buf = (char *)new( PACKET_BODY );
if (scat->elements[1].buf == NULL) {
Alarm(EXIT, "Sess_create_reject_mess: Failed to allocate a new PACKET_BODY object\n");
return;
}
scat->num_elements++;
old_byte_location = sizeof(message_header);
new_byte_location = 0;
memcpy(&scat->elements[1].buf[new_byte_location], &old_type, 4);
new_byte_location += 4;
memcpy(&scat->elements[1].buf[new_byte_location], &scat->elements[0].buf[old_byte_location], head_ptr->num_groups * MAX_GROUP_NAME );
old_byte_location += (head_ptr->num_groups * MAX_GROUP_NAME);
if ( head_ptr->num_groups < MAX_GROUPS_PER_MESSAGE )
{
new_byte_location += (head_ptr->num_groups * MAX_GROUP_NAME);
head_ptr->num_groups = head_ptr->num_groups + 1;
} else
{
/* overwrite last group name to make sure we have the rejected sender in list */
new_byte_location += ( (head_ptr->num_groups - 1) * MAX_GROUP_NAME);
}
/* Add sender to groups list */
memcpy(&scat->elements[1].buf[new_byte_location], head_ptr->private_group_name, MAX_GROUP_NAME);
new_byte_location += MAX_GROUP_NAME;
/* now copy the rest of the message body from first old scatter to new scatter 1 */
memcpy(&scat->elements[1].buf[new_byte_location], &scat->elements[0].buf[old_byte_location], scat->elements[0].len - old_byte_location);
new_byte_location += (scat->elements[0].len - old_byte_location);
scat->elements[1].len = new_byte_location;
scat->elements[0].len = sizeof(message_header);
return;
}
void Message_Dec_Refcount(message_obj *msg)
{
/* Not needed for Spread3 */
return;
}
void Message_dispose_message(message_obj *msg)
{
packet_body *body_ptr;
int i;
for( i=0; i < msg->num_elements; i++ )
{
body_ptr = (packet_body *)msg->elements[i].buf;
dispose( body_ptr );
}
dispose( msg );
}
message_obj *Message_dup_and_reset_old_message(message_obj *msg, int len)
{
message_obj *mess_dup;
int i,remain;
mess_dup = new( SCATTER );
if (mess_dup == NULL) {
Alarm(EXIT, "Message_dup_message: Failed to allocate a new SCATTER object\n");
return(NULL);
}
for( i=0, mess_dup->num_elements=0, remain=len ; remain > 0 ; i++, remain -= sizeof(packet_body) )
{
mess_dup->elements[i].buf = msg->elements[i].buf;
if( remain > sizeof(packet_body) )
{
mess_dup->elements[i].len = sizeof( packet_body );
}else{
mess_dup->elements[i].len = remain;
}
mess_dup->num_elements++;
msg->elements[i].buf = (char *)new( PACKET_BODY );
if (msg->elements[i].buf == NULL) {
Alarm(EXIT, "Message_dup_message: Failed to allocate a new PACKET_BODY object\n");
return(NULL);
}
msg->elements[i].len = sizeof( packet_body );
}
return(mess_dup);
}
message_obj *Message_copy_message(message_obj *msg)
{
int i;
message_obj *tmp_scat;
tmp_scat = new( SCATTER );
tmp_scat->num_elements = msg->num_elements;
for( i=0; i < msg->num_elements; i++ )
{
tmp_scat->elements[i].len = msg->elements[i].len;
tmp_scat->elements[i].buf = (char *)new( PACKET_BODY );
if (tmp_scat == NULL) {
Alarm(EXIT, "Message_copy_message: Failed to allocate a new PACKET_BODY object\n");
return(NULL);
}
memcpy( tmp_scat->elements[i].buf, msg->elements[i].buf, msg->elements[i].len );
}
return(tmp_scat);
}
/* At the beginning of this function msg should be a scatter of one element. The beginning
* of that buffer should contain an valid message_header structure. The buf and num_bytes fields
* passed into this function should be everything for the message that goes AFTER the message_header.
* Thus the first element is handled specially.
*/
void Message_Buffer_to_Message_Fragments( message_obj *msg, char buf[], int num_bytes )
{
scatter *scat;
int bytes_to_copy, copied_bytes;
int i, head_size;
scat = Message_get_data_scatter(msg);
head_size = Message_get_data_header_size();
scat->num_elements = (num_bytes + head_size -1)/sizeof( packet_body ) + 1;
copied_bytes = 0;
for( i=0; i < scat->num_elements ; i++ )
{
if ( 0 == i ) {
bytes_to_copy = num_bytes;
if( bytes_to_copy > ( sizeof( packet_body ) - head_size ) )
bytes_to_copy = sizeof( packet_body ) - head_size ;
memcpy( &scat->elements[i].buf[head_size], &buf[ 0 ], bytes_to_copy );
scat->elements[i].len += bytes_to_copy;
copied_bytes += bytes_to_copy;
} else {
bytes_to_copy = num_bytes - copied_bytes;
if( bytes_to_copy > sizeof( packet_body ) )
bytes_to_copy = sizeof( packet_body );
scat->elements[i].buf = (char *)new( PACKET_BODY );
memcpy( scat->elements[i].buf, &buf[ copied_bytes ], bytes_to_copy );
scat->elements[i].len = bytes_to_copy;
copied_bytes += bytes_to_copy;
}
}
return;
}
void Message_element_len_fixup(message_obj *msg)
{
int total_size, sum_size, last;
message_header *head_p;
head_p = Message_get_message_header(msg);
total_size = head_p->data_len + MAX_GROUP_NAME * head_p->num_groups + sizeof(message_header);
sum_size = sizeof(packet_body) * (msg->num_elements - 1);
last = total_size / sizeof(packet_body);
if (total_size < sizeof(packet_body))
{
assert(last == 0);
msg->elements[last].len = total_size;
}
else
{
msg->elements[0].len = sizeof(packet_body);
msg->elements[last].len = total_size - sum_size;
}
return;
}
int Message_kill_mess_fixup(message_obj *msg, int orig_len, int mbox)
{
/* Don't need to do anything for spread3 */
return(orig_len);
}
void Message_reset_current_location(struct partial_message_info *cur_msg)
{
cur_msg->cur_element = 0;
cur_msg->cur_byte = 0;
cur_msg->in_mess_head = 0;
cur_msg->total_bytes = 0;
}
void Message_set_location_begin_body(struct partial_message_info *cur_msg)
{
/* do nothing really for spread3 */
cur_msg->in_mess_head = 0;
}
void Message_add_scat_element(message_obj *msg)
{
int cur_num_elements;
assert (msg->num_elements < MAX_SCATTER_ELEMENTS);
cur_num_elements = msg->num_elements;
msg->elements[cur_num_elements].buf = (char *) new(PACKET_BODY);
if (msg->elements[cur_num_elements].buf == NULL) {
Alarm(EXIT, "Message_add_scat_element: Failed to allocate a new PACKET_BODY\n");
return;
}
msg->elements[cur_num_elements].len = sizeof(packet_body);
msg->num_elements++;
}
void Message_remove_scat_element(message_obj *msg)
{
assert ( msg->num_elements > 0 );
dispose( msg->elements[msg->num_elements - 1].buf );
msg->elements[msg->num_elements - 1].len = 0;
msg->num_elements--;
}
void Message_calculate_current_location(message_obj *msg, int len_sent, struct partial_message_info *cur_msg)
{
int i, len;
len = len_sent;
for( i=0; i < msg->num_elements; i++ )
{
if( len < msg->elements[i].len ) break;
len -= msg->elements[i].len;
}
cur_msg->cur_element = i;
cur_msg->cur_byte = len;
cur_msg->in_mess_head = 0;
}
message_obj *Message_new_message(void)
{
message_obj *new_mess;
new_mess = new( SCATTER );
if (new_mess == NULL) {
Alarm(EXIT, "Message_new_message: Failed to allocate a new SCATTER\n");
return(NULL);
}
new_mess->num_elements = 1;
new_mess->elements[0].len = sizeof(message_header);
new_mess->elements[0].buf = (char *) new(PACKET_BODY);
if (new_mess->elements[0].buf == NULL) {
Alarm(EXIT, "Message_new_message: Failed to allocate a new PACKET_BODY\n");
dispose(new_mess);
return(NULL);
}
return(new_mess);
}
message_obj *Message_create_message(int mess_type, char *sender_name)
{
message_obj *new_mess;
message_header *new_head;
new_mess = Message_new_message();
new_head = (message_header *)new_mess->elements[0].buf;
new_head->type = mess_type;
/* Setting endian to my endian on the header */
new_head->type = Set_endian( new_head->type );
new_head->hint = Set_endian( 0 );
strncpy( new_head->private_group_name, sender_name, MAX_GROUP_NAME);
new_head->num_groups = 0;
new_head->data_len = 0;
return(new_mess);
}
void Message_endian_correct_message_header(message_obj *msg)
{
message_header *head_ptr;
head_ptr = (message_header *) msg->elements[0].buf;
/* Fliping message header to my form if needed */
if( !Same_endian( head_ptr->type ) )
{
Flip_mess( head_ptr );
}
}
syntax highlighted by Code2HTML, v. 0.9.1