### Local Variables: *** ### mode:perl *** ### comment-column:0 *** ### comment-start: "### " *** ### comment-end: "***" *** ### End: *** # # ****************DO NOT MOVE OR CHANGE LINES ABOVE THIS********************* # # The first set of lines runs perl from any shell. The second set of lines # identifies the rest of the file as PERL for EMACS autoformatting. # See end of copyright for more information. # # # ------------------------------------------------------------------- # X-BONE # # http://www.isi.edu/xbone # USC Information Sciences Institute (USC/ISI) # Marina del Rey, California 90292, USA # Copyright (c) 1998-2005 # # ------------------------------------------------------------------- # # Copyright (c) 1998-2005 by the University of Southern California. # All rights reserved. # # Permission to use, copy, modify, and distribute this software and # its documentation in source and binary forms for non-commercial # purposes and without fee is hereby granted, provided that the above # copyright notice appear in all copies and that both the copyright # notice and this permission notice appear in supporting # documentation, and that any documentation, advertising materials, # and other materials related to such distribution and use acknowledge # that the software was developed by the University of Southern # California, Information Sciences Institute. The name of the # University may not be used to endorse or promote products derived # from this software without specific prior written permission. # # THE UNIVERSITY OF SOUTHERN CALIFORNIA MAKES NO REPRESENTATIONS ABOUT # THE SUITABILITY OF THIS SOFTWARE FOR ANY PURPOSE. THIS SOFTWARE IS # PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED WARRANTIES, # INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. # # Other copyrights might apply to parts of this software and are so # noted when applicable. # # ------------------------------------------------------------------- # # Effort partly sponsored by the Defense Advanced Research Projects # Agency (DARPA) and Air Force Research Laboratory, Air Force Materiel # Command, USAF, under agreement numbers F30602-98-1-0200 (X-Bone) and # F30602-01-2-0529 (DynaBone). The views and conclusions contained # herein are those of the authors and should not be interpreted as # necessarily representing the official policies or endorsements, # either expressed or implied, of the Defense Advanced Research # Projects Agency (DARPA), the Air Force Research Laboratory, or the # U.S. Government. # # This work was partly supported by the NSF STI-XTEND (ANI-0230789) # and NETFS (ANI-0129689) projects. Any opinions, findings, and # conclusions or recommendations expressed in this material are those # of the authors and do not necessarily reflect the views of the # National Science Foundation. # # ------------------------------------------------------------------- # $RCSfile: XB_API.pm,v $ # # $Revision: 1.64 $ # $Author: pingali $ # $Date: 2005/04/21 04:00:05 $ # $State: Exp $ # ---------------------------------------------------------------------------- # # Primary Author: Yu-Shun Wang # DescriptioN: Functions for processing XBone API commands package XB_API; require Exporter; @ISA = qw(Exporter); @EXPORT = qw(); @EXPORT_OK = qw(api_start api_stop api_refresh api_status api_init api_invite api_select api_release api_bind api_config ); use strict; use sigtrap; use Data::Dumper; #use IO::Handle; use Socket; use Parallel::ForkManager; use Socket6; use XB_AppDeploy; use XB_VN_DNS; my $modname = "XB_API::"; # for recursive calls sub init_network($$$); ############################################################################### # UTILITY FUNCTIONS ############################################################################### # Description: # Initialize node/network data structure recursively. # Arguments: # $clist (ref) hash of classes # $class class name # $level overlay level (root=0) # Returns: # $body (ref) hash of the overlay body (properites, interfaces, # nodes, & links) # Exceptions: # - # Notes: # Refer to doc/xbone-data-structure.txt for the complete hash. # sub init_network($$$){ my ($c_list, $class, $level) = @_; my $procname = "init_network"; XB_Log::log "info", "-> $procname $c_list, $class, $level"; my %body; eval { my ($host, $router, $meta, $iface) = (0, 0, 0, 0); my ($all_host, $all_router, $all_meta, $all_iface) = (0, 0, 0, 0); my (@host_os, @router_os, @host_tags, @router_tags, @meta_tags); my ($class_ref, $nref); #-> check if the class is defined unless (defined $c_list->{$class}){ XB_Log::log "err", " [$procname] unknown class: $class" and die "class"; } $class_ref = $c_list->{$class}; #-> overall properties $body{properties}{class_name} = $class; $body{properties}{level} = $level; #-> copy the network properties over (instead of point to the hash) while(my ($k, $v) = each %{$class_ref->{property}}){ $body{properties}{$k}=$v; } #-> interfaces $body{interfaces} = (); if(scalar(@{$class_ref->{iface}})){ for my $e (@{$class_ref->{iface}}){ $body{interfaces}{$e->{ident}} = (); if(defined $e->{renames}){ my $r = $e->{renames}{endpoint}; $body{interfaces}{$e->{ident}}{node} = $r->{node}; $body{interfaces}{$e->{ident}}{interface} = $r->{iface}; } } # host or router if(scalar(@{$class_ref->{iface}}) > 1){ $body{properties}{type} = "router"; }else{ $body{properties}{type} = "host"; } } #-> nodes: simple vs. meta if(defined $class_ref->{vnode}){ $body{properties}{class} = "meta"; for my $n (@{$class_ref->{vnode}}){ my $vname = $n->{ident}; my $vtype = $n->{type}; $body{nodes}{$vname} = init_network($c_list, $vtype, $level+1); } }else{ $body{properties}{class} = "simple"; } #-> nodes: host vs. router vs. meta for my $n (keys %{$body{nodes}}){ $nref = $body{nodes}; my $i = scalar(keys %{$nref->{$n}{interfaces}}); $iface += $i; $nref->{$n}{properties}{router_links} = []; $nref->{$n}{properties}{host_links} = []; $nref->{$n}{properties}{local_dest} = []; my @node_os; if(defined $nref->{$n}{properties}{os} && $nref->{$n}{properties}{os} ne 'any'){ @node_os = split /\|/, $nref->{$n}{properties}{os}; } if($body{nodes}{$n}{properties}{class} eq 'meta'){ # meta is not counted as either host or router for resource # discovery purpose; also don't care about the platforms $meta++; $all_host += $body{nodes}{$n}{properties}{all_host}; $all_router += $body{nodes}{$n}{properties}{all_router}; $all_meta += $body{nodes}{$n}{properties}{all_meta}; $all_iface += $body{nodes}{$n}{properties}{all_iface}; }elsif($i >1){ $router++; for my $o (@node_os){ unless(grep /^$o$/, @router_os){ push @router_os, $o; } } }else{ $host++; for my $o (@node_os){ unless(grep /^$o$/, @host_os){ push @host_os, $o; } } } } if($body{properties}{class} eq "meta"){ $body{properties}{iface} = $iface; $body{properties}{host} = $host; $body{properties}{router} = $router; $body{properties}{meta} = $meta; $body{properties}{all_iface} = $all_iface + $iface; $body{properties}{all_host} = $all_host + $host; $body{properties}{all_router} = $all_router + $router; $body{properties}{all_meta} = $all_meta + $meta; $body{properties}{host_os} = join "|", @host_os; unless ($body{properties}{host_os} =~ /\S+/){ $body{properties}{host_os} = "any"; } $body{properties}{router_os} = join "|", @router_os; unless ($body{properties}{router_os} =~ /\S+/){ $body{properties}{router_os} = "any"; } $body{properties}{IPsec}= ($body{properties}{IPsec_encryption} ne 'none' or $body{properties}{IPsec_authentication} ne 'none')? "yes":"no"; }else{ delete $body{nodes}; } #-> links for my $l (@{$class_ref->{link}}){ my $lt = $l->{endpoint}[0]{node}; my $rt = $l->{endpoint}[1]{node}; my $ln = $l->{ident}; if($body{nodes}{$lt}{properties}{type} eq "router" and $body{nodes}{$rt}{properties}{type} eq "router"){ $body{links}{$ln}{type} = "router"; push @{$body{nodes}{$lt}{properties}{router_links}}, $ln; push @{$body{nodes}{$rt}{properties}{router_links}}, $ln; }else{ $body{links}{$ln}{type} = "host"; push @{$body{nodes}{$lt}{properties}{host_links}}, $ln; push @{$body{nodes}{$rt}{properties}{host_links}}, $ln; if($body{nodes}{$lt}{properties}{type} eq "host"){ $body{nodes}{$lt}{properties}{default_router} = $rt; }else{ $body{nodes}{$rt}{properties}{default_router} = $lt; } } $body{links}{$ln}{right_node} = $rt; $body{links}{$ln}{right_if} = $l->{endpoint}[1]{iface}; $body{links}{$ln}{left_node} = $lt; $body{links}{$ln}{left_if} = $l->{endpoint}[0]{iface}; } #-> application deployment if(defined $class_ref->{application}){ for my $a (@{$class_ref->{application}}){ #-- extrace & store app params my $appname = (defined $a->{program})? $a->{program}: ''; my $script = (defined $a->{script})? $a->{script} : ''; if($appname eq ''){ XB_Log::log "err", " [$procname] missing application name in ". "application deployment section" and die "noname"; }elsif($script eq ''){ XB_Log::log "err", " [$procname] missing script URL for ". "$appname in application deployment\n section" and die "nourl"; } $body{app_deploy}{$appname}{script} = $script; $body{app_deploy}{$appname}{checksum} = (defined $a->{checksum})? $a->{checksum}: ''; $body{app_deploy}{$appname}{suid} = (defined $a->{suid})? $a->{suid} : ''; $body{app_deploy}{$appname}{ifaces} = (defined $a->{ifaces})? $a->{ifaces} : ''; $body{app_deploy}{$appname}{nodes} = (defined $a->{nodes})? $a->{nodes} : ''; } } }; XB_Log::log "info", "<- $procname"; return (\%body) unless $@; if ($@ !~ /(class|noname|nourl)/){ XB_Log::log "warning", " ! $procname caught unknown exception $@"; } die "$modname$procname"; } # Description: # Check & get the application object for given type and name # Arguments: # $type application type # $name application name # Returns: # app_obj (ref) app/overlay object hash if exists, undef if not # Exceptions: # - sub get_app($$){ my ($type, $name) = @_; my $app_obj = undef; my $procname = "get_app"; my $str = "-> $procname $type, $name: "; if(defined $XB_Params::node_state{active_apps}{$type}{$name}){ $app_obj = $XB_Params::node_state{active_apps}{$type}{$name}; $str .= "$type $name exists."; }else{ $str .= "$type $name dose not exist."; } XB_Log::log "debug1", $str; return $app_obj; } # Description: # Construct the list of hash of nodes used to create node level XML part. # Arguments: # $net (ref) hash of network part of the overlay object # Returns: # $nlist (ref) list of hash of nodes # Exceptions: # - sub api_build_nodelist ($$){ my ($net, $cmd) = @_; my $procname = "api_build_nodelist"; XB_Log::log "debug1", "-> $modname$procname $net, $cmd"; my @nlist; eval{ my $nodes = $net->{nodes}; my $links = $net->{links}; for my $n (keys %{$nodes}){ my $me = $nodes->{$n}; my $pro = $nodes->{$n}{properties}; my (%nh, @nh, @tuns); $nh{hostname} = $pro->{hostname}; $nh{ip} = $pro->{ctl_addr}; $nh{type} = $pro->{type}; $nh{class} = $pro->{class}; $nh{os} = $pro->{os}; $nh{vname} = $n; # TODO api_config should set this upon receiving ack-config! $nh{status} = (exists $pro->{status})? $pro->{status} : 'up'; if($nh{status} eq 'up'){ # only construct the tunnel part if the node is up my @al; push @al, @{$pro->{router_links}}; push @al, @{$pro->{host_links}}; for my $l (@al){ my $ln = $links->{$l}; my (%lh, @lh, $local_if, $remote, $remote_if, @h2, %h2, @h3, %h3); ($remote, $remote_if, $local_if) = ($ln->{right_node} eq $n)? ($ln->{left_node}, $ln->{left_if}, $ln->{right_if}) : ($ln->{right_node}, $ln->{right_if}, $ln->{left_if}); $lh{local_ip_address} = $me->{interfaces}{$local_if}{netaddr}; $lh{remote_ip_address} = $nodes->{$remote}{interfaces}{$remote_if}{netaddr}; $lh{status} = (exists $ln->{status_net})? $ln->{status_net}: "up"; @lh = %lh; push @tuns, (\@lh); if($lh{status} eq 'down'){ $h2{local_ip_address} = $me->{interfaces}{$local_if}{linkaddr}; $h2{remote_ip_address} = $nodes->{$remote}{interfaces}{$remote_if}{linkaddr}; $h2{status} = (exists $ln->{status_link})? $ln->{status_link}: "up"; @h2 = %h2; push @tuns, (\@h2); } if(defined $h2{status} and $h2{status} eq 'down'){ $h3{local_ip_address} = $me->{properties}{app_addr}; $h3{remote_ip_address} = $nodes->{$remote}{properties}{app_addr}; $h3{status} = (exists $ln->{status_phy})? $ln->{status_phy}: "up"; @h3 = %h3; push @tuns, (\@h3); } } $nh{tunnels} = \@tuns; }else{ # node is down or error, attach the error message $nh{error_msg} = $pro->{error_msg}; } @nh = %nh; push @nlist, (\@nh); } #XB_Log::log "debug1", " [$procname] Nodelist: ". Dumper(\@nlist); }; XB_Log::log "debug1", "<- $modname$procname"; return \@nlist unless $@; XB_Log::log "warning", " ! $procname caught unknown exception $@"; die "$modname$procname"; } # Description: # Construct the hashes & lists and build the XBone API XML message # Arguments: # $app_obj (ref) application object # $msg_type message type # Returns: # $msg_ref (ref) message # Exceptions: # - sub api_build_msg ($$){ my ($msg_type, $app_obj) = @_; my $procname = "api_build_msg"; XB_Log::log "info", "-> $modname$procname $msg_type, $app_obj"; my $msg_ref; eval{ my (%ahref, $nlref); #-> common fields $ahref{protocol} = $XB_Params::api_ver; $ahref{release} = $XB_Params::rel_ver; $ahref{auth_type} = $app_obj->{credential}{auth_type}; $ahref{user_email} = $app_obj->{credential}{user_email}; $ahref{user_name} = $app_obj->{credential}{user_name}; #-> message-specific fields $_ = $msg_type; SWITCH: { /\b(create|status)_reply\b/ && do { my $cmd = $1; #-> overlay level my $net = $app_obj->{application}{network}; $ahref{overlay_name} = $app_obj->{application}{name}; $ahref{dns} = $net->{properties}{dns}; $ahref{routing} = $net->{properties}{routing}; $ahref{IPsec_encr} = $net->{properties}{IPsec_encryption}; $ahref{IPsec_auth} = $net->{properties}{IPsec_authentication}; $ahref{qos} = (defined $net->{properties}{qos} and $net->{properties}{qos} ne "")? $net->{properties}{qos} : "no"; #-> node level $nlref = api_build_nodelist($net, $cmd); if($cmd eq 'create'){ $msg_ref = XB_XML_GUI::XB_build_create_overlay_reply_msg (\%ahref, $nlref); }else{ $msg_ref = XB_XML_GUI::XB_build_overlay_status_reply_msg (\%ahref, $nlref); } last SWITCH; }; /\b(discover_reply)\b/ && do { my (@nlist, $n, $h, $v, $k); while(($n, $h) = each %{$app_obj->{nodes}}){ my (%nh, @nh); while(($k, $v) = each %{$h->{command}}){ $nh{$k} = $v; } delete $nh{command}; @nh = %nh; push @nlist, (\@nh); } $msg_ref = XB_XML_GUI::XB_build_discover_daemons_reply_msg (\%ahref, \@nlist); last SWITCH; }; XB_Log::log "err", " [$procname] unknow message type $_"; die "message"; } XB_Log::log "debug7", " [$procname] $msg_type message:". $$msg_ref; }; XB_Log::log "info", "<- $modname$procname"; return $msg_ref unless $@; unless ($@ =~ /(message)/){ XB_Log::log "warning", " ! $procname caught unknown exception $@"; } die "$modname$procname"; } ############################################################################### # EXPORTED API ############################################################################### # Description: # API Start: Phase 1 - Initialization # - convert API command into internal application object hash # Arguments: # $cmd (ref) API start (or create overlay) command # Returns: # \%app_obj (ref) app/overlay object hash # Exceptions: # "api_init" on failure, nothing to cleanup by caller # sub api_init($){ my $cmd = shift; my $procname = $modname. "api_init"; XB_Log::log "info", "-> $procname $cmd"; my (%app_obj, $ovl_name, $level, $retry, $class, $central); my ($host, $router, $meta, $slack, %available); eval{ # Convert the API start command into internal app/ovl hash #=> general info about the command $app_obj{protocol} = $cmd->{version}; $app_obj{release} = $cmd->{release}; $app_obj{credential} = $cmd->{credential}{property}; $app_obj{credential}{section} = $cmd->{credential}{section}; $app_obj{message} = $cmd->{message}; #=> switch according to application type if($cmd->{command}{command} eq "create_overlay"){ #-> overlays my $xol = $cmd->{command}{create_overlay}{xol_program}; #-- common fields $app_obj{application}{type} = "overlay"; $app_obj{application}{name} = $xol->{vnode}{ident}; $app_obj{application}{version} = $xol->{version}; $app_obj{application}{level} = ($cmd->{command}{level})? $cmd->{command}{level}:0; unless($xol->{version} eq $XB_Params::xol_ver){ XB_Log::log "err", " [$procname] XOL versions mismatch!\n ". "The command has $xol->{version} and this node supports ". "$XB_Params::xol_ver"; die "xol"; } #-- root node name $app_obj{application}{root_node} = $xol->{vnode}{type}; #-- node class list: convert the list from array into hash my $node_class = $xol->{node_def}; for my $c (@{$node_class}){ $app_obj{application}{node_class}{$c->{ident}}=$c; delete $c->{ident}; } #-- expand main (root) overlay $app_obj{application}{network} = init_network $app_obj{application}{node_class}, $app_obj{application}{root_node}, $app_obj{application}{level}; #-- fill in some extra properties # TODO The following should be included in API: discover, retry, # TODO deployment, selection, mcast address, LDAP server. my $props = $app_obj{application}{network}{properties}; my $cprops = $cmd->{command}{create_overlay}{property}; #-- choose the discovery method depending on what is defined. # XXX Hack. Perl throws warnings if we look for hash # entries that dont exist. So generate a list of existing # entries and search through them. my @optlist = keys %{$props}; if(XB_Common::check_list("custom_hostlist", \@optlist) == 1){ if ($props->{custom_hostlist} ne ''){ $props->{discover} = "unicast"; my @hlist = split /[ \t\n\r\f,]+/, $cprops->{custom_hostlist}; $props->{hostlist} = \@hlist; XB_Log::log "debug2", " [$procname] User supplied a list of hosts". " to probe:\n". Dumper($props->{hostlist}); } } elsif(XB_Common::check_list("ldap", \@optlist) == 1){ if($props->{ldap} eq "yes"){ if ($XB_Params::node_opts{ldap}->{enable} !~ /yes/i){ XB_Log::log "err", " [$procname] LDAP support disabled because the server ". "is unreachable or LDAP support is not configured\n"; die("ldap"); }; $props->{discover} = "unicast"; XB_Log::log "debug2", " [$procname] User asked to use LDAP to get the". " hosts to probe: \n"; my ($scope, $line, %attrvals) = ($cprops->{scope}, $cprops->{attrvals}); my @av_array = split(/[\s,]+/, $line); foreach my $av (@av_array){ my @arr = split(/=/, $av); $attrvals{$arr[0]} = $arr[1]; } my $result = XB_LDAP::LDAP_search("registry", $scope, \%attrvals); XB_Log::log "debug2", "LDAP search result =". Dumper ($result); my @hlist = (); for my $h (keys %{$result}) { XB_Log::log "debug2", "registered hostname= $h"; push @hlist, $h; } $props->{hostlist} = \@hlist; }; # ldap enabled }else{ $props->{discover} = "multicast"; $props->{search_radius} = $cprops->{search_radius}; } $props->{timeout} = $cmd->{command}{create_overlay}{property}{timeout}; $props->{selection} = "auto"; $props->{deployment} = "distributed"; # vs. centralized $props->{retry} = ($cmd->{command}{retry})?$cmd->{command}{retry}:1; $props->{routing} = ($props->{dynamic_routing} =~ /(no|0)/i)? "static" : "dynamic"; delete $props->{dynamic_routing}; # copy the dummynet variables. my $cmd_props = $cmd->{command}{create_overlay}{property}; $props->{qos} = $cmd_props->{dummynet}; if ($props->{qos}){ if ($props->{qos} =~ /yes/){ foreach my $key ("dummynet_queue", "dummynet_queue_unit", "dummynet_delay", "dummynet_loss_rate", "dummynet_bandwidth", "dummynet_bandwidth_unit"){ $props->{$key} = $cmd_props->{$key}; } } } else { $props->{qos} = "no"; } #-- application deployment my $net = $app_obj{application}{network}; if(defined $net->{app_deploy}){ for my $appname (keys %{$net->{app_deploy}}){ my $app = $net->{app_deploy}{$appname}; my $acl_suid = $cmd->{user_acl}{suid}; # OM will act as gate keeper regarding application deployment # suid. So if a user with a higher app deploy privilege at RDs' # ACLs but a lower privilege in OM's ACL, OM's ACL will take # precedence and reject any requests that do not saticfy OM's # ACL! # # If the command did not specify an suid, treat it as an error # and reject the request; # otherwise, the order is 'root' > 'vhost' > 'nobody' in the # following comparison. $props->{app_deploy}{$appname}{suid} = $app->{suid}; if($app->{suid} eq ''){ XB_Log::log "err", " [$procname] Missing SUID field from the ". "application deployment section\n of the XBone overlay ". "create request!" and die "suid"; }else{ # check: root > vhost > nobody; $cmd_suid <= $acl_suid my $spoiled = 0; my $cmd_suid = $app->{suid}; if($acl_suid eq 'vhost' and $cmd_suid eq 'root'){ $spoiled = 1; }elsif($acl_suid eq 'nobody' and $cmd_suid ne 'nobody'){ $spoiled = 1; }else{ $app->{suid} = $cmd_suid; } if($spoiled){ XB_Log::log "err"," [$procname] Requsted privilege ". "($cmd_suid) for deploying \"$appname\" is\n higher ". "than specified in user ACL ($acl_suid), contact the\n ". "local XBone administrator to upgrade your privilege in ACL." and die "suid"; } } } } }else{ # add processing sections for other applications here, currently none XB_Log::log "err", " [$procname] unknown applications: ". $cmd->{command}{command}; die "application"; } }; XB_Log::log "info", "<- $procname"; return \%app_obj unless $@; unless ($@ =~ /(init_network|application|xol|suid)/){ XB_Log::log "warning", " ! $procname caught unknown exception: $@"; } die "$procname"; } # Description: # API Start: Phase 2 - Invite # - accept request (in the form of application obj) from API or CTL # - construct the invitation message # - multicast the invitation and collect responses # - request other application-specific resources # Arguments: # $app_obj (ref) application object hash # $msock multicast send socket (IO::Socket::Multicast) # Returns: # \%avail (ref) hash of (node => ack-invite-cmd) that reply # Exceptions: # "XB_API::api_invite" on error, caller should release reserved # resources: nodes, IP address blocks, etc. # sub api_invite($$){ my ($app_obj, $msock) = @_; my $procname = "api_invite"; XB_Log::log "info", "-> $modname$procname $app_obj, $msock"; my (%available, $level); my $sent_req = 0; # flag for sending release eval{ my $seq = time; my $cre = $app_obj->{credential}; my $app = $app_obj->{application}; my $pro = $app_obj->{application}{network}{properties}; my ($host, $router, $meta) = ($pro->{host}, $pro->{router}, $pro->{meta}); my ($enough, $host_ack, $router_ack, $both_ack, $meta_ack, $slack, $type); my (%host_hash, %router_hash, %meta_hash, %both_hash); $level = $app->{level}; #=> Check for the validity of the request if (($pro->{qos} =~ /yes/i) and ($pro->{address_type} =~ /ipv6/i)){ XB_Log::log "err", " [$procname] QoS is not supported in IPv6 overlays"; die "qos"; } #=> construct invite message my $cred = $app_obj->{credential}{section}; my $app_deploy_sec = ''; if(defined $app->{network}{app_deploy}){ for my $appname (keys %{$app->{network}{app_deploy}}){ my $dapp = $app->{network}{app_deploy}{$appname}; my $script = $dapp->{script}; my $chksum = $dapp->{checksum}; my $suid = $dapp->{suid}; $dapp->{action} = 'start'; $app_deploy_sec .= " (app-deploy\n". " (name $appname)\n". " (url $script)\n"; $app_deploy_sec .= ($suid ne '')? " (suid $suid)\n": ""; $app_deploy_sec .= ($chksum ne '')? " (chksum $chksum))\n": " )\n"; } } my $invite = "(xbonecontrol $XB_Params::ctl_ver $XB_Params::rel_ver $seq\n". $$cred. " (invite\n". " (application $app->{type})\n". " (name $app->{name})\n". " (version $app->{version})\n". " (level $level)\n". " (routing $pro->{routing})\n". " (IPsec $pro->{IPsec})\n". " (qos $pro->{qos})\n". " (addr_type $pro->{address_type})\n". " (host $host $pro->{host_os})\n". " (router $router $pro->{router_os})\n". " (meta $meta any)\n". $app_deploy_sec. " )\n". ")\n$XB_Params::msg_delimiter\n"; XB_Log::log "debug1", " * Invite command: $invite"; #=> sign the message with S/MIME my $smime_invite = XB_SMIME::sign($invite, $XB_Params::node_opts{"node_cert"}, $XB_Params::node_opts{"node_key"}); #=> switch on discovery mechanisms if($pro->{discover} eq "multicast" or $pro->{discover} eq "unicast"){ # multicast & unicast my $discover = $pro->{discover}; my $retry = $pro->{retry}; my (%msg_hash, %msg_log); my $counter=1; while($retry){ XB_Log::log "info", " [$procname] $discover invite no. $counter"; $counter++; if($discover eq "multicast"){ #-- multicast the message unless (ref($msock) =~ /IO::Socket::Multicast/){ XB_Log::log "warning", " [$procname] wrong socket type! ".ref($msock); } #-- set the radius if (defined $pro->{search_radius}){ $msock->mcast_ttl($pro->{search_radius}); } #-- supress mcast loopback before sending $msock->mcast_loopback(0); $sent_req = 1; $msock->mcast_send($smime_invite) or XB_Log::log "err", " [$procname] multicast send failed: $!" and die "mcast"; $msock->mcast_loopback(1); }else{ #-- unicast the message for my $p (@{$pro->{hostlist}}){ #-> create udp socket my $udp_sock = XB_Common::udp_sock($p, $XB_Params::node_opts{xbone_ctl_port}, $pro->{address_type}); #-> send invite message unless (send($udp_sock, $smime_invite, 0)){ XB_Log::log "err", " [$procname] Send UDP unicast invite to". " $p failed: $!" and die "send"; }else{ XB_Log::log "info", " [$procname] UDP unicast invite to $p"; } #=> close udp socket $udp_sock->close or XB_Log::log "err", " [$procname] socket close failed: $!" and die "close"; } } XB_Log::log "info", " [$procname] sent $discover invite ". "$app->{name}, level $level"; #-- start timer my $rtt; if ($pro->{timeout} eq '') { $rtt = $XB_Params::xbone_rtt; } else { $rtt = $pro->{timeout}; } XB_Log::log "debug", "##### Timeout = $rtt #####"; my $timeout = time + $rtt; my $now; #-- refer to XB_RPC.pm -> XB_request_overlay function my $new_sel; unless ($new_sel = IO::Select->new($msock)){ XB_Log::log "err", " [$procname] select failed: $!" and die "sel"; } #-- collect ACK-INVITEs while(1){ $now = time; my $timeleft = $timeout - $now; while(my @r = $new_sel->can_read ($timeleft)){ foreach my $fh (@r){ if($fh == $msock){ XB_Log::log "info", " [$procname] incoming UDP message"; my ($umsg, $src_sock, $peerhost, $src_cmd, $src_port); unless ($src_sock = $fh->recv($umsg, 65536, 0)){ XB_Log::log "err", " [$procname] recv: $!" and die "recv"; } eval{ ($umsg, $peerhost) = XB_SMIME::verify($umsg); }; if($@){ XB_Log::log "warning", " [$procname] SMIME message verification failed"; $umsg = ""; next; } my $ip; if(ref ($fh) eq "IO::Socket::Multicast6"){ $ip = 'ipv6'; }else{ $ip = 'ipv4'; } my ($peeraddr, $peerport) = XB_Common::chk_sockaddr($fh, $peerhost, $ip); $umsg =~ s/$XB_Params::msg_delimiter//g; my $ctl_cmd = $XB_CTL_parser::parser->xb_ctl($umsg); unless (defined $ctl_cmd){ XB_Log::log "err", " [$procname] parser error: ==\n$umsg\n==" and next; } XB_Log::log "debug6", " [$procname] CTL command:". Dumper($ctl_cmd); $src_cmd = $ctl_cmd->{command}{hostname}; unless ($ctl_cmd->{command}{command} eq "ack-invite"){ XB_Log::log "err", " [$procname] wrong command: ". "$ctl_cmd->{command}{command}" and die "cmd"; } unless($src_cmd eq $peerhost){ XB_Log::log "err", " [$procname] hostname mismatch: ". "sock: $peerhost, cmd: $src_cmd" and die "name"; } if(exists $msg_log{$src_cmd}){ XB_Log::log "warning", " [$procname] duplicate message". " from $src_cmd" and next; }else{ XB_Log::log "debug1", " [$procname] new ack: $src_cmd"; $msg_log{$src_cmd}=time; # check if any app needs install if(defined $ctl_cmd->{command}{app_deploy}){ for my $apname (keys %{$ctl_cmd->{command}{app_deploy}}){ my $thisapp = $ctl_cmd->{command}{app_deploy}{$apname}; if($thisapp->{app_verify} eq 'install'){ $app->{network}{app_deploy}{$apname}{action} = 'install'; } } } } $type = $ctl_cmd->{command}{node_type}; if($type =~ /host/i) { $host_hash{$src_cmd} = $ctl_cmd; } elsif($type =~ /router/i){ $router_hash{$src_cmd}= $ctl_cmd; } elsif($type =~ /meta/i) { $meta_hash{$src_cmd} = $ctl_cmd; } else { $both_hash{$src_cmd} = $ctl_cmd; } } } } $now = time; if($now > $timeout){ last; } } #-- timeout, count acks $host_ack = scalar keys(%host_hash); $router_ack= scalar keys(%router_hash); $meta_ack = scalar keys(%meta_hash); $both_ack = scalar keys(%both_hash); $slack = 0; $enough = 1; if($meta_ack < $meta){ $enough = 0; } if($router_ack > $router){ $slack = $both_ack; }elsif($router_ack + $both_ack < $router){ $enough = 0; }else{ $slack = $router_ack + $both_ack - $router; } if($host_ack < $host and $host_ack + $slack < $host){ $enough=0; } #-- enough, return if($enough) { last; } #-- else retry $retry--; # need to re-sequence the message & sign it for next retry my $new_time = time; } }else{ # TODO support for other discovery/reservation schemes such as LDAP # TODO LDAP_discover($smine_invite, \%host_hash, \%router_hash, # TODO \%meta_hash, \%both_hash); # TODO then calculate $enough as shown above XB_Log::log "err", " [$procname] $pro->{discover} mechanism is not". " supported" and die "disc"; } #=> have enough? if(not $enough){ XB_Log::log "err", " [$procname] Did not find enough nodes ". "(Host/Router/Both/Meta):\n Needed $host/$router/0/$meta.\n ". "Found $host_ack/$router_ack/$both_ack/$meta_ack.\n Either ". "increase the search radius from the GUI or the timeout value \n". "and try again."; if(defined $pro->{revisit}){ # TODO revisitation: check if vnodes/pnode over the limit XB_Log::log "err", " [$procname] revisitation is not supported"; die "revi"; }else{ XB_Log::log "info", " [$procname] not enough node, release ". "$app->{name}" and die "more"; } }else{ $available{host} = (scalar keys(%host_hash))? \%host_hash: undef; $available{router}= (scalar keys(%router_hash))? \%router_hash:undef; $available{meta} = (scalar keys(%meta_hash))? \%meta_hash: undef; $available{both} = (scalar keys(%both_hash))? \%both_hash: undef; } # other application-specific resources if($app->{type} eq "overlay"){ #-> request IP address blocks for overlay my $size = $pro->{iface} * 2; # because we used /30 per link! (ipv4) my $addrtype = "ipv4"; if ($pro->{address_type} =~ /^(ipv6|v6overv4)$/){ $addrtype = "ipv6"; } my ($net_blk, $link_blk, $srv); unless($XB_Params::new_alloc){ ($net_blk, $link_blk, $srv)=XB_VN_IPalloc::request($size, $addrtype); }else{ my %appinfo; $appinfo{type} = $app->{type}; $appinfo{level} = $level; $appinfo{name} = $app->{name}; $appinfo{credential} = $$cred; ($net_blk, $link_blk, $srv) = XB_VN_IPalloc::new_request($size, $addrtype, \%appinfo); } my @addr_blk_all; push @addr_blk_all, $net_blk; $pro->{addr_blk_all} = \@addr_blk_all; $pro->{addr_blk_net} = $net_blk; $pro->{addr_blk_link} = $link_blk; $pro->{addr_server} = $srv; } #=> succeed; change the application state $app->{state} = "invited"; }; XB_Log::log "info", "<- $modname$procname"; return (\%available) unless $@; unless ($@ =~ /\b(XB_SMIME|qos|mcast|sel|recv|cmd|name|disc|revi|more|request)\b/){ XB_Log::log "warning", " ! $procname caught unknown exception: $@"; } die "$modname$procname"; } # Description: # API Start: Phase II - Select # - select a set of nodes from the list of all available ones: # [automatic] automatic, could be random, optimized or clusterized # [specified] fully specified in the application specification # [interactive] send back the list and wait for a fully specified spec # - send SELECT to selected nodes to confirm the selection (ACK-SELECT) # Arguments: # $app_obj (ref) overlay data structure object # $avail (ref) hash of available nodes # Returns: # N/A # Exceptions: # "XB_API::api_select" on failure, caller should delete overlay & # release reserved resources # sub api_select($$;$){ my ($app_obj, $avail, $api_sock) = @_; my $procname = "api_select"; XB_Log::log "info", "-> $modname$procname $app_obj, $avail"; my (@unused_host, @unused_router, @unused_meta); eval{ my $app = $app_obj->{application}; my $pro = $app_obj->{application}{network}{properties}; my $type = $app_obj->{application}{type}; my $name = $app_obj->{application}{name}; my $level= $app_obj->{application}{level}; my ($host, $router, $meta) = ($pro->{host}, $pro->{router}, $pro->{meta}); my (@h, @r, @m, @b, %addr_hash); #=> 1. select nodes =================================== #-- get the list of available nodes @h = keys %{$avail->{host}}; @r = keys %{$avail->{router}}; @m = keys %{$avail->{meta}}; @b = keys %{$avail->{both}}; #=> switch on the style of selection #-> automatic if($pro->{selection} =~ /\bauto\b/i){ my $nh = @h; my $nr = @r; my $nm = @m; my $nb = @b; XB_Log::log "debug1", " - need H/R/M: $host/$router/$meta"; XB_Log::log "debug1", " - have H/R/B/M: $nh/$nr/$nb/$nm\n". " - H: ". (join ', ', @h). "\n"." - R: ". (join ', ', @r)."\n". " - B: ". (join ', ', @b). "\n"." - M: ". (join ', ', @m)."\n"; if(defined $pro->{revisit}) { # TODO auto selection for revisitation case XB_Log::log "err", " [$procname] node revisit is not supported"; die "revi"; }else{ # auto selection for non-revisitation case # trim the non-selected nodes from $avail if($nm > $meta){ @unused_meta = splice @m, $meta; for my $u (@unused_meta){ delete $avail->{meta}{$u}; } } if($nr > $router){ @unused_router = splice @r, $router; for my $u (@unused_router){ delete $avail->{router}{$u}; } }else{ my @temp = splice @b, 0, ($router - $nr); push @r, @temp; for my $u (@temp){ $avail->{router}{$u} = $avail->{both}{$u}; } } if($nh > $host){ @unused_host = splice @h, $host; for my $u (@unused_host){ delete $avail->{host}{$u}; } }else{ my @temp = splice @b, 0, ($host - $nh); push @h, @temp; for my $u (@temp){ $avail->{host}{$u} = $avail->{both}{$u}; } } delete $avail->{both}; for my $t (keys %{$avail}){ # trim the unnecessary fields for my $n (keys %{$avail->{$t}}){ delete $avail->{$t}{$n}{sequence}; delete $avail->{$t}{$n}{version}; delete $avail->{$t}{$n}{release}; delete $avail->{$t}{$n}{credential}; delete $avail->{$t}{$n}{command}{node_type}; delete $avail->{$t}{$n}{command}{app_name}; delete $avail->{$t}{$n}{command}{app_vers}; delete $avail->{$t}{$n}{command}{level}; delete $avail->{$t}{$n}{command}{app_type}; delete $avail->{$t}{$n}{command}{command}; $addr_hash{$n} = $avail->{$t}{$n}{command}{ctl_addr}; } } $app->{resources} = $avail; } XB_Log::log "debug1", " [$procname] hash of available nodes\n". Dumper($avail)."\n"; } #-> specified elsif($pro->{selection} =~ /\bspecified\b/i){ # components are specified in the application specification # TODO extract the list of nodes from the app_obj hash # TODO (optional) check if the specified ones have replied to invite # TODO construct @h, @r, @m from app_obj XB_Log::log "err", " [$procname] \"specified\" not supported"; die "style"; } #-> interactive elsif($pro->{selection} =~ /\binteractive\b/i){ # TODO * require API socket # TODO o send the list of available nodes to the user through API socket # TODO o user maps the nodes to resources and sends back a new API cmd # TODO o receive a fully-specified application specification # TODO o construct @h, @r, @m from app_obj XB_Log::log "err", " [$procname] \"interactive\" not supported"; die "style"; } #-> unknown, die else{ XB_Log::log "err", " [$procname] unkown selection style: ". "\"$pro->{selection}\"" and die "style"; } #-> done, you have @h, @r, @m XB_Log::log "debug1", " + Picked the following nodes:\n". " + Hosts: ". (join '|', @h). "\n". " + Routers: ". (join '|', @r). "\n". " + Metas: ". (join '|', @m); XB_Log::log "debug1", " - Leftovers:\n". " - hosts : ". (join '|', @unused_host). "\n". " - routers: ". (join '|', @unused_router). "\n". " - metas: ". (join '|', @unused_meta). "\n". " - boths: ". (join '|', @b); #=> 2. send SELECT & collect ACK-SELECT =============== #-- check if using persistent socket my $connection = ($XB_Params::PERSISTENT_SOCK)? "yes":"no"; #-- compose select message my $credential = ${$app_obj->{credential}{section}}; my $select = "(xbonecontrol $XB_Params::ctl_ver $XB_Params::rel_ver\n". $credential. " (select (application $type)\n". " (name $name)\n". " (level $level)\n". " (persistent_connection $connection)\n". " )\n". ")\n$XB_Params::msg_delimiter\n"; my @f; push @f, @h; push @f, @r; push @f, @m; #-- fork & send to all nodes in parallel with Parallel::ForkManager(3) my $max_procs = ($XB_Params::NO_FORK)? 0 : @f; my $node_count = @f; my $pm = new Parallel::ForkManager($max_procs); my %node_list; my $sel = IO::Select->new; $pm->run_on_start( sub { my ($pid, $ident) = @_; XB_Log::log "info", " [$procname] select $ident (fork pid: $pid)"; } ); $pm->run_on_finish( sub { my ($pid, $exit_code, $ident) = @_; if($exit_code){ XB_Log::log "info", " [$procname] $ident ack-select ok, ". "pid $pid exit $exit_code\n"; }else{ XB_Log::log "warning", " [$procname] $ident ack-select failed, ". "pid $pid exit $exit_code\n"; } $node_count--; } ); $pm->run_on_wait( sub { XB_Log::log "debug1", " [$procname] waiting $node_count processes". " ..."; } ); for my $node (@f){ #=> create or retrieve tcp/ssl socket my $ssl_sock; my $ipproto = $pro->{address_type}; my $ipaddr = (defined $addr_hash{$node})? $addr_hash{$node}: ""; if($XB_Params::PERSISTENT_SOCK){ $ssl_sock = XB_Common::tcp_ssl_sock ($ipproto, $node, $XB_Params::node_opts{xbone_ctl_port}, $ipaddr ); } #=> create a pipe for the child to write back to parent my $rh = IO::Handle->new; my $wh = IO::Handle->new; pipe $rh, $wh or die "pipe"; $node_list{$rh} = $node; #=> fork my $pid = $pm->start($node) and XB_Common::fork_parent_close($sel, $rh, $wh, $node) and next; #=> begin child process ============================================ # exception handling inside the child process: need to catch all # dies by the child process and exit with different code my $received = 0; eval{ #=> close the read handle $rh->close or die "close"; #=> create or retrieve tcp/ssl socket if not yet created unless(defined $ssl_sock){ $ssl_sock = XB_Common::tcp_ssl_sock ($ipproto, $node, $XB_Params::node_opts{xbone_ctl_port}, $ipaddr); } #=> send select XB_Log::log "info", " [$procname] send select to $node"; print $ssl_sock "$select"; #=> start timer my $timeout = time + $XB_Params::xbone_rtt; my $now; #=> wait & receive ack-select my $new_sel; unless ($new_sel = IO::Select->new($ssl_sock)){ XB_Log::log "err", " [$procname:$node] select failed: $!" and die "select"; } while(my @r = $new_sel->can_read()){ # TODO set timeout for my $fh (@r){ if($fh != $ssl_sock){ XB_Log::log "warning", " [$procname:$node] wrong socket ". "for $node" and next; }else{ #=> read command & write it back to the parent process my $ctl_msg = XB_Common::fh_read_until ($fh, $XB_Params::msg_delimiter); print $wh $ctl_msg; $received = 1; } } if($received == 1){ last; } } if(not $XB_Params::PERSISTENT_SOCK){ $ssl_sock->close or XB_Log::log "err", " [$procname:$node] socket close failed: $!" and die "close"; } unless($received){ XB_Log::log "err", " [$procname:$node] did not received the ". "whole message" and die "rcv"; } }; if($@ && $@ !~ /(tcp_ssl_sock|select|fh_read_until|close|rcv)/){ XB_Log::log "err", " [$procname:$node] caught unknown exception: $@"; } unless($XB_Params::NO_FORK){ XB_Common::child_close($node); } unless($received){ if($XB_Params::error_reply eq ""){ $XB_Params::error_reply = "$node caught exception $@ w/out further". " details"; } my $emsg = XB_Common::ctl_error_msg('select', $type, $name, $level, \$XB_Params::error_reply); print $wh $$emsg; } $pm->finish($received); #=> end child process ============================================== } #=> select msg sent, change state to selected $app->{state} = "selected"; while (my @handles = $sel->can_read){ # TODO set timeout for my $h (@handles){ my $n = $node_list{$h}; XB_Log::log "debug1", " [$procname] receive from $n"; my @lines = $h->getlines; my $ack_msg = join "", @lines; $ack_msg =~ s/$XB_Params::msg_delimiter//g; my $ctl_cmd = $XB_CTL_parser::parser->xb_ctl($ack_msg); unless (defined $ctl_cmd){ XB_Log::log "err", " [$procname] error parsing message ". "from $n =====\n$ack_msg\n=========="; }else{ unless ($ctl_cmd->{command}{command} eq "ack-select"){ XB_Log::log "err", " [$procname] wrong command: ". "$ctl_cmd->{command}{command}"; }else{ XB_Log::log "debug6", " [$procname] CTL command: ". Dumper($ctl_cmd); delete $node_list{$h}; } } $sel->remove($h); $h->close or die "close"; } } $pm->wait_all_children; #=> check if all ack'ed my @missing = values %node_list; if(@missing > 0){ XB_Log::log "err", " [$procname] select failed on ". (join ", ", @missing) and die "ack"; } }; XB_Log::log "info", "<- $modname$procname"; return 1 unless $@; unless ($@ =~ /(style|tcp_ssl_sock|select|parser|ack)/){ XB_Log::log "warning", " ! xb_select caught unknown exception: $@"; } die "$modname$procname"; } # Description: # API Start: Phase 4 - Release # - send multicast release to release reserved nodes # Arguments: # $app_obj (ref) application object hash # $level level to release # $msock multicast send socket (IO::Socket::Multicast) # Returns: # 1 on success # Exceptions: # XB_API::api_release on failure, nothing to cleanup by caller sub api_release($$$){ my ($app_obj, $level, $msock) = @_; my $procname = "api_release"; XB_Log::log "info", "-> $modname$procname $app_obj, $level, $msock"; eval{ #=> generate the release message my $pro = $app_obj->{application}{network}{properties}; my $type = $app_obj->{application}{type}; my $name = $app_obj->{application}{name}; my $cred = ${$app_obj->{credential}{section}}; #my $release = XB_Common::ctl_msg('release', $type, $name, $level); my $release = "(xbone-ctl $XB_Params::ctl_ver $XB_Params::rel_ver\n". $cred. " (release\n". " (application $type)\n". " (name $name)\n". " (level $level)\n". " )\n". ")\n$XB_Params::msg_delimiter\n"; #=> SMIME the message my $smime_release = XB_SMIME::sign($release, $XB_Params::node_opts{"node_cert"}, $XB_Params::node_opts{"node_key"}); #=> switch on discovery mechanisms if($pro->{discover} eq "multicast" or $pro->{discover} eq "unicast"){ my $discover = $pro->{discover}; unless (ref($msock) =~ /IO::Socket::Multicast/){ XB_Log::log "warning", " [$procname] wrong socket type! ".ref($msock); } if($discover eq "multicast"){ #=> Multicast the message # - set the search radius if (defined $pro->{search_radius}){ $msock->mcast_ttl($pro->{search_radius}); } # - supress mcast loopback before sending $msock->mcast_loopback(0); $msock->mcast_send($smime_release) or XB_Log::log "err", " [$procname] multicast send failed: $!" and die "multicast"; $msock->mcast_loopback(1); XB_Log::log "info", " [$procname] sent multicast release $name, ". "level $level"; }else{ #-- unicast the message for my $p (@{$pro->{hostlist}}){ #-> create udp socket my $udp_sock = XB_Common::udp_sock($p, $XB_Params::node_opts{xbone_ctl_port}, $pro->{address_type}); #-> send invite message unless (send($udp_sock, $smime_release, 0)){ XB_Log::log "err", " [$procname] Send UDP unicast release to". " $p failed: $!" and die "send"; }else{ XB_Log::log "info", " [$procname] UDP unicast release to $p"; } #=> close udp socket $udp_sock->close or XB_Log::log "err", " [$procname] socket close failed: $!" and die "close"; } } }else{ # TODO support for other discovery/reservation schemes such as LDAP # TODO LDAP_release($smine_release); XB_Log::log "err", " [$procname] $pro->{discover} mechanism is not". " supported"; die "disc"; } }; XB_Log::log "info", "<- $modname$procname"; return 1 unless $@; unless($@ =~ /\b(XB_SMIME::\S+|multicast|disc)\b/){ XB_Log::log "warning", " ! $procname caught unknown exception: $@"; } die "$modname$procname"; } # Description: # API Start: Phase 5 - Bind & Dispatch # - bind/assign selected resources to corresponding "roles" in the # application specification # - recurse if any of the resources recurses # - send DISPATCH to all meta nodes which will carry out Invite, Select, # Release, and Dispatch recursively # - collect all ACK-DISPATCH and extract the exported info for binding # Arguments: # $app_obj (ref) application object hash # Returns: # 1 on success # Exceptions: # XB_API::api_bind on failure, caller should delete overlay & # release reserved resources # sub api_bind($){ my ($app_obj) = shift; my $procname = "api_bind"; XB_Log::log "info", "-> $modname$procname $app_obj"; eval{ # Resource binding/assignment: # Nodes # for each node (see Note 1) # simple - assign # meta - assign & recurse # Other resources # overlay: IP addresses # other apps #for each node # grab a node based on node type (host|router|meta) # node name -> node properties # control addr -> node properties # [simple] data addr -> interface phy-addr # [meta] push to meta-list for recursion my $app = $app_obj->{application}; my $res = $app->{resources}; my $pro = $app->{network}{properties}; my $nodes = $app->{network}{nodes}; my ($type, $name, $level, $vers) = ($app->{type}, $app->{name}, $app->{level}, $app->{version}); my @meta_nodes; #XB_Log::log "debug1", "Resources: ". Dumper($res); #XB_Log::log "debug1", "Network: ". Dumper($app->{network}); #=> 1. Assign resources to virtual nodes #-> gather available resources my %resources; for my $t (keys %{$res}){ my @a; for my $n (keys %{$res->{$t}}){ push @a, ($n); } $resources{$t} = \@a; } #-> assign resources to virtual nodes for my $n (keys %{$nodes}){ my ($phy, $k, $v, $type, $props); $props = $nodes->{$n}{properties}; # if meta, pop from meta queue, not host/router $type = (defined $props->{class} eq 'meta')? "meta" : $props->{type}; $phy = pop @{$resources{$props->{type}}}; XB_Log::log "debug2", " [$procname] $n <=> $phy"; while (($k, $v) = each %{$res->{$type}{$phy}{command}}){ $props->{$k} = $v; } delete $res->{$type}{$phy}{command}; $res->{$type}{$phy}{vnode} = $n; # gather meta nodes for recursion if($type eq "meta"){ push @meta_nodes, ($phy); } } XB_Log::log "debug8", " [$procname] meta: ". Dumper(\@meta_nodes); XB_Log::log "debug8", " [$procname] resources: ". Dumper($res); XB_Log::log "debug8", " [$procname] network: ". Dumper($nodes); #=> 2. Recurse on meta nodes if(@meta_nodes > 0){ my (@nodes, %node_tags); #-- generate the dispatch message my $node_count= @meta_nodes; #-- check if using persistent socket my $connection = ($XB_Params::PERSISTENT_SOCK)? "yes":"no"; my $dispatch_msg = " (xbonecontrol $XB_Params::ctl_ver $XB_Params::rel_ver (dispatch (application $type) (name $name) (version $vers) (level $level) (retry $node_count) (persistent_connection $connection) APPSPEC ))\n$XB_Params::msg_delimiter\n"; #=> get the list of nodes for my $n (@{$app_obj->{resources}{meta_tags}}){ push @nodes, $n->{name}; $node_tags{$n->{name}} = $n->{tag}; } XB_Log::log "debug1", " [$procname] nodes: ". join ('|', @nodes); #=> send TCP/SSL delete message to all nodes in parallel my $max_procs = $node_count; my $pm = new Parallel::ForkManager($max_procs); my %node_list; my $select = IO::Select->new; $pm->run_on_start( sub { my ($pid, $ident) = @_; XB_Log::log "info", " [$procname] fork: $ident pid: $pid"; } ); $pm->run_on_finish( sub { my ($pid, $exit_code, $ident) = @_; if($exit_code){ XB_Log::log "info", " [$procname] ack-dispatch from $ident,". " pid $pid exit $exit_code"; }else{ XB_Log::log "warning", " [$procname] $ident ack-dispatch failed,". " pid $pid exit $exit_code"; } $node_count--; } ); $pm->run_on_wait( sub { XB_Log::log "debug1", " [$procname] waiting $node_count processes". " ..."; } ); for my $node (@nodes){ my $ssl_sock; if($XB_Params::PERSISTENT_SOCK){ my $ctltype = ($XB_Params::node_opts{control_protocol} eq "both") ? $pro->{address_type}: $XB_Params::node_opts{control_protocol}; $ssl_sock = XB_Common::tcp_ssl_sock ($ctltype, $node, $XB_Params::node_opts{xbone_ctl_port}); } # create a pipe for the child to write back to parent my $rh = IO::Handle->new; my $wh = IO::Handle->new; pipe $rh, $wh or die "pipe"; $node_list{$rh} = $node; #=> fork my $pid = $pm->start($node) and XB_Common::fork_parent_close($select, $rh, $wh, $node) and next; #=> begin child process ========================================== # exception handling inside the child process: need to catch all # dies by the child process and exit with different code my $received = 0; eval{ #=> close the read handle $rh->close or die "close"; #=> construct dispatch message for each node => APPLICATION-SPECIFIC! my $api; if($app_obj->{application}{type} eq "overlay"){ $api = ${$app_obj->{message}}; my $rclass = $app_obj->{application}{root_node}; my $app_name= $app_obj->{application}{name}; my $nodelist= $app_obj->{application}{network}{nodes}; my $nclass = $nodelist->{$node_tags{$node}}{properties}{class}; $api=~s/\(root\s+$rclass\s+$app_name\)/\(root $nclass $app_name\)/g; } my $node_dispatch = $dispatch_msg; $node_dispatch =~ s/APPSPEC/\"$api\"/g; #=> create or retrieve tcp/ssl socket if not yet created unless(defined $ssl_sock){ my $ctltype = ($XB_Params::node_opts{control_protocol} eq "both") ? $pro->{address_type}: $XB_Params::node_opts{control_protocol}; $ssl_sock = XB_Common::tcp_ssl_sock ($ctltype, $node, $XB_Params::node_opts{xbone_ctl_port}); } #=> send dispatch XB_Log::log "info", " [$procname] send dispatch to $node"; print $ssl_sock "$node_dispatch"; #=> start set timer my $timeout = ($XB_Params::xbone_rtt * $max_procs *2) +$XB_Params::xbone_rtt; #=> wait & receive ack-dispatch my $new_sel; unless ($new_sel = IO::Select->new($ssl_sock)){ XB_Log::log "err", " [$procname] select $node failed: $!" and die "select"; } while(my @r = $new_sel->can_read){ for my $fh (@r){ if($fh != $ssl_sock){ XB_Log::log "err", " [$procname] wrong socket for $node"; next; }else{ #=> read command my $ctl_msg = XB_Common::fh_read_until ($fh, $XB_Params::msg_delimiter); print $wh $ctl_msg; $received = 1; } } if($received){ last; } } }; # need to close(SSL_no_shutdown=>1) all open TCP/SSL sockets before # child exits, or parent could not use those sockets after fork unless($XB_Params::NO_FORK){ XB_Common::child_close($node); } if($@ && $@ !~ /(tcp_ssl_sock|select|fh_read_until|close)/){ XB_Log::log "err", " [$procname:$node] caught unknown error: $@\n"; } #=> end child process ============================================ $pm->finish($received); } while (my @handles = $select->can_read){ for my $h (@handles){ my $n = $node_list{$h}; XB_Log::log "debug1", " [$procname] receive from $n"; my @lines = $h->getlines; my $ack_msg = join "", @lines; my $ctl_cmd = $XB_CTL_parser::parser->xb_ctl($ack_msg); unless (defined $ctl_cmd){ XB_Log::log "err", " [$procname] error parsing message ". "from $n =====\n$ack_msg\n==========" and die "parser"; } unless ($ctl_cmd->{command}{command} eq "ack-dispatch"){ XB_Log::log "err", " [$procname] wrong command: ". "$ctl_cmd->{command}{command}"; die "command"; } XB_Log::log "debug1", " [$procname] CTL command: ".Dumper($ctl_cmd); $select->remove($h); $h->close or die "close"; # process the command! } } $pm->wait_all_children; #=> TODO delete the corresponding data obj # - deallocate IP address blocks # TODO failure processing: when not all acks XB_Log::log "info", " [$procname] dispatch $name, level $level"; } }; XB_Log::log "info", "<- $modname$procname"; return 1 unless $@; unless($@ =~ /(XB_SMIME::\S+|multicast)/){ XB_Log::log "warning", " ! $procname caught unknown exception: $@"; } die "$modname$procname"; } # Description: # api_update_dns # Update the dns will appropriate modifications # Arguments: # $app (ref) application object hash # operation - "add" or "delete" # Returns: # 1 on success # 0 on failure # Exceptions: # # TODO: Must store the virtual hostname back to the app object hash! # sub api_update_dns($$){ my ($app, $op) = @_; my $procname = "api_update_dns"; XB_Log::log "info", "-> $modname$procname $app"; eval{ if ($op !~ /(add|delete)/){ XB_Log::log "err", "[$procname] invalid operation $op"; die ("op"); } my $name = $app->{name}; my $domain = $XB_Params::node_opts{xbone_net}; # remove the suffix from the name in case it is appended. $name =~ s/\.$domain//g; #=> get the list of nodes - without the name extensions my @nodenames = (); for my $t (keys %{$app->{resources}}){ for my $n (keys %{$app->{resources}{$t}}){ push @nodenames, $app->{resources}{$t}{$n}{vnode}; } } # construct the arguments for calling update_dns function my %args = (); my %map = (); foreach my $node (@nodenames){ my $ipaddrlist = undef; my $itfmap = $app->{network}{nodes}{$node}{interfaces}; foreach my $itf (keys %{$itfmap}){ if (defined $itfmap->{$itf}){ if (not defined $ipaddrlist){ $ipaddrlist = $itfmap->{$itf}{netaddr}; } else { $ipaddrlist = "$ipaddrlist;" . $itfmap->{$itf}{netaddr}; } } } # by now you have computed all the possible addresses for a given node. $map{$node} = $ipaddrlist; } $args{op} = $op; $args{overlay} = $app->{name}; $args{overlay} =~ s/\.$XB_Params::node_opts{xbone_net}//g; # remove the suffix $args{address_type} = $app->{network}{properties}{address_type}; $args{map} = \%map; XB_VN_DNS::update_dns(\%args); }; XB_Log::log "info", "<- $modname$procname"; return 1 unless $@; unless($@ =~ /(op|update_dns)/){ XB_Log::log "warning", " ! $procname caught unknown exception: $@"; } die "$modname$procname"; } # Description: # API Start: Phase 6 - Configure # - construct node commands according to application specification # - send CONFIG (with node commands) to all selected nodes # - collect all ACK-CONFIG # Arguments: # $app_obj (ref) application object hash # Returns: # 1 on success # Exceptions: # XB_API::api_config on failure, caller should stop the app, # remove the app object, and stop the API start process # sub api_config($){ my ($app_obj) = shift; my $procname = "api_config"; XB_Log::log "info", "-> $modname$procname $app_obj"; eval{ my $app = $app_obj->{application}; my $pro = $app->{network}{properties}; my ($type, $name, $level) = ($app->{type}, $app->{name}, $app->{level}); my $node_cmds; # to store the message for each node my %node_reply; #=> construct node commands (%message) if($type eq 'overlay'){ $node_cmds = XB_VN_funcs::make_node_config($app_obj); XB_Log::log "debug1", " [$procname] Node Commands:". Dumper($node_cmds); }else{ # add support for other applications here XB_Log::log "err", " [$procname] application $type not supported" and die "app"; } #XB_Log::log "debug1", " [$procname] App Object: ". Dumper($app_obj); #=> fork and send CONFIG(node commands) to each node, collect replies #-> compose select message # check if using persistent socket # TODO persistent connection between CONFIG & STOP doesn't make sense # TODO so no "(persistent_connection $conection)" is needed here my $connection = ($XB_Params::PERSISTENT_SOCK)? "yes":"no"; my $credential = ${$app_obj->{credential}{section}}; my $config = "(xbonecontrol $XB_Params::ctl_ver $XB_Params::rel_ver\n". $credential. " (config (application $type)\n". " (name $name)\n". " (version $XB_Params::xol_ver)\n". " (level $level)\n"; # need "))\n $XB_Params::msg_delimiter\n" at the end my $msg = ""; my (%nodelist, @failed); #=> get the list of nodes for my $t (keys %{$app->{resources}}){ for my $n (keys %{$app->{resources}{$t}}){ $nodelist{$n} = $app->{resources}{$t}{$n}{vnode}; } } #=> before contacting all the nodes, update the DNS if ($XB_Params::node_opts{"dns"} eq "yes" and $app_obj->{application}{network}{properties}{dns} eq "yes"){ api_update_dns($app, "add"); } #=> send to all nodes in parallel with Parallel::ForkManager(3) my $max_procs = ($XB_Params::NO_FORK)? 0 : (keys %nodelist); my $node_count = keys %nodelist; my $pm = new Parallel::ForkManager($max_procs); my $sel = IO::Select->new; my %node_list; $pm->run_on_start( sub { my ($pid, $ident) = @_; XB_Log::log "info", " [$procname] config $ident (fork pid: $pid)"; } ); $pm->run_on_finish( sub { my ($pid, $exit_code, $ident) = @_; if($exit_code){ XB_Log::log "info", " [$procname] ack-config from $ident, ". "pid $pid exit $exit_code\n"; }else{ push @failed, $ident; XB_Log::log "warning", " [$procname] $ident ack-config failed, ". "pid $pid exit $exit_code\n"; } $node_count--; } ); $pm->run_on_wait( sub { XB_Log::log "debug1", " [$procname] waiting $node_count processes". " ..."; } ); for my $node (keys %nodelist){ my $ipaddr = $app->{network}{nodes}{$nodelist{$node}}{properties}{ctl_addr}; #=> create or retrieve tcp/ssl socket my $ssl_sock; if($XB_Params::PERSISTENT_SOCK){ my $ctltype = ($XB_Params::node_opts{control_protocol} eq "both") ? $pro->{address_type}: $XB_Params::node_opts{control_protocol}; $ssl_sock = XB_Common::tcp_ssl_sock ($ctltype, $node, $XB_Params::node_opts{xbone_ctl_port}, $ipaddr); } #=> create a pipe for the child to write back to parent my $rh = IO::Handle->new; my $wh = IO::Handle->new; pipe $rh, $wh or die "pipe"; $node_list{$rh} = $node; #-> fork my $pid = $pm->start($node) and XB_Common::fork_parent_close($sel, $rh, $wh, $node) and next; #=> begin child process ============================================ # exception handling inside the child process: need to catch all # dies by the child process and exit with different code my $received = 0; eval{ #=> create or retrieve tcp/ssl socket if not yet created unless(defined $ssl_sock){ my $ctltype = ($XB_Params::node_opts{control_protocol} eq "both") ? $pro->{address_type}: $XB_Params::node_opts{control_protocol}; $ssl_sock = XB_Common::tcp_ssl_sock ($ctltype, $node, $XB_Params::node_opts{xbone_ctl_port}, $ipaddr); } #=> finish the config command $config .= $node_cmds->{$nodelist{$node}} . "))\n $XB_Params::msg_delimiter\n"; #=> send config XB_Log::log "info", " [$procname] send config to $node"; print $ssl_sock "$config"; XB_Log::log "debug7", " [$procname] $node node command: $config"; #=> start set timer my $timeout = time + $XB_Params::xbone_rtt; my $now; #=> wait & receive ack-select my $new_sel; unless ($new_sel = IO::Select->new($ssl_sock)){ XB_Log::log "err", " [$procname:$node] select failed: $!" and die "sel"; } while(my @r = $new_sel->can_read()){ # TODO set timeout? for my $fh (@r){ if($fh != $ssl_sock){ XB_Log::log "err", " [$procname:$node] wrong socket for $node"; next; }else{ #=> read command my $ctl_msg = XB_Common::fh_read_until ($fh, $XB_Params::msg_delimiter); print $wh $ctl_msg; $received = 1; } } if($received == 1){ last; } } if(not $XB_Params::PERSISTENT_SOCK){ $ssl_sock->close or XB_Log::log "err", " [$procname:$node] socket close failed: $!" and die "close"; } unless($received){ XB_Log::log "err", " [$procname:$node] did not acknowledge"; die "rcv"; } }; unless($XB_Params::NO_FORK){ XB_Common::child_close($node); } if($@ && $@ !~ /(tcp_ssl_sock|sel|fh_read_until|rcv|close)/){ XB_Log::log "err", " [$procname:$node] caught unknown exception: $@"; } unless($received){ if($XB_Params::error_reply eq ""){ $XB_Params::error_reply = "$node caught exception $@ w/out further". " details"; } my $emsg = XB_Common::ctl_error_msg('config', $type, $name, $level, \$XB_Params::error_reply); print $wh $$emsg; } #=> end child process ============================================== $pm->finish($received); } #=> collect responses from all the nodes while (my @handles = $sel->can_read){ # TODO set timeout for my $h (@handles){ my $n = $node_list{$h}; XB_Log::log "debug1", " [$procname] receive from $n"; my @lines = $h->getlines; my $ack_msg = join "", @lines; $ack_msg =~ s/$XB_Params::msg_delimiter//g; my $ctl_cmd = $XB_CTL_parser::parser->xb_ctl($ack_msg); unless (defined $ctl_cmd){ XB_Log::log "err", " [$procname] error parsing message ". "from $n =====\n$ack_msg\n=========="; }else{ unless ($ctl_cmd->{command}{command} =~ /(ack-config|error)/){ XB_Log::log "err", " [$procname] wrong command: ". "$ctl_cmd->{command}{command}"; }else{ XB_Log::log "debug6", " [$procname] CTL command: ". Dumper($ctl_cmd); delete $node_list{$h}; $node_reply{$n} = $ctl_cmd; } } $sel->remove($h); $h->close or die "close"; } } $pm->wait_all_children; #=> process replies: my $failed = 0; my $err_msg = ""; while(my ($k, $v) = each %node_reply){ if($v->{command}{command} eq 'error'){ $err_msg .= "Node $k replied error: \'$v->{command}{message}\'\n"; $failed++; } } my @missing = values %node_list; if(@missing > 0){ $err_msg .= "Missing replies from ". (join ", ", @missing)."\n"; $failed++; } if($failed){ XB_Log::log "err", " [$procname] Node configuration has failed. Check the xbone log. $err_msg"; die "fail"; } #if(@failed > 0){ # XB_Log::log "err", " [$procname] config failed on ". # (join ", ", @failed) and die "ack"; #} #TODO set the node & tunnel status to up! }; XB_Log::log "info", "<- $modname$procname"; return 1 unless $@; unless($@ =~ /(make_node_config|app|pipe|close|tcp_ssl_sock|fail)/){ XB_Log::log "warning", " ! $procname caught unknown exception: $@"; } die "$modname$procname"; } ############################################################################### # Top Level API Commands ############################################################################### # Description: # [API] Start/Create the specified application # Arguments: # $cmd (ref) API command # $msock multicast socket # Returns: # $reply (ref) reply message # Exceptions: # "api_start" on failure, nothing to cleanup by caller # Notes: # * API reply message: # [done] construct API reply message & return it to caller # [fail] die & throw an exception in caller routine, caller will need to # gather error messages from logs and send back API error reply # sub api_start ($$){ my ($cmd, $msock) = @_; my $procname = "api_start"; my ($type, $name, $app_obj, $avail, $reply, $apptype, $appname); XB_Log::log "info", "-> $modname$procname $cmd, $msock"; eval{ #=> [0] Check for conflicts & resources #-- Check if the app/name exist $type = "overlay"; $name = $cmd->{command}{create_overlay}{property}{overlay_name}; $app_obj = get_app $type, $name; if(defined $app_obj){ $reply = "[$procname] $type $name already exists!"; XB_Log::log "err", " $reply"; die "dupl"; } #-- Check if DNS is requested & supported unless(XB_Common::check_resource("DDNS", $cmd->{command}{create_overlay}{property}{dns}, $XB_Params::node_opts{"dns"})){ XB_Log::log "err", " [$procname] DDNS update not enabled."; die("dns"); } #=> [1] Initialization $app_obj = api_init $cmd; #=> [2] Invite $avail = api_invite $app_obj, $msock; #=> [3] Select api_select $app_obj, $avail; #=> [4] Release api_release $app_obj, $app_obj->{application}{level}, $msock; #sleep ($XB_Params::xbone_rtt); #=> [5] Bind and Dispatch api_bind($app_obj); #=> [6] Config api_config($app_obj); #=> [7] Commit $XB_Params::node_state{active_apps}{$type}{$name} = $app_obj; XB_Log::log "info", " [$procname] commit $type $name into state"; XB_Log::log "debug1", " [$procname] State: ". Dumper(\%XB_Params::node_state); $XB_Params::node_state{user_stats}{$app_obj->{credential}{user_email}}++; #=> [8] Update the state file XB_Common::record_state(); #=> [9] Succeed, construct API reply message to be returned $reply = api_build_msg "create_reply", $app_obj; }; XB_Log::log "info", "<- $modname$procname"; return $reply unless $@; #=> Caught exception; cleanup the mess before returning unless($@ =~ /\b(dupl|api_init|api_invite|api_select|api_release)\b/ or $@ =~ /\b(api_bind|api_config)\b/){ XB_Log::log "warning", " [$procname] caught unknown exception: $@"; } my $exception = $@; #-> cleanup depending on where it dies eval{ unless($exception =~ /\b(dupl|dns|api_init|api_release)\b/){ # remove the DNS entries # release if multicast was sent api_release $app_obj, $app_obj->{application}{level}, $msock; # TODO release address blocks } unless($exception =~ /\b(dupl|dns|api_init|api_invite)\b/){ # stop/delete if nodes were selected api_stop ($type, $name, $app_obj); if ($XB_Params::node_opts{"dns"} eq "yes" and $app_obj->{application}{network}{properties}{dns} eq "yes"){ api_update_dns($app_obj->{application}, "delete"); } } }; if($@){ XB_Log::log "err", " [$procname] failed again during exception ". "handling: $@"; } die "$modname$procname"; } # Description: # [API] Stop/Delete the specified application(s) # Arguments: # $type application type # $name application name # $app_obj (ref) application object hash # Returns: # \$reply (ref) reply message # Exceptions: # XB_API::api_stop on failure, nothing to cleanup by caller # sub api_stop($$;$){ my ($type, $name, $app_obj) = @_; my ($reply); my $procname = "api_stop"; XB_Log::log "info", "-> $modname$procname $type, $name"; eval{ # TODO 1. Convert the fork so that the children write back to the # TODO the parent in case of error reply. # TODO 2. Failure processing: die if the following # TODO - missing acks # TODO - error message or nacks my $cred; #=> check if the app/name exists if(exists $app_obj->{command}{command}){ $cred = ${$app_obj->{credential}{section}}; $app_obj = get_app ($type, $name); } unless(defined $app_obj){ $reply = "[$procname] $type $name does not exists!"; XB_Log::log "err", " $reply" and die "miss"; } #=> generate the stop message my $level= $app_obj->{application}{level}; #my $stop = XB_Common::ctl_msg('stop', $type, $name, $level); unless(defined $cred){ $cred = ${$app_obj->{credential}{section}}; } my $stop = "(xbone-ctl $XB_Params::ctl_ver $XB_Params::rel_ver\n". $cred. " (stop\n". " (application $type)\n". " (name $name)\n". " (level $level)\n". " )\n". ")\n$XB_Params::msg_delimiter\n"; #=> undo the DNS entries. XXX have to check for DNS variable being #set here. eval { if ($XB_Params::node_opts{"dns"} eq "yes" and $app_obj->{application}{network}{properties}{dns} eq "yes"){ api_update_dns($app_obj->{application}, "delete"); } }; if ($@){ # dont stop even if there is a problem. XB_Log::log "err", " Deletion of dynamic DNS entries failed!"; } #=> get the list of nodes my @nodes; my $app = $app_obj->{application}; my $res = $app_obj->{application}{resources}; my $pro = $app_obj->{application}{network}{properties}; my %nodelist; for my $t (keys %{$res}){ for my $n (keys %{$res->{$t}}){ push @nodes, $n; $nodelist{$n} = $res->{$t}{$n}{vnode}; } } #=> send TCP/SSL delete message to all nodes in parallel my $max_procs = @nodes; my $node_count = $max_procs; my (%node_list, %node_reply); my @failed; my $pm = new Parallel::ForkManager($max_procs); my $sel = IO::Select->new; $pm->run_on_start( sub { my ($pid, $ident) = @_; XB_Log::log "info", " [$procname] stop $ident ". "(fork pid: $pid)"; } ); $pm->run_on_finish( sub { my ($pid, $exit_code, $ident) = @_; if($exit_code){ XB_Log::log "info", " [$procname] ack-stop from $ident, pid ". "$pid exit $exit_code"; }else{ XB_Log::log "warning", " [$procname] $ident ack-stop failed, pid ". "$pid exit $exit_code"; push @failed, ($ident); } $node_count--; } ); $pm->run_on_wait( sub { XB_Log::log "debug1", " [$procname] waiting $node_count processes". " ..."; } ); for my $node (@nodes){ my $ipaddr = $app->{network}{nodes}{$nodelist{$node}}{properties}{ctl_addr}; #=> create or retrieve tcp/ssl socket my $ssl_sock; if($XB_Params::PERSISTENT_SOCK){ my $ctltype = ($XB_Params::node_opts{control_protocol} eq "both") ? $pro->{address_type}: $XB_Params::node_opts{control_protocol}; $ssl_sock = XB_Common::tcp_ssl_sock($ctltype, $node, $XB_Params::node_opts{xbone_ctl_port}, $ipaddr); } #=> create a pipe for the child to write back to parent my $rh = IO::Handle->new; my $wh = IO::Handle->new; pipe $rh, $wh or die "pipe"; $node_list{$rh} = $node; #=> fork my $pid = $pm->start($node) and XB_Common::fork_parent_close($sel, $rh, $wh, $node) and next; #=> begin child process # exception handling inside the child process: need to catch all # dies by the child process and exit with different code my $received = 0; eval{ #=> close the read handle $rh->close or die "close"; #=> create or retrieve tcp/ssl socket if not yet created unless(defined $ssl_sock){ my $ctltype = ($XB_Params::node_opts{control_protocol} eq "both") ? $pro->{address_type}: $XB_Params::node_opts{control_protocol}; $ssl_sock = XB_Common::tcp_ssl_sock($ctltype, $node, $XB_Params::node_opts{xbone_ctl_port}, $ipaddr); } #=> send stop XB_Log::log "info", " [$procname] send stop to $node"; print $ssl_sock $stop or die "send"; #=> start set timer my $timeout = $XB_Params::xbone_rtt; #=> wait & receive ack-stop my $new_sel; unless ($new_sel = IO::Select->new($ssl_sock)){ XB_Log::log "err", " [$procname] select $node failed: $!" and die "sel"; } while(my @r = $new_sel->can_read){ # TODO set timeout for my $fh (@r){ if($fh != $ssl_sock){ XB_Log::log "err", " [$procname] wrong socket for $node"; next; }else{ #=> read command my $ctl_msg = XB_Common::fh_read_until ($fh, $XB_Params::msg_delimiter); print $wh $ctl_msg; $received = 1; } } if($received == 1){ last; } } if(not $XB_Params::PERSISTENT_SOCK){ $ssl_sock->close or XB_Log::log "err", " [$procname:$node] socket close failed: $!" and die "close"; } unless($received){ XB_Log::log "err", " [$procname:$node] did not received the ". "whole message" and die "rcv"; } }; unless($XB_Params::NO_FORK){ XB_Common::child_close($node); } if($@ && $@ !~ /(tcp_ssl_sock|send|sel|fh_read_until|close|rcv)/){ XB_Log::log "err", " [$procname:$node] caught unknown exception: $@"; } unless($received){ if($XB_Params::error_reply eq ""){ $XB_Params::error_reply = "$node caught exception $@ w/out further". " details"; } my $emsg = XB_Common::ctl_error_msg('stop', $type, $name, $level, \$XB_Params::error_reply); print $wh $$emsg; } $pm->finish($received); #=> end child process } #=> collect responses from all the nodes while (my @handles = $sel->can_read){ # TODO set timeout for my $h (@handles){ my $n = $node_list{$h}; XB_Log::log "debug1", " [$procname] receive from $n"; my @lines = $h->getlines; my $ack_msg = join "", @lines; $ack_msg =~ s/$XB_Params::msg_delimiter//g; my $ctl_cmd = $XB_CTL_parser::parser->xb_ctl($ack_msg); unless (defined $ctl_cmd){ XB_Log::log "err", " [$procname] error parsing message ". "from $n =====\n$ack_msg\n=========="; }else{ unless ($ctl_cmd->{command}{command} =~ /(ack-stop|error)/){ XB_Log::log "err", " [$procname] wrong command: ". "$ctl_cmd->{command}{command}"; }else{ XB_Log::log "debug6", " [$procname] CTL command: ". Dumper($ctl_cmd); delete $node_list{$h}; $node_reply{$n} = $ctl_cmd; } } $sel->remove($h); $h->close or die "close"; } } $pm->wait_all_children; #=> release address blocks my @addr = ($pro->{addr_blk_net}, $pro->{addr_blk_link}); unless($XB_Params::new_alloc){ XB_VN_IPalloc::release (\@addr, $pro->{address_type}, $pro->{addr_server}); }else{ XB_VN_IPalloc::new_release (\@addr, $pro->{address_type}, $pro->{addr_server}); } #=> delete the corresponding data obj if exists if(exists $XB_Params::node_state{active_apps}{$type}{$name}){ delete $XB_Params::node_state{active_apps}{$type}{$name}; } my $user_email = $app_obj->{credential}{user_email}; if($XB_Params::node_state{user_stats}{$user_email}){ $XB_Params::node_state{user_stats}{$user_email}--; } #=> update the state file XB_Common::record_state(); XB_Log::log "info", " [$procname] stop $name, level $level"; my $failed = 0; my $err_msg = ""; while(my ($k, $v) = each %node_reply){ if($v->{command}{command} eq 'error'){ $err_msg .= "Node $k replied error: \'$v->{command}{message}\'\n"; $failed++; } } my @missing = values %node_list; if(@missing > 0){ $err_msg .= "Missing replies from ". (join ", ", @missing)."\n"; $failed++; } if($failed){ XB_Log::log "err", " [$procname] $err_msg"; die "fail"; } #if(@failed > 0){ # #=> failure processing: when not all acks # XB_Log::log "warning", " [$procname] missing ack-stops from ". # join (", ", @failed); #} #=> construct API reply message: my %ahref = ( "protocol" => $XB_Params::api_ver, "release" => $XB_Params::rel_ver ); $reply = XB_XML_GUI::XB_build_destroy_overlay_reply_msg (\%ahref, $name); }; XB_Log::log "info", "<- $modname$procname"; return $reply unless $@; unless($@ =~ /(miss|pipe|close|fail)/){ XB_Log::log "warning", " ! $procname caught unknown exception: $@"; } die "$modname$procname"; } # Description: # [API] Send multicast to find all active nodes. # Arguments: # $cmd (ref) API command # $msocks (ref) array of multicast sockets # Returns: # $reply (ref) reply message # Exceptions: # XB_API::api_discover on failure, nothing to cleanup by caller # sub api_discover($$){ my ($cmd, $msocks) = @_; my $reply; my $procname = "api_discover"; XB_Log::log "info", "-> $modname$procname $cmd, $msocks"; eval{ my $seq = time; my $cre = $cmd->{credential}{property}; $cmd->{credential}{user_name} = $cre->{user_name}; $cmd->{credential}{user_email} = $cre->{user_email}; $cmd->{credential}{auth_type} = $cre->{auth_type}; my (%host_hash, %router_hash, %meta_hash, %node_hash); #=> construct discover message my $cresec = $cmd->{credential}{section}; my $discover = "(xbone-ctl $XB_Params::ctl_ver $XB_Params::rel_ver $seq\n". $$cresec. " (discover)\n". ")\n$XB_Params::msg_delimiter\n"; XB_Log::log "debug1", " [$procname] discover command:\n$discover"; #=> sign the message with S/MIME my $smime_msg = XB_SMIME::sign($discover, $XB_Params::node_opts{"node_cert"}, $XB_Params::node_opts{"node_key"}); #=> switch on discovery mechanisms my $props = $cmd->{command}{discover_daemons}{property}; $props->{discover} = "multicast"; # XXX Hack. Perl throws warnings if we look for hash # entries that dont exist. So generate a list of existing # entries and search through them. my @optlist = keys %{$props}; if(XB_Common::check_list("custom_hostlist", \@optlist) == 1){ if ($props->{custom_hostlist} ne ''){ $props->{discover} = "unicast"; my @hlist = split /[ \t\n\r\f,]+/, $props->{custom_hostlist}; $props->{hostlist} = \@hlist; XB_Log::log "debug2", " [$procname] User supplied a list of hosts". " to probe:\n". Dumper($props->{hostlist}); } } elsif(XB_Common::check_list("ldap", \@optlist) == 1){ if($props->{ldap} eq "yes"){ if ($XB_Params::node_opts{ldap}->{enable} !~ /yes/i){ XB_Log::log "err", " [$procname] LDAP support disabled because the server ". "is unreachable or LDAP support is not configured\n"; die("ldap"); } $props->{discover} = "unicast"; XB_Log::log "debug2", " [$procname] User asked to use LDAP server". " to get the list of hosts\n"; my ($scope, $line, %attrvals) = ($props->{scope}, $props->{attrvals},() ); my @av_array = split(/[\s,]+/, $line); foreach my $av (@av_array){ my @arr = split(/=/, $av); $attrvals{$arr[0]} = $arr[1]; } my $result = XB_LDAP::LDAP_search("registry", $scope, \%attrvals); XB_Log::log "debug2", "LDAP search result =". Dumper ($result); my @hlist = (); for my $h (keys %{$result}) { XB_Log::log "debug2", "registered hostname= $h"; push @hlist, $h; } $props->{hostlist} = \@hlist; } } my $new_sel = IO::Select->new; # all multicast udp sockets to listen on my (%msg_hash, %msg_log); if($props->{discover} eq "multicast"){ # multicast for my $msock (@{$msocks}){ #-- multicast the message for each multicast socket unless (ref($msock) =~ /IO::Socket::Multicast/){ XB_Log::log "warning", " [$procname] wrong socket type! ".ref($msock); } # - set the search radius if (defined $props->{search_radius}){ $msock->mcast_ttl($props->{search_radius}); } # - supress mcast loopback before sending #-- supress mcast loopback before sending $msock->mcast_loopback(0); $msock->mcast_send($smime_msg) or XB_Log::log "err", " [$procname] multicast send failed: $!" and die "mcast"; $msock->mcast_loopback(1); $new_sel->add($msock) or XB_Log::log "err", " [$procname] select failed: $!" and die "sel"; XB_Log::log "info", " [$procname] sent multicast discover"; } } elsif ($props->{discover} eq "unicast"){ #-- unicast the message for my $p (@{$props->{hostlist}}){ #-> create udp socket # you have to discover on both v4 and v6 sockets. # send a message on v4 socket if ($XB_Params::node_opts{ipproto} =~ /both|ipv4/){ my $udp_sock; $udp_sock = XB_Common::udp_sock($p, $XB_Params::node_opts{xbone_ctl_port}, 'ipv4'); #-> send invite message unless (send($udp_sock, $smime_msg, 0)){ XB_Log::log "err", " [$procname] Send UDP unicast invite to". " $p failed: $!" and die "send"; }else{ XB_Log::log "info", " [$procname] UDP unicast invite to $p"; } #=> close udp socket $udp_sock->close or XB_Log::log "err", " [$procname] socket close failed: $!" and die "close"; } # handled the v4... # send a message on v6 socket if ($XB_Params::node_opts{ipproto} =~ /both|ipv6/){ my $udp_sock; $udp_sock = XB_Common::udp_sock($p, $XB_Params::node_opts{xbone_ctl_port}, 'ipv6'); #-> send invite message unless (send($udp_sock, $smime_msg, 0)){ XB_Log::log "err", " [$procname] Send UDP unicast invite to". " $p failed: $!" and die "send"; }else{ XB_Log::log "info", " [$procname] UDP unicast invite to $p"; } #=> close udp socket $udp_sock->close or XB_Log::log "err", " [$procname] socket close failed: $!" and die "close"; } # handled the v6... } # foreach of the hosts.. # the response will come back as a UDP message from # the RDs on the control port. multicast port are # those sockets. for my $msock (@{$msocks}){ $new_sel->add($msock) or XB_Log::log "err", " [$procname] select failed: $!" and die "sel"; } } #=> Receive replies from the sources. #-- start timer my $rtt; if ($props->{timeout} eq '') { $rtt = $XB_Params::xbone_rtt; } else { $rtt = $props->{timeout}; } XB_Log::log "debug", "##### Timeout = $rtt #####"; my $timeout = time + $rtt; #TODO TIMEOUT from CMD!!! my $now; #-- collect ack-discover while(1){ $now = time; my $timeleft = $timeout - $now; while(my @r = $new_sel->can_read ($timeleft)){ foreach my $fh (@r){ #if($fh == $msock){ XB_Log::log "info", " [$procname] incoming UDP message"; my ($umsg, $src_sock, $peerhost, $src_cmd, $src_port); unless ($src_sock = $fh->recv($umsg, 65536, 0)){ XB_Log::log "err", " [$procname] recv: $!" and die "recv"; } eval{ ($umsg, $peerhost) = XB_SMIME::verify($umsg); }; if($@){ XB_Log::log "warning", " [$procname] SMIME verification failed"; $umsg = ""; next; } my $ip; if(ref ($fh) eq "IO::Socket::Multicast6"){ $ip = 'ipv6'; #($src_port, $src_sock) = unpack_sockaddr_in6($src_sock); #my @hinfo = getipnodebyaddr(AF_INET6, $src_sock); #$src_sock = $hinfo[0]; }else{ $ip = 'ipv4'; #($src_port, $src_sock) = sockaddr_in $src_sock; #$src_sock = gethostbyaddr($src_sock, AF_INET); } my $peerport = XB_Common::chk_sockaddr($fh, $peerhost, $ip); $umsg =~ s/$XB_Params::msg_delimiter//g; my $ctl_cmd = $XB_CTL_parser::parser->xb_ctl($umsg); unless (defined $ctl_cmd){ XB_Log::log "err", " [$procname] parser error: ==\n$umsg\n==" and next; } XB_Log::log "debug7", " [$procname] ack discover received". " from $peerhost: ==\n$umsg\n=="; XB_Log::log "debug6", " [$procname] CTL command:". Dumper($ctl_cmd); $src_cmd = $ctl_cmd->{command}{hostname}; unless ($ctl_cmd->{command}{command} =~ /(ack-discover|err)/){ XB_Log::log "err", " [$procname] wrong command: ". "$ctl_cmd->{command}{command}" and die "cmd"; } #unless($src_sock eq $peerhost){ # XB_Log::log "err", " [$procname] hostname mismatch: ". # "cert: $src_cert, sock: $src_sock" and die "name"; #} unless($src_cmd eq $peerhost){ XB_Log::log "err", " [$procname] hostname mismatch: ". "cmd: $src_cmd, cert: $peerhost" and die "name"; } if(exists $msg_log{$src_cmd}){ XB_Log::log "warning", " [$procname] duplicate message". " from $src_cmd" and next; }else{ XB_Log::log "debug1", " [$procname] new ack: $src_cmd"; $msg_log{$src_cmd}=time; } $cmd->{nodes}{$src_cmd} = $ctl_cmd; #} } } $now = time; if($now > $timeout){ last; } } $reply = api_build_msg "discover_reply", $cmd; #}else{ # TODO support for other discovery/reservation schemes such as LDAP # TODO LDAP_discover($smine_invite, \%host_hash, \%router_hash, # TODO \%meta_hash, \%both_hash); # TODO then calculate $enough as shown above #XB_Log::log "err", " [$procname] $cmd->{discover} mechanism is not". # " supported" and die "disc"; #} }; XB_Log::log "info", "<- $modname$procname"; return $reply unless $@; unless($@ =~ /(XB_SMIME|mcast|sel|recv|cmd|name|disc|ldap)/){ XB_Log::log "warning", " ! $procname caught unknown exception: $@"; } die "$modname$procname"; } # Description: # [API] Check the node/link status of the given application. # Arguments: # $type application type # $name application name # Returns: # \$reply (ref) reply message # Exceptions: # "XB_API::api_status" on failure, nothing to cleanup by caller # sub api_status($$$){ my ($type, $name, $cmd) = @_; my ($reply); my $procname = "api_status"; XB_Log::log "info", "-> $modname$procname $type, $name, $cmd"; eval{ #=> check if application type/name exists my $app_obj = get_app ($type, $name); unless(defined $app_obj){ $reply = "[$procname] $type $name does not exists!"; XB_Log::log "err", " $reply" and die "miss"; } my $level= $app_obj->{application}{level}; #=> generate the status message my $cred = ${$cmd->{credential}{section}}; #my $status = XB_Common::ctl_msg('status', $type, $name, $level); my $status = "(xbone-ctl $XB_Params::ctl_ver $XB_Params::rel_ver\n". $cred. " (status\n". " (application $type)\n". " (name $name)\n". " (level $level)\n". " )\n". ")\n$XB_Params::msg_delimiter\n"; #=> get the list of nodes #my @nodes; my %nodes; my $app = $app_obj->{application}; my $res = $app->{resources}; my $net = $app->{network}; my $pro = $net->{properties}; while(my ($type, $thash) = each %{$res}){ while(my ($host, $hhash) = each %{$thash}){ $nodes{$host} = $hhash->{vnode}; } } #for my $t (keys %{$res}){ # for my $n (keys %{$res->{$t}}){ # push @nodes, $n; # } #} XB_Log::log "debug1", " [$procname] nodes: ".join ('|', keys %nodes); #=> fork & send to all nodes in parallel my $max_procs = ($XB_Params::NO_FORK)? 0 : keys %nodes; my $node_count = keys %nodes; my $pm = new Parallel::ForkManager($max_procs); my %node_list; my %node_reply; my $sel = IO::Select->new; $pm->run_on_start( sub { my ($pid, $ident) = @_; XB_Log::log "info", " [$procname] status $ident (fork pid: $pid)"; } ); $pm->run_on_finish( sub { my ($pid, $exit_code, $ident) = @_; if($exit_code){ XB_Log::log "info", " [$procname] $ident status ok, ". "pid $pid exit $exit_code"; }else{ XB_Log::log "warning", " [$procname] $ident status failed, ". "pid $pid exit $exit_code"; } $node_count--; } ); $pm->run_on_wait( sub { XB_Log::log "debug1", " [$procname] waiting $node_count processes". " ..."; } ); for my $node (keys %nodes){ #=> create or retrieve tcp/ssl socket my $ssl_sock; my $ipproto = $pro->{address_type}; my $ipaddr = $net->{nodes}{$nodes{$node}}{properties}{ctl_addr}; if($XB_Params::PERSISTENT_SOCK){ $ssl_sock = XB_Common::tcp_ssl_sock ($ipproto, $node, $XB_Params::node_opts{xbone_ctl_port}, $ipaddr); } #=> create a pipe for the child to write back to parent my $rh = IO::Handle->new; my $wh = IO::Handle->new; pipe $rh, $wh or die "pipe"; $node_list{$rh} = $node; #=> fork my $pid = $pm->start($node) and XB_Common::fork_parent_close($sel, $rh, $wh, $node) and next; #=> begin child process ============================================ # exception handling inside the child process: need to catch all # dies by the child process and exit with different code my $received = 0; eval{ #=> close the read handle $rh->close or die "close"; #=> create or retrieve tcp/ssl socket if not yet created unless(defined $ssl_sock){ $ssl_sock = XB_Common::tcp_ssl_sock ($ipproto, $node, $XB_Params::node_opts{xbone_ctl_port}, $ipaddr); } #=> send select XB_Log::log "info", " [$procname] send status to $node"; print $ssl_sock $status; #=> wait & receive ack-status my $new_sel; unless ($new_sel = IO::Select->new($ssl_sock)){ XB_Log::log "err", " [$procname:$node] select failed: $!" and die "select"; } while(my @r = $new_sel->can_read()){ # TODO set timeout for my $fh (@r){ if($fh != $ssl_sock){ XB_Log::log "err", " [$procname:$node] wrong socket for $node"; next; }else{ #=> read command & write it back to the parent process my $ctl_msg = XB_Common::fh_read_until ($fh, $XB_Params::msg_delimiter); print $wh $ctl_msg; $received = 1; } } if($received == 1){ last; } } if(not $XB_Params::PERSISTENT_SOCK){ $ssl_sock->close or XB_Log::log "err", " [$procname:$node] socket close failed: $!" and die "close"; } unless($received){ XB_Log::log "err", " [$procname:$node] did not received the ". "whole message" and die "rcv"; } }; unless($XB_Params::NO_FORK){ XB_Common::child_close($node); } if($@ && $@ !~ /(tcp_ssl_sock|select|fh_read_until|close|rcv)/){ XB_Log::log "err", " [$procname:$node] caught unknown exception: $@"; } $pm->finish($received); #=> end child process ============================================== } #=> collect responses from all the nodes while (my @handles = $sel->can_read){ # TODO set timeout for my $h (@handles){ my $n = $node_list{$h}; XB_Log::log "debug1", " [$procname] receive from $n"; my @lines = $h->getlines; my $ack_msg = join "", @lines; my $ctl_cmd = $XB_CTL_parser::parser->xb_ctl($ack_msg); unless (defined $ctl_cmd){ XB_Log::log "err", " [$procname] error parsing message ". "from $n =====\n$ack_msg\n=========="; }else{ unless ($ctl_cmd->{command}{command} =~ /(ack-status|error)/){ XB_Log::log "err", " [$procname] wrong command: ". "$ctl_cmd->{command}{command}"; }else{ XB_Log::log "debug6", " [$procname] CTL command: ". Dumper($ctl_cmd); delete $node_list{$h}; $node_reply{$n} = $ctl_cmd; } } $sel->remove($h); $h->close or die "close"; } } $pm->wait_all_children; #=> process replies #-- gather the missing ones my @missing = (); @missing = values %node_list; if(@missing > 0){ XB_Log::log "err", " [$procname] missing status replies from ". (join ", ", @missing); } if($type eq 'overlay'){ XB_VN_funcs::process_status($app_obj, \%node_reply, \@missing); }else{ # add support for other applications here XB_Log::log "err", " [$procname] application $type not supported" and die "app"; } #=> generate the reply message $reply = api_build_msg "status_reply", $app_obj; }; XB_Log::log "info", "<- $modname$procname"; return $reply unless $@; unless($@ =~ /(miss)/){ XB_Log::log "warning", " ! $procname caught unknown exception: $@"; } die "$modname$procname"; } # Description: # [API] Fork and send refresh (heartbeat) messages to every node of all # active overlays to reset their expiration times. # Arguments: # $rlist (ref) hash of nodes and their corresponding overlays # Returns: # 1 on success # Exceptions: # "XB_API::api_refresh" on failure, nothing to cleanup by caller # sub api_refresh($){ my $rlist = shift; my $procname = "api_refresh"; XB_Log::log "info", "-> $modname$procname $rlist"; eval{ my @ipproto = keys %{$rlist}; #=> loop through IPv4 and IPv6 for my $ip (@ipproto){ my @nodes = keys %{$rlist->{$ip}}; my $max_procs = ($XB_Params::NO_FORK)? 0 : @nodes; my $node_count = @nodes; my $pm = new Parallel::ForkManager($max_procs); my $sel = IO::Select->new; my %node_list; $pm->run_on_start( sub { my ($pid, $ident) = @_; XB_Log::log "info", " [$procname] refresh $ident (fork pid: $pid)"; } ); $pm->run_on_finish( sub { my ($pid, $val, $ident) = @_; if($val){ XB_Log::log "info", " [$procname] $ident done, pid: $pid exit: $val"; }else{ XB_Log::log "warning", " [$procname] $ident failed, pid: $pid exit: $val"; } $node_count--; } ); $pm->run_on_wait( sub { XB_Log::log "debug1", " [$procname] waiting for $node_count processes"; } ); for my $node (@nodes){ #=> create a pipe for the child to write back to parent my $rh = IO::Handle->new; my $wh = IO::Handle->new; pipe $rh, $wh or die "pipe"; $node_list{$rh} = $node; my ($k, $v, $msg, $appstr, $udp_sock); #=> construct & sign the refresh message for my $h (@{$rlist->{$ip}{$node}}){ while (($k, $v) = each %{$h}){ $appstr .= " ($k $v)\n"; } } $msg = "(xbone-ctl $XB_Params::ctl_ver $XB_Params::rel_ver\n". " (refresh\n". $appstr. " )\n". ")\n". "$XB_Params::msg_delimiter\n"; $msg = XB_SMIME::sign($msg, $XB_Params::node_opts{"node_cert"}, $XB_Params::node_opts{"node_key"}); #=> fork my $pid = $pm->start($node) and XB_Common::fork_parent_close($sel, $rh, $wh, $node) and next; #=> begin child process ============================================ # exception handling inside the child process: need to catch all # dies by the child process and exit with different code my $sent = 0; eval{ #=> close the read handle $rh->close or die "close"; #=> create udp socket $udp_sock = XB_Common::udp_sock($node, $XB_Params::node_opts{xbone_ctl_port}, $ip); #=> send refresh message unless (send($udp_sock, $msg, 0)){ XB_Log::log "err", " [$procname] error send to $node: $!" and die "send"; }else{ $sent = 1; XB_Log::log "info", " [$procname] send refresh to $node"; XB_Log::log "debug7", $msg; } #=> close udp socket $udp_sock->close or XB_Log::log "err", " [$procname] socket close failed: $!" and die "close"; }; if($@){ print $wh "error: $@\n$XB_Params::error_reply"; }elsif($sent){ print $wh "sent\n"; }else{ print $wh "not sent, but also no error. weird.\n"; } $pm->finish($sent); #=> end child process ============================================== } #=> select msg sent, change state to selected #$app->{state} = "selected"; while (my @handles = $sel->can_read){ # TODO set timeout for my $h (@handles){ my $n = $node_list{$h}; XB_Log::log "debug1", " [$procname] receive from $n"; my @lines = $h->getlines; my $ack_msg = join "", @lines; my $ctl_cmd = $XB_CTL_parser::parser->xb_ctl($ack_msg); XB_Log::log "debug1", " [$procname] result: $ack_msg"; if($ack_msg =~ /^sent/){ delete $node_list{$h}; } $sel->remove($h); $h->close or die "close"; } } $pm->wait_all_children; #=> check if all ack'ed my @missing = values %node_list; XB_Log::log "debug1", " [$procname] error from the following nodes:\n". (join ", ", @missing); } }; XB_Log::log "info", "<- $modname$procname"; return 1 unless $@; unless($@ =~ /(pipe|close)/){ XB_Log::log "warning", " ! $procname caught unknown exception: $@"; } die "$modname$procname"; } 1;