/*
* The Spread Toolkit.
*
* The contents of this file are subject to the Spread Open-Source
* License, Version 1.0 (the ``License''); you may not use
* this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.spread.org/license/
*
* or in the file ``license.txt'' found in this distribution.
*
* Software distributed under the License is distributed on an AS IS basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Creators of Spread are:
* Yair Amir, Michal Miskin-Amir, Jonathan Stanton.
*
* Copyright (C) 1993-2004 Spread Concepts LLC <spread@spreadconcepts.com>
*
* All Rights Reserved.
*
* Major Contributor(s):
* ---------------
* Cristina Nita-Rotaru crisn@cs.purdue.edu - group communication security.
* Theo Schlossnagle jesus@omniti.com - Perl, skiplists, autoconf.
* Dan Schoenblum dansch@cnds.jhu.edu - Java interface.
* John Schultz jschultz@cnds.jhu.edu - contribution to process group membership.
*
*/
#include "arch.h"
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <assert.h>
#ifndef ARCH_PC_WIN95
#include <errno.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#ifdef HAVE_SYS_UIO_H
#include <sys/uio.h>
#endif
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/un.h>
#include <signal.h>
#include <sys/ioctl.h>
#else /* ARCH_PC_WIN95 */
#include <winsock.h>
#define ioctl ioctlsocket
#endif /* ARCH_PC_WIN95 */
#include "spread_params.h"
#include "net_types.h"
#include "sp_events.h"
#include "objects.h"
#include "memory.h"
#include "session.h"
#include "sess_types.h"
#define ext_sess_body
#include "sess_body.h"
#undef ext_sess_body
#include "groups.h"
#include "log.h"
#include "status.h"
#include "alarm.h"
#include "objects.h"
#include "prot_body.h"
#if ( SPREAD_PROTOCOL > 3 )
#include "queues.h"
#endif
#include "message.h"
#include "acm.h"
static sp_time Badger_timeout = { 0, 100000 };
static message_obj New_mess;
static int Accept_inet_mbox_num;
static mailbox Accept_inet_mbox[MAX_INTERFACES_PROC];
static mailbox Accept_unix_mbox;
static int Protocol_threshold;
#define SESSION_FD_HASH_SIZE 256
static session *Sessions_hash_head[SESSION_FD_HASH_SIZE];
static session *Sessions_head;
static session *Sessions_tail;
static session *Sessions_free;
static void Sess_attach_accept(void);
static void Sess_detach_accept(void);
static void Sess_recv_client_auth(mailbox mbox, int dummy, void *dummy_p);
static void Sess_accept( mailbox mbox, int domain, void *dummy );
static void Sess_accept_continue( mailbox, int, void * );
static void Sess_read( mailbox mbox, int domain, void *dummy );
static void Sess_badger( mailbox mbox, void *dummy );
static void Sess_kill( mailbox mbox );
static void Sess_handle_join( message_link *mess_link );
static void Sess_handle_leave( message_link *mess_link );
static void Sess_handle_kill( message_link *mess_link );
static void Sess_deliver_reject( message_obj *msg );
static void Sess_create_reject_message ( message_obj *msg );
static int Sess_get_p2p_dests( int num_groups, char groups[][MAX_GROUP_NAME], char dests[][MAX_GROUP_NAME] );
#define ACTIVATE_PORT_REUSE(mbox) do { \
int on = 1; \
if (setsockopt(mbox, SOL_SOCKET, SO_REUSEADDR, (void *) &on, sizeof(on)) < 0) \
Alarm( EXIT, "Sess_init: Error setting SO_REUSEADDR socket option\n" ); \
} while (0)
int Sess_get_session_index (int mbox)
{
session *tmp;
unsigned char *c = (unsigned char *) &mbox;
unsigned int i;
i = c[0] ^ c[1] ^ c[2] ^ c[3];
Alarm( NONE, "Sess_get_session_index: mbox %d hashed to %u\n", mbox, i);
for (tmp = Sessions_hash_head[i]; tmp; tmp = tmp->hash_next)
if (tmp->mbox == mbox)
return (tmp - Sessions);
return -1;
}
static void Sess_hash_session (session *ses)
{
unsigned int i;
unsigned char *c;
c = (unsigned char *) &ses->mbox;
i = c[0] ^ c[1] ^ c[2] ^ c[3];
ses->hash_next = Sessions_hash_head[i];
Sessions_hash_head[i] = ses;
}
static void Sess_unhash_session (session *ses)
{
unsigned int i;
unsigned char *c;
session *tmp;
c = (char *) &ses->mbox;
i = c[0] ^ c[1] ^ c[2] ^ c[3];
tmp = Sessions_hash_head[i];
if (tmp == ses)
{
Sessions_hash_head[i] = ses->hash_next;
ses->hash_next = NULL;
return;
}
for ( ; tmp->hash_next != ses; tmp = tmp->hash_next);
tmp->hash_next = ses->hash_next;
ses->hash_next = NULL;
}
static session *Sess_get_free_session (void)
{
session *ses;
if ((ses = Sessions_free) == NULL)
{
Alarm (EXIT, "Sess_get_free_session: BUG ! No free sessions !\n");
}
Sessions_free = Sessions_free->sort_next;
return ses;
}
static void Sess_free_session (session *ses)
{
ses->sort_next = Sessions_free;
Sessions_free = ses;
}
static int Sess_insert_new_session (session *where, session *template)
{
session *new_ses;
new_ses = Sess_get_free_session();
memmove(new_ses, template, sizeof (*template));
if (!where)
{
/* Ok, we insert a session at the end of the list... */
new_ses->sort_next = NULL;
if (!Sessions_tail)
{
/* List is empty */
new_ses->sort_prev = NULL;
Sessions_head = Sessions_tail = new_ses;
}
else
{
new_ses->sort_prev = Sessions_tail;
Sessions_tail->sort_next = new_ses;
Sessions_tail = new_ses;
}
}
else
{
/* Ok, we insert a session in the middle of the list, just
* before where... */
new_ses->sort_next = where;
new_ses->sort_prev = where->sort_prev;
where->sort_prev = new_ses;
if (!new_ses->sort_prev)
{
/* new_ses is new head */
Sessions_head = new_ses;
}
else
{
new_ses->sort_prev->sort_next = new_ses;
}
}
return(new_ses - Sessions);
}
static void Sess_remove_session (session *ses)
{
if (!ses->sort_prev && !ses->sort_next)
{
/* Last session */
Sessions_head = Sessions_tail = NULL;
Sess_free_session(ses);
return;
}
if (!ses->sort_prev)
{
/* Head */
Sessions_head = ses->sort_next;
ses->sort_next->sort_prev = NULL;
Sess_free_session(ses);
return;
}
if (!ses->sort_next)
{
/* Tail */
Sessions_tail = ses->sort_prev;
ses->sort_prev->sort_next = NULL;
Sess_free_session(ses);
return;
}
/* All troubled cases are above ;-) */
ses->sort_next->sort_prev = ses->sort_prev;
ses->sort_prev->sort_next = ses->sort_next;
Sess_free_session(ses);
}
static void Sess_init_sessions (void)
{
int i;
for (i = 0; i < SESSION_FD_HASH_SIZE; i++)
Sessions_hash_head[i] = NULL;
Sessions_free = Sessions_head = Sessions_tail = NULL;
for (i = 0; i < MAX_SESSIONS; i++)
Sess_free_session( &Sessions[i] );
}
int count_bits_set( int32u field, int first_index, int last_index)
{
int i;
int count = 0;
int32u bitfield;
assert(first_index >= 0 && first_index < 32);
assert(last_index >= 0 && last_index <=32);
assert(last_index >= first_index);
bitfield = 0x1 << first_index;
for ( i=0; i < (last_index - first_index); i++, bitfield <<=1)
{
if (field & bitfield )
count++;
}
return(count);
}
void Sess_init()
{
struct sockaddr_in inet_addr;
int16 port;
int ret, i;
mailbox mbox;
#ifndef ARCH_PC_WIN95
struct sockaddr_un unix_addr;
char name[80];
signal( SIGPIPE, SIG_IGN );
#endif /* ARCH_PC_WIN95 */
ret = Mem_init_object( MESSAGE_LINK, sizeof(message_link), 1000, 0);
if (ret < 0)
{
Alarm(EXIT, "Sess_init: Failure to Initialize MESSAGE_LINK memory objects\n");
}
ret = Mem_init_object( DOWN_LINK, sizeof(down_link), 200, 0);
if (ret < 0)
{
Alarm(EXIT, "Sess_Init: Failure to Initialize DOWN_LINK memory objects\n");
}
Sess_init_sessions ();
Num_sessions = 0;
GlobalStatus.num_sessions = Num_sessions;
GlobalStatus.message_delivered = 0;
My = Conf_my();
port = My.port;
Session_threshold = LOW_PRIORITY;
/* Initializing the protocol */
Protocol_threshold = LOW_PRIORITY;
Prot_init_down_queues();
Prot_set_down_queue( NORMAL_DOWNQUEUE );
Prot_init();
/* Initiation of the INET socket */
memset(&inet_addr.sin_zero, 0, sizeof(inet_addr.sin_zero));
inet_addr.sin_family = AF_INET;
inet_addr.sin_port = htons(port);
Accept_inet_mbox_num = 0;
/* Bind to all interfaces specified in config file */
for ( i=0; i < My.num_if; i++)
{
if (Is_IfType_Client(My.ifc[i].type) || Is_IfType_Any(My.ifc[i].type) )
{
port_reuse type;
if( (mbox = socket( AF_INET, SOCK_STREAM, 0 ) ) == -1)
Alarm( EXIT, "Sess_init: INET sock error\n" );
type = Conf_get_port_reuse_type();
if (type == port_reuse_on)
ACTIVATE_PORT_REUSE(mbox);
if (Is_IfType_Any(My.ifc[i].type) )
inet_addr.sin_addr.s_addr = INADDR_ANY;
else
{
if (type == port_reuse_auto)
ACTIVATE_PORT_REUSE(mbox);
inet_addr.sin_addr.s_addr = htonl(My.ifc[i].ip);
}
if( bind( mbox, (struct sockaddr *)&inet_addr, sizeof(inet_addr) ) == -1)
{
Alarm( PRINT, "Sess_init: INET unable to bind to port %d, already running \n" ,port );
exit(0);
}
inet_addr.sin_addr.s_addr = ntohl(inet_addr.sin_addr.s_addr);
Alarm( SESSION, "Sess_init: INET bind for port %d interface %d.%d.%d.%d ok\n", port,
IP1(inet_addr.sin_addr.s_addr), IP2(inet_addr.sin_addr.s_addr),
IP3(inet_addr.sin_addr.s_addr), IP4(inet_addr.sin_addr.s_addr) );
if( listen( mbox, 25 ) < 0 )
Alarm( EXIT, "Sess_init: INET unable to listen\n" );
Accept_inet_mbox[Accept_inet_mbox_num] = mbox;
Accept_inet_mbox_num++;
Alarm( SESSION, "Sess_init: INET went ok on mailbox %d\n", mbox );
}
}
#ifndef ARCH_PC_WIN95
/* Initiation of the UNIX socket */
if( (mbox = socket( AF_UNIX, SOCK_STREAM, 0 ) ) == -1)
Alarm( EXIT, "Sess_init: UNIX sock error\n" );
unix_addr.sun_family = AF_UNIX;
sprintf( name, "%s/spread.sock", _PATH_SPREAD_PIDDIR );
strcpy( unix_addr.sun_path, name );
unlink( name );
if( bind( mbox, (struct sockaddr *)&unix_addr, sizeof(unix_addr) ) == -1)
{
Alarm( PRINT, "Sess_init: UNIX unable to bind to name %s, already running \n" , name );
exit(0);
}
Alarm( SESSION, "Sess_init: UNIX bind for name %s ok\n", name );
chmod( name, 0666 );
if( listen( mbox, 5 ) < 0 )
Alarm( EXIT, "Sess_init: UNIX unable to listen\n" );
Accept_unix_mbox = mbox;
Alarm( SESSION, "Sess_init: UNIX went ok on mailbox %d\n", mbox );
#endif /* ARCH_PC_WIN95 */
Sess_attach_accept();
Message_populate_with_buffers(&New_mess);
G_init();
Alarm( SESSION, "Sess_init: ended ok\n" );
}
void Sess_set_active_threshold()
{
/* This function is used only by the session (and groups) layer */
if( Protocol_threshold > Session_threshold )
E_set_active_threshold( Protocol_threshold );
else E_set_active_threshold( Session_threshold );
}
void Sess_block_user(int xxx)
{
Alarm(EXIT,"Sess_block_user: NOT IMPLEMENTED!\n");
}
void Sess_unblock_user(int xxx)
{
Alarm(EXIT,"Sess_unblock_user: NOT IMPLEMENTED!\n");
}
void Sess_block_users_level()
{
/* This function is used only by lower layers (protocol) */
if( Protocol_threshold < MEDIUM_PRIORITY )
{
Protocol_threshold = MEDIUM_PRIORITY;
Sess_set_active_threshold();
}
}
void Sess_unblock_users_level()
{
/* This function is used only by lower layers (protocol) */
if( Protocol_threshold > LOW_PRIORITY )
{
Protocol_threshold = LOW_PRIORITY;
Sess_set_active_threshold();
}
}
static void Sess_attach_accept()
{
int i;
for ( i=0; i < Accept_inet_mbox_num; i++)
{
E_attach_fd( Accept_inet_mbox[i], READ_FD, Sess_accept, AF_INET, NULL, LOW_PRIORITY );
E_attach_fd( Accept_inet_mbox[i], EXCEPT_FD, Sess_accept, AF_INET, NULL, LOW_PRIORITY );
}
#ifndef ARCH_PC_WIN95
E_attach_fd( Accept_unix_mbox, READ_FD, Sess_accept, AF_UNIX, NULL, LOW_PRIORITY );
#endif /* ARCH_PC_WIN95 */
}
static void Sess_detach_accept()
{
int i;
for (i=0; i < Accept_inet_mbox_num; i++)
{
E_detach_fd( Accept_inet_mbox[i], READ_FD );
E_detach_fd( Accept_inet_mbox[i], EXCEPT_FD );
}
#ifndef ARCH_PC_WIN95
E_detach_fd( Accept_unix_mbox, READ_FD );
#endif /* ARCH_PC_WIN95 */
}
void Sess_accept_continue2(int d1, void *d2)
{
Sess_accept_continue(0,0,NULL);
}
static void Sess_accept( mailbox mbox, int domain, void *dummy )
{
struct sockaddr_in inet_addr;
socklen_t inet_len;
sockopt_len_t onlen;
sp_time accept_delay;
char response;
int ret;
int i;
int32 on;
if( domain == AF_INET )
{
inet_len = sizeof(inet_addr);
Sessions[MAX_SESSIONS].mbox = accept( mbox, (struct sockaddr *)&inet_addr, &inet_len );
Sessions[MAX_SESSIONS].type = AF_INET;
/*
* sender's machine ip address is: htonl(inet_addr.sin_addr.s_addr)
* sender's assigned port is : htons(inet_addr.sin_port)
*/
Sessions[MAX_SESSIONS].address = htonl(inet_addr.sin_addr.s_addr);
}else if( domain == AF_UNIX ){
/* no need for return values for AF_UNIX on the accept */
Sessions[MAX_SESSIONS].mbox = accept( mbox, 0, 0 );
Sessions[MAX_SESSIONS].type = AF_UNIX;
Sessions[MAX_SESSIONS].address = 0;
}else Alarm( EXIT, "Sess_accept: Unknown domain %d on mailbox %d\n", domain, mbox );
if( Sessions[MAX_SESSIONS].mbox < 0 )
{
Alarm( SESSION, "Sess_accept: accept failed for domain %d\n", domain );
return;
}
if( Num_sessions == MAX_SESSIONS )
{
response = REJECT_QUOTA;
send( Sessions[MAX_SESSIONS].mbox, &response, sizeof(response), 0 );
close( Sessions[MAX_SESSIONS].mbox );
Alarm( SESSION, "Sess_accept: rejecting session due to quota\n" );
return;
}
if ( ( (i = Sess_get_session_index(Sessions[MAX_SESSIONS].mbox)) != -1)
&& (Sessions[i].mbox == Sessions[MAX_SESSIONS].mbox)
&& (Is_op_session( Sessions[i].status )) )
{
/* This is impossible as the mbox must have been closed to be returned by accept */
Alarm(EXIT, "Sess_accept: BUG! Accepted new FD %d that is currently in use(ses %d).\n", Sessions[i].mbox, i);
}
for( i=10; i <= 200; i+=5 )
{
on = 1024*i;
onlen = sizeof(on);
ret = setsockopt( Sessions[MAX_SESSIONS].mbox, SOL_SOCKET, SO_SNDBUF, (void *)&on, onlen);
if (ret < 0 ) break;
ret = setsockopt( Sessions[MAX_SESSIONS].mbox, SOL_SOCKET, SO_RCVBUF, (void *)&on, onlen);
if (ret < 0 ) break;
ret= getsockopt( Sessions[MAX_SESSIONS].mbox, SOL_SOCKET, SO_SNDBUF, (void *)&on, &onlen );
if( on < i*1024 ) break;
Alarm( NONE, "Sess_accept: set sndbuf %d, ret is %d\n", on, ret );
onlen = sizeof(on);
ret= getsockopt( Sessions[MAX_SESSIONS].mbox, SOL_SOCKET, SO_RCVBUF, (void *)&on, &onlen );
if( on < i*1024 ) break;
Alarm( NONE, "Sess_accept: set rcvbuf %d, ret is %d\n", on, ret );
}
Alarm( SESSION, "Sess_accept: set sndbuf/rcvbuf to %d\n", 1024*(i-5) );
if ( domain == AF_INET ) {
on = 1;
ret = setsockopt( Sessions[MAX_SESSIONS].mbox, IPPROTO_TCP, TCP_NODELAY, (void *)&on, 4);
if (ret < 0)
Alarm(PRINT, "Setting TCP_NODELAY failed with error: %s\n", sock_strerror(sock_errno));
else
Alarm( SESSION, "Setting TCP_NODELAY on socket %d\n", Sessions[MAX_SESSIONS].mbox );
}
/* delaying for the private name to be written */
Sess_detach_accept();
E_attach_fd( Sessions[MAX_SESSIONS].mbox, READ_FD, Sess_accept_continue, 0, NULL, LOW_PRIORITY );
E_attach_fd( Sessions[MAX_SESSIONS].mbox, EXCEPT_FD, Sess_accept_continue, 0, NULL, LOW_PRIORITY );
accept_delay.sec = 1;
accept_delay.usec= 0;
E_queue( Sess_accept_continue2, 1, NULL, accept_delay );
}
void Sess_accept_continue(mailbox d1, int d2, void *d3)
{
char response;
int legal_private_name;
int unique_private_name;
int name_len;
int ioctl_cmd;
char version[3];
char conn[2];
char priv_user_name[MAX_PRIVATE_NAME];
int ret, i, sess_location, rnum;
char *allowed_auth_list;
unsigned char list_len;
session *tmp_ses;
E_dequeue( Sess_accept_continue2, 1, NULL );
E_detach_fd( Sessions[MAX_SESSIONS].mbox, READ_FD );
E_detach_fd( Sessions[MAX_SESSIONS].mbox, EXCEPT_FD );
Sess_attach_accept();
/* set file descriptor to non blocking */
ioctl_cmd = 1;
ret = ioctl( Sessions[MAX_SESSIONS].mbox, FIONBIO, &ioctl_cmd);
/*
* connect message looks like:
*
* byte - version of lib
* byte - subversion of lib
* (optional) byte - patchversion of lib (only if version.subversion > 3.14)
* byte - 1/0 with or without groups
* byte - len of name
* len bytes - name
*
*/
/* version checking 3.01 is minimal */
version[0] = version[1] = version[2] = 0;
ret = recv( Sessions[MAX_SESSIONS].mbox, version, 2, 0 );
if( ret < 0 )
{
Alarm( SESSION, "Sess_accept_continue: reading version.subversion failed on mailbox %d\n",
Sessions[MAX_SESSIONS].mbox );
close( Sessions[MAX_SESSIONS].mbox );
return;
}
if ( version[0]*10000 + version[1]*100 + version[2] > 31400 )
{
ret = recv( Sessions[MAX_SESSIONS].mbox, &version[2], 1, 0 );
if( ret < 0 )
{
Alarm( SESSION, "Sess_accept_continue: reading patch_version failed on mailbox %d\n",
Sessions[MAX_SESSIONS].mbox );
close( Sessions[MAX_SESSIONS].mbox );
return;
}
}
if( version[0]*10000 + version[1]*100 + version[2] < 30100 )
{
response = REJECT_VERSION;
send( Sessions[MAX_SESSIONS].mbox, &response, sizeof(response), 0 );
Alarm( SESSION, "Sess_accept_continue: version %d.%d.%d is not supported\n",
version[0], version[1], version[2] );
close( Sessions[MAX_SESSIONS].mbox );
return;
}
Sessions[MAX_SESSIONS].lib_version[0] = version[0];
Sessions[MAX_SESSIONS].lib_version[1] = version[1];
Sessions[MAX_SESSIONS].lib_version[2] = version[2];
ret = recv( Sessions[MAX_SESSIONS].mbox, conn, 2, 0 );
if( ret < 0 )
{
Alarm( SESSION, "Sess_accept_continue: reading private name failed on mailbox %d\n",
Sessions[MAX_SESSIONS].mbox );
close( Sessions[MAX_SESSIONS].mbox );
return;
}else if( ret < 2 ){
response = REJECT_NO_NAME;
send( Sessions[MAX_SESSIONS].mbox, &response, sizeof(response), 0 );
Alarm( SESSION, "Sess_accept_continue: reading private name failed on mailbox %d\n",
Sessions[MAX_SESSIONS].mbox );
close( Sessions[MAX_SESSIONS].mbox );
return;
}
if( ((int)conn[0] % 2) == 1 ) Sessions[MAX_SESSIONS].status = Set_memb_session( Sessions[MAX_SESSIONS].status );
else Sessions[MAX_SESSIONS].status = Clear_memb_session( Sessions[MAX_SESSIONS].status );
Sessions[MAX_SESSIONS].priority = (int)conn[0] / 16 ;
name_len = (int)conn[1];
if( name_len > MAX_PRIVATE_NAME || name_len < 0 )
{
response = REJECT_ILLEGAL_NAME;
send( Sessions[MAX_SESSIONS].mbox, &response, sizeof(response), 0 );
Alarm( SESSION, "Sess_accept_continue: len %d of private name does not fit (ret = %d) on mailbox %d\n",
name_len, ret,
Sessions[MAX_SESSIONS].mbox );
close( Sessions[MAX_SESSIONS].mbox );
return;
}
/* reading private name */
for( i=0; i < MAX_PRIVATE_NAME+1; i++ )
Sessions[MAX_SESSIONS].name[i] = 0;
if (name_len == 0)
{
/* Assign a random user name to this user. Currently this is a random 4 digit number followed
* by the mbox of the users connection.
*/
char newname[MAX_PRIVATE_NAME];
unique_private_name = 0;
while ( !unique_private_name )
{
memset(newname, '\0', MAX_PRIVATE_NAME);
rnum = (int) (9999.0*get_rand()/(RAND_MAX+1.0));
snprintf(newname, MAX_PRIVATE_NAME, "r%u-%u", rnum, Sessions[MAX_SESSIONS].mbox);
memcpy( Sessions[MAX_SESSIONS].name, newname, MAX_PRIVATE_NAME);
/* checking if private name is unique */
for( unique_private_name=1, tmp_ses = Sessions_head; tmp_ses; tmp_ses = tmp_ses->sort_next )
{
ret = strcmp( Sessions[MAX_SESSIONS].name, tmp_ses->name );
if( ret <= 0 )
{
if( ret == 0 ) unique_private_name = 0;
break;
}
}
}
} else {
/* recive user name from client and validate it */
ret = recv( Sessions[MAX_SESSIONS].mbox, priv_user_name, MAX_PRIVATE_NAME, 0 );
if( ret < 0 )
{
Alarm( SESSION, "Sess_accept_continue: reading private name failed on mailbox %d\n",
Sessions[MAX_SESSIONS].mbox );
close( Sessions[MAX_SESSIONS].mbox );
return;
}else if( ret != name_len )
{
response = REJECT_ILLEGAL_NAME;
send( Sessions[MAX_SESSIONS].mbox, &response, sizeof(response), 0 );
Alarm( SESSION, "Sess_accept_continue: len %d of private name does not fit (ret = %d) on mailbox %d\n",
name_len, ret, Sessions[MAX_SESSIONS].mbox );
close( Sessions[MAX_SESSIONS].mbox );
return;
}
memcpy( Sessions[MAX_SESSIONS].name, priv_user_name, name_len );
/* checking if private name is legal */
for( legal_private_name=1, i=0; i < name_len; i++ )
if( Sessions[MAX_SESSIONS].name[i] <= '#' ||
Sessions[MAX_SESSIONS].name[i] > '~' ) legal_private_name = 0;
if( !legal_private_name )
{
response = REJECT_ILLEGAL_NAME;
send( Sessions[MAX_SESSIONS].mbox, &response, sizeof(response), 0 );
Alarm( SESSION, "Sess_accept_continue: illegal private name %s on mailbox %d\n",
Sessions[MAX_SESSIONS].name,
Sessions[MAX_SESSIONS].mbox );
close( Sessions[MAX_SESSIONS].mbox );
return;
}
/* checking if private name is unique */
for( unique_private_name=1, tmp_ses = Sessions_head; tmp_ses; tmp_ses = tmp_ses->sort_next )
{
ret = strcmp( Sessions[MAX_SESSIONS].name, tmp_ses->name );
if( ret <= 0 )
{
if( ret == 0 ) unique_private_name = 0;
break;
}
}
if( !unique_private_name )
{
response = REJECT_NOT_UNIQUE;
send( Sessions[MAX_SESSIONS].mbox, &response, sizeof(response), 0 );
Alarm( SESSION, "Sess_accept_continue: non unique private name %s on mailbox %d\n",
Sessions[MAX_SESSIONS].name,
Sessions[MAX_SESSIONS].mbox );
close( Sessions[MAX_SESSIONS].mbox );
return;
}
}
/* set file descriptor back to blocking */
ioctl_cmd = 0;
ret = ioctl( Sessions[MAX_SESSIONS].mbox, FIONBIO, &ioctl_cmd);
/* Insert the new session just before the point we already
* found while checking unique private name... */
sess_location = Sess_insert_new_session (tmp_ses, &Sessions[MAX_SESSIONS]);
Num_sessions++;
GlobalStatus.num_sessions = Num_sessions;
Sessions[sess_location].status = Set_preauth_session( Sessions[sess_location].status );
Sess_hash_session (&Sessions[sess_location]);
/* OLD client library without authentication/authorization code */
if ( version[0]*10000 + version[1]*100 + version[2] < 31600 )
{
Acm_acp_fill_ops( &(Sessions[sess_location].acp_ops) );
/* If IP access control is enabled, then check it.
*/
if ( Acm_auth_query_allowed("IP") )
{
void (*auth_open)(struct session_auth_info *);
struct session_auth_info *sess_auth_p;
sess_auth_p = Acm_auth_create_sess_info_forIP(Sessions[sess_location].mbox);
auth_open = Acm_auth_get_auth_client_connection_byname("IP");
auth_open(sess_auth_p);
return;
}
/* If no IP authentication enabled, then try to use NULL */
if ( Acm_auth_query_allowed("NULL") )
Sess_session_authorized(sess_location);
else
Sess_session_denied(sess_location);
return;
}
allowed_auth_list = Acm_auth_get_allowed_list();
list_len = strlen(allowed_auth_list);
send( Sessions[sess_location].mbox, &list_len, 1, 0 );
send( Sessions[sess_location].mbox, allowed_auth_list, list_len, 0 );
/* If no AllowedAuthMethods are declared, reject all sessions.
* To maintain old behaivor of allowing all, just add the NULL
* method to the allowed list and then all clients will be allowed.
*/
if (list_len == 0)
{
Sess_session_denied(sess_location);
return;
}
/* Now wait for client reply */
E_attach_fd( Sessions[sess_location].mbox, READ_FD, Sess_recv_client_auth, 0, NULL, LOW_PRIORITY );
E_attach_fd( Sessions[sess_location].mbox, EXCEPT_FD, Sess_recv_client_auth, 0, NULL, LOW_PRIORITY );
}
static void Sess_recv_client_auth(mailbox mbox, int dummy, void *dummy_p)
{
int ret, i, ioctl_cmd, ses;
char auth_name[MAX_AUTH_NAME * MAX_AUTH_METHODS];
void (*auth_open)(struct session_auth_info *);
struct session_auth_info *sess_auth_p;
ses = Sess_get_session_index(mbox);
if( ses < 0 || ses >= MAX_SESSIONS ) {
Alarm( PRINT, "Sess_recv_client_auth: Illegal mbox %d for receiving client auth. Cannot deny or allow\n", mbox);
return;
}
if (!Is_preauth_session(Sessions[ses].status) )
{
Alarm( EXIT, "Sess_recv_client_auth: BUG! Session is already authorized (status 0x%x)\n", Sessions[ses].status);
}
E_detach_fd(mbox, READ_FD);
E_detach_fd(mbox, EXCEPT_FD);
/* set file descriptor to non blocking */
ioctl_cmd = 1;
ret = ioctl( mbox, FIONBIO, &ioctl_cmd);
/* FIXME: Support partial reads by storing the portion read so far and requeuing */
ret = recv( mbox, auth_name, MAX_AUTH_NAME * MAX_AUTH_METHODS, 0 );
if( ret < 0 )
{
Alarm( SESSION, "Sess_recv_client_auth: reading auth string failed on mailbox %d\n", mbox );
Sess_session_denied(ses);
return;
}
if( ret < (MAX_AUTH_NAME * MAX_AUTH_METHODS) )
{
Alarm( SESSION, "Sess_recv_client_auth: reading auth string SHORT on mailbox %d\n", mbox );
Sess_session_denied(ses);
return;
}
/* set file descriptor back to blocking */
ioctl_cmd = 0;
ret = ioctl( mbox, FIONBIO, &ioctl_cmd);
i = 0;
while ( auth_name[i * MAX_AUTH_NAME] != '\0')
{
Alarm( SESSION, "Sess_recv_client_auth: Client requested %s type authentication\n", &auth_name[i * MAX_AUTH_NAME]);
if ( !Acm_auth_query_allowed(&auth_name[i * MAX_AUTH_NAME]) )
{
Alarm( SESSION, "Sess_recv_client_auth: received non-allowed auth method %s, closing session %d on mailbox %d\n", &auth_name[i * MAX_AUTH_NAME], ses, mbox);
Sess_session_denied(ses);
return;
}
i++;
}
/* Register default permit all ops access control policy. */
Acm_acp_fill_ops( &(Sessions[ses].acp_ops) );
sess_auth_p = Acm_auth_create_sess_info(mbox, auth_name);
if ( NULL == sess_auth_p )
{
Alarm( SESSION, "Sess_recv_client_auth: no valid auth_methods set or received: auth method %s, closing session %d on mailbox %d\n", auth_name, ses, mbox);
Sess_session_denied(ses);
return;
}
auth_open = Acm_auth_get_auth_client_connection( sess_auth_p->required_auth_methods[0] );
if (auth_open == NULL)
{
Alarm(PRINT, "Sess_recv_client_auth: Illegal auth_method_id (%d) tried\n", sess_auth_p->required_auth_methods[0]);
dispose( sess_auth_p );
Sess_session_denied( ses );
return;
}
auth_open( sess_auth_p );
}
void Sess_session_report_auth_result(struct session_auth_info *sess_auth_h, int authenticated_p )
{
int ses, authid, num_auths;
int permit_count, decision, i;
void (*auth_open)(struct session_auth_info *);
ses = Sess_get_session_index(sess_auth_h->mbox);
if( ses < 0 || ses >= MAX_SESSIONS ) {
Alarm( PRINT, "Sess_session_report_auth_result: Illegal mbox %d for authentication. Cannot deny or allow\n", sess_auth_h->mbox);
dispose( sess_auth_h );
return;
}
num_auths = sess_auth_h->num_required_auths;
/* finished another method. See if entire set of checks is complete */
sess_auth_h->required_auth_results[sess_auth_h->completed_required_auths] = authenticated_p;
sess_auth_h->completed_required_auths++;
if (sess_auth_h->completed_required_auths < num_auths)
{
authid = sess_auth_h->required_auth_methods[sess_auth_h->completed_required_auths];
auth_open = Acm_auth_get_auth_client_connection(authid);
if (auth_open == NULL)
{
Alarm(PRINT, "Sess_session_report_auth_result: Illegal auth_method_id (%d) tried\n", authid);
dispose( sess_auth_h );
Sess_session_denied( ses );
return;
}
auth_open(sess_auth_h);
return;
}
permit_count = 0;
for (i = 0; i < num_auths; i++)
if ( sess_auth_h->required_auth_results[i] ) permit_count++;
dispose( sess_auth_h );
if ( permit_count < num_auths )
{
/* session is denied if any authentication method fails */
Sess_session_denied( ses );
return;
}
decision = Sessions[ses].acp_ops.open_connection(Sessions[ses].name);
if (decision != ACM_ACCESS_ALLOWED)
{
Sess_session_denied( ses );
return;
}
/* NOTE: Here and in the backwards compat support in Sess_accept_cont are the only places
* that should authorize connections
*/
Sess_session_authorized( ses );
}
void Sess_session_denied(int ses)
{
char response;
if (!Is_preauth_session(Sessions[ses].status) )
{
Alarm( EXIT, "Sess_session_denied: BUG! Session is already authorized (status 0x%x)\n", Sessions[ses].status);
}
response = REJECT_AUTH;
send( Sessions[ses].mbox, &response, sizeof(response), 0 );
Alarm( SESSION, "Sess_session_denied: Authorization denied for %s on mailbox %d\n",
Sessions[ses].name,
Sessions[ses].mbox );
close( Sessions[ses].mbox );
Sess_unhash_session (&Sessions[ses]);
Sess_remove_session (&Sessions[ses]);
Num_sessions--;
GlobalStatus.num_sessions = Num_sessions;
return;
}
void Sess_session_authorized(int ses)
{
char ip[16];
char response;
unsigned int name_len;
char private_group_name[MAX_GROUP_NAME];
if (!Is_preauth_session(Sessions[ses].status) )
{
Alarm( EXIT, "Sess_session_authorized: BUG! Session is already authorized (status 0x%x)\n", Sessions[ses].status);
}
/*
* accept message looks like:
*
* byte - ACCEPT_SESSION code
* byte - version of spread
* byte - subversion of spread
* (optional) byte - patch version of spread (only if library is 3.15.0 or greater)
* byte - len of name
* len bytes - name
*
*/
Sessions[ses].num_mess = 0;
response = ACCEPT_SESSION;
send( Sessions[ses].mbox, &response, 1, 0 );
response = SP_MAJOR_VERSION;
send( Sessions[ses].mbox, &response, 1, 0 );
response = SP_MINOR_VERSION;
send( Sessions[ses].mbox, &response, 1, 0 );
if (Sessions[ses].lib_version[0]*10000 + Sessions[ses].lib_version[1]*100 + Sessions[ses].lib_version[2] >= 31500)
{
response = SP_PATCH_VERSION;
send( Sessions[ses].mbox, &response, 1, 0 );
}
sprintf(private_group_name, "#%s#%s", Sessions[ses].name, My.name );
name_len = strlen( private_group_name );
/* sending the len of the private group in one byte */
response = name_len;
send( Sessions[ses].mbox, &response, 1, 0 );
/* sending the private group name */
send( Sessions[ses].mbox, private_group_name, name_len, 0 );
E_attach_fd( Sessions[ses].mbox, READ_FD, Sess_read, Sessions[ses].type, NULL,
LOW_PRIORITY );
E_attach_fd( Sessions[ses].mbox, EXCEPT_FD, Sess_read, Sessions[ses].type, NULL,
LOW_PRIORITY );
Sessions[ses].status = Set_op_session( Sessions[ses].status );
Sessions[ses].status = Clear_preauth_session( Sessions[ses].status );
Prot_Create_Local_Session(&Sessions[ses]);
Message_reset_current_location(&(Sessions[ses].read) );
Message_reset_current_location(&(Sessions[ses].write) );
Sessions[ses].read.in_mess_head = 1;
Log_sess_connect( Sessions[ses].mbox, Sessions[ses].address,
Sessions[ses].name );
Conf_id_to_str( Sessions[ses].address, ip );
Alarm( SESSION, "Sess_session_authorized: Accepting from %s with private name %s on mailbox %d\n",
ip,
Sessions[ses].name,
Sessions[ses].mbox );
}
static int Sess_validate_read_header( mailbox mbox, int ses, int head_size, message_header *head_ptr)
{
char private_name[MAX_PRIVATE_NAME+1];
char proc_name[MAX_PROC_NAME];
int ret, type_bits, memb_bits;
/* Disallow more then one message type being set or more then one Join/Leave/Kill being set */
if ( ( (type_bits = count_bits_set( head_ptr->type, 0, 6)) > 1) ||
( (memb_bits = count_bits_set( head_ptr->type, 16, 19)) > 1) ||
( (type_bits + memb_bits) != 1 ) )
{
Alarm( SESSION, "Sess_validate_read_header: Message has illegal type field 0x%x\n", head_ptr->type);
return(-1);
}
head_ptr->private_group_name[MAX_GROUP_NAME -1] = '\0';
ret = G_private_to_names( head_ptr->private_group_name, private_name, proc_name );
if( ret < 0 )
{
Alarm( SESSION, "Sess_validate_read_header: Message has illegal private_group_name (priv, proc)\n");
return(-1);
}
if( strncmp( proc_name, My.name, MAX_PROC_NAME ) != 0 )
{
Alarm( SESSION, "Sess_validate_read_header: proc name %s is not my name %s\n",
proc_name, My.name );
return(-1);
}
if (strncmp(private_name , Sessions[ses].name, MAX_PRIVATE_NAME) )
{
Alarm( PRINT, "Sess_validate_read_header: Session %s trying to make session %s do something\n",
private_name, Sessions[ses].name );
return(-1);
}
if ( (head_ptr->num_groups < 0) || (head_ptr->num_groups > MAX_GROUPS_PER_MESSAGE) )
{
Alarm( SESSION, "Sess_validate_read_header: Message has negative or too large num_groups field\n", head_ptr->num_groups);
return(-1);
}
if ( head_ptr->hint & ~( 0x80000080 | 0x00ffff00 ) )
{
Alarm( SESSION, "Sess_validate_read_header: Message has illegal hint field 0x%x\n", head_ptr->hint);
return(-1);
}
if ( (head_ptr->data_len < 0 ) || ( head_ptr->data_len > MAX_MESSAGE_BODY_LEN) )
{
Alarm( SESSION, "Sess_validate_read_header: Message has negative or too large data_len %d\n", head_ptr->data_len);
return(-1);
}
if ( (head_ptr->data_len + MAX_GROUP_NAME * head_ptr->num_groups) > (MAX_MESSAGE_BODY_LEN - head_size ) )
{
Alarm( SESSION, "Sess_validate_read_header: Message + Groups is too large (%d + %d = %d). MAX size is: %d\n",
head_ptr->data_len, MAX_GROUP_NAME * head_ptr->num_groups,
head_ptr->data_len + MAX_GROUP_NAME * head_ptr->num_groups,
MAX_MESSAGE_BODY_LEN - head_size );
return(-1);
}
/* Passed all checks, so valid */
return( 0 );
}
static void Sess_read( mailbox mbox, int dummy, void *d2 )
{
message_header *head_ptr, *msg_head;
message_obj *msg;
scatter *scat;
down_link *down_ptr;
message_link *mess_link;
int packet_index, byte_index, to_read;
int len, remain, ret;
int head_size, data_frag_len;
int ses, ioctl_cmd;
char *head_cbuf;
#if 0
#ifndef ARCH_SCATTER_NONE
static struct msghdr msgh;
#endif /* ARCH_SCATTER_NONE */
#endif /* 0 */
#if 0
/* we currently don't use recvmsg */
#ifndef ARCH_SCATTER_NONE
msgh.msg_name = (caddr_t) 0;
msgh.msg_namelen = 0;
msgh.msg_iov = (struct iovec *)scat->elements;
msgh.msg_iovlen = scat->num_elements;
#endif /* ARCH_SCATTER_NONE */
#ifdef ARCH_SCATTER_CONTROL
msgh.msg_control = (caddr_t) 0;
msgh.msg_controllen = 0;
#endif /* ARCH_SCATTER_CONTROL */
#ifdef ARCH_SCATTER_ACCRIGHTS
msgh.msg_accrights = (caddr_t) 0;
msgh.msg_accrightslen = 0;
#endif /* ARCH_SCATTER_ACCRIGHTS */
#endif /* 0 */
ses = Sess_get_session_index(mbox);
if( ses < 0 || ses >= MAX_SESSIONS ) {
Alarm( PRINT, "Sess_read: Illegal mbox %d for read\n", mbox);
return;
}
if (Sessions[ses].read_mess == NULL)
Sessions[ses].read_mess = Message_new_message();
msg = Sessions[ses].read_mess;
head_ptr = Message_get_message_header(msg);
head_cbuf = (char *) head_ptr;
head_size = Message_get_header_size();
/* set file descriptor to non blocking */
ioctl_cmd = 1;
ret = ioctl( mbox, FIONBIO, &ioctl_cmd);
if ( Sessions[ses].read.in_mess_head == 1 )
{
/* read up to size of message_header */
len = Sessions[ses].read.cur_byte;
remain = sizeof(message_header) - len;
ret = recv( mbox, (char *) &head_cbuf[len], remain, 0 );
if( ret == remain )
{
Sessions[ses].read.cur_byte += ret;
Message_set_location_begin_body(&(Sessions[ses].read) );
} else if (ret > 0 ) {
Sessions[ses].read.cur_byte += ret;
ioctl_cmd = 0;
ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
return;
} else {
/* error reading */
if ( (ret == -1) && ( (sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK) ) ) {
ioctl_cmd = 0;
ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
return;
}
Alarm( SESSION, "Sess_read: failed receiving header on session %d: ret %d: error: %s \n", mbox, ret, sock_strerror(sock_errno) );
Sess_kill( mbox );
ioctl_cmd = 0;
ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
return;
}
/* When we get here we have a complete header */
ioctl_cmd = 0;
ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
/* Fliping message header to my form if needed */
if( !Same_endian( head_ptr->type ) )
{
Flip_mess( head_ptr );
}
/* Setting endian to my endian on the header */
head_ptr->type = Set_endian( head_ptr->type );
/* Validate all fields */
Alarm( SESSION, "Sess_read: Message has type field 0x%x\n", head_ptr->type);
ret = Sess_validate_read_header( mbox, ses, head_size, head_ptr);
if (ret < 0 )
{
/* invalid header */
Sess_kill(mbox);
return;
}
} /* finished reading and validating header */
/*
* to do recvmsg, but need to trick with the starting AFTER the header
* on the first packet, and then to return the big_scatter to original
* form. read at *most* head_ptr->data_len (set scatter lengths accordingly
* everytime here from scratch!
*
* ret = recvmsg( mbox, &msg, 0 );
* if( ret <=0 )
* {
* Alarm( SESSION, "Sess_read: failed receiving message on session %d\n", mbox );
* Sess_kill( mbox );
* return;
* }
*/
/* read the rest of the message if needed, reserving room at the beginning
* of the first fragment(scat buf) for the message header and the lts and seq fields. */
/* enable non-blocking io */
ioctl_cmd = 1;
ret = ioctl( mbox, FIONBIO, &ioctl_cmd);
data_frag_len = Message_get_data_fragment_len();
scat = Message_get_data_scatter(msg);
remain = ( head_ptr->data_len + MAX_GROUP_NAME*head_ptr->num_groups ) - Sessions[ses].read.total_bytes;
for( ; remain > 0; remain -= ret )
{
packet_index = Sessions[ses].read.cur_element;
byte_index = Sessions[ses].read.cur_byte;
if (packet_index >= scat->num_elements)
{
/* We are beginning a new fragment -- so allocate it */
assert(byte_index == 0);
Message_add_scat_element(msg);
}
to_read = ( data_frag_len - byte_index );
if( to_read > remain ) to_read = remain;
ret = recv( mbox, &scat->elements[packet_index].buf[byte_index],
to_read, 0 );
if( ret == to_read )
{
Sessions[ses].read.cur_byte = 0;
Sessions[ses].read.cur_element++;
Sessions[ses].read.total_bytes += ret;
} else if (ret > 0 ) {
Sessions[ses].read.cur_byte += ret;
Sessions[ses].read.total_bytes += ret;
ioctl_cmd = 0;
ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
return;
} else {
if ( (ret == -1) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) ) {
ioctl_cmd = 0;
ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
return;
}
Alarm( SESSION, "Sess_read: failed receiving message on session %d, ret is %d: error: %s\n", mbox, ret, sock_strerror(sock_errno) );
Alarm( SESSION, "Sess_read: failed recv msg more info: len read: %d, remain: %d, to_read: %d, pkt_index: %d, b_index: %d, scat_nums: %d\n",Sessions[ses].read.total_bytes, remain, to_read, packet_index, byte_index, scat->num_elements );
Sess_kill( mbox );
ioctl_cmd = 0;
ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
return;
}
}
/* We now have a complete message */
ioctl_cmd = 0;
ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
/* reset active read_mess to empty */
Message_reset_current_location(&(Sessions[ses].read));
Sessions[ses].read.in_mess_head = 1;
Sessions[ses].read_mess = NULL;
Message_element_len_fixup(msg);
#ifdef PROBE_LATENCY
if (Is_latency_mess( head_ptr->type ) )
{
int32u initial_offset, htime_offset;
int32u *p_time_offset;
sp_time cur_time, ncur_time;
initial_offset = MAX_GROUP_NAME*head_ptr->num_groups + head_size;
p_time_offset = (int32u *) &msg->body.elements[0].buf[initial_offset];
htime_offset = ntohl(*p_time_offset);
Alarm(SESSION, "Sess_read: Msg Data at %d with time_offset %u \n", initial_offset, htime_offset);
cur_time = E_get_time();
ncur_time.sec = htonl(cur_time.sec);
ncur_time.usec = htonl(cur_time.usec);
memcpy(&(msg->body.elements[0].buf[htime_offset + initial_offset]), &ncur_time, sizeof(sp_time));
Alarm(SESSION, "Sess_read: timestamped time (%d, %d) in byte %d of message\n", cur_time.sec, cur_time.usec, htime_offset);
*p_time_offset = htonl(htime_offset + sizeof(sp_time) );
}
#endif /* PROBE_LATENCY */
/* Do ACM access control checks */
/* Note, disconnects (Is_kill_mess) are not limited. Someone can always cut themselves off */
if ( Is_leave_mess( head_ptr->type ) )
{
char *groups_ptr;
int decision;
groups_ptr = Message_get_first_group( msg );
decision = Sessions[ses].acp_ops.leave_group( head_ptr->private_group_name, groups_ptr, NULL);
if (decision != ACM_ACCESS_ALLOWED)
{
head_ptr->type = (head_ptr->type & ~LEAVE_MESS);
head_ptr->type |= CAUSED_BY_LEAVE;
Sess_create_reject_message( msg );
Sess_deliver_reject( msg );
return;
}
}
if ( Is_join_mess( head_ptr->type ) )
{
char *groups_ptr;
int decision;
groups_ptr = Message_get_first_group( msg );
/* Make sure we don't let a join happen if the limit has been reached. */
if( G_get_num_local( groups_ptr ) == MAX_LOCAL_GROUP_MEMBERS ) {
Alarm( PRINT, "Sess_read: Attempt by session %s to join group %s "
"failed: too many local members.\n", head_ptr->private_group_name, groups_ptr );
head_ptr->type = (head_ptr->type & ~JOIN_MESS);
head_ptr->type |= CAUSED_BY_JOIN;
Sess_create_reject_message( msg );
Sess_deliver_reject( msg );
return;
}
decision = Sessions[ses].acp_ops.join_group( head_ptr->private_group_name, groups_ptr, NULL);
if (decision != ACM_ACCESS_ALLOWED)
{
head_ptr->type = (head_ptr->type & ~JOIN_MESS);
head_ptr->type |= CAUSED_BY_JOIN;
Sess_create_reject_message( msg );
Sess_deliver_reject( msg );
return;
}
}
if ( Is_only_regular_mess( head_ptr->type ) )
{
char *groups_ptr;
char target_groups[MAX_GROUPS_PER_MESSAGE][MAX_GROUP_NAME];
int decision, num_p2p_dest;
groups_ptr = Message_get_groups_array( msg );
num_p2p_dest = Sess_get_p2p_dests(head_ptr->num_groups, (char (*)[MAX_GROUP_NAME])groups_ptr, target_groups);
if (num_p2p_dest)
{
decision = Sessions[ses].acp_ops.p2p_send( head_ptr->private_group_name, num_p2p_dest, target_groups, head_ptr->type, ( (head_ptr->hint >> 8) & 0x0000ffff) );
if (decision != ACM_ACCESS_ALLOWED)
{
Sess_create_reject_message( msg );
Sess_deliver_reject( msg );
return;
}
}
if (head_ptr->num_groups > num_p2p_dest)
{
decision = Sessions[ses].acp_ops.mcast_send( head_ptr->private_group_name, head_ptr->num_groups, (char (*)[MAX_GROUP_NAME])groups_ptr, head_ptr->type, ( (head_ptr->hint >> 8) & 0x0000ffff) );
if (decision != ACM_ACCESS_ALLOWED)
{
Sess_create_reject_message( msg );
Sess_deliver_reject( msg );
return;
}
}
}
/* create new down_link and big_scatter */
down_ptr = Prot_Create_Down_Link(msg, Message_get_packet_type(head_ptr->type), mbox, 0);
if (down_ptr == NULL)
{
Alarm( SESSION, "Sess_read: Session has illegal message type 0x%x\n", head_ptr->type);
Sess_kill( mbox );
return;
}
down_ptr->mess = msg;
msg_head = Message_get_message_header(down_ptr->mess);
if (Is_kill_mess(msg_head->type) )
{
/* We are going to overwrite the group that is sent from the library.
* it is not needed for kill messages, so we shrink the data field to ignore it
*/
len = Message_kill_mess_fixup(down_ptr->mess, Sessions[ses].read.total_bytes - MAX_GROUP_NAME, mbox);
/* A bug in both 3.13 and 4 I think is that if we get a DISCONNECT message
* from the client and we process and deliver that before discovering the
* closed socket ourselves and calling Sess_kill(), then the session
* is in the wrong state and we will crash when we try to finish delivery.
*/
Log_sess_disconnect( Sessions[ses].mbox, Sessions[ses].address, Sessions[ses].name,
Sessions[ses].num_mess );
/* clear his structure */
while( Sessions[ses].num_mess > 0 )
{
mess_link = Sessions[ses].first;
Sessions[ses].first = Sessions[ses].first->next;
Sess_dispose_message( mess_link );
Sessions[ses].num_mess--;
}
/* close the mailbox and mark it unoperational */
E_dequeue( Sess_badger, mbox, NULL );
E_detach_fd( mbox, READ_FD );
E_detach_fd( mbox, EXCEPT_FD );
close( mbox );
/* the mailbox is closed but the entry still points to it */
Sessions[ses].status = Clear_op_session( Sessions[ses].status );
Alarm( SESSION, "Sess_read: disconnecting session %s ( mailbox %d )\n",Sessions[ses].name, mbox );
}
Alarm( SESSION, "Sess_read: queueing message of type %d with len %d to the protocol\n",
down_ptr->type, Sessions[ses].read.total_bytes );
Prot_new_message( down_ptr, Sessions[ses].down_queue );
}
static int Sess_get_p2p_dests( int num_groups, char groups[][MAX_GROUP_NAME], char dests[][MAX_GROUP_NAME] )
{
int i, num_p2p_targets = 0;
for( i=0; i < num_groups; i++ )
{
if( groups[i][0] == '#' )
{
/* private group */
memcpy( dests[num_p2p_targets], groups[i], MAX_GROUP_NAME);
num_p2p_targets++;
}
}
return( num_p2p_targets );
}
/* Take a message received from a client and change it into the form of
* a reject message. Destination groups, user data, mess_type and type field
* are all preserved to give the sender information about what message was
* rejected.
*/
static void Sess_create_reject_message ( message_obj *msg )
{
message_header *head_ptr;
int32u old_type;
head_ptr = Message_get_message_header(msg);
old_type = head_ptr->type;
head_ptr->type = REJECT_MESS;
head_ptr->type = Set_endian( head_ptr->type );
/* If original message was SELF_DISCARD, then maintain that state */
if (Is_self_discard( old_type) ) head_ptr->type |= SELF_DISCARD;
Message_add_oldtype_to_reject( msg, old_type );
Alarm( PRINT, "Sess_create_reject_mess: Created Reject for sender %s type 0x%x oldtype 0x%x for first group %s\n",
head_ptr->private_group_name, head_ptr->type, old_type, Message_get_first_group( msg ) );
return;
}
static void Sess_deliver_reject( message_obj *msg )
{
message_link *mess_link;
mess_link = new(MESSAGE_LINK);
if (mess_link == NULL ) {
Alarm(EXIT, "Sess_deliver_reject: Failed to allocate a new MESSAGE_LINK.\n");
return;
}
mess_link->mess = msg;
mess_link->next = NULL;
Sess_deliver_message( mess_link );
return;
}
void Sess_write( int ses, message_link *mess_link, int *needed )
{
message_obj *msg;
message_link *tmp_link;
scatter *scat;
int ioctl_cmd;
int ret;
int total_to_send;
int len_sent, first_data_len;
char *first_data_ptr;
int i;
message_header *head_ptr;
if( !Is_op_session( Sessions[ses].status ) ) return;
if( Sessions[ses].num_mess >= MAX_SESSION_MESSAGES )
{
Alarm( SESSION,
"Sess_write: killing mbox %d for not reading\n",
Sessions[ses].mbox );
Sess_kill( Sessions[ses].mbox );
return;
}
if( Sessions[ses].num_mess > 0 )
{
Sess_badger( Sessions[ses].mbox , NULL);
}
msg = mess_link->mess;
Obj_Inc_Refcount(msg);
scat = Message_get_data_scatter(msg);
for( total_to_send=0, i=0; i < scat->num_elements; i++ )
total_to_send += scat->elements[i].len;
/* since also sending message_header */
total_to_send += Message_get_non_body_header_size();
head_ptr = Message_get_message_header(msg);
#ifdef PROBE_LATENCY
if (Is_latency_mess( head_ptr->type ) )
{
int32u initial_offset, htime_offset;
int32u *p_time_offset;
sp_time cur_time, ncur_time;
initial_offset = MAX_GROUP_NAME*head_ptr->num_groups;
p_time_offset = (int32u *) &msg->body.elements[0].buf[initial_offset];
htime_offset = ntohl(*p_time_offset);
Alarm(SESSION, "Sess_write: Msg Data at %d with timeoffset %u\n", initial_offset, htime_offset);
cur_time = E_get_time();
ncur_time.sec = htonl(cur_time.sec);
ncur_time.usec = htonl(cur_time.usec);
memcpy(&(msg->body.elements[0].buf[htime_offset + initial_offset]), &ncur_time, sizeof(sp_time));
Alarm(SESSION, "Sess_write: timestamped time (%d, %d) in byte %d of message\n", cur_time.sec, cur_time.usec, htime_offset);
*p_time_offset = htonl(htime_offset + sizeof(sp_time) );
}
#endif
len_sent = 0;
if( Sessions[ses].num_mess == 0 )
{
/* set file descriptor to non blocking */
ioctl_cmd = 1;
ret = ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
/* Send the first part of message which must be handled specially */
ret = send(Sessions[ses].mbox, (char *) head_ptr,
sizeof(message_header), 0);
if ( ret > 0 ) len_sent += ret;
if ( ret != sizeof(message_header) )
{
goto end_write;
}
/* Send the first data of message which must be handled specially */
first_data_ptr = Message_get_first_data_ptr(msg);
first_data_len = Message_get_first_data_len(msg);
ret = send(Sessions[ses].mbox, first_data_ptr,
first_data_len, 0);
if ( ret > 0 ) len_sent += ret;
if ( ret != first_data_len )
{
goto end_write;
}
/* send the message after first buffer*/
for( i=1; i < scat->num_elements; i++ )
{
ret = send( Sessions[ses].mbox, scat->elements[i].buf,
scat->elements[i].len, 0);
if( ret > 0 ) len_sent += ret;
if( ret != scat->elements[i].len )
{
break;
}
}
/* set file descriptor back to blocking */
end_write:
ioctl_cmd = 0;
ret = ioctl( Sessions[ses].mbox, FIONBIO, &ioctl_cmd);
}
if( len_sent < total_to_send )
{
/* this message has to be linked */
if( *needed )
{
/* create a copy of mess_link and link it */
tmp_link = new(MESSAGE_LINK);
if (tmp_link == NULL ) {
Alarm(EXIT, "Sess_write: Failed to allocate a new MESSAGE_LINK.\n");
return;
}
tmp_link->mess = Message_copy_message(msg);
++*needed;
}else{
/* should link mess_link itself */
tmp_link = mess_link;
*needed=1;
}
/* link the message */
tmp_link->next = 0;
if( Sessions[ses].num_mess == 0 )
{
Sessions[ses].first = tmp_link;
Sessions[ses].last = tmp_link;
/* setting cur_element and cur_byte */
Message_calculate_current_location(tmp_link->mess, len_sent, &(Sessions[ses].write) );
/* We will need to badger this guy */
E_queue( Sess_badger, Sessions[ses].mbox, NULL, Badger_timeout );
}else{
/* This guy was already badgered */
Sessions[ses].last->next = tmp_link;
Sessions[ses].last = tmp_link;
}
Sessions[ses].num_mess++;
}
Message_Dec_Refcount(msg);
}
static void Sess_badger( mailbox mbox, void *dummy )
{
int ses;
message_link *mess_link;
int able_to_write;
message_obj *msg;
scatter *scat;
int ioctl_cmd;
int bytes_to_send, from;
int i;
int ret;
Alarm( SESSION, "Sess_badger: for mbox %d\n", mbox );
ses = Sess_get_session_index( mbox );
if( ses < 0 || ses >= MAX_SESSIONS ) return;
if( !Is_op_session( Sessions[ses].status ) ) return;
if( Sessions[ses].num_mess <= 0 ) return;
/* set file descriptor to non blocking */
ioctl_cmd = 1;
ret = ioctl( mbox, FIONBIO, &ioctl_cmd);
for( able_to_write = 1 ; Sessions[ses].num_mess > 0 && able_to_write; )
{
msg = Sessions[ses].first->mess;
Obj_Inc_Refcount(msg);
/* First if we havn't sent the mess_head yet, send that special
* and then reset the cur_byte and in_mess_head fields
*/
if ( Sessions[ses].write.in_mess_head == 1 )
{
char *tmp_buf;
tmp_buf = (char *) Message_get_message_header(msg);
from = Sessions[ses].write.cur_byte;
bytes_to_send = sizeof(message_header) - from;
ret = send(mbox, &tmp_buf[from], bytes_to_send, 0);
if (ret == bytes_to_send)
{
Message_set_location_begin_body(&(Sessions[ses].write));
} else
{
if (ret > 0) Sessions[ses].write.cur_byte = from + ret;
able_to_write = 0;
}
}
/* Next if we still can, send some body of this messsage */
if ( able_to_write )
{
scat = Message_get_data_scatter(msg);
for( i=Sessions[ses].write.cur_element, from=Sessions[ses].write.cur_byte;
i < scat->num_elements;
i++, from=0 )
{
bytes_to_send = scat->elements[i].len - from;
ret = send( mbox, &(scat->elements[i].buf[from]), bytes_to_send, 0);
if( ret == bytes_to_send )
{
Sessions[ses].write.cur_byte = 0;
Sessions[ses].write.cur_element++;
}else{
if( ret > 0 ) Sessions[ses].write.cur_byte += ret;
able_to_write = 0;
break;
}
}
}
if( able_to_write )
{
/* free that message */
mess_link = Sessions[ses].first;
Sessions[ses].first = Sessions[ses].first->next;
Sessions[ses].num_mess--;
Message_reset_current_location(&(Sessions[ses].write) );
Sess_dispose_message( mess_link );
}
Message_Dec_Refcount(msg);
} /* for loop per message */
/* set file descriptor back to blocking */
ioctl_cmd = 0;
ret = ioctl( mbox, FIONBIO, &ioctl_cmd);
if( Sessions[ses].num_mess > 0 ) E_queue( Sess_badger, mbox, NULL, Badger_timeout );
}
static void Sess_kill( mailbox mbox )
{
int ses;
message_link *mess_link;
message_obj *kill_mess;
message_header *kill_head;
down_link *down_ptr;
int head_size;
char private_send_group[MAX_GROUP_NAME];
head_size = Message_get_header_size();
ses = Sess_get_session_index(mbox);
if( ses < 0 || ses >= MAX_SESSIONS ) {
Alarm( PRINT, "Sess_kill: Illegal mbox %d for killing. Cannot cleanup\n", mbox);
return;
}
if( !Is_op_session( Sessions[ses].status ) )
{
if ( !Is_preauth_session( Sessions[ses].status ) )
Alarm( PRINT,
"Sess_kill: session %s with mailbox %d is already killed (status %d)\n",
Sessions[ses].name, mbox, Sessions[ses].status );
else
Alarm( EXIT,
"Sess_kill: BUG! session %s with mailbox %d killed before authentication and authorization completed (status 0x%x)\n",
Sessions[ses].name, mbox, Sessions[ses].status);
}
/* send a message to kill this session */
sprintf( private_send_group, "#%s#%s", Sessions[ses].name, My.name );
kill_mess = Message_create_message(KILL_MESS | SAFE_MESS, private_send_group);
kill_head = Message_get_message_header(kill_mess);
Message_kill_mess_fixup(kill_mess, head_size, mbox );
down_ptr = Prot_Create_Down_Link(kill_mess, Message_get_packet_type(kill_head->type), mbox, 0);
down_ptr->mess = kill_mess;
Obj_Inc_Refcount(down_ptr->mess);
Prot_new_message( down_ptr, Sessions[ses].down_queue );
Message_Dec_Refcount(kill_mess);
Log_sess_disconnect( Sessions[ses].mbox, Sessions[ses].address, Sessions[ses].name,
Sessions[ses].num_mess );
/* clear his structure */
while( Sessions[ses].num_mess > 0 )
{
mess_link = Sessions[ses].first;
Sessions[ses].first = Sessions[ses].first->next;
Sess_dispose_message( mess_link );
Sessions[ses].num_mess--;
}
/* reset active read_mess to empty */
Message_reset_current_location(&(Sessions[ses].read));
Sessions[ses].read.in_mess_head = 1;
if (Sessions[ses].read_mess != NULL)
Message_dispose_message( Sessions[ses].read_mess );
Sessions[ses].read_mess = NULL;
/* close the mailbox and mark it unoperational */
E_dequeue( Sess_badger, mbox, NULL );
E_detach_fd( mbox, READ_FD );
E_detach_fd( mbox, EXCEPT_FD );
close(mbox);
/* the mailbox is closed but the entry still points to it */
Sessions[ses].status = Clear_op_session( Sessions[ses].status );
Alarm( SESSION, "Sess_kill: killing session %s ( mailbox %d )\n",Sessions[ses].name, mbox );
}
void Sess_deliver_message( message_link *mess_link )
{
static int target_sessions[MAX_SESSIONS];
int num_target_sessions;
int source_ses;
message_header *head_ptr;
message_obj *msg;
scatter *scat;
char *target_groups;
char private_name[MAX_PRIVATE_NAME+1];
char proc_name[MAX_PROC_NAME];
int needed;
int i;
msg = mess_link->mess;
Obj_Inc_Refcount(msg);
head_ptr = Message_get_message_header(msg);
Message_endian_correct_message_header(msg);
if( Is_join_mess( head_ptr->type ) )
{
Sess_handle_join( mess_link );
Message_Dec_Refcount(msg);
return;
}
if( Is_leave_mess( head_ptr->type ) )
{
Sess_handle_leave( mess_link );
Message_Dec_Refcount(msg);
return;
}
if( Is_kill_mess( head_ptr->type ) )
{
Sess_handle_kill( mess_link );
Message_Dec_Refcount(msg);
return;
}
if( Is_groups_mess( head_ptr->type ) )
{
G_handle_groups( mess_link );
Message_Dec_Refcount(msg);
return;
}
/* regular message */
GlobalStatus.message_delivered++;
/* Setting endian to my endian on the header */
head_ptr->type = Set_endian( head_ptr->type );
scat = Message_get_data_scatter(msg);
/* analyze message groups to sessions */
if ( Is_reject_mess(head_ptr->type) ) {
num_target_sessions = 1;
G_private_to_names( head_ptr->private_group_name, private_name, proc_name );
target_sessions[0] = Sess_get_session( private_name );
} else {
target_groups = Message_get_groups_array(msg);
num_target_sessions = G_analize_groups( head_ptr->num_groups,
(char (*)[MAX_GROUP_NAME])target_groups,
target_sessions ) ;
}
/* if self_discard, sender is local and a target then eliminate sender from targets */
source_ses = -1;
if( num_target_sessions > 0 && Is_self_discard( head_ptr->type ) )
{
G_private_to_names( head_ptr->private_group_name, private_name, proc_name );
if( strcmp( My.name, proc_name ) == 0 )
{
source_ses = Sess_get_session( private_name );
}
}
needed = 0;
for( i = 0; i < num_target_sessions ; i++ )
{
if( source_ses == target_sessions[i] ) continue; /* self_discard */
Sess_write( target_sessions[i], mess_link, &needed );
}
Message_Dec_Refcount(msg);
if( !needed ) Sess_dispose_message( mess_link );
}
void Sess_deliver_reg_memb( configuration reg_memb, membership_id reg_memb_id )
{
G_handle_reg_memb( reg_memb, reg_memb_id );
}
void Sess_deliver_trans_memb( configuration trans_memb, membership_id trans_memb_id )
{
G_handle_trans_memb( trans_memb, trans_memb_id );
}
static void Sess_handle_join( message_link *mess_link )
{
message_header *join_head;
char *group_name;
join_head = Message_get_message_header(mess_link->mess);
group_name = Message_get_first_group(mess_link->mess);
G_handle_join( join_head->private_group_name, group_name );
Sess_dispose_message( mess_link );
}
static void Sess_handle_leave( message_link *mess_link )
{
message_header *leave_head;
char *group_name;
leave_head = Message_get_message_header(mess_link->mess);
group_name = Message_get_first_group(mess_link->mess);
G_handle_leave( leave_head->private_group_name, group_name );
Sess_dispose_message( mess_link );
}
static void Sess_handle_kill( message_link *mess_link )
{
char private_name[MAX_PRIVATE_NAME+1];
char proc_name[MAX_PROC_NAME];
message_header *kill_head;
int ses;
int ret;
kill_head = Message_get_message_header(mess_link->mess);
ret = G_private_to_names( kill_head->private_group_name, private_name, proc_name );
if( ret < 0 ) Alarm( EXIT, "Sess_handle_kill: Illegal private name to kill %s\n",
kill_head->private_group_name );
G_handle_kill( kill_head->private_group_name );
if( strcmp( My.name, proc_name ) == 0 )
{
/* my machine, we need to find the session */
ses = Sess_get_session( private_name );
if( ses >= 0 )
{
/* delete session ses */
if( Is_op_session( Sessions[ses].status ) )
Alarm( EXIT, "Sess_handle_kill: killing unkilled session bug!\n");
Sess_unhash_session (&Sessions[ses]);
Prot_Destroy_Local_Session(&(Sessions[ses]) );
Sess_remove_session (&Sessions[ses]);
Num_sessions--;
GlobalStatus.num_sessions = Num_sessions;
}
}
Prot_kill_session(mess_link->mess);
Sess_dispose_message( mess_link );
}
void Sess_dispose_message( message_link *mess_link )
{
Message_dispose_message(mess_link->mess);
dispose( mess_link );
}
int Sess_get_session( char *name )
{
int ret;
session *ses;
for( ses = Sessions_head; ses; ses = ses->sort_next )
{
ret = strcmp( ses->name, name );
if( ret < 0 ) continue;
if( ret == 0 ) return( ses - Sessions );
if( ret > 0 ) return( -1 );
}
return( -1 );
}
void Flip_mess( message_header *head_ptr )
{
head_ptr->type = Flip_int32( head_ptr->type );
head_ptr->num_groups = Flip_int32( head_ptr->num_groups );
head_ptr->data_len = Flip_int32( head_ptr->data_len );
}
syntax highlighted by Code2HTML, v. 0.9.1