### Local Variables: ***
### mode:perl ***
### comment-column:0 ***
### comment-start: "### " ***
### comment-end: "***" ***
### End: ***
#
# ****************DO NOT MOVE OR CHANGE LINES ABOVE THIS*********************
#
# The first set of lines runs perl from any shell. The second set of lines
# identifies the rest of the file as PERL for EMACS autoformatting.
# See end of copyright for more information.
#
#
# -------------------------------------------------------------------
# X-BONE
#
# http://www.isi.edu/xbone
# USC Information Sciences Institute (USC/ISI)
# Marina del Rey, California 90292, USA
# Copyright (c) 1998-2005
#
# -------------------------------------------------------------------
#
# Copyright (c) 1998-2005 by the University of Southern California.
# All rights reserved.
#
# Permission to use, copy, modify, and distribute this software and
# its documentation in source and binary forms for non-commercial
# purposes and without fee is hereby granted, provided that the above
# copyright notice appear in all copies and that both the copyright
# notice and this permission notice appear in supporting
# documentation, and that any documentation, advertising materials,
# and other materials related to such distribution and use acknowledge
# that the software was developed by the University of Southern
# California, Information Sciences Institute. The name of the
# University may not be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THE UNIVERSITY OF SOUTHERN CALIFORNIA MAKES NO REPRESENTATIONS ABOUT
# THE SUITABILITY OF THIS SOFTWARE FOR ANY PURPOSE. THIS SOFTWARE IS
# PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED WARRANTIES,
# INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
#
# Other copyrights might apply to parts of this software and are so
# noted when applicable.
#
# -------------------------------------------------------------------
#
# Effort partly sponsored by the Defense Advanced Research Projects
# Agency (DARPA) and Air Force Research Laboratory, Air Force Materiel
# Command, USAF, under agreement numbers F30602-98-1-0200 (X-Bone) and
# F30602-01-2-0529 (DynaBone). The views and conclusions contained
# herein are those of the authors and should not be interpreted as
# necessarily representing the official policies or endorsements,
# either expressed or implied, of the Defense Advanced Research
# Projects Agency (DARPA), the Air Force Research Laboratory, or the
# U.S. Government.
#
# This work was partly supported by the NSF STI-XTEND (ANI-0230789)
# and NETFS (ANI-0129689) projects. Any opinions, findings, and
# conclusions or recommendations expressed in this material are those
# of the authors and do not necessarily reflect the views of the
# National Science Foundation.
#
# -------------------------------------------------------------------
# $RCSfile: XB_API.pm,v $
#
# $Revision: 1.64 $
# $Author: pingali $
# $Date: 2005/04/21 04:00:05 $
# $State: Exp $
# ----------------------------------------------------------------------------
#
# Primary Author: Yu-Shun Wang
# DescriptioN: Functions for processing XBone API commands
package XB_API;
require Exporter;
@ISA = qw(Exporter);
@EXPORT = qw();
@EXPORT_OK = qw(api_start api_stop api_refresh api_status
api_init api_invite api_select api_release api_bind api_config
);
use strict;
use sigtrap;
use Data::Dumper;
#use IO::Handle;
use Socket;
use Parallel::ForkManager;
use Socket6;
use XB_AppDeploy;
use XB_VN_DNS;
my $modname = "XB_API::";
# for recursive calls
sub init_network($$$);
###############################################################################
# UTILITY FUNCTIONS
###############################################################################
# Description:
# Initialize node/network data structure recursively.
# Arguments:
# $clist (ref) hash of classes
# $class class name
# $level overlay level (root=0)
# Returns:
# $body (ref) hash of the overlay body (properites, interfaces,
# nodes, & links)
# Exceptions:
# -
# Notes:
# Refer to doc/xbone-data-structure.txt for the complete hash.
#
sub init_network($$$){
my ($c_list, $class, $level) = @_;
my $procname = "init_network";
XB_Log::log "info", "-> $procname $c_list, $class, $level";
my %body;
eval {
my ($host, $router, $meta, $iface) = (0, 0, 0, 0);
my ($all_host, $all_router, $all_meta, $all_iface) = (0, 0, 0, 0);
my (@host_os, @router_os, @host_tags, @router_tags, @meta_tags);
my ($class_ref, $nref);
#-> check if the class is defined
unless (defined $c_list->{$class}){
XB_Log::log "err", " [$procname] unknown class: $class" and
die "class";
}
$class_ref = $c_list->{$class};
#-> overall properties
$body{properties}{class_name} = $class;
$body{properties}{level} = $level;
#-> copy the network properties over (instead of point to the hash)
while(my ($k, $v) = each %{$class_ref->{property}}){
$body{properties}{$k}=$v;
}
#-> interfaces
$body{interfaces} = ();
if(scalar(@{$class_ref->{iface}})){
for my $e (@{$class_ref->{iface}}){
$body{interfaces}{$e->{ident}} = ();
if(defined $e->{renames}){
my $r = $e->{renames}{endpoint};
$body{interfaces}{$e->{ident}}{node} = $r->{node};
$body{interfaces}{$e->{ident}}{interface} = $r->{iface};
}
}
# host or router
if(scalar(@{$class_ref->{iface}}) > 1){
$body{properties}{type} = "router";
}else{
$body{properties}{type} = "host";
}
}
#-> nodes: simple vs. meta
if(defined $class_ref->{vnode}){
$body{properties}{class} = "meta";
for my $n (@{$class_ref->{vnode}}){
my $vname = $n->{ident};
my $vtype = $n->{type};
$body{nodes}{$vname} = init_network($c_list, $vtype, $level+1);
}
}else{
$body{properties}{class} = "simple";
}
#-> nodes: host vs. router vs. meta
for my $n (keys %{$body{nodes}}){
$nref = $body{nodes};
my $i = scalar(keys %{$nref->{$n}{interfaces}});
$iface += $i;
$nref->{$n}{properties}{router_links} = [];
$nref->{$n}{properties}{host_links} = [];
$nref->{$n}{properties}{local_dest} = [];
my @node_os;
if(defined $nref->{$n}{properties}{os} &&
$nref->{$n}{properties}{os} ne 'any'){
@node_os = split /\|/, $nref->{$n}{properties}{os};
}
if($body{nodes}{$n}{properties}{class} eq 'meta'){
# meta is not counted as either host or router for resource
# discovery purpose; also don't care about the platforms
$meta++;
$all_host += $body{nodes}{$n}{properties}{all_host};
$all_router += $body{nodes}{$n}{properties}{all_router};
$all_meta += $body{nodes}{$n}{properties}{all_meta};
$all_iface += $body{nodes}{$n}{properties}{all_iface};
}elsif($i >1){
$router++;
for my $o (@node_os){
unless(grep /^$o$/, @router_os){ push @router_os, $o; }
}
}else{
$host++;
for my $o (@node_os){
unless(grep /^$o$/, @host_os){ push @host_os, $o; }
}
}
}
if($body{properties}{class} eq "meta"){
$body{properties}{iface} = $iface;
$body{properties}{host} = $host;
$body{properties}{router} = $router;
$body{properties}{meta} = $meta;
$body{properties}{all_iface} = $all_iface + $iface;
$body{properties}{all_host} = $all_host + $host;
$body{properties}{all_router} = $all_router + $router;
$body{properties}{all_meta} = $all_meta + $meta;
$body{properties}{host_os} = join "|", @host_os;
unless ($body{properties}{host_os} =~ /\S+/){
$body{properties}{host_os} = "any";
}
$body{properties}{router_os} = join "|", @router_os;
unless ($body{properties}{router_os} =~ /\S+/){
$body{properties}{router_os} = "any";
}
$body{properties}{IPsec}=
($body{properties}{IPsec_encryption} ne 'none' or
$body{properties}{IPsec_authentication} ne 'none')? "yes":"no";
}else{
delete $body{nodes};
}
#-> links
for my $l (@{$class_ref->{link}}){
my $lt = $l->{endpoint}[0]{node};
my $rt = $l->{endpoint}[1]{node};
my $ln = $l->{ident};
if($body{nodes}{$lt}{properties}{type} eq "router" and
$body{nodes}{$rt}{properties}{type} eq "router"){
$body{links}{$ln}{type} = "router";
push @{$body{nodes}{$lt}{properties}{router_links}}, $ln;
push @{$body{nodes}{$rt}{properties}{router_links}}, $ln;
}else{
$body{links}{$ln}{type} = "host";
push @{$body{nodes}{$lt}{properties}{host_links}}, $ln;
push @{$body{nodes}{$rt}{properties}{host_links}}, $ln;
if($body{nodes}{$lt}{properties}{type} eq "host"){
$body{nodes}{$lt}{properties}{default_router} = $rt;
}else{
$body{nodes}{$rt}{properties}{default_router} = $lt;
}
}
$body{links}{$ln}{right_node} = $rt;
$body{links}{$ln}{right_if} = $l->{endpoint}[1]{iface};
$body{links}{$ln}{left_node} = $lt;
$body{links}{$ln}{left_if} = $l->{endpoint}[0]{iface};
}
#-> application deployment
if(defined $class_ref->{application}){
for my $a (@{$class_ref->{application}}){
#-- extrace & store app params
my $appname = (defined $a->{program})? $a->{program}: '';
my $script = (defined $a->{script})? $a->{script} : '';
if($appname eq ''){
XB_Log::log "err", " [$procname] missing application name in ".
"application deployment section" and die "noname";
}elsif($script eq ''){
XB_Log::log "err", " [$procname] missing script URL for ".
"$appname in application deployment\n section" and die "nourl";
}
$body{app_deploy}{$appname}{script} = $script;
$body{app_deploy}{$appname}{checksum} =
(defined $a->{checksum})? $a->{checksum}: '';
$body{app_deploy}{$appname}{suid} =
(defined $a->{suid})? $a->{suid} : '';
$body{app_deploy}{$appname}{ifaces} =
(defined $a->{ifaces})? $a->{ifaces} : '';
$body{app_deploy}{$appname}{nodes} =
(defined $a->{nodes})? $a->{nodes} : '';
}
}
};
XB_Log::log "info", "<- $procname";
return (\%body) unless $@;
if ($@ !~ /(class|noname|nourl)/){
XB_Log::log "warning", " ! $procname caught unknown exception $@";
}
die "$modname$procname";
}
# Description:
# Check & get the application object for given type and name
# Arguments:
# $type application type
# $name application name
# Returns:
# app_obj (ref) app/overlay object hash if exists, undef if not
# Exceptions:
# -
sub get_app($$){
my ($type, $name) = @_;
my $app_obj = undef;
my $procname = "get_app";
my $str = "-> $procname $type, $name: ";
if(defined $XB_Params::node_state{active_apps}{$type}{$name}){
$app_obj = $XB_Params::node_state{active_apps}{$type}{$name};
$str .= "$type $name exists.";
}else{
$str .= "$type $name dose not exist.";
}
XB_Log::log "debug1", $str;
return $app_obj;
}
# Description:
# Construct the list of hash of nodes used to create node level XML part.
# Arguments:
# $net (ref) hash of network part of the overlay object
# Returns:
# $nlist (ref) list of hash of nodes
# Exceptions:
# -
sub api_build_nodelist ($$){
my ($net, $cmd) = @_;
my $procname = "api_build_nodelist";
XB_Log::log "debug1", "-> $modname$procname $net, $cmd";
my @nlist;
eval{
my $nodes = $net->{nodes};
my $links = $net->{links};
for my $n (keys %{$nodes}){
my $me = $nodes->{$n};
my $pro = $nodes->{$n}{properties};
my (%nh, @nh, @tuns);
$nh{hostname} = $pro->{hostname};
$nh{ip} = $pro->{ctl_addr};
$nh{type} = $pro->{type};
$nh{class} = $pro->{class};
$nh{os} = $pro->{os};
$nh{vname} = $n;
# TODO api_config should set this upon receiving ack-config!
$nh{status} = (exists $pro->{status})? $pro->{status} : 'up';
if($nh{status} eq 'up'){
# only construct the tunnel part if the node is up
my @al;
push @al, @{$pro->{router_links}};
push @al, @{$pro->{host_links}};
for my $l (@al){
my $ln = $links->{$l};
my (%lh, @lh, $local_if, $remote, $remote_if, @h2, %h2, @h3, %h3);
($remote, $remote_if, $local_if) = ($ln->{right_node} eq $n)?
($ln->{left_node}, $ln->{left_if}, $ln->{right_if}) :
($ln->{right_node}, $ln->{right_if}, $ln->{left_if});
$lh{local_ip_address} = $me->{interfaces}{$local_if}{netaddr};
$lh{remote_ip_address} =
$nodes->{$remote}{interfaces}{$remote_if}{netaddr};
$lh{status} = (exists $ln->{status_net})? $ln->{status_net}: "up";
@lh = %lh;
push @tuns, (\@lh);
if($lh{status} eq 'down'){
$h2{local_ip_address} = $me->{interfaces}{$local_if}{linkaddr};
$h2{remote_ip_address} =
$nodes->{$remote}{interfaces}{$remote_if}{linkaddr};
$h2{status} = (exists $ln->{status_link})? $ln->{status_link}: "up";
@h2 = %h2;
push @tuns, (\@h2);
}
if(defined $h2{status} and $h2{status} eq 'down'){
$h3{local_ip_address} = $me->{properties}{app_addr};
$h3{remote_ip_address} = $nodes->{$remote}{properties}{app_addr};
$h3{status} = (exists $ln->{status_phy})? $ln->{status_phy}: "up";
@h3 = %h3;
push @tuns, (\@h3);
}
}
$nh{tunnels} = \@tuns;
}else{
# node is down or error, attach the error message
$nh{error_msg} = $pro->{error_msg};
}
@nh = %nh;
push @nlist, (\@nh);
}
#XB_Log::log "debug1", " [$procname] Nodelist: ". Dumper(\@nlist);
};
XB_Log::log "debug1", "<- $modname$procname";
return \@nlist unless $@;
XB_Log::log "warning", " ! $procname caught unknown exception $@";
die "$modname$procname";
}
# Description:
# Construct the hashes & lists and build the XBone API XML message
# Arguments:
# $app_obj (ref) application object
# $msg_type message type
# Returns:
# $msg_ref (ref) message
# Exceptions:
# -
sub api_build_msg ($$){
my ($msg_type, $app_obj) = @_;
my $procname = "api_build_msg";
XB_Log::log "info", "-> $modname$procname $msg_type, $app_obj";
my $msg_ref;
eval{
my (%ahref, $nlref);
#-> common fields
$ahref{protocol} = $XB_Params::api_ver;
$ahref{release} = $XB_Params::rel_ver;
$ahref{auth_type} = $app_obj->{credential}{auth_type};
$ahref{user_email} = $app_obj->{credential}{user_email};
$ahref{user_name} = $app_obj->{credential}{user_name};
#-> message-specific fields
$_ = $msg_type;
SWITCH: {
/\b(create|status)_reply\b/ && do {
my $cmd = $1;
#-> overlay level
my $net = $app_obj->{application}{network};
$ahref{overlay_name} = $app_obj->{application}{name};
$ahref{dns} = $net->{properties}{dns};
$ahref{routing} = $net->{properties}{routing};
$ahref{IPsec_encr} = $net->{properties}{IPsec_encryption};
$ahref{IPsec_auth} = $net->{properties}{IPsec_authentication};
$ahref{qos} =
(defined $net->{properties}{qos} and $net->{properties}{qos} ne "")?
$net->{properties}{qos} : "no";
#-> node level
$nlref = api_build_nodelist($net, $cmd);
if($cmd eq 'create'){
$msg_ref = XB_XML_GUI::XB_build_create_overlay_reply_msg
(\%ahref, $nlref);
}else{
$msg_ref = XB_XML_GUI::XB_build_overlay_status_reply_msg
(\%ahref, $nlref);
}
last SWITCH;
};
/\b(discover_reply)\b/ && do {
my (@nlist, $n, $h, $v, $k);
while(($n, $h) = each %{$app_obj->{nodes}}){
my (%nh, @nh);
while(($k, $v) = each %{$h->{command}}){
$nh{$k} = $v;
}
delete $nh{command};
@nh = %nh;
push @nlist, (\@nh);
}
$msg_ref = XB_XML_GUI::XB_build_discover_daemons_reply_msg
(\%ahref, \@nlist);
last SWITCH;
};
XB_Log::log "err", " [$procname] unknow message type $_";
die "message";
}
XB_Log::log "debug7", " [$procname] $msg_type message:". $$msg_ref;
};
XB_Log::log "info", "<- $modname$procname";
return $msg_ref unless $@;
unless ($@ =~ /(message)/){
XB_Log::log "warning", " ! $procname caught unknown exception $@";
}
die "$modname$procname";
}
###############################################################################
# EXPORTED API
###############################################################################
# Description:
# API Start: Phase 1 - Initialization
# - convert API command into internal application object hash
# Arguments:
# $cmd (ref) API start (or create overlay) command
# Returns:
# \%app_obj (ref) app/overlay object hash
# Exceptions:
# "api_init" on failure, nothing to cleanup by caller
#
sub api_init($){
my $cmd = shift;
my $procname = $modname. "api_init";
XB_Log::log "info", "-> $procname $cmd";
my (%app_obj, $ovl_name, $level, $retry, $class, $central);
my ($host, $router, $meta, $slack, %available);
eval{
# Convert the API start command into internal app/ovl hash
#=> general info about the command
$app_obj{protocol} = $cmd->{version};
$app_obj{release} = $cmd->{release};
$app_obj{credential} = $cmd->{credential}{property};
$app_obj{credential}{section} = $cmd->{credential}{section};
$app_obj{message} = $cmd->{message};
#=> switch according to application type
if($cmd->{command}{command} eq "create_overlay"){
#-> overlays
my $xol = $cmd->{command}{create_overlay}{xol_program};
#-- common fields
$app_obj{application}{type} = "overlay";
$app_obj{application}{name} = $xol->{vnode}{ident};
$app_obj{application}{version} = $xol->{version};
$app_obj{application}{level} = ($cmd->{command}{level})?
$cmd->{command}{level}:0;
unless($xol->{version} eq $XB_Params::xol_ver){
XB_Log::log "err", " [$procname] XOL versions mismatch!\n ".
"The command has $xol->{version} and this node supports ".
"$XB_Params::xol_ver";
die "xol";
}
#-- root node name
$app_obj{application}{root_node} = $xol->{vnode}{type};
#-- node class list: convert the list from array into hash
my $node_class = $xol->{node_def};
for my $c (@{$node_class}){
$app_obj{application}{node_class}{$c->{ident}}=$c;
delete $c->{ident};
}
#-- expand main (root) overlay
$app_obj{application}{network} = init_network
$app_obj{application}{node_class}, $app_obj{application}{root_node},
$app_obj{application}{level};
#-- fill in some extra properties
# TODO The following should be included in API: discover, retry,
# TODO deployment, selection, mcast address, LDAP server.
my $props = $app_obj{application}{network}{properties};
my $cprops = $cmd->{command}{create_overlay}{property};
#-- choose the discovery method depending on what is defined.
# XXX Hack. Perl throws warnings if we look for hash
# entries that dont exist. So generate a list of existing
# entries and search through them.
my @optlist = keys %{$props};
if(XB_Common::check_list("custom_hostlist", \@optlist) == 1){
if ($props->{custom_hostlist} ne ''){
$props->{discover} = "unicast";
my @hlist = split /[ \t\n\r\f,]+/, $cprops->{custom_hostlist};
$props->{hostlist} = \@hlist;
XB_Log::log "debug2", " [$procname] User supplied a list of hosts".
" to probe:\n". Dumper($props->{hostlist});
}
}
elsif(XB_Common::check_list("ldap", \@optlist) == 1){
if($props->{ldap} eq "yes"){
if ($XB_Params::node_opts{ldap}->{enable} !~ /yes/i){
XB_Log::log "err",
" [$procname] LDAP support disabled because the server ".
"is unreachable or LDAP support is not configured\n";
die("ldap");
};
$props->{discover} = "unicast";
XB_Log::log "debug2", " [$procname] User asked to use LDAP to get the".
" hosts to probe: \n";
my ($scope, $line, %attrvals) =
($cprops->{scope}, $cprops->{attrvals});
my @av_array = split(/[\s,]+/, $line);
foreach my $av (@av_array){
my @arr = split(/=/, $av);
$attrvals{$arr[0]} = $arr[1];
}
my $result = XB_LDAP::LDAP_search("registry", $scope, \%attrvals);
XB_Log::log "debug2", "LDAP search result =". Dumper ($result);
my @hlist = ();
for my $h (keys %{$result}) {
XB_Log::log "debug2", "registered hostname= $h";
push @hlist, $h;
}
$props->{hostlist} = \@hlist;
}; # ldap enabled
}else{
$props->{discover} = "multicast";
$props->{search_radius} = $cprops->{search_radius};
}
$props->{timeout} =
$cmd->{command}{create_overlay}{property}{timeout};
$props->{selection} = "auto";
$props->{deployment} = "distributed"; # vs. centralized
$props->{retry} = ($cmd->{command}{retry})?$cmd->{command}{retry}:1;
$props->{routing} = ($props->{dynamic_routing} =~ /(no|0)/i)?
"static" : "dynamic";
delete $props->{dynamic_routing};
# copy the dummynet variables.
my $cmd_props = $cmd->{command}{create_overlay}{property};
$props->{qos} = $cmd_props->{dummynet};
if ($props->{qos}){
if ($props->{qos} =~ /yes/){
foreach my $key ("dummynet_queue", "dummynet_queue_unit",
"dummynet_delay", "dummynet_loss_rate",
"dummynet_bandwidth",
"dummynet_bandwidth_unit"){
$props->{$key} = $cmd_props->{$key};
}
}
} else {
$props->{qos} = "no";
}
#-- application deployment
my $net = $app_obj{application}{network};
if(defined $net->{app_deploy}){
for my $appname (keys %{$net->{app_deploy}}){
my $app = $net->{app_deploy}{$appname};
my $acl_suid = $cmd->{user_acl}{suid};
# OM will act as gate keeper regarding application deployment
# suid. So if a user with a higher app deploy privilege at RDs'
# ACLs but a lower privilege in OM's ACL, OM's ACL will take
# precedence and reject any requests that do not saticfy OM's
# ACL!
#
# If the command did not specify an suid, treat it as an error
# and reject the request;
# otherwise, the order is 'root' > 'vhost' > 'nobody' in the
# following comparison.
$props->{app_deploy}{$appname}{suid} = $app->{suid};
if($app->{suid} eq ''){
XB_Log::log "err", " [$procname] Missing SUID field from the ".
"application deployment section\n of the XBone overlay ".
"create request!" and die "suid";
}else{
# check: root > vhost > nobody; $cmd_suid <= $acl_suid
my $spoiled = 0;
my $cmd_suid = $app->{suid};
if($acl_suid eq 'vhost' and $cmd_suid eq 'root'){
$spoiled = 1;
}elsif($acl_suid eq 'nobody' and $cmd_suid ne 'nobody'){
$spoiled = 1;
}else{
$app->{suid} = $cmd_suid;
}
if($spoiled){
XB_Log::log "err"," [$procname] Requsted privilege ".
"($cmd_suid) for deploying \"$appname\" is\n higher ".
"than specified in user ACL ($acl_suid), contact the\n ".
"local XBone administrator to upgrade your privilege in ACL."
and die "suid";
}
}
}
}
}else{
# add processing sections for other applications here, currently none
XB_Log::log "err", " [$procname] unknown applications: ".
$cmd->{command}{command};
die "application";
}
};
XB_Log::log "info", "<- $procname";
return \%app_obj unless $@;
unless ($@ =~ /(init_network|application|xol|suid)/){
XB_Log::log "warning", " ! $procname caught unknown exception: $@";
}
die "$procname";
}
# Description:
# API Start: Phase 2 - Invite
# - accept request (in the form of application obj) from API or CTL
# - construct the invitation message
# - multicast the invitation and collect responses
# - request other application-specific resources
# Arguments:
# $app_obj (ref) application object hash
# $msock multicast send socket (IO::Socket::Multicast)
# Returns:
# \%avail (ref) hash of (node => ack-invite-cmd) that reply
# Exceptions:
# "XB_API::api_invite" on error, caller should release reserved
# resources: nodes, IP address blocks, etc.
#
sub api_invite($$){
my ($app_obj, $msock) = @_;
my $procname = "api_invite";
XB_Log::log "info", "-> $modname$procname $app_obj, $msock";
my (%available, $level);
my $sent_req = 0; # flag for sending release
eval{
my $seq = time;
my $cre = $app_obj->{credential};
my $app = $app_obj->{application};
my $pro = $app_obj->{application}{network}{properties};
my ($host, $router, $meta) = ($pro->{host}, $pro->{router}, $pro->{meta});
my ($enough, $host_ack, $router_ack, $both_ack, $meta_ack, $slack, $type);
my (%host_hash, %router_hash, %meta_hash, %both_hash);
$level = $app->{level};
#=> Check for the validity of the request
if (($pro->{qos} =~ /yes/i) and ($pro->{address_type} =~ /ipv6/i)){
XB_Log::log "err",
" [$procname] QoS is not supported in IPv6 overlays";
die "qos";
}
#=> construct invite message
my $cred = $app_obj->{credential}{section};
my $app_deploy_sec = '';
if(defined $app->{network}{app_deploy}){
for my $appname (keys %{$app->{network}{app_deploy}}){
my $dapp = $app->{network}{app_deploy}{$appname};
my $script = $dapp->{script};
my $chksum = $dapp->{checksum};
my $suid = $dapp->{suid};
$dapp->{action} = 'start';
$app_deploy_sec .=
" (app-deploy\n".
" (name $appname)\n".
" (url $script)\n";
$app_deploy_sec .= ($suid ne '')?
" (suid $suid)\n": "";
$app_deploy_sec .= ($chksum ne '')?
" (chksum $chksum))\n":
" )\n";
}
}
my $invite =
"(xbonecontrol $XB_Params::ctl_ver $XB_Params::rel_ver $seq\n".
$$cred.
" (invite\n".
" (application $app->{type})\n".
" (name $app->{name})\n".
" (version $app->{version})\n".
" (level $level)\n".
" (routing $pro->{routing})\n".
" (IPsec $pro->{IPsec})\n".
" (qos $pro->{qos})\n".
" (addr_type $pro->{address_type})\n".
" (host $host $pro->{host_os})\n".
" (router $router $pro->{router_os})\n".
" (meta $meta any)\n".
$app_deploy_sec.
" )\n".
")\n$XB_Params::msg_delimiter\n";
XB_Log::log "debug1", " * Invite command: $invite";
#=> sign the message with S/MIME
my $smime_invite = XB_SMIME::sign($invite,
$XB_Params::node_opts{"node_cert"}, $XB_Params::node_opts{"node_key"});
#=> switch on discovery mechanisms
if($pro->{discover} eq "multicast" or $pro->{discover} eq "unicast"){
# multicast & unicast
my $discover = $pro->{discover};
my $retry = $pro->{retry};
my (%msg_hash, %msg_log);
my $counter=1;
while($retry){
XB_Log::log "info", " [$procname] $discover invite no. $counter";
$counter++;
if($discover eq "multicast"){
#-- multicast the message
unless (ref($msock) =~ /IO::Socket::Multicast/){
XB_Log::log "warning",
" [$procname] wrong socket type! ".ref($msock);
}
#-- set the radius
if (defined $pro->{search_radius}){
$msock->mcast_ttl($pro->{search_radius});
}
#-- supress mcast loopback before sending
$msock->mcast_loopback(0);
$sent_req = 1;
$msock->mcast_send($smime_invite) or
XB_Log::log "err", " [$procname] multicast send failed: $!"
and die "mcast";
$msock->mcast_loopback(1);
}else{
#-- unicast the message
for my $p (@{$pro->{hostlist}}){
#-> create udp socket
my $udp_sock = XB_Common::udp_sock($p,
$XB_Params::node_opts{xbone_ctl_port}, $pro->{address_type});
#-> send invite message
unless (send($udp_sock, $smime_invite, 0)){
XB_Log::log "err", " [$procname] Send UDP unicast invite to".
" $p failed: $!" and die "send";
}else{
XB_Log::log "info", " [$procname] UDP unicast invite to $p";
}
#=> close udp socket
$udp_sock->close or
XB_Log::log "err", " [$procname] socket close failed: $!"
and die "close";
}
}
XB_Log::log "info", " [$procname] sent $discover invite ".
"$app->{name}, level $level";
#-- start timer
my $rtt;
if ($pro->{timeout} eq '') {
$rtt = $XB_Params::xbone_rtt;
} else {
$rtt = $pro->{timeout};
}
XB_Log::log "debug", "##### Timeout = $rtt #####";
my $timeout = time + $rtt;
my $now;
#-- refer to XB_RPC.pm -> XB_request_overlay function
my $new_sel;
unless ($new_sel = IO::Select->new($msock)){
XB_Log::log "err", " [$procname] select failed: $!"
and die "sel";
}
#-- collect ACK-INVITEs
while(1){
$now = time;
my $timeleft = $timeout - $now;
while(my @r = $new_sel->can_read ($timeleft)){
foreach my $fh (@r){
if($fh == $msock){
XB_Log::log "info", " [$procname] incoming UDP message";
my ($umsg, $src_sock, $peerhost, $src_cmd, $src_port);
unless ($src_sock = $fh->recv($umsg, 65536, 0)){
XB_Log::log "err", " [$procname] recv: $!" and die "recv";
}
eval{ ($umsg, $peerhost) = XB_SMIME::verify($umsg); };
if($@){
XB_Log::log "warning",
" [$procname] SMIME message verification failed";
$umsg = "";
next;
}
my $ip;
if(ref ($fh) eq "IO::Socket::Multicast6"){
$ip = 'ipv6';
}else{
$ip = 'ipv4';
}
my ($peeraddr, $peerport) =
XB_Common::chk_sockaddr($fh, $peerhost, $ip);
$umsg =~ s/$XB_Params::msg_delimiter//g;
my $ctl_cmd = $XB_CTL_parser::parser->xb_ctl($umsg);
unless (defined $ctl_cmd){
XB_Log::log "err",
" [$procname] parser error: ==\n$umsg\n==" and next;
}
XB_Log::log "debug6", " [$procname] CTL command:".
Dumper($ctl_cmd);
$src_cmd = $ctl_cmd->{command}{hostname};
unless ($ctl_cmd->{command}{command} eq "ack-invite"){
XB_Log::log "err", " [$procname] wrong command: ".
"$ctl_cmd->{command}{command}" and die "cmd";
}
unless($src_cmd eq $peerhost){
XB_Log::log "err", " [$procname] hostname mismatch: ".
"sock: $peerhost, cmd: $src_cmd" and die "name";
}
if(exists $msg_log{$src_cmd}){
XB_Log::log "warning", " [$procname] duplicate message".
" from $src_cmd" and next;
}else{
XB_Log::log "debug1", " [$procname] new ack: $src_cmd";
$msg_log{$src_cmd}=time;
# check if any app needs install
if(defined $ctl_cmd->{command}{app_deploy}){
for my $apname (keys %{$ctl_cmd->{command}{app_deploy}}){
my $thisapp = $ctl_cmd->{command}{app_deploy}{$apname};
if($thisapp->{app_verify} eq 'install'){
$app->{network}{app_deploy}{$apname}{action} =
'install';
}
}
}
}
$type = $ctl_cmd->{command}{node_type};
if($type =~ /host/i) { $host_hash{$src_cmd} = $ctl_cmd; }
elsif($type =~ /router/i){ $router_hash{$src_cmd}= $ctl_cmd; }
elsif($type =~ /meta/i) { $meta_hash{$src_cmd} = $ctl_cmd; }
else { $both_hash{$src_cmd} = $ctl_cmd; }
}
}
}
$now = time;
if($now > $timeout){ last; }
}
#-- timeout, count acks
$host_ack = scalar keys(%host_hash);
$router_ack= scalar keys(%router_hash);
$meta_ack = scalar keys(%meta_hash);
$both_ack = scalar keys(%both_hash);
$slack = 0;
$enough = 1;
if($meta_ack < $meta){
$enough = 0;
}
if($router_ack > $router){
$slack = $both_ack;
}elsif($router_ack + $both_ack < $router){
$enough = 0;
}else{
$slack = $router_ack + $both_ack - $router;
}
if($host_ack < $host and $host_ack + $slack < $host){
$enough=0;
}
#-- enough, return
if($enough) { last; }
#-- else retry
$retry--;
# need to re-sequence the message & sign it for next retry
my $new_time = time;
}
}else{
# TODO support for other discovery/reservation schemes such as LDAP
# TODO LDAP_discover($smine_invite, \%host_hash, \%router_hash,
# TODO \%meta_hash, \%both_hash);
# TODO then calculate $enough as shown above
XB_Log::log "err", " [$procname] $pro->{discover} mechanism is not".
" supported" and die "disc";
}
#=> have enough?
if(not $enough){
XB_Log::log "err", " [$procname] Did not find enough nodes ".
"(Host/Router/Both/Meta):\n Needed $host/$router/0/$meta.\n ".
"Found $host_ack/$router_ack/$both_ack/$meta_ack.\n Either ".
"increase the search radius from the GUI or the timeout value \n".
"and try again.";
if(defined $pro->{revisit}){
# TODO revisitation: check if vnodes/pnode over the limit
XB_Log::log "err", " [$procname] revisitation is not supported";
die "revi";
}else{
XB_Log::log "info", " [$procname] not enough node, release ".
"$app->{name}" and die "more";
}
}else{
$available{host} = (scalar keys(%host_hash))? \%host_hash: undef;
$available{router}= (scalar keys(%router_hash))? \%router_hash:undef;
$available{meta} = (scalar keys(%meta_hash))? \%meta_hash: undef;
$available{both} = (scalar keys(%both_hash))? \%both_hash: undef;
}
# other application-specific resources
if($app->{type} eq "overlay"){
#-> request IP address blocks for overlay
my $size = $pro->{iface} * 2; # because we used /30 per link! (ipv4)
my $addrtype = "ipv4";
if ($pro->{address_type} =~ /^(ipv6|v6overv4)$/){ $addrtype = "ipv6"; }
my ($net_blk, $link_blk, $srv);
unless($XB_Params::new_alloc){
($net_blk, $link_blk, $srv)=XB_VN_IPalloc::request($size, $addrtype);
}else{
my %appinfo;
$appinfo{type} = $app->{type}; $appinfo{level} = $level;
$appinfo{name} = $app->{name}; $appinfo{credential} = $$cred;
($net_blk, $link_blk, $srv) =
XB_VN_IPalloc::new_request($size, $addrtype, \%appinfo);
}
my @addr_blk_all;
push @addr_blk_all, $net_blk;
$pro->{addr_blk_all} = \@addr_blk_all;
$pro->{addr_blk_net} = $net_blk;
$pro->{addr_blk_link} = $link_blk;
$pro->{addr_server} = $srv;
}
#=> succeed; change the application state
$app->{state} = "invited";
};
XB_Log::log "info", "<- $modname$procname";
return (\%available) unless $@;
unless ($@ =~
/\b(XB_SMIME|qos|mcast|sel|recv|cmd|name|disc|revi|more|request)\b/){
XB_Log::log "warning", " ! $procname caught unknown exception: $@";
}
die "$modname$procname";
}
# Description:
# API Start: Phase II - Select
# - select a set of nodes from the list of all available ones:
# [automatic] automatic, could be random, optimized or clusterized
# [specified] fully specified in the application specification
# [interactive] send back the list and wait for a fully specified spec
# - send SELECT to selected nodes to confirm the selection (ACK-SELECT)
# Arguments:
# $app_obj (ref) overlay data structure object
# $avail (ref) hash of available nodes
# Returns:
# N/A
# Exceptions:
# "XB_API::api_select" on failure, caller should delete overlay &
# release reserved resources
#
sub api_select($$;$){
my ($app_obj, $avail, $api_sock) = @_;
my $procname = "api_select";
XB_Log::log "info", "-> $modname$procname $app_obj, $avail";
my (@unused_host, @unused_router, @unused_meta);
eval{
my $app = $app_obj->{application};
my $pro = $app_obj->{application}{network}{properties};
my $type = $app_obj->{application}{type};
my $name = $app_obj->{application}{name};
my $level= $app_obj->{application}{level};
my ($host, $router, $meta) = ($pro->{host}, $pro->{router}, $pro->{meta});
my (@h, @r, @m, @b, %addr_hash);
#=> 1. select nodes ===================================
#-- get the list of available nodes
@h = keys %{$avail->{host}};
@r = keys %{$avail->{router}};
@m = keys %{$avail->{meta}};
@b = keys %{$avail->{both}};
#=> switch on the style of selection
#-> automatic
if($pro->{selection} =~ /\bauto\b/i){
my $nh = @h;
my $nr = @r;
my $nm = @m;
my $nb = @b;
XB_Log::log "debug1", " - need H/R/M: $host/$router/$meta";
XB_Log::log "debug1", " - have H/R/B/M: $nh/$nr/$nb/$nm\n".
" - H: ". (join ', ', @h). "\n"." - R: ". (join ', ', @r)."\n".
" - B: ". (join ', ', @b). "\n"." - M: ". (join ', ', @m)."\n";
if(defined $pro->{revisit}) {
# TODO auto selection for revisitation case
XB_Log::log "err", " [$procname] node revisit is not supported";
die "revi";
}else{
# auto selection for non-revisitation case
# trim the non-selected nodes from $avail
if($nm > $meta){
@unused_meta = splice @m, $meta;
for my $u (@unused_meta){ delete $avail->{meta}{$u}; }
}
if($nr > $router){
@unused_router = splice @r, $router;
for my $u (@unused_router){ delete $avail->{router}{$u}; }
}else{
my @temp = splice @b, 0, ($router - $nr);
push @r, @temp;
for my $u (@temp){ $avail->{router}{$u} = $avail->{both}{$u}; }
}
if($nh > $host){
@unused_host = splice @h, $host;
for my $u (@unused_host){ delete $avail->{host}{$u}; }
}else{
my @temp = splice @b, 0, ($host - $nh);
push @h, @temp;
for my $u (@temp){ $avail->{host}{$u} = $avail->{both}{$u}; }
}
delete $avail->{both};
for my $t (keys %{$avail}){ # trim the unnecessary fields
for my $n (keys %{$avail->{$t}}){
delete $avail->{$t}{$n}{sequence};
delete $avail->{$t}{$n}{version};
delete $avail->{$t}{$n}{release};
delete $avail->{$t}{$n}{credential};
delete $avail->{$t}{$n}{command}{node_type};
delete $avail->{$t}{$n}{command}{app_name};
delete $avail->{$t}{$n}{command}{app_vers};
delete $avail->{$t}{$n}{command}{level};
delete $avail->{$t}{$n}{command}{app_type};
delete $avail->{$t}{$n}{command}{command};
$addr_hash{$n} = $avail->{$t}{$n}{command}{ctl_addr};
}
}
$app->{resources} = $avail;
}
XB_Log::log "debug1", " [$procname] hash of available nodes\n".
Dumper($avail)."\n";
}
#-> specified
elsif($pro->{selection} =~ /\bspecified\b/i){
# components are specified in the application specification
# TODO extract the list of nodes from the app_obj hash
# TODO (optional) check if the specified ones have replied to invite
# TODO construct @h, @r, @m from app_obj
XB_Log::log "err", " [$procname] \"specified\" not supported";
die "style";
}
#-> interactive
elsif($pro->{selection} =~ /\binteractive\b/i){
# TODO * require API socket
# TODO o send the list of available nodes to the user through API socket
# TODO o user maps the nodes to resources and sends back a new API cmd
# TODO o receive a fully-specified application specification
# TODO o construct @h, @r, @m from app_obj
XB_Log::log "err", " [$procname] \"interactive\" not supported";
die "style";
}
#-> unknown, die
else{
XB_Log::log "err", " [$procname] unkown selection style: ".
"\"$pro->{selection}\"" and die "style";
}
#-> done, you have @h, @r, @m
XB_Log::log "debug1",
" + Picked the following nodes:\n".
" + Hosts: ". (join '|', @h). "\n".
" + Routers: ". (join '|', @r). "\n".
" + Metas: ". (join '|', @m);
XB_Log::log "debug1", " - Leftovers:\n".
" - hosts : ". (join '|', @unused_host). "\n".
" - routers: ". (join '|', @unused_router). "\n".
" - metas: ". (join '|', @unused_meta). "\n".
" - boths: ". (join '|', @b);
#=> 2. send SELECT & collect ACK-SELECT ===============
#-- check if using persistent socket
my $connection = ($XB_Params::PERSISTENT_SOCK)? "yes":"no";
#-- compose select message
my $credential = ${$app_obj->{credential}{section}};
my $select =
"(xbonecontrol $XB_Params::ctl_ver $XB_Params::rel_ver\n".
$credential.
" (select (application $type)\n".
" (name $name)\n".
" (level $level)\n".
" (persistent_connection $connection)\n".
" )\n".
")\n$XB_Params::msg_delimiter\n";
my @f;
push @f, @h;
push @f, @r;
push @f, @m;
#-- fork & send to all nodes in parallel with Parallel::ForkManager(3)
my $max_procs = ($XB_Params::NO_FORK)? 0 : @f;
my $node_count = @f;
my $pm = new Parallel::ForkManager($max_procs);
my %node_list;
my $sel = IO::Select->new;
$pm->run_on_start(
sub {
my ($pid, $ident) = @_;
XB_Log::log "info", " [$procname] select $ident (fork pid: $pid)";
}
);
$pm->run_on_finish(
sub {
my ($pid, $exit_code, $ident) = @_;
if($exit_code){
XB_Log::log "info", " [$procname] $ident ack-select ok, ".
"pid $pid exit $exit_code\n";
}else{
XB_Log::log "warning", " [$procname] $ident ack-select failed, ".
"pid $pid exit $exit_code\n";
}
$node_count--;
}
);
$pm->run_on_wait(
sub {
XB_Log::log "debug1", " [$procname] waiting $node_count processes".
" ...";
}
);
for my $node (@f){
#=> create or retrieve tcp/ssl socket
my $ssl_sock;
my $ipproto = $pro->{address_type};
my $ipaddr = (defined $addr_hash{$node})? $addr_hash{$node}: "";
if($XB_Params::PERSISTENT_SOCK){
$ssl_sock = XB_Common::tcp_ssl_sock ($ipproto, $node,
$XB_Params::node_opts{xbone_ctl_port}, $ipaddr
);
}
#=> create a pipe for the child to write back to parent
my $rh = IO::Handle->new;
my $wh = IO::Handle->new;
pipe $rh, $wh or die "pipe";
$node_list{$rh} = $node;
#=> fork
my $pid = $pm->start($node)
and XB_Common::fork_parent_close($sel, $rh, $wh, $node)
and next;
#=> begin child process ============================================
# exception handling inside the child process: need to catch all
# dies by the child process and exit with different code
my $received = 0;
eval{
#=> close the read handle
$rh->close or die "close";
#=> create or retrieve tcp/ssl socket if not yet created
unless(defined $ssl_sock){
$ssl_sock = XB_Common::tcp_ssl_sock ($ipproto, $node,
$XB_Params::node_opts{xbone_ctl_port}, $ipaddr);
}
#=> send select
XB_Log::log "info", " [$procname] send select to $node";
print $ssl_sock "$select";
#=> start timer
my $timeout = time + $XB_Params::xbone_rtt;
my $now;
#=> wait & receive ack-select
my $new_sel;
unless ($new_sel = IO::Select->new($ssl_sock)){
XB_Log::log "err", " [$procname:$node] select failed: $!"
and die "select";
}
while(my @r = $new_sel->can_read()){ # TODO set timeout
for my $fh (@r){
if($fh != $ssl_sock){
XB_Log::log "warning", " [$procname:$node] wrong socket ".
"for $node" and next;
}else{
#=> read command & write it back to the parent process
my $ctl_msg = XB_Common::fh_read_until ($fh,
$XB_Params::msg_delimiter);
print $wh $ctl_msg;
$received = 1;
}
}
if($received == 1){ last; }
}
if(not $XB_Params::PERSISTENT_SOCK){
$ssl_sock->close or
XB_Log::log "err", " [$procname:$node] socket close failed: $!"
and die "close";
}
unless($received){
XB_Log::log "err", " [$procname:$node] did not received the ".
"whole message" and die "rcv";
}
};
if($@ && $@ !~ /(tcp_ssl_sock|select|fh_read_until|close|rcv)/){
XB_Log::log "err", " [$procname:$node] caught unknown exception: $@";
}
unless($XB_Params::NO_FORK){
XB_Common::child_close($node);
}
unless($received){
if($XB_Params::error_reply eq ""){
$XB_Params::error_reply = "$node caught exception $@ w/out further".
" details";
}
my $emsg = XB_Common::ctl_error_msg('select', $type, $name,
$level, \$XB_Params::error_reply);
print $wh $$emsg;
}
$pm->finish($received);
#=> end child process ==============================================
}
#=> select msg sent, change state to selected
$app->{state} = "selected";
while (my @handles = $sel->can_read){ # TODO set timeout
for my $h (@handles){
my $n = $node_list{$h};
XB_Log::log "debug1", " [$procname] receive from $n";
my @lines = $h->getlines;
my $ack_msg = join "", @lines;
$ack_msg =~ s/$XB_Params::msg_delimiter//g;
my $ctl_cmd = $XB_CTL_parser::parser->xb_ctl($ack_msg);
unless (defined $ctl_cmd){
XB_Log::log "err", " [$procname] error parsing message ".
"from $n =====\n$ack_msg\n==========";
}else{
unless ($ctl_cmd->{command}{command} eq "ack-select"){
XB_Log::log "err", " [$procname] wrong command: ".
"$ctl_cmd->{command}{command}";
}else{
XB_Log::log "debug6", " [$procname] CTL command: ".
Dumper($ctl_cmd);
delete $node_list{$h};
}
}
$sel->remove($h);
$h->close or die "close";
}
}
$pm->wait_all_children;
#=> check if all ack'ed
my @missing = values %node_list;
if(@missing > 0){
XB_Log::log "err", " [$procname] select failed on ".
(join ", ", @missing) and die "ack";
}
};
XB_Log::log "info", "<- $modname$procname";
return 1 unless $@;
unless ($@ =~ /(style|tcp_ssl_sock|select|parser|ack)/){
XB_Log::log "warning", " ! xb_select caught unknown exception: $@";
}
die "$modname$procname";
}
# Description:
# API Start: Phase 4 - Release
# - send multicast release to release reserved nodes
# Arguments:
# $app_obj (ref) application object hash
# $level level to release
# $msock multicast send socket (IO::Socket::Multicast)
# Returns:
# 1 on success
# Exceptions:
# XB_API::api_release on failure, nothing to cleanup by caller
sub api_release($$$){
my ($app_obj, $level, $msock) = @_;
my $procname = "api_release";
XB_Log::log "info", "-> $modname$procname $app_obj, $level, $msock";
eval{
#=> generate the release message
my $pro = $app_obj->{application}{network}{properties};
my $type = $app_obj->{application}{type};
my $name = $app_obj->{application}{name};
my $cred = ${$app_obj->{credential}{section}};
#my $release = XB_Common::ctl_msg('release', $type, $name, $level);
my $release =
"(xbone-ctl $XB_Params::ctl_ver $XB_Params::rel_ver\n".
$cred.
" (release\n".
" (application $type)\n".
" (name $name)\n".
" (level $level)\n".
" )\n".
")\n$XB_Params::msg_delimiter\n";
#=> SMIME the message
my $smime_release = XB_SMIME::sign($release,
$XB_Params::node_opts{"node_cert"}, $XB_Params::node_opts{"node_key"});
#=> switch on discovery mechanisms
if($pro->{discover} eq "multicast" or $pro->{discover} eq "unicast"){
my $discover = $pro->{discover};
unless (ref($msock) =~ /IO::Socket::Multicast/){
XB_Log::log "warning",
" [$procname] wrong socket type! ".ref($msock);
}
if($discover eq "multicast"){
#=> Multicast the message
# - set the search radius
if (defined $pro->{search_radius}){
$msock->mcast_ttl($pro->{search_radius});
}
# - supress mcast loopback before sending
$msock->mcast_loopback(0);
$msock->mcast_send($smime_release) or
XB_Log::log "err", " [$procname] multicast send failed: $!"
and die "multicast";
$msock->mcast_loopback(1);
XB_Log::log "info", " [$procname] sent multicast release $name, ".
"level $level";
}else{
#-- unicast the message
for my $p (@{$pro->{hostlist}}){
#-> create udp socket
my $udp_sock = XB_Common::udp_sock($p,
$XB_Params::node_opts{xbone_ctl_port}, $pro->{address_type});
#-> send invite message
unless (send($udp_sock, $smime_release, 0)){
XB_Log::log "err", " [$procname] Send UDP unicast release to".
" $p failed: $!" and die "send";
}else{
XB_Log::log "info", " [$procname] UDP unicast release to $p";
}
#=> close udp socket
$udp_sock->close or
XB_Log::log "err", " [$procname] socket close failed: $!"
and die "close";
}
}
}else{
# TODO support for other discovery/reservation schemes such as LDAP
# TODO LDAP_release($smine_release);
XB_Log::log "err", " [$procname] $pro->{discover} mechanism is not".
" supported";
die "disc";
}
};
XB_Log::log "info", "<- $modname$procname";
return 1 unless $@;
unless($@ =~ /\b(XB_SMIME::\S+|multicast|disc)\b/){
XB_Log::log "warning", " ! $procname caught unknown exception: $@";
}
die "$modname$procname";
}
# Description:
# API Start: Phase 5 - Bind & Dispatch
# - bind/assign selected resources to corresponding "roles" in the
# application specification
# - recurse if any of the resources recurses
# - send DISPATCH to all meta nodes which will carry out Invite, Select,
# Release, and Dispatch recursively
# - collect all ACK-DISPATCH and extract the exported info for binding
# Arguments:
# $app_obj (ref) application object hash
# Returns:
# 1 on success
# Exceptions:
# XB_API::api_bind on failure, caller should delete overlay &
# release reserved resources
#
sub api_bind($){
my ($app_obj) = shift;
my $procname = "api_bind";
XB_Log::log "info", "-> $modname$procname $app_obj";
eval{
# Resource binding/assignment:
# Nodes
# for each node (see Note 1)
# simple - assign
# meta - assign & recurse
# Other resources
# overlay: IP addresses
# other apps
#for each node
# grab a node based on node type (host|router|meta)
# node name -> node properties
# control addr -> node properties
# [simple] data addr -> interface phy-addr
# [meta] push to meta-list for recursion
my $app = $app_obj->{application};
my $res = $app->{resources};
my $pro = $app->{network}{properties};
my $nodes = $app->{network}{nodes};
my ($type, $name, $level, $vers) =
($app->{type}, $app->{name}, $app->{level}, $app->{version});
my @meta_nodes;
#XB_Log::log "debug1", "Resources: ". Dumper($res);
#XB_Log::log "debug1", "Network: ". Dumper($app->{network});
#=> 1. Assign resources to virtual nodes
#-> gather available resources
my %resources;
for my $t (keys %{$res}){
my @a;
for my $n (keys %{$res->{$t}}){
push @a, ($n);
}
$resources{$t} = \@a;
}
#-> assign resources to virtual nodes
for my $n (keys %{$nodes}){
my ($phy, $k, $v, $type, $props);
$props = $nodes->{$n}{properties};
# if meta, pop from meta queue, not host/router
$type = (defined $props->{class} eq 'meta')? "meta" : $props->{type};
$phy = pop @{$resources{$props->{type}}};
XB_Log::log "debug2", " [$procname] $n <=> $phy";
while (($k, $v) = each %{$res->{$type}{$phy}{command}}){
$props->{$k} = $v;
}
delete $res->{$type}{$phy}{command};
$res->{$type}{$phy}{vnode} = $n;
# gather meta nodes for recursion
if($type eq "meta"){ push @meta_nodes, ($phy); }
}
XB_Log::log "debug8", " [$procname] meta: ". Dumper(\@meta_nodes);
XB_Log::log "debug8", " [$procname] resources: ". Dumper($res);
XB_Log::log "debug8", " [$procname] network: ". Dumper($nodes);
#=> 2. Recurse on meta nodes
if(@meta_nodes > 0){
my (@nodes, %node_tags);
#-- generate the dispatch message
my $node_count= @meta_nodes;
#-- check if using persistent socket
my $connection = ($XB_Params::PERSISTENT_SOCK)? "yes":"no";
my $dispatch_msg = "
(xbonecontrol $XB_Params::ctl_ver $XB_Params::rel_ver
(dispatch (application $type)
(name $name)
(version $vers)
(level $level)
(retry $node_count)
(persistent_connection $connection)
APPSPEC
))\n$XB_Params::msg_delimiter\n";
#=> get the list of nodes
for my $n (@{$app_obj->{resources}{meta_tags}}){
push @nodes, $n->{name};
$node_tags{$n->{name}} = $n->{tag};
}
XB_Log::log "debug1", " [$procname] nodes: ". join ('|', @nodes);
#=> send TCP/SSL delete message to all nodes in parallel
my $max_procs = $node_count;
my $pm = new Parallel::ForkManager($max_procs);
my %node_list;
my $select = IO::Select->new;
$pm->run_on_start(
sub {
my ($pid, $ident) = @_;
XB_Log::log "info", " [$procname] fork: $ident pid: $pid";
}
);
$pm->run_on_finish(
sub {
my ($pid, $exit_code, $ident) = @_;
if($exit_code){
XB_Log::log "info", " [$procname] ack-dispatch from $ident,".
" pid $pid exit $exit_code";
}else{
XB_Log::log "warning", " [$procname] $ident ack-dispatch failed,".
" pid $pid exit $exit_code";
}
$node_count--;
}
);
$pm->run_on_wait(
sub {
XB_Log::log "debug1", " [$procname] waiting $node_count processes".
" ...";
}
);
for my $node (@nodes){
my $ssl_sock;
if($XB_Params::PERSISTENT_SOCK){
my $ctltype = ($XB_Params::node_opts{control_protocol} eq "both") ?
$pro->{address_type}:
$XB_Params::node_opts{control_protocol};
$ssl_sock = XB_Common::tcp_ssl_sock ($ctltype,
$node, $XB_Params::node_opts{xbone_ctl_port});
}
# create a pipe for the child to write back to parent
my $rh = IO::Handle->new;
my $wh = IO::Handle->new;
pipe $rh, $wh or die "pipe";
$node_list{$rh} = $node;
#=> fork
my $pid = $pm->start($node)
and XB_Common::fork_parent_close($select, $rh, $wh, $node)
and next;
#=> begin child process ==========================================
# exception handling inside the child process: need to catch all
# dies by the child process and exit with different code
my $received = 0;
eval{
#=> close the read handle
$rh->close or die "close";
#=> construct dispatch message for each node => APPLICATION-SPECIFIC!
my $api;
if($app_obj->{application}{type} eq "overlay"){
$api = ${$app_obj->{message}};
my $rclass = $app_obj->{application}{root_node};
my $app_name= $app_obj->{application}{name};
my $nodelist= $app_obj->{application}{network}{nodes};
my $nclass = $nodelist->{$node_tags{$node}}{properties}{class};
$api=~s/\(root\s+$rclass\s+$app_name\)/\(root $nclass $app_name\)/g;
}
my $node_dispatch = $dispatch_msg;
$node_dispatch =~ s/APPSPEC/\"$api\"/g;
#=> create or retrieve tcp/ssl socket if not yet created
unless(defined $ssl_sock){
my $ctltype = ($XB_Params::node_opts{control_protocol} eq "both") ?
$pro->{address_type}:
$XB_Params::node_opts{control_protocol};
$ssl_sock = XB_Common::tcp_ssl_sock ($ctltype,
$node, $XB_Params::node_opts{xbone_ctl_port});
}
#=> send dispatch
XB_Log::log "info", " [$procname] send dispatch to $node";
print $ssl_sock "$node_dispatch";
#=> start set timer
my $timeout = ($XB_Params::xbone_rtt * $max_procs *2)
+$XB_Params::xbone_rtt;
#=> wait & receive ack-dispatch
my $new_sel;
unless ($new_sel = IO::Select->new($ssl_sock)){
XB_Log::log "err", " [$procname] select $node failed: $!"
and die "select";
}
while(my @r = $new_sel->can_read){
for my $fh (@r){
if($fh != $ssl_sock){
XB_Log::log "err", " [$procname] wrong socket for $node";
next;
}else{
#=> read command
my $ctl_msg = XB_Common::fh_read_until ($fh,
$XB_Params::msg_delimiter);
print $wh $ctl_msg;
$received = 1;
}
}
if($received){ last; }
}
};
# need to close(SSL_no_shutdown=>1) all open TCP/SSL sockets before
# child exits, or parent could not use those sockets after fork
unless($XB_Params::NO_FORK){
XB_Common::child_close($node);
}
if($@ && $@ !~ /(tcp_ssl_sock|select|fh_read_until|close)/){
XB_Log::log "err", " [$procname:$node] caught unknown error: $@\n";
}
#=> end child process ============================================
$pm->finish($received);
}
while (my @handles = $select->can_read){
for my $h (@handles){
my $n = $node_list{$h};
XB_Log::log "debug1", " [$procname] receive from $n";
my @lines = $h->getlines;
my $ack_msg = join "", @lines;
my $ctl_cmd = $XB_CTL_parser::parser->xb_ctl($ack_msg);
unless (defined $ctl_cmd){
XB_Log::log "err", " [$procname] error parsing message ".
"from $n =====\n$ack_msg\n==========" and die "parser";
}
unless ($ctl_cmd->{command}{command} eq "ack-dispatch"){
XB_Log::log "err", " [$procname] wrong command: ".
"$ctl_cmd->{command}{command}";
die "command";
}
XB_Log::log "debug1", " [$procname] CTL command: ".Dumper($ctl_cmd);
$select->remove($h);
$h->close or die "close";
# process the command!
}
}
$pm->wait_all_children;
#=> TODO delete the corresponding data obj
# - deallocate IP address blocks
# TODO failure processing: when not all acks
XB_Log::log "info", " [$procname] dispatch $name, level $level";
}
};
XB_Log::log "info", "<- $modname$procname";
return 1 unless $@;
unless($@ =~ /(XB_SMIME::\S+|multicast)/){
XB_Log::log "warning", " ! $procname caught unknown exception: $@";
}
die "$modname$procname";
}
# Description:
# api_update_dns
# Update the dns will appropriate modifications
# Arguments:
# $app (ref) application object hash
# operation - "add" or "delete"
# Returns:
# 1 on success
# 0 on failure
# Exceptions:
#
# TODO: Must store the virtual hostname back to the app object hash!
#
sub api_update_dns($$){
my ($app, $op) = @_;
my $procname = "api_update_dns";
XB_Log::log "info", "-> $modname$procname $app";
eval{
if ($op !~ /(add|delete)/){
XB_Log::log "err", "[$procname] invalid operation $op";
die ("op");
}
my $name = $app->{name};
my $domain = $XB_Params::node_opts{xbone_net};
# remove the suffix from the name in case it is appended.
$name =~ s/\.$domain//g;
#=> get the list of nodes - without the name extensions
my @nodenames = ();
for my $t (keys %{$app->{resources}}){
for my $n (keys %{$app->{resources}{$t}}){
push @nodenames, $app->{resources}{$t}{$n}{vnode};
}
}
# construct the arguments for calling update_dns function
my %args = ();
my %map = ();
foreach my $node (@nodenames){
my $ipaddrlist = undef;
my $itfmap = $app->{network}{nodes}{$node}{interfaces};
foreach my $itf (keys %{$itfmap}){
if (defined $itfmap->{$itf}){
if (not defined $ipaddrlist){
$ipaddrlist = $itfmap->{$itf}{netaddr};
} else {
$ipaddrlist = "$ipaddrlist;" . $itfmap->{$itf}{netaddr};
}
}
}
# by now you have computed all the possible addresses for a given node.
$map{$node} = $ipaddrlist;
}
$args{op} = $op;
$args{overlay} = $app->{name};
$args{overlay} =~ s/\.$XB_Params::node_opts{xbone_net}//g; # remove the suffix
$args{address_type} = $app->{network}{properties}{address_type};
$args{map} = \%map;
XB_VN_DNS::update_dns(\%args);
};
XB_Log::log "info", "<- $modname$procname";
return 1 unless $@;
unless($@ =~ /(op|update_dns)/){
XB_Log::log "warning", " ! $procname caught unknown exception: $@";
}
die "$modname$procname";
}
# Description:
# API Start: Phase 6 - Configure
# - construct node commands according to application specification
# - send CONFIG (with node commands) to all selected nodes
# - collect all ACK-CONFIG
# Arguments:
# $app_obj (ref) application object hash
# Returns:
# 1 on success
# Exceptions:
# XB_API::api_config on failure, caller should stop the app,
# remove the app object, and stop the API start process
#
sub api_config($){
my ($app_obj) = shift;
my $procname = "api_config";
XB_Log::log "info", "-> $modname$procname $app_obj";
eval{
my $app = $app_obj->{application};
my $pro = $app->{network}{properties};
my ($type, $name, $level) = ($app->{type}, $app->{name}, $app->{level});
my $node_cmds; # to store the message for each node
my %node_reply;
#=> construct node commands (%message)
if($type eq 'overlay'){
$node_cmds = XB_VN_funcs::make_node_config($app_obj);
XB_Log::log "debug1", " [$procname] Node Commands:".
Dumper($node_cmds);
}else{ # add support for other applications here
XB_Log::log "err", " [$procname] application $type not supported"
and die "app";
}
#XB_Log::log "debug1", " [$procname] App Object: ". Dumper($app_obj);
#=> fork and send CONFIG(node commands) to each node, collect replies
#-> compose select message
# check if using persistent socket
# TODO persistent connection between CONFIG & STOP doesn't make sense
# TODO so no "(persistent_connection $conection)" is needed here
my $connection = ($XB_Params::PERSISTENT_SOCK)? "yes":"no";
my $credential = ${$app_obj->{credential}{section}};
my $config =
"(xbonecontrol $XB_Params::ctl_ver $XB_Params::rel_ver\n".
$credential.
" (config (application $type)\n".
" (name $name)\n".
" (version $XB_Params::xol_ver)\n".
" (level $level)\n";
# need "))\n $XB_Params::msg_delimiter\n" at the end
my $msg = "";
my (%nodelist, @failed);
#=> get the list of nodes
for my $t (keys %{$app->{resources}}){
for my $n (keys %{$app->{resources}{$t}}){
$nodelist{$n} = $app->{resources}{$t}{$n}{vnode};
}
}
#=> before contacting all the nodes, update the DNS
if ($XB_Params::node_opts{"dns"} eq "yes" and
$app_obj->{application}{network}{properties}{dns} eq "yes"){
api_update_dns($app, "add");
}
#=> send to all nodes in parallel with Parallel::ForkManager(3)
my $max_procs = ($XB_Params::NO_FORK)? 0 : (keys %nodelist);
my $node_count = keys %nodelist;
my $pm = new Parallel::ForkManager($max_procs);
my $sel = IO::Select->new;
my %node_list;
$pm->run_on_start(
sub {
my ($pid, $ident) = @_;
XB_Log::log "info", " [$procname] config $ident (fork pid: $pid)";
}
);
$pm->run_on_finish(
sub {
my ($pid, $exit_code, $ident) = @_;
if($exit_code){
XB_Log::log "info", " [$procname] ack-config from $ident, ".
"pid $pid exit $exit_code\n";
}else{
push @failed, $ident;
XB_Log::log "warning", " [$procname] $ident ack-config failed, ".
"pid $pid exit $exit_code\n";
}
$node_count--;
}
);
$pm->run_on_wait(
sub {
XB_Log::log "debug1", " [$procname] waiting $node_count processes".
" ...";
}
);
for my $node (keys %nodelist){
my $ipaddr =
$app->{network}{nodes}{$nodelist{$node}}{properties}{ctl_addr};
#=> create or retrieve tcp/ssl socket
my $ssl_sock;
if($XB_Params::PERSISTENT_SOCK){
my $ctltype = ($XB_Params::node_opts{control_protocol} eq "both") ?
$pro->{address_type}:
$XB_Params::node_opts{control_protocol};
$ssl_sock = XB_Common::tcp_ssl_sock ($ctltype,
$node, $XB_Params::node_opts{xbone_ctl_port}, $ipaddr);
}
#=> create a pipe for the child to write back to parent
my $rh = IO::Handle->new;
my $wh = IO::Handle->new;
pipe $rh, $wh or die "pipe";
$node_list{$rh} = $node;
#-> fork
my $pid = $pm->start($node)
and XB_Common::fork_parent_close($sel, $rh, $wh, $node)
and next;
#=> begin child process ============================================
# exception handling inside the child process: need to catch all
# dies by the child process and exit with different code
my $received = 0;
eval{
#=> create or retrieve tcp/ssl socket if not yet created
unless(defined $ssl_sock){
my $ctltype = ($XB_Params::node_opts{control_protocol} eq "both") ?
$pro->{address_type}:
$XB_Params::node_opts{control_protocol};
$ssl_sock = XB_Common::tcp_ssl_sock ($ctltype,
$node, $XB_Params::node_opts{xbone_ctl_port}, $ipaddr);
}
#=> finish the config command
$config .= $node_cmds->{$nodelist{$node}} .
"))\n $XB_Params::msg_delimiter\n";
#=> send config
XB_Log::log "info", " [$procname] send config to $node";
print $ssl_sock "$config";
XB_Log::log "debug7", " [$procname] $node node command: $config";
#=> start set timer
my $timeout = time + $XB_Params::xbone_rtt;
my $now;
#=> wait & receive ack-select
my $new_sel;
unless ($new_sel = IO::Select->new($ssl_sock)){
XB_Log::log "err", " [$procname:$node] select failed: $!"
and die "sel";
}
while(my @r = $new_sel->can_read()){ # TODO set timeout?
for my $fh (@r){
if($fh != $ssl_sock){
XB_Log::log "err", " [$procname:$node] wrong socket for $node";
next;
}else{
#=> read command
my $ctl_msg = XB_Common::fh_read_until ($fh,
$XB_Params::msg_delimiter);
print $wh $ctl_msg;
$received = 1;
}
}
if($received == 1){ last; }
}
if(not $XB_Params::PERSISTENT_SOCK){
$ssl_sock->close or
XB_Log::log "err", " [$procname:$node] socket close failed: $!"
and die "close";
}
unless($received){
XB_Log::log "err", " [$procname:$node] did not acknowledge";
die "rcv";
}
};
unless($XB_Params::NO_FORK){
XB_Common::child_close($node);
}
if($@ && $@ !~ /(tcp_ssl_sock|sel|fh_read_until|rcv|close)/){
XB_Log::log "err", " [$procname:$node] caught unknown exception: $@";
}
unless($received){
if($XB_Params::error_reply eq ""){
$XB_Params::error_reply = "$node caught exception $@ w/out further".
" details";
}
my $emsg = XB_Common::ctl_error_msg('config', $type, $name,
$level, \$XB_Params::error_reply);
print $wh $$emsg;
}
#=> end child process ==============================================
$pm->finish($received);
}
#=> collect responses from all the nodes
while (my @handles = $sel->can_read){ # TODO set timeout
for my $h (@handles){
my $n = $node_list{$h};
XB_Log::log "debug1", " [$procname] receive from $n";
my @lines = $h->getlines;
my $ack_msg = join "", @lines;
$ack_msg =~ s/$XB_Params::msg_delimiter//g;
my $ctl_cmd = $XB_CTL_parser::parser->xb_ctl($ack_msg);
unless (defined $ctl_cmd){
XB_Log::log "err", " [$procname] error parsing message ".
"from $n =====\n$ack_msg\n==========";
}else{
unless ($ctl_cmd->{command}{command} =~ /(ack-config|error)/){
XB_Log::log "err", " [$procname] wrong command: ".
"$ctl_cmd->{command}{command}";
}else{
XB_Log::log "debug6", " [$procname] CTL command: ".
Dumper($ctl_cmd);
delete $node_list{$h};
$node_reply{$n} = $ctl_cmd;
}
}
$sel->remove($h);
$h->close or die "close";
}
}
$pm->wait_all_children;
#=> process replies:
my $failed = 0;
my $err_msg = "";
while(my ($k, $v) = each %node_reply){
if($v->{command}{command} eq 'error'){
$err_msg .= "Node $k replied error: \'$v->{command}{message}\'\n";
$failed++;
}
}
my @missing = values %node_list;
if(@missing > 0){
$err_msg .= "Missing replies from ". (join ", ", @missing)."\n";
$failed++;
}
if($failed){
XB_Log::log "err", " [$procname] Node configuration has failed. Check the xbone log. $err_msg";
die "fail";
}
#if(@failed > 0){
# XB_Log::log "err", " [$procname] config failed on ".
# (join ", ", @failed) and die "ack";
#}
#TODO set the node & tunnel status to up!
};
XB_Log::log "info", "<- $modname$procname";
return 1 unless $@;
unless($@ =~ /(make_node_config|app|pipe|close|tcp_ssl_sock|fail)/){
XB_Log::log "warning", " ! $procname caught unknown exception: $@";
}
die "$modname$procname";
}
###############################################################################
# Top Level API Commands
###############################################################################
# Description:
# [API] Start/Create the specified application
# Arguments:
# $cmd (ref) API command
# $msock multicast socket
# Returns:
# $reply (ref) reply message
# Exceptions:
# "api_start" on failure, nothing to cleanup by caller
# Notes:
# * API reply message:
# [done] construct API reply message & return it to caller
# [fail] die & throw an exception in caller routine, caller will need to
# gather error messages from logs and send back API error reply
#
sub api_start ($$){
my ($cmd, $msock) = @_;
my $procname = "api_start";
my ($type, $name, $app_obj, $avail, $reply, $apptype, $appname);
XB_Log::log "info", "-> $modname$procname $cmd, $msock";
eval{
#=> [0] Check for conflicts & resources
#-- Check if the app/name exist
$type = "overlay";
$name = $cmd->{command}{create_overlay}{property}{overlay_name};
$app_obj = get_app $type, $name;
if(defined $app_obj){
$reply = "[$procname] $type $name already exists!";
XB_Log::log "err", " $reply";
die "dupl";
}
#-- Check if DNS is requested & supported
unless(XB_Common::check_resource("DDNS",
$cmd->{command}{create_overlay}{property}{dns},
$XB_Params::node_opts{"dns"})){
XB_Log::log "err", " [$procname] DDNS update not enabled.";
die("dns");
}
#=> [1] Initialization
$app_obj = api_init $cmd;
#=> [2] Invite
$avail = api_invite $app_obj, $msock;
#=> [3] Select
api_select $app_obj, $avail;
#=> [4] Release
api_release $app_obj, $app_obj->{application}{level}, $msock;
#sleep ($XB_Params::xbone_rtt);
#=> [5] Bind and Dispatch
api_bind($app_obj);
#=> [6] Config
api_config($app_obj);
#=> [7] Commit
$XB_Params::node_state{active_apps}{$type}{$name} = $app_obj;
XB_Log::log "info", " [$procname] commit $type $name into state";
XB_Log::log "debug1", " [$procname] State: ".
Dumper(\%XB_Params::node_state);
$XB_Params::node_state{user_stats}{$app_obj->{credential}{user_email}}++;
#=> [8] Update the state file
XB_Common::record_state();
#=> [9] Succeed, construct API reply message to be returned
$reply = api_build_msg "create_reply", $app_obj;
};
XB_Log::log "info", "<- $modname$procname";
return $reply unless $@;
#=> Caught exception; cleanup the mess before returning
unless($@ =~ /\b(dupl|api_init|api_invite|api_select|api_release)\b/ or
$@ =~ /\b(api_bind|api_config)\b/){
XB_Log::log "warning", " [$procname] caught unknown exception: $@";
}
my $exception = $@;
#-> cleanup depending on where it dies
eval{
unless($exception =~ /\b(dupl|dns|api_init|api_release)\b/){
# remove the DNS entries
# release if multicast was sent
api_release $app_obj, $app_obj->{application}{level}, $msock;
# TODO release address blocks
}
unless($exception =~ /\b(dupl|dns|api_init|api_invite)\b/){
# stop/delete if nodes were selected
api_stop ($type, $name, $app_obj);
if ($XB_Params::node_opts{"dns"} eq "yes" and
$app_obj->{application}{network}{properties}{dns} eq "yes"){
api_update_dns($app_obj->{application}, "delete");
}
}
};
if($@){
XB_Log::log "err", " [$procname] failed again during exception ".
"handling: $@";
}
die "$modname$procname";
}
# Description:
# [API] Stop/Delete the specified application(s)
# Arguments:
# $type application type
# $name application name
# $app_obj (ref) application object hash
# Returns:
# \$reply (ref) reply message
# Exceptions:
# XB_API::api_stop on failure, nothing to cleanup by caller
#
sub api_stop($$;$){
my ($type, $name, $app_obj) = @_;
my ($reply);
my $procname = "api_stop";
XB_Log::log "info", "-> $modname$procname $type, $name";
eval{
# TODO 1. Convert the fork so that the children write back to the
# TODO the parent in case of error reply.
# TODO 2. Failure processing: die if the following
# TODO - missing acks
# TODO - error message or nacks
my $cred;
#=> check if the app/name exists
if(exists $app_obj->{command}{command}){
$cred = ${$app_obj->{credential}{section}};
$app_obj = get_app ($type, $name);
}
unless(defined $app_obj){
$reply = "[$procname] $type $name does not exists!";
XB_Log::log "err", " $reply" and die "miss";
}
#=> generate the stop message
my $level= $app_obj->{application}{level};
#my $stop = XB_Common::ctl_msg('stop', $type, $name, $level);
unless(defined $cred){
$cred = ${$app_obj->{credential}{section}};
}
my $stop =
"(xbone-ctl $XB_Params::ctl_ver $XB_Params::rel_ver\n".
$cred.
" (stop\n".
" (application $type)\n".
" (name $name)\n".
" (level $level)\n".
" )\n".
")\n$XB_Params::msg_delimiter\n";
#=> undo the DNS entries. XXX have to check for DNS variable being
#set here.
eval {
if ($XB_Params::node_opts{"dns"} eq "yes" and
$app_obj->{application}{network}{properties}{dns} eq "yes"){
api_update_dns($app_obj->{application}, "delete");
}
};
if ($@){
# dont stop even if there is a problem.
XB_Log::log "err", " Deletion of dynamic DNS entries failed!";
}
#=> get the list of nodes
my @nodes;
my $app = $app_obj->{application};
my $res = $app_obj->{application}{resources};
my $pro = $app_obj->{application}{network}{properties};
my %nodelist;
for my $t (keys %{$res}){
for my $n (keys %{$res->{$t}}){
push @nodes, $n;
$nodelist{$n} = $res->{$t}{$n}{vnode};
}
}
#=> send TCP/SSL delete message to all nodes in parallel
my $max_procs = @nodes;
my $node_count = $max_procs;
my (%node_list, %node_reply);
my @failed;
my $pm = new Parallel::ForkManager($max_procs);
my $sel = IO::Select->new;
$pm->run_on_start(
sub {
my ($pid, $ident) = @_;
XB_Log::log "info", " [$procname] stop $ident ".
"(fork pid: $pid)";
}
);
$pm->run_on_finish(
sub {
my ($pid, $exit_code, $ident) = @_;
if($exit_code){
XB_Log::log "info", " [$procname] ack-stop from $ident, pid ".
"$pid exit $exit_code";
}else{
XB_Log::log "warning", " [$procname] $ident ack-stop failed, pid ".
"$pid exit $exit_code";
push @failed, ($ident);
}
$node_count--;
}
);
$pm->run_on_wait(
sub {
XB_Log::log "debug1", " [$procname] waiting $node_count processes".
" ...";
}
);
for my $node (@nodes){
my $ipaddr =
$app->{network}{nodes}{$nodelist{$node}}{properties}{ctl_addr};
#=> create or retrieve tcp/ssl socket
my $ssl_sock;
if($XB_Params::PERSISTENT_SOCK){
my $ctltype = ($XB_Params::node_opts{control_protocol} eq "both") ?
$pro->{address_type}:
$XB_Params::node_opts{control_protocol};
$ssl_sock = XB_Common::tcp_ssl_sock($ctltype,
$node, $XB_Params::node_opts{xbone_ctl_port}, $ipaddr);
}
#=> create a pipe for the child to write back to parent
my $rh = IO::Handle->new;
my $wh = IO::Handle->new;
pipe $rh, $wh or die "pipe";
$node_list{$rh} = $node;
#=> fork
my $pid = $pm->start($node)
and XB_Common::fork_parent_close($sel, $rh, $wh, $node)
and next;
#=> begin child process
# exception handling inside the child process: need to catch all
# dies by the child process and exit with different code
my $received = 0;
eval{
#=> close the read handle
$rh->close or die "close";
#=> create or retrieve tcp/ssl socket if not yet created
unless(defined $ssl_sock){
my $ctltype = ($XB_Params::node_opts{control_protocol} eq "both") ?
$pro->{address_type}:
$XB_Params::node_opts{control_protocol};
$ssl_sock = XB_Common::tcp_ssl_sock($ctltype,
$node, $XB_Params::node_opts{xbone_ctl_port}, $ipaddr);
}
#=> send stop
XB_Log::log "info", " [$procname] send stop to $node";
print $ssl_sock $stop or die "send";
#=> start set timer
my $timeout = $XB_Params::xbone_rtt;
#=> wait & receive ack-stop
my $new_sel;
unless ($new_sel = IO::Select->new($ssl_sock)){
XB_Log::log "err", " [$procname] select $node failed: $!"
and die "sel";
}
while(my @r = $new_sel->can_read){ # TODO set timeout
for my $fh (@r){
if($fh != $ssl_sock){
XB_Log::log "err", " [$procname] wrong socket for $node";
next;
}else{
#=> read command
my $ctl_msg = XB_Common::fh_read_until ($fh,
$XB_Params::msg_delimiter);
print $wh $ctl_msg;
$received = 1;
}
}
if($received == 1){ last; }
}
if(not $XB_Params::PERSISTENT_SOCK){
$ssl_sock->close or
XB_Log::log "err", " [$procname:$node] socket close failed: $!"
and die "close";
}
unless($received){
XB_Log::log "err", " [$procname:$node] did not received the ".
"whole message" and die "rcv";
}
};
unless($XB_Params::NO_FORK){
XB_Common::child_close($node);
}
if($@ && $@ !~ /(tcp_ssl_sock|send|sel|fh_read_until|close|rcv)/){
XB_Log::log "err", " [$procname:$node] caught unknown exception: $@";
}
unless($received){
if($XB_Params::error_reply eq ""){
$XB_Params::error_reply = "$node caught exception $@ w/out further".
" details";
}
my $emsg = XB_Common::ctl_error_msg('stop', $type, $name,
$level, \$XB_Params::error_reply);
print $wh $$emsg;
}
$pm->finish($received);
#=> end child process
}
#=> collect responses from all the nodes
while (my @handles = $sel->can_read){ # TODO set timeout
for my $h (@handles){
my $n = $node_list{$h};
XB_Log::log "debug1", " [$procname] receive from $n";
my @lines = $h->getlines;
my $ack_msg = join "", @lines;
$ack_msg =~ s/$XB_Params::msg_delimiter//g;
my $ctl_cmd = $XB_CTL_parser::parser->xb_ctl($ack_msg);
unless (defined $ctl_cmd){
XB_Log::log "err", " [$procname] error parsing message ".
"from $n =====\n$ack_msg\n==========";
}else{
unless ($ctl_cmd->{command}{command} =~ /(ack-stop|error)/){
XB_Log::log "err", " [$procname] wrong command: ".
"$ctl_cmd->{command}{command}";
}else{
XB_Log::log "debug6", " [$procname] CTL command: ".
Dumper($ctl_cmd);
delete $node_list{$h};
$node_reply{$n} = $ctl_cmd;
}
}
$sel->remove($h);
$h->close or die "close";
}
}
$pm->wait_all_children;
#=> release address blocks
my @addr = ($pro->{addr_blk_net}, $pro->{addr_blk_link});
unless($XB_Params::new_alloc){
XB_VN_IPalloc::release (\@addr, $pro->{address_type}, $pro->{addr_server});
}else{
XB_VN_IPalloc::new_release (\@addr, $pro->{address_type},
$pro->{addr_server});
}
#=> delete the corresponding data obj if exists
if(exists $XB_Params::node_state{active_apps}{$type}{$name}){
delete $XB_Params::node_state{active_apps}{$type}{$name};
}
my $user_email = $app_obj->{credential}{user_email};
if($XB_Params::node_state{user_stats}{$user_email}){
$XB_Params::node_state{user_stats}{$user_email}--;
}
#=> update the state file
XB_Common::record_state();
XB_Log::log "info", " [$procname] stop $name, level $level";
my $failed = 0;
my $err_msg = "";
while(my ($k, $v) = each %node_reply){
if($v->{command}{command} eq 'error'){
$err_msg .= "Node $k replied error: \'$v->{command}{message}\'\n";
$failed++;
}
}
my @missing = values %node_list;
if(@missing > 0){
$err_msg .= "Missing replies from ". (join ", ", @missing)."\n";
$failed++;
}
if($failed){
XB_Log::log "err", " [$procname] $err_msg";
die "fail";
}
#if(@failed > 0){
# #=> failure processing: when not all acks
# XB_Log::log "warning", " [$procname] missing ack-stops from ".
# join (", ", @failed);
#}
#=> construct API reply message:
my %ahref = ( "protocol" => $XB_Params::api_ver,
"release" => $XB_Params::rel_ver );
$reply = XB_XML_GUI::XB_build_destroy_overlay_reply_msg (\%ahref, $name);
};
XB_Log::log "info", "<- $modname$procname";
return $reply unless $@;
unless($@ =~ /(miss|pipe|close|fail)/){
XB_Log::log "warning", " ! $procname caught unknown exception: $@";
}
die "$modname$procname";
}
# Description:
# [API] Send multicast to find all active nodes.
# Arguments:
# $cmd (ref) API command
# $msocks (ref) array of multicast sockets
# Returns:
# $reply (ref) reply message
# Exceptions:
# XB_API::api_discover on failure, nothing to cleanup by caller
#
sub api_discover($$){
my ($cmd, $msocks) = @_;
my $reply;
my $procname = "api_discover";
XB_Log::log "info", "-> $modname$procname $cmd, $msocks";
eval{
my $seq = time;
my $cre = $cmd->{credential}{property};
$cmd->{credential}{user_name} = $cre->{user_name};
$cmd->{credential}{user_email} = $cre->{user_email};
$cmd->{credential}{auth_type} = $cre->{auth_type};
my (%host_hash, %router_hash, %meta_hash, %node_hash);
#=> construct discover message
my $cresec = $cmd->{credential}{section};
my $discover =
"(xbone-ctl $XB_Params::ctl_ver $XB_Params::rel_ver $seq\n".
$$cresec.
" (discover)\n".
")\n$XB_Params::msg_delimiter\n";
XB_Log::log "debug1", " [$procname] discover command:\n$discover";
#=> sign the message with S/MIME
my $smime_msg = XB_SMIME::sign($discover,
$XB_Params::node_opts{"node_cert"}, $XB_Params::node_opts{"node_key"});
#=> switch on discovery mechanisms
my $props = $cmd->{command}{discover_daemons}{property};
$props->{discover} = "multicast";
# XXX Hack. Perl throws warnings if we look for hash
# entries that dont exist. So generate a list of existing
# entries and search through them.
my @optlist = keys %{$props};
if(XB_Common::check_list("custom_hostlist", \@optlist) == 1){
if ($props->{custom_hostlist} ne ''){
$props->{discover} = "unicast";
my @hlist = split /[ \t\n\r\f,]+/, $props->{custom_hostlist};
$props->{hostlist} = \@hlist;
XB_Log::log "debug2", " [$procname] User supplied a list of hosts".
" to probe:\n". Dumper($props->{hostlist});
}
}
elsif(XB_Common::check_list("ldap", \@optlist) == 1){
if($props->{ldap} eq "yes"){
if ($XB_Params::node_opts{ldap}->{enable} !~ /yes/i){
XB_Log::log "err",
" [$procname] LDAP support disabled because the server ".
"is unreachable or LDAP support is not configured\n";
die("ldap");
}
$props->{discover} = "unicast";
XB_Log::log "debug2", " [$procname] User asked to use LDAP server".
" to get the list of hosts\n";
my ($scope, $line, %attrvals) =
($props->{scope}, $props->{attrvals},() );
my @av_array = split(/[\s,]+/, $line);
foreach my $av (@av_array){
my @arr = split(/=/, $av);
$attrvals{$arr[0]} = $arr[1];
}
my $result = XB_LDAP::LDAP_search("registry", $scope, \%attrvals);
XB_Log::log "debug2", "LDAP search result =". Dumper ($result);
my @hlist = ();
for my $h (keys %{$result}) {
XB_Log::log "debug2", "registered hostname= $h";
push @hlist, $h;
}
$props->{hostlist} = \@hlist;
}
}
my $new_sel = IO::Select->new; # all multicast udp sockets to listen on
my (%msg_hash, %msg_log);
if($props->{discover} eq "multicast"){
# multicast
for my $msock (@{$msocks}){
#-- multicast the message for each multicast socket
unless (ref($msock) =~ /IO::Socket::Multicast/){
XB_Log::log "warning",
" [$procname] wrong socket type! ".ref($msock);
}
# - set the search radius
if (defined $props->{search_radius}){
$msock->mcast_ttl($props->{search_radius});
}
# - supress mcast loopback before sending
#-- supress mcast loopback before sending
$msock->mcast_loopback(0);
$msock->mcast_send($smime_msg) or
XB_Log::log "err", " [$procname] multicast send failed: $!"
and die "mcast";
$msock->mcast_loopback(1);
$new_sel->add($msock) or
XB_Log::log "err", " [$procname] select failed: $!" and die "sel";
XB_Log::log "info", " [$procname] sent multicast discover";
}
} elsif ($props->{discover} eq "unicast"){
#-- unicast the message
for my $p (@{$props->{hostlist}}){
#-> create udp socket
# you have to discover on both v4 and v6 sockets.
# send a message on v4 socket
if ($XB_Params::node_opts{ipproto} =~ /both|ipv4/){
my $udp_sock;
$udp_sock =
XB_Common::udp_sock($p,
$XB_Params::node_opts{xbone_ctl_port},
'ipv4');
#-> send invite message
unless (send($udp_sock, $smime_msg, 0)){
XB_Log::log "err", " [$procname] Send UDP unicast invite to".
" $p failed: $!" and die "send";
}else{
XB_Log::log "info", " [$procname] UDP unicast invite to $p";
}
#=> close udp socket
$udp_sock->close or
XB_Log::log "err", " [$procname] socket close failed: $!"
and die "close";
} # handled the v4...
# send a message on v6 socket
if ($XB_Params::node_opts{ipproto} =~ /both|ipv6/){
my $udp_sock;
$udp_sock =
XB_Common::udp_sock($p,
$XB_Params::node_opts{xbone_ctl_port},
'ipv6');
#-> send invite message
unless (send($udp_sock, $smime_msg, 0)){
XB_Log::log "err", " [$procname] Send UDP unicast invite to".
" $p failed: $!" and die "send";
}else{
XB_Log::log "info", " [$procname] UDP unicast invite to $p";
}
#=> close udp socket
$udp_sock->close or
XB_Log::log "err", " [$procname] socket close failed: $!"
and die "close";
} # handled the v6...
} # foreach of the hosts..
# the response will come back as a UDP message from
# the RDs on the control port. multicast port are
# those sockets.
for my $msock (@{$msocks}){
$new_sel->add($msock) or
XB_Log::log "err", " [$procname] select failed: $!"
and die "sel";
}
}
#=> Receive replies from the sources.
#-- start timer
my $rtt;
if ($props->{timeout} eq '') {
$rtt = $XB_Params::xbone_rtt;
} else {
$rtt = $props->{timeout};
}
XB_Log::log "debug", "##### Timeout = $rtt #####";
my $timeout = time + $rtt; #TODO TIMEOUT from CMD!!!
my $now;
#-- collect ack-discover
while(1){
$now = time;
my $timeleft = $timeout - $now;
while(my @r = $new_sel->can_read ($timeleft)){
foreach my $fh (@r){
#if($fh == $msock){
XB_Log::log "info", " [$procname] incoming UDP message";
my ($umsg, $src_sock, $peerhost, $src_cmd, $src_port);
unless ($src_sock = $fh->recv($umsg, 65536, 0)){
XB_Log::log "err", " [$procname] recv: $!" and die "recv";
}
eval{ ($umsg, $peerhost) = XB_SMIME::verify($umsg); };
if($@){
XB_Log::log "warning",
" [$procname] SMIME verification failed";
$umsg = "";
next;
}
my $ip;
if(ref ($fh) eq "IO::Socket::Multicast6"){
$ip = 'ipv6';
#($src_port, $src_sock) = unpack_sockaddr_in6($src_sock);
#my @hinfo = getipnodebyaddr(AF_INET6, $src_sock);
#$src_sock = $hinfo[0];
}else{
$ip = 'ipv4';
#($src_port, $src_sock) = sockaddr_in $src_sock;
#$src_sock = gethostbyaddr($src_sock, AF_INET);
}
my $peerport = XB_Common::chk_sockaddr($fh, $peerhost, $ip);
$umsg =~ s/$XB_Params::msg_delimiter//g;
my $ctl_cmd = $XB_CTL_parser::parser->xb_ctl($umsg);
unless (defined $ctl_cmd){
XB_Log::log "err",
" [$procname] parser error: ==\n$umsg\n==" and next;
}
XB_Log::log "debug7", " [$procname] ack discover received".
" from $peerhost: ==\n$umsg\n==";
XB_Log::log "debug6", " [$procname] CTL command:".
Dumper($ctl_cmd);
$src_cmd = $ctl_cmd->{command}{hostname};
unless ($ctl_cmd->{command}{command} =~ /(ack-discover|err)/){
XB_Log::log "err", " [$procname] wrong command: ".
"$ctl_cmd->{command}{command}" and die "cmd";
}
#unless($src_sock eq $peerhost){
# XB_Log::log "err", " [$procname] hostname mismatch: ".
# "cert: $src_cert, sock: $src_sock" and die "name";
#}
unless($src_cmd eq $peerhost){
XB_Log::log "err", " [$procname] hostname mismatch: ".
"cmd: $src_cmd, cert: $peerhost" and die "name";
}
if(exists $msg_log{$src_cmd}){
XB_Log::log "warning", " [$procname] duplicate message".
" from $src_cmd" and next;
}else{
XB_Log::log "debug1", " [$procname] new ack: $src_cmd";
$msg_log{$src_cmd}=time;
}
$cmd->{nodes}{$src_cmd} = $ctl_cmd;
#}
}
}
$now = time;
if($now > $timeout){ last; }
}
$reply = api_build_msg "discover_reply", $cmd;
#}else{
# TODO support for other discovery/reservation schemes such as LDAP
# TODO LDAP_discover($smine_invite, \%host_hash, \%router_hash,
# TODO \%meta_hash, \%both_hash);
# TODO then calculate $enough as shown above
#XB_Log::log "err", " [$procname] $cmd->{discover} mechanism is not".
# " supported" and die "disc";
#}
};
XB_Log::log "info", "<- $modname$procname";
return $reply unless $@;
unless($@ =~ /(XB_SMIME|mcast|sel|recv|cmd|name|disc|ldap)/){
XB_Log::log "warning", " ! $procname caught unknown exception: $@";
}
die "$modname$procname";
}
# Description:
# [API] Check the node/link status of the given application.
# Arguments:
# $type application type
# $name application name
# Returns:
# \$reply (ref) reply message
# Exceptions:
# "XB_API::api_status" on failure, nothing to cleanup by caller
#
sub api_status($$$){
my ($type, $name, $cmd) = @_;
my ($reply);
my $procname = "api_status";
XB_Log::log "info", "-> $modname$procname $type, $name, $cmd";
eval{
#=> check if application type/name exists
my $app_obj = get_app ($type, $name);
unless(defined $app_obj){
$reply = "[$procname] $type $name does not exists!";
XB_Log::log "err", " $reply" and die "miss";
}
my $level= $app_obj->{application}{level};
#=> generate the status message
my $cred = ${$cmd->{credential}{section}};
#my $status = XB_Common::ctl_msg('status', $type, $name, $level);
my $status =
"(xbone-ctl $XB_Params::ctl_ver $XB_Params::rel_ver\n".
$cred.
" (status\n".
" (application $type)\n".
" (name $name)\n".
" (level $level)\n".
" )\n".
")\n$XB_Params::msg_delimiter\n";
#=> get the list of nodes
#my @nodes;
my %nodes;
my $app = $app_obj->{application};
my $res = $app->{resources};
my $net = $app->{network};
my $pro = $net->{properties};
while(my ($type, $thash) = each %{$res}){
while(my ($host, $hhash) = each %{$thash}){
$nodes{$host} = $hhash->{vnode};
}
}
#for my $t (keys %{$res}){
# for my $n (keys %{$res->{$t}}){
# push @nodes, $n;
# }
#}
XB_Log::log "debug1", " [$procname] nodes: ".join ('|', keys %nodes);
#=> fork & send to all nodes in parallel
my $max_procs = ($XB_Params::NO_FORK)? 0 : keys %nodes;
my $node_count = keys %nodes;
my $pm = new Parallel::ForkManager($max_procs);
my %node_list;
my %node_reply;
my $sel = IO::Select->new;
$pm->run_on_start(
sub {
my ($pid, $ident) = @_;
XB_Log::log "info", " [$procname] status $ident (fork pid: $pid)";
}
);
$pm->run_on_finish(
sub {
my ($pid, $exit_code, $ident) = @_;
if($exit_code){
XB_Log::log "info", " [$procname] $ident status ok, ".
"pid $pid exit $exit_code";
}else{
XB_Log::log "warning", " [$procname] $ident status failed, ".
"pid $pid exit $exit_code";
}
$node_count--;
}
);
$pm->run_on_wait(
sub {
XB_Log::log "debug1", " [$procname] waiting $node_count processes".
" ...";
}
);
for my $node (keys %nodes){
#=> create or retrieve tcp/ssl socket
my $ssl_sock;
my $ipproto = $pro->{address_type};
my $ipaddr = $net->{nodes}{$nodes{$node}}{properties}{ctl_addr};
if($XB_Params::PERSISTENT_SOCK){
$ssl_sock = XB_Common::tcp_ssl_sock ($ipproto, $node,
$XB_Params::node_opts{xbone_ctl_port}, $ipaddr);
}
#=> create a pipe for the child to write back to parent
my $rh = IO::Handle->new;
my $wh = IO::Handle->new;
pipe $rh, $wh or die "pipe";
$node_list{$rh} = $node;
#=> fork
my $pid = $pm->start($node)
and XB_Common::fork_parent_close($sel, $rh, $wh, $node)
and next;
#=> begin child process ============================================
# exception handling inside the child process: need to catch all
# dies by the child process and exit with different code
my $received = 0;
eval{
#=> close the read handle
$rh->close or die "close";
#=> create or retrieve tcp/ssl socket if not yet created
unless(defined $ssl_sock){
$ssl_sock = XB_Common::tcp_ssl_sock ($ipproto, $node,
$XB_Params::node_opts{xbone_ctl_port}, $ipaddr);
}
#=> send select
XB_Log::log "info", " [$procname] send status to $node";
print $ssl_sock $status;
#=> wait & receive ack-status
my $new_sel;
unless ($new_sel = IO::Select->new($ssl_sock)){
XB_Log::log "err", " [$procname:$node] select failed: $!"
and die "select";
}
while(my @r = $new_sel->can_read()){ # TODO set timeout
for my $fh (@r){
if($fh != $ssl_sock){
XB_Log::log "err", " [$procname:$node] wrong socket for $node";
next;
}else{
#=> read command & write it back to the parent process
my $ctl_msg = XB_Common::fh_read_until ($fh,
$XB_Params::msg_delimiter);
print $wh $ctl_msg;
$received = 1;
}
}
if($received == 1){ last; }
}
if(not $XB_Params::PERSISTENT_SOCK){
$ssl_sock->close or
XB_Log::log "err", " [$procname:$node] socket close failed: $!"
and die "close";
}
unless($received){
XB_Log::log "err", " [$procname:$node] did not received the ".
"whole message" and die "rcv";
}
};
unless($XB_Params::NO_FORK){
XB_Common::child_close($node);
}
if($@ && $@ !~ /(tcp_ssl_sock|select|fh_read_until|close|rcv)/){
XB_Log::log "err", " [$procname:$node] caught unknown exception: $@";
}
$pm->finish($received);
#=> end child process ==============================================
}
#=> collect responses from all the nodes
while (my @handles = $sel->can_read){ # TODO set timeout
for my $h (@handles){
my $n = $node_list{$h};
XB_Log::log "debug1", " [$procname] receive from $n";
my @lines = $h->getlines;
my $ack_msg = join "", @lines;
my $ctl_cmd = $XB_CTL_parser::parser->xb_ctl($ack_msg);
unless (defined $ctl_cmd){
XB_Log::log "err", " [$procname] error parsing message ".
"from $n =====\n$ack_msg\n==========";
}else{
unless ($ctl_cmd->{command}{command} =~ /(ack-status|error)/){
XB_Log::log "err", " [$procname] wrong command: ".
"$ctl_cmd->{command}{command}";
}else{
XB_Log::log "debug6", " [$procname] CTL command: ".
Dumper($ctl_cmd);
delete $node_list{$h};
$node_reply{$n} = $ctl_cmd;
}
}
$sel->remove($h);
$h->close or die "close";
}
}
$pm->wait_all_children;
#=> process replies
#-- gather the missing ones
my @missing = ();
@missing = values %node_list;
if(@missing > 0){
XB_Log::log "err", " [$procname] missing status replies from ".
(join ", ", @missing);
}
if($type eq 'overlay'){
XB_VN_funcs::process_status($app_obj, \%node_reply, \@missing);
}else{ # add support for other applications here
XB_Log::log "err", " [$procname] application $type not supported"
and die "app";
}
#=> generate the reply message
$reply = api_build_msg "status_reply", $app_obj;
};
XB_Log::log "info", "<- $modname$procname";
return $reply unless $@;
unless($@ =~ /(miss)/){
XB_Log::log "warning", " ! $procname caught unknown exception: $@";
}
die "$modname$procname";
}
# Description:
# [API] Fork and send refresh (heartbeat) messages to every node of all
# active overlays to reset their expiration times.
# Arguments:
# $rlist (ref) hash of nodes and their corresponding overlays
# Returns:
# 1 on success
# Exceptions:
# "XB_API::api_refresh" on failure, nothing to cleanup by caller
#
sub api_refresh($){
my $rlist = shift;
my $procname = "api_refresh";
XB_Log::log "info", "-> $modname$procname $rlist";
eval{
my @ipproto = keys %{$rlist};
#=> loop through IPv4 and IPv6
for my $ip (@ipproto){
my @nodes = keys %{$rlist->{$ip}};
my $max_procs = ($XB_Params::NO_FORK)? 0 : @nodes;
my $node_count = @nodes;
my $pm = new Parallel::ForkManager($max_procs);
my $sel = IO::Select->new;
my %node_list;
$pm->run_on_start(
sub {
my ($pid, $ident) = @_;
XB_Log::log "info", " [$procname] refresh $ident (fork pid: $pid)";
}
);
$pm->run_on_finish(
sub {
my ($pid, $val, $ident) = @_;
if($val){
XB_Log::log "info",
" [$procname] $ident done, pid: $pid exit: $val";
}else{
XB_Log::log "warning",
" [$procname] $ident failed, pid: $pid exit: $val";
}
$node_count--;
}
);
$pm->run_on_wait(
sub {
XB_Log::log "debug1",
" [$procname] waiting for $node_count processes";
}
);
for my $node (@nodes){
#=> create a pipe for the child to write back to parent
my $rh = IO::Handle->new;
my $wh = IO::Handle->new;
pipe $rh, $wh or die "pipe";
$node_list{$rh} = $node;
my ($k, $v, $msg, $appstr, $udp_sock);
#=> construct & sign the refresh message
for my $h (@{$rlist->{$ip}{$node}}){
while (($k, $v) = each %{$h}){ $appstr .= " ($k $v)\n"; }
}
$msg =
"(xbone-ctl $XB_Params::ctl_ver $XB_Params::rel_ver\n".
" (refresh\n". $appstr.
" )\n".
")\n".
"$XB_Params::msg_delimiter\n";
$msg = XB_SMIME::sign($msg, $XB_Params::node_opts{"node_cert"},
$XB_Params::node_opts{"node_key"});
#=> fork
my $pid = $pm->start($node)
and XB_Common::fork_parent_close($sel, $rh, $wh, $node)
and next;
#=> begin child process ============================================
# exception handling inside the child process: need to catch all
# dies by the child process and exit with different code
my $sent = 0;
eval{
#=> close the read handle
$rh->close or die "close";
#=> create udp socket
$udp_sock = XB_Common::udp_sock($node,
$XB_Params::node_opts{xbone_ctl_port}, $ip);
#=> send refresh message
unless (send($udp_sock, $msg, 0)){
XB_Log::log "err", " [$procname] error send to $node: $!"
and die "send";
}else{
$sent = 1;
XB_Log::log "info", " [$procname] send refresh to $node";
XB_Log::log "debug7", $msg;
}
#=> close udp socket
$udp_sock->close or
XB_Log::log "err", " [$procname] socket close failed: $!"
and die "close";
};
if($@){
print $wh "error: $@\n$XB_Params::error_reply";
}elsif($sent){
print $wh "sent\n";
}else{
print $wh "not sent, but also no error. weird.\n";
}
$pm->finish($sent);
#=> end child process ==============================================
}
#=> select msg sent, change state to selected
#$app->{state} = "selected";
while (my @handles = $sel->can_read){ # TODO set timeout
for my $h (@handles){
my $n = $node_list{$h};
XB_Log::log "debug1", " [$procname] receive from $n";
my @lines = $h->getlines;
my $ack_msg = join "", @lines;
my $ctl_cmd = $XB_CTL_parser::parser->xb_ctl($ack_msg);
XB_Log::log "debug1", " [$procname] result: $ack_msg";
if($ack_msg =~ /^sent/){ delete $node_list{$h}; }
$sel->remove($h);
$h->close or die "close";
}
}
$pm->wait_all_children;
#=> check if all ack'ed
my @missing = values %node_list;
XB_Log::log "debug1", " [$procname] error from the following nodes:\n".
(join ", ", @missing);
}
};
XB_Log::log "info", "<- $modname$procname";
return 1 unless $@;
unless($@ =~ /(pipe|close)/){
XB_Log::log "warning", " ! $procname caught unknown exception: $@";
}
die "$modname$procname";
}
1;
syntax highlighted by Code2HTML, v. 0.9.1