# -*- mode: perl -*- # ============================================================================ package Net::SNMP::Dispatcher; # $Id: Dispatcher.pm,v 3.1 2005/10/20 14:17:01 dtown Rel $ # Object the dispatches SNMP messages and handles the scheduling of events. # Copyright (c) 2001-2005 David M. Town # All rights reserved. # This program is free software; you may redistribute it and/or modify it # under the same terms as Perl itself. # ============================================================================ use strict; use Errno; use Net::SNMP::MessageProcessing(); use Net::SNMP::Message qw( TRUE FALSE ); ## Version of the Net::SNMP::Dispatcher module our $VERSION = v3.0.1; ## Package variables our $INSTANCE; # Reference to our Singleton object our $DEBUG = FALSE; # Debug flag our $MESSAGE_PROCESSING; # Reference to the Message Processing object ## Event array indexes sub _ACTIVE() { 0 } # State of the event sub _TIME() { 1 } # Execution time sub _CALLBACK() { 2 } # Callback reference sub _PREVIOUS() { 3 } # Previous event sub _NEXT() { 4 } # Next event BEGIN { # Use a higher resolution of time() if the Time::HiRes module is available. if (eval('require Time::HiRes')) { Time::HiRes->import('time'); } # Validate the creation of the Message Processing object. if (!defined($MESSAGE_PROCESSING = Net::SNMP::MessageProcessing->instance)) { die('FATAL: Failed to create Message Processing instance'); } } # [public methods] ----------------------------------------------------------- sub instance { $INSTANCE ||= Net::SNMP::Dispatcher->_new; } sub activate { my ($this) = @_; # Return immediately if the Dispatcher is already active. return TRUE if ($this->{_active}); # Indicate that the Dispatcher is active and block # on select() calls. $this->{_active} = TRUE; $this->{_blocking} = TRUE; while (defined($this->{_event_queue_h})) { $this->_event_handle; } # Flag the Dispatcher as not active $this->{_active} = FALSE; } sub one_event { my ($this) = @_; # Return immediately if the Dispatcher is already active. return TRUE if ($this->{_active}); # Indicate that the Dispatcher is active and DO NOT # block on select() calls. $this->{_active} = TRUE; $this->{_blocking} = FALSE; $this->_event_handle; # Flag the Dispatcher as not active $this->{_active} = FALSE; defined($this->{_event_queue_h}) ? TRUE : FALSE; } sub listen { my ($this) = @_; # Return immediately if the Dispatcher is already active. return TRUE if ($this->{_active}); # Indicate that the Dispatcher is active and block # on select() calls. $this->{_active} = TRUE; $this->{_blocking} = TRUE; while (defined($this->{_event_queue_h}) || keys(%{$this->{_descriptors}})) { # Handle queued events events: while (defined($this->{_event_queue_h})) { $this->_event_handle; } # Block on select() until there is file descriptor activity. DEBUG_INFO('waiting for activity'); $this->_event_select(undef); } # Flag the Dispatcher as not active $this->{_active} = FALSE; } sub send_pdu { my ($this, $pdu, $delay) = @_; # Clear any previous errors $this->_error_clear; if ((@_ < 2) || !ref($pdu)) { return $this->_error('Required PDU missing'); } # If the Dispatcher is active and the delay value is negative, # send the message immediately. if ($delay < 0) { if ($this->{_active}) { return $this->_send_pdu($pdu, $pdu->timeout, $pdu->retries); } $delay = 0; } $this->schedule($delay, [\&_send_pdu, $pdu, $pdu->timeout, $pdu->retries]); TRUE; } sub return_response_pdu { my ($this, $pdu) = @_; $this->send_pdu($pdu, -1); } sub schedule { my ($this, $time, $callback) = @_; $this->_event_create($time, $this->_callback_create($callback)); } sub cancel { my ($this, $event) = @_; $this->_event_delete($event); } sub register { my ($this, $transport, $callback) = @_; # Transport Domain and file descriptor must be valid. my $fileno; if ((!defined($transport)) || (!defined($fileno = $transport->fileno))) { return $this->_error('Invalid Transport Domain'); } # NOTE: The callback must read the data associated with the # file descriptor or the Dispatcher will continuously # call the callback and get stuck in an infinite loop. if (!exists($this->{_descriptors}->{$fileno})) { DEBUG_INFO('adding descriptor [%d]', $fileno); $this->{_rin} = '' unless defined($this->{_rin}); # Add the file descriptor to the list $this->{_descriptors}->{$fileno} = [ $this->_callback_create($callback), # Callback $transport, # Transport Domain object 1 # Reference count ]; # Add the file descriptor to the "readable" vector vec($this->{_rin}, $fileno, 1) = 1; } else { # Bump up the reference count $this->{_descriptors}->{$fileno}->[2]++; } # Return the Transport Domain object $transport; } sub deregister { my ($this, $transport) = @_; # Transport Domain and file descriptor must be valid. my $fileno; if ((!defined($transport)) || (!defined($fileno = $transport->fileno))) { return $this->_error('Invalid Transport Domain'); } if (exists($this->{_descriptors}->{$fileno})) { # Check reference count if (--$this->{_descriptors}->{$fileno}->[2] < 1) { DEBUG_INFO('removing descriptor [%d]', $fileno); # Remove the file descriptor from the list delete($this->{_descriptors}->{$fileno}); # Remove the file descriptor from the "readable" vector vec($this->{_rin}, $fileno, 1) = 0; # Undefine the vector if there are no file descriptors, # some systems expect this to make select() work properly. $this->{_rin} = undef unless keys(%{$this->{_descriptors}}); } } else { return $this->_error('Not registered for this Transport Domain'); } # Return the Transport Domain object $transport; } sub error { $_[0]->{_error} || ''; } sub debug { (@_ == 2) ? $DEBUG = ($_[1]) ? TRUE : FALSE : $DEBUG; } # [private methods] ---------------------------------------------------------- sub _new { my ($class) = @_; # The constructor is private since we only want one # Dispatcher object. bless { '_active' => FALSE, # State of this Dispatcher object '_blocking' => TRUE, # Block on select() '_error' => undef, # Error message '_event_queue_h' => undef, # Head of the event queue '_event_queue_t' => undef, # Tail of the event queue '_rin' => undef, # Readable vector for select() '_descriptors' => {}, # List of file descriptors to monitor }, $class; } sub _send_pdu { my ($this, $pdu, $timeout, $retries) = @_; # Pass the PDU to Message Processing so that it can # create the new outgoing message. my $msg = $MESSAGE_PROCESSING->prepare_outgoing_msg($pdu); if (!defined($msg)) { # Inform the command generator about the Message Processing error. $pdu->status_information($MESSAGE_PROCESSING->error); return; } # Actually send the message. if (!defined($msg->send)) { if ($pdu->expect_response) { $MESSAGE_PROCESSING->msg_handle_delete($pdu->msg_id); } # A crude attempt to recover from temporary failures. if (($retries-- > 0) && ($!{EAGAIN} || $!{EWOULDBLOCK})) { DEBUG_INFO('attempting recovery from temporary failure'); $this->schedule($timeout, [\&_send_pdu, $pdu, $timeout, $retries]); return FALSE; } # Inform the command generator about the send() error. $pdu->status_information($msg->error); return; } # Schedule the timeout handler if the message expects a response. if ($pdu->expect_response) { $this->register($msg->transport, [\&_transport_response_received]); $msg->timeout_id( $this->schedule( $timeout, [\&_transport_timeout, $pdu, $timeout, $retries] ) ); } TRUE; } sub _transport_timeout { my ($this, $pdu, $timeout, $retries) = @_; # Stop waiting for responses $this->deregister($pdu->transport); if ($retries-- > 0) { # Resend a new message. DEBUG_INFO('retries left %d', $retries); $this->_send_pdu($pdu, $timeout, $retries); } else { # Delete the msgHandle. $MESSAGE_PROCESSING->msg_handle_delete($pdu->msg_id); # Inform the command generator about the timeout. $pdu->status_information( "No response from remote host '%s'", $pdu->hostname ); return; } } sub _transport_response_received { my ($this, $transport) = @_; # Clear any previous errors $this->_error_clear; die('FATAL: Invalid Transport Domain') unless ref($transport); # Create a new Message object to receive the response my ($msg, $error) = Net::SNMP::Message->new(-transport => $transport); if (!defined($msg)) { die sprintf('Failed to create Message object [%s]', $error); } # Read the message from the Transport Layer if (!defined($msg->recv)) { $this->deregister($transport) unless ($transport->connectionless); return $this->_error($msg->error); } # For connection-oriented Transport Domains, it is possible to # "recv" an empty buffer if reassembly is required. if (!$msg->length) { DEBUG_INFO('ignoring zero length message'); return FALSE; } # Hand the message over to Message Processing. if (!defined($MESSAGE_PROCESSING->prepare_data_elements($msg))) { return $this->_error($MESSAGE_PROCESSING->error); } # Set the error if applicable. $msg->error($MESSAGE_PROCESSING->error) if ($MESSAGE_PROCESSING->error); # Cancel the timeout. $this->cancel($msg->timeout_id); # Stop waiting for responses. $this->deregister($transport); # Notify the command generator to process the response. $msg->process_response_pdu; } sub _event_create { my ($this, $time, $callback) = @_; # Create a new event anonymous array and add it to the queue. # The event is initialized based on the currrent state of the # Dispatcher object. If the Dispatcher is not currently running # the event needs to be created such that it will get properly # initialized when the Dispatcher is started. $this->_event_insert( [ $this->{_active}, # State of the object $this->{_active} ? time() + $time : $time, # Execution time $callback, # Callback reference undef, # Previous event undef, # Next event ] ); } sub _event_insert { my ($this, $event) = @_; # If the head of the list is not defined, we _must_ be the only # entry in the list, so create a new head and tail reference. if (!defined($this->{_event_queue_h})) { DEBUG_INFO('created new head and tail [%s]', $event); return $this->{_event_queue_h} = $this->{_event_queue_t} = $event; } # Estimate the midpoint of the list by calculating the average of # the time associated with the head and tail of the list. Based # on this value either start at the head or tail of the list to # search for an insertion point for the new Event. my $midpoint = (($this->{_event_queue_h}->[_TIME] + $this->{_event_queue_t}->[_TIME]) / 2); if ($event->[_TIME] >= $midpoint) { # Search backwards from the tail of the list for (my $e = $this->{_event_queue_t}; defined($e); $e = $e->[_PREVIOUS]) { if ($e->[_TIME] <= $event->[_TIME]) { $event->[_PREVIOUS] = $e; $event->[_NEXT] = $e->[_NEXT]; if ($e eq $this->{_event_queue_t}) { DEBUG_INFO('modified tail [%s]', $event); $this->{_event_queue_t} = $event; } else { DEBUG_INFO('inserted [%s] into list', $event); $e->[_NEXT]->[_PREVIOUS] = $event; } return $e->[_NEXT] = $event; } } DEBUG_INFO('added [%s] to head of list', $event); $event->[_NEXT] = $this->{_event_queue_h}; $this->{_event_queue_h} = $this->{_event_queue_h}->[_PREVIOUS] = $event; } else { # Search forward from the head of the list for (my $e = $this->{_event_queue_h}; defined($e); $e = $e->[_NEXT]) { if ($e->[_TIME] > $event->[_TIME]) { $event->[_NEXT] = $e; $event->[_PREVIOUS] = $e->[_PREVIOUS]; if ($e eq $this->{_event_queue_h}) { DEBUG_INFO('modified head [%s]', $event); $this->{_event_queue_h} = $event; } else { DEBUG_INFO('inserted [%s] into list', $event); $e->[_PREVIOUS]->[_NEXT] = $event; } return $e->[_PREVIOUS] = $event; } } DEBUG_INFO('added [%s] to tail of list', $event); $event->[_PREVIOUS] = $this->{_event_queue_t}; $this->{_event_queue_t} = $this->{_event_queue_t}->[_NEXT] = $event; } } sub _event_delete { my ($this, $event) = @_; my $info = ''; # Update the previous event if (defined($event->[_PREVIOUS])) { $event->[_PREVIOUS]->[_NEXT] = $event->[_NEXT]; } elsif ($event eq $this->{_event_queue_h}) { if (defined($this->{_event_queue_h} = $event->[_NEXT])) { $info = sprintf(', defined new head [%s]', $event->[_NEXT]); } else { DEBUG_INFO('deleted [%s], list is now empty', $event); $this->{_event_queue_t} = undef @{$event}; return FALSE; # Indicate queue is empty } } else { die('FATAL: Attempt to delete invalid Event head'); } # Update the next event if (defined($event->[_NEXT])) { $event->[_NEXT]->[_PREVIOUS] = $event->[_PREVIOUS]; } elsif ($event eq $this->{_event_queue_t}) { $info .= sprintf(', defined new tail [%s]', $event->[_PREVIOUS]); $this->{_event_queue_t} = $event->[_PREVIOUS]; } else { die('FATAL: Attempt to delete invalid Event tail'); } DEBUG_INFO('deleted [%s]%s', $event, $info); undef @{$event}; # Indicate queue still has entries TRUE; } sub _event_init { my ($this, $event) = @_; DEBUG_INFO('initializing event [%s]', $event); # Save the time and callback because they will be cleared. my ($time, $callback) = @{$event}[_TIME, _CALLBACK]; # Remove the event from the queue. $this->_event_delete($event); # Update the appropriate fields. $event->[_ACTIVE] = $this->{_active}; $event->[_TIME] = $this->{_active} ? time() + $time : $time; $event->[_CALLBACK] = $callback; # Insert the event back into the queue. $this->_event_insert($event); TRUE; } sub _event_handle { my ($this) = @_; # Events are sorted by time, so the event at the head of the list # is the next event that needs to be executed. return FALSE unless defined(my $event = $this->{_event_queue_h}); # Calculate a timeout based on the current time and the lowest # event time (if the event does not need initialized). my $timeout = ($event->[_ACTIVE]) ? ($event->[_TIME] - time()) : 0; # If the timeout is less than 0, we are running late. Adjust the # the timeout to poll the descriptors before acting on the event. if ($timeout < 0) { DEBUG_INFO('event [%s], skew = %f', $event, -$timeout); $timeout = 0; } else { DEBUG_INFO('event [%s], poll delay = %f', $event, $timeout); } # Check the file descriptors for activity. If there has been any # activity, we must return because the activity could have cancelled # the event or returned control here before the event is ready to # be acted upon. return TRUE if ($this->_event_select($this->{_blocking} ? $timeout : 0)); # If we are not blocking and the timeout is non-zero, then it is # not time to act on the event. return TRUE if ((!$this->{_blocking}) && ($timeout > 0)); # If we made it here, we can finally act on the event. If the event # was inserted with a non-zero delay while the Dispatcher was not # active, the execution time of the event needs to be updated. if ((!$event->[_ACTIVE]) && ($event->[_TIME] > 0)) { return $this->_event_init($event); } else { $this->_callback_execute($event->[_CALLBACK]); } # Once we reach here, we are done with the event, so remove it # from the head of the queue. $this->_event_delete($event); } sub _event_select { my ($this, $timeout) = @_; my $nfound = select(my $rout = $this->{_rin}, undef, undef, $timeout); if ((!defined($nfound)) || ($nfound < 0)) { if ($!{EINTR}) { # Recoverable error return TRUE; } else { die sprintf('FATAL: select() error [%s]', $!); } } elsif ($nfound > 0) { # Find out which file descriptors have data ready for reading. if (defined($rout)) { foreach (keys(%{$this->{_descriptors}})) { if (vec($rout, $_, 1)) { DEBUG_INFO('descriptor [%d] ready for read', $_); $this->_callback_execute(@{$this->{_descriptors}->{$_}}[0,1]); } } } return TRUE; } # No file descriptor activity. FALSE; } sub _callback_create { my ($this, $callback) = @_; return unless (@_ == 2); # Callbacks can be passed in two different ways. If the callback # has options, the callback must be passed as an ARRAY reference # with the first element being a CODE reference and the remaining # elements the arguments. If the callback has no options it is # just passed as a CODE reference. if ((ref($callback) eq 'ARRAY') && (ref($callback->[0]) eq 'CODE')) { $callback; } elsif (ref($callback) eq 'CODE') { [$callback]; } else { return; } } sub _callback_execute { # my ($this, @argv) = @_; return unless (@_ > 1) && defined($_[1]); # The callback is invoked passing a reference to this object # with the parameters passed by the user next and then any # parameters that we provide. my $this = shift(@_); my @argv = @{shift(@_)}; my $cb = shift(@argv); # Protect ourselves from user error. eval { $cb->($this, @argv, @_); }; ($@) ? $this->_error($@) : TRUE; } sub _error { my $this = shift; if (!defined($this->{_error})) { $this->{_error} = (@_ > 1) ? sprintf(shift(@_), @_) : $_[0]; if ($this->debug) { printf("error: [%d] %s(): %s\n", (caller(0))[2], (caller(1))[3], $this->{_error} ); } } return; } sub _error_clear { $_[0]->{_error} = undef; } sub DEBUG_INFO { return unless $DEBUG; printf( sprintf('debug: [%d] %s(): ', (caller(0))[2], (caller(1))[3]) . ((@_ > 1) ? shift(@_) : '%s') . "\n", @_ ); $DEBUG; } # ============================================================================ 1; # [end Net::SNMP::Dispatcher]