package Event::ExecFlow::Job::Group; use base qw( Event::ExecFlow::Job ); use strict; use Scalar::Util qw(weaken); sub get_type { "group" } sub get_jobs { shift->{jobs} } sub get_fail_with_members { shift->{fail_with_members} } sub get_stop_on_failure { shift->{stop_on_failure} } sub get_parallel { shift->{parallel} } sub get_scheduler { shift->{scheduler} } sub get_member_finished_callbacks { shift->{member_finished_callbacks} } sub set_jobs { shift->{jobs} = $_[1] } sub set_fail_with_members { shift->{fail_with_members} = $_[1] } sub set_stop_on_failure { shift->{stop_on_failure} = $_[1] } sub set_parallel { shift->{parallel} = $_[1] } sub set_member_finished_callbacks { shift->{member_finished_callbacks} = $_[1] } sub new { my $class = shift; my %par = @_; my ($jobs, $fail_with_members, $stop_on_failure) = @par{'jobs','fail_with_members','stop_on_failure'}; my ($parallel, $scheduler, $member_finished_callbacks) = @par{'parallel','scheduler','member_finished_callbacks'}; $jobs = [] unless defined $jobs; $fail_with_members = 1 unless defined $fail_with_members; $stop_on_failure = 1 unless defined $stop_on_failure; my $self = $class->SUPER::new(@_); for my $cb ( $member_finished_callbacks ) { $cb ||= Event::ExecFlow::Callbacks->new; $cb = Event::ExecFlow::Callbacks->new($cb) if ref $cb eq 'CODE'; } $self->set_jobs($jobs); $self->set_fail_with_members($fail_with_members); $self->set_stop_on_failure($stop_on_failure); $self->set_parallel($parallel); $self->set_scheduler($scheduler); $self->set_member_finished_callbacks($member_finished_callbacks); return $self; } sub set_frontend { my $self = shift; my ($frontend) = @_; $self->SUPER::set_frontend($frontend); $_->set_frontend($frontend) for @{$self->get_jobs}; return $frontend; } sub set_scheduler { my $self = shift; my ($scheduler) = @_; $self->{scheduler} = $scheduler; foreach my $job ( @{$self->get_jobs} ) { $job->set_scheduler($scheduler) if $job->get_type eq 'group'; } return $scheduler; } sub get_exec_type { my $self = shift; my $job = $self->get_next_job; return "sync" if not $job; return $job->get_exec_type; } sub get_diskspace_consumed { my $self = shift; my $sum = $self->SUPER::get_diskspace_consumed; $sum += $_->get_diskspace_consumed for @{$self->get_jobs}; return $sum; } sub get_diskspace_freed { my $self = shift; my $sum = $self->SUPER::get_diskspace_freed; $sum += $_->get_diskspace_freed for @{$self->get_jobs}; return $sum; } sub init { my $self = shift; $self->SUPER::init(); foreach my $job ( @{$self->get_jobs} ) { $job->set_group($self); weaken($job->{group}); $self->add_child_post_callback($job); } $self->set_progress_max($self->get_job_cnt); 1; } sub reset_non_finished_jobs { my $self = shift; if ( $self->get_state ne 'finished' ) { $self->set_state("waiting"); $self->set_cancelled(0); $self->set_error_message(); $self->get_frontend->report_job_progress($self); } foreach my $job ( @{$self->get_jobs} ) { if ( $job->get_state ne 'finished' ) { $job->set_state("waiting"); $job->set_cancelled(0); $job->set_error_message(); $self->get_frontend->report_job_progress($job); } $job->reset_non_finished_jobs if $job->get_type eq 'group'; } 1; } sub get_job_cnt { my $self = shift; my $cnt = 0; foreach my $job ( @{$self->get_jobs} ) { $cnt += $job->get_job_cnt; } return $cnt; } sub init_progress_state { my $self = shift; my $progress_cnt = 0; foreach my $job ( @{$self->get_jobs} ) { if ( $job->get_type eq 'group' ) { $job->init_progress_state; $progress_cnt += $job->get_progress_cnt; } else { ++$progress_cnt if $job->get_state eq 'finished' || $job->get_state eq 'error'; } } $self->set_progress_cnt($progress_cnt); $self->set_progress_max($self->get_job_cnt); $self->set_state("finished") if $self->get_progress_cnt == $self->get_progress_max; 1; } sub set_group_in_all_childs { my $self = shift; foreach my $job ( @{$self->get_jobs} ) { if ( $job->get_type eq 'group' ) { $job->set_group($self); weaken($job->{group}); $job->set_group_in_all_childs; } else { $job->set_group($self); weaken($job->{group}); } } 1; } sub increase_progress_max { my $self = shift; my ($add) = @_; my $job = $self; while ( $job ) { $job->set_progress_max($job->get_progress_max + $add); $job = $job->get_group; } 1; } sub decrease_progress_max { my $self = shift; my ($del) = @_; my $job = $self; while ( $job ) { $job->set_progress_max($job->get_progress_max - $del); $job = $job->get_group; } 1; } sub increase_progress_cnt { my $self = shift; my ($add) = @_; my $job = $self; while ( $job ) { $job->set_progress_cnt($job->get_progress_cnt + $add); $job = $job->get_group; } 1; } sub decrease_progress_cnt { my $self = shift; my ($del) = @_; my $job = $self; while ( $job ) { $job->set_progress_cnt($job->get_progress_cnt - $del); $job = $job->get_group; } 1; } sub add_job { my $self = shift; my ($job) = @_; push @{$self->get_jobs}, $job; $job->set_frontend($self->get_frontend); $job->set_group($self); weaken($job->{group}); my $job_cnt = $job->get_job_cnt; $self->increase_progress_max($job_cnt) if $job_cnt != 0; if ( $self->get_state eq 'finished' || $self->get_state eq 'error' ) { $self->set_state("waiting"); } $self->add_child_post_callback($job); $self->get_frontend->report_job_added($job); 1; } sub remove_job { my $self = shift; my ($job) = @_; my $jobs = $self->get_jobs; my $i; for ( $i=0; $i < @{$jobs}; ++$i ) { last if $jobs->[$i] eq $job; } die "Job with ID ".$job->get_id." no member of this group" if $i == @{$jobs}; splice @{$jobs}, $i, 1; my $job_cnt = $job->get_job_cnt; $self->decrease_progress_max($job_cnt) if $job_cnt != 0; $self->get_frontend->report_job_removed($job); 1; } sub get_job_by_name { my $self = shift; my ($job_name) = @_; foreach my $job ( @{$self->get_jobs} ) { return $job if $job->get_name eq $job_name; } die "Job '$job_name' not member of group '".$self->get_name."'"; } sub execute { my $self = shift; my %par = @_; my ($skip) = $par{'skip'}; $skip = "" if ! defined $skip; my $blocked_job; while ( 1 ) { if ( $self->get_cancelled || $self->all_jobs_finished || ( $self->get_error_message && $self->get_stop_on_failure ) ) { $self->execution_finished; if ( $self->get_scheduler && $self->get_scheduler->is_exclusive ) { $self->get_scheduler->run; } return; } return if $self->get_scheduler && $self->get_scheduler->is_exclusive; my $job = $self->get_next_job(blocked=>$blocked_job); next if defined $job && "$job" eq "$skip"; if ( !$job ) { $self->try_reschedule_jobs(skip => $skip); last; } if ( $self->get_scheduler ) { my $state = $self->get_scheduler->schedule_job($job); return if $state eq 'sched-blocked'; if ( $state eq 'job-blocked' ) { $blocked_job = $job; next; } die "Illegal scheduler state '$state'" unless $state eq 'ok'; } $self->start_child_job($job); last if !$self->get_parallel; } 1; } sub try_reschedule_jobs { my $self = shift; my %par = @_; my ($skip) = $par{'skip'}; my $executed = 0; foreach my $job ( @{$self->get_jobs} ) { next if "$job" eq "$skip"; # Parallel execution groups which are running now # probably can execute more job, so give it a try. if ( $job->get_type eq 'group' && $job->get_state eq 'running' && $job->get_parallel ) { $job->execute; $executed = 1; } } if ( !$executed && $self->get_group ) { $self->get_group->execute(skip => $self); } 1; } sub cancel { my $self = shift; $self->set_cancelled(1); $_->get_state eq 'running' && $_->cancel for @{$self->get_jobs}; 1; } sub pause_job { my $self = shift; $_->get_state eq 'running' && $_->pause for @{$self->get_jobs}; 1; } sub reset { my $self = shift; foreach my $job ( @{$self->get_jobs} ) { if ( $job->reset ) { $self->decrease_progress_cnt($job->get_job_cnt); } } $self->get_frontend->report_job_progress($self); return $self->SUPER::reset() if $self->get_progress_cnt == 0; 0; } sub add_child_post_callback { my $self = shift; my ($job) = @_; if ( $job->{_post_callbacks_added} ) { return; require Carp; Carp::confess($job->get_info.": callbacks added twice!"); } $job->{_post_callbacks_added} = 1; $job->get_post_callbacks->add( sub { my ($job) = @_; $self->child_job_finished($job); 1; }); 1; } sub start_child_job { my $self = shift; my ($job) = @_; $Event::ExecFlow::DEBUG && print "Group->start_child_job(".$job->get_info.")\n"; $self->set_progress_cnt(0) unless defined $self->get_progress_cnt; $self->get_frontend->report_job_progress($self); $job->start; 1; } sub child_job_finished { my $self = shift; my ($job) = @_; $Event::ExecFlow::DEBUG && print "Group->child_job_finished(".$job->get_info.")\n"; $self->get_member_finished_callbacks->execute() if $self->get_member_finished_callbacks; if ( $job->get_error_message && !$job->get_cancelled ) { if ( $self->get_fail_with_members ) { $self->set_state("error"); $self->add_job_error_message($job); } } if ( $self->get_scheduler ) { $self->get_scheduler->job_finished($job); } $self->execute; 1; } sub add_job_error_message { my $self = shift; my ($job) = @_; my $error_message = $self->get_error_message || ""; $error_message .= "Job '".$job->get_info."' ". "failed with error message:\n". $job->get_error_message."\n". ("-"x80)."\n"; $self->set_error_message($error_message); 1; } sub get_first_job { my $self = shift; return $self->get_jobs->[0]; } sub get_next_job { my $self = shift; my %par = @_; my ($blocked) = $par{'blocked'}; $blocked = "" if ! defined $blocked; my $next_job; foreach my $job ( @{$self->get_jobs} ) { next if defined $job && "$job" eq "$blocked"; $Event::ExecFlow::DEBUG && print "Group(".$self->get_info.")->get_next_job: check ".$job->get_info."=>".$job->get_state."\n"; if ( $job->get_state eq 'waiting' && $self->dependencies_ok($job) ) { $next_job = $job; last; } } $Event::ExecFlow::DEBUG && print "Group(".$self->get_info.")->get_next_job=". ($next_job ? $next_job->get_info : "NOJOB")."\n"; return $next_job; } sub dependencies_ok { my $self = shift; my ($job) = @_; foreach my $dep_job_name ( @{$job->get_depends_on} ) { my $dep_job = $self->get_job_by_name($dep_job_name); $Event::ExecFlow::DEBUG && print "Job(".$job->get_info.")->dependencies_ok: check ".$dep_job->get_info." =>".$dep_job->get_state."\n"; return if $dep_job->get_state ne 'finished'; } return 1; } sub all_jobs_finished { my $self = shift; foreach my $job ( @{$self->get_jobs} ) { return 0 if $job->get_state eq 'waiting' || $job->get_state eq 'error' || $job->get_state eq 'running'; } return 1; } sub get_max_diskspace_consumed { my $self = shift; my ($currently_consumed, $max_consumed) = @_; foreach my $job ( @{$self->get_jobs} ) { ($currently_consumed, $max_consumed) = $job->get_max_diskspace_consumed ($currently_consumed, $max_consumed); } return ($currently_consumed, $max_consumed); } sub backup_state { my $self = shift; my $data_href = $self->SUPER::backup_state(); delete $data_href->{jobs}; delete $data_href->{scheduler}; delete $data_href->{member_finished_callbacks}; my $jobs = $self->get_jobs; foreach my $job ( @{$jobs} ) { push @{$data_href->{jobs}}, $job->backup_state; } return $data_href; } sub restore_state { my $self = shift; my ($data_href) = @_; my $jobs = $self->get_jobs; $self->SUPER::restore_state($data_href); my $job_states = delete $self->{jobs}; my $i = 0; foreach my $job ( @{$jobs} ) { $job->restore_state($job_states->[$i]); ++$i; } $self->set_jobs($jobs); 1; } sub add_stash_to_all_jobs { my $self = shift; my ($add_stash) = @_; $self->add_stash($add_stash); foreach my $job ( @{$self->get_jobs} ) { if ( $job->get_type eq 'group' ) { $job->add_stash_to_all_jobs($add_stash); } else { $job->add_stash($add_stash); } } } sub traverse_all_jobs { my $self = shift; my ($code) = @_; foreach my $job ( @{$self->get_jobs} ) { $code->($job); if ( $job->get_type eq 'group' ) { $job->traverse_all_jobs($code); } } 1; } sub get_job_with_id { my $self = shift; my ($job_id) = @_; my $job; $self->traverse_all_jobs(sub{ $job = $_[0] if $_[0]->get_id eq $job_id; }); return $job; } 1; __END__ =head1 NAME Event::ExecFlow::Job::Group - Build a group of jobs =head1 SYNOPSIS Event::ExecFlow::Job::Group->new ( jobs => List of job group members, fail_with_members => Boolean whether group should fail with its members, stop_on_failure => Boolean whether execuction should stop on failure, parallel => Boolean whether members may be executed in parallel, scheduler => Scheduler object for add. control of par. execution, ... Event::ExecFlow::Job attributes ); =head1 DESCRIPTION Use this module to group together jobs of any type, including groups, which results in arbitrary complex nested job plans. =head1 OBJECT HIERARCHY Event::ExecFlow Event::ExecFlow::Job +--- Event::ExecFlow::Job::Group Event::ExecFlow::Frontend Event::ExecFlow::Callbacks =head1 ATTRIBUTES Attributes can by accessed at runtime using the common get_ATTR(), set_ATTR() style accessors. [ FIXME: describe all attributes in detail ] =head1 METHODS [ FIXME: describe all methods in detail ] =head1 AUTHORS Jörn Reder =head1 COPYRIGHT AND LICENSE Copyright 2005-2006 by Jörn Reder. This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307 USA. =cut