Bio::EnsEMBL::Pipeline::Finished Job
SummaryIncluded librariesPackage variablesSynopsisDescriptionGeneral documentationMethods
Toolbar
WebCvsRaw content
Summary
Bio::EnsEMBL::Pipeline::Finished::Job
Package variables
Privates (from "my" definitions)
%BATCH_QUEUES = &set_up_queues
$file = "$batch_q_module.pm"
$batch_q_module = "Bio::EnsEMBL::Pipeline::BatchSubmission::$QUEUE_MANAGER"
Included modules
Bio::EnsEMBL::Analysis::Tools::Logger
Bio::EnsEMBL::Pipeline::Config::BatchQueue
Bio::EnsEMBL::Pipeline::Config::General
Bio::EnsEMBL::Pipeline::Job
Bio::EnsEMBL::Utils::Argument qw ( rearrange )
Bio::EnsEMBL::Utils::Exception qw ( verbose throw warning info )
Inherit
Bio::EnsEMBL::Pipeline::Job
Synopsis
Description
Run a Finished analysis job.
Allow to save the database version and the runtime info in input_id_analysis.
Methods
batch_runRemoteDescriptionCode
flush_runsDescriptionCode
priority
No description
Code
run_moduleDescriptionCode
set_up_queues
No description
Code
set_update_value
No description
Code
update
No description
Code
Methods description
batch_runRemotecode    nextTop
  Title   : batch_runRemote
Usage : $job->batch_runRemote
Function: see parent class
Returns :
Args : Is static, private function, dont call with arrow notation.
flush_runscodeprevnextTop
  Title   : flush_runs
Usage : $job->flush_runs( jobadaptor, [queue] );
Function: Methode extended to handle the failed 'out of memory' Jobs and use the big memory queue.
Returns :
Args :
run_modulecodeprevnextTop
Run the job
Methods code
batch_runRemotedescriptionprevnextTop
sub batch_runRemote {
	my ($self) = @_;
	my $submitted = 1;
	my $queue;
	my $dbname = $self->adaptor->db->dbc->dbname();
	my $host = $self->adaptor->db->dbc->host();

	if ( !exists( $BATCH_QUEUES{ $self->analysis->logic_name } ) ) {
		$queue = 'default';
	}
	else {
		$queue = $self->analysis->logic_name;
	}
	# add job to batch jobs array
my $batch_jobs = $BATCH_QUEUES{$queue}{'jobs'}; $batch_jobs->{$host}->{$dbname} = [] unless $batch_jobs->{$host}->{$dbname}; push @{ $batch_jobs->{$host}->{$dbname} }, $self->dbID; # maximum batch jobs size
my $batch_size = $BATCH_QUEUES{$queue}{'batch_size'}; if(scalar(@$batch_size) > 1){ $batch_size = ($self->update == 1) ? @$batch_size[1] : @$batch_size[0]; } else { $batch_size = @$batch_size[0]; } if ( scalar( @{ $batch_jobs->{$host}->{$dbname} } ) >= $batch_size ) { $self->flush_runs( $self->adaptor, $queue ); } else { $submitted = 0; } return $submitted;
}
flush_runsdescriptionprevnextTop
sub flush_runs {
	my ( $self, $adaptor, $queue, $verbose ) = @_;
	# flush_runs is optionally sent a queue to deal with
# @analyses is a list of logic_names (strings)
my @analyses = ($queue) || ( keys %BATCH_QUEUES ); if ( !defined $adaptor ) { throw("Cannot run remote without db connection"); } local *FILE; my $dbc = $adaptor->db->dbc; my $host = $dbc->host; my $username = $dbc->username; my $dbname = $dbc->dbname; my $pass = $dbc->password; my $port = $dbc->port; # runner.pl: first look at value set in RuleManager ($RUNNER_SCRIPT)
# then in same directory as Job.pm,
# and fail if not found
my $runner = $self->runner; if ( !$runner || !-x $runner ) { $runner = __FILE__; $runner =~ s:/[^/]*$:/runner.pl:; my $caller = caller(0); throw( "runner " . $runner . " not found - needs to be set in " . "$caller\n" ) unless -x $runner; } ANAL: for my $anal (@analyses) { my $queue = $BATCH_QUEUES{$anal}; my @job_ids; @job_ids = @{ $queue->{'jobs'}->{$host}->{$dbname} } if ($queue->{'jobs'}->{$host}->{$dbname}); if ( !@job_ids ) { next ANAL; } print "\t\t$anal\t".scalar(@job_ids)." jobs\n" if $verbose; my $this_runner = $queue->{'runner'}; $this_runner = ( -x $this_runner ) ? $this_runner : $runner; my $lastjob = $adaptor->fetch_by_dbID( $job_ids[-1] ); while( !$lastjob && @job_ids) { pop @job_ids; $lastjob = $adaptor->fetch_by_dbID( $job_ids[-1] ) if($job_ids[-1]); } if ( !$lastjob ) { next ANAL; } my $pre_exec = $this_runner . " -check -output_dir " . $self->output_dir; my $farm_queue = $queue->{'queue'}; my $farm_resource = $queue->{'resource'}; my $param = $queue->{'sub_args'}.' -sp '.$self->priority.' '; if ( $self->priority == $BIG_MEM_PRIORITY ) { $farm_queue = $BIG_MEM_QUEUE; $farm_resource = $BIG_MEM_RESOURCE; $param .= $BIG_MEM_PARAM; } if ( $self->priority == $LONG_JOB_PRIORITY ) { $farm_queue = $LONG_JOB_QUEUE; } # change job mysql load ressource
if($host eq 'otterpipe1') { $farm_resource =~ s/otterp2/otterp1/; } else { $farm_resource =~ s/otterp1/otterp2/; } my $batch_job = $batch_q_module->new( -STDOUT => $lastjob->stdout_file, -STDERR => $lastjob->stderr_file, -PARAMETERS => $param, -PRE_EXEC => $pre_exec, -QUEUE => $farm_queue, -JOBNAME => $dbname . ':' . $anal, -NODES => $queue->{'nodes'}, -RESOURCE => $farm_resource ); my $cmd; # check if the password has been defined, and write the
# "connect" command line accordingly otherwise -pass gets the
# first job id as password, instead of remaining undef
if ($pass) { $cmd = $runner . " -dbhost $host -dbuser $username -dbname $dbname -dbpass $pass -dbport $port"; } else { $cmd = $runner . " -dbhost $host -dbuser $username -dbname $dbname -dbport $port"; } $cmd .= " -output_dir " . $self->output_dir; $cmd .= " -queue_manager $QUEUE_MANAGER "; if ( $self->cleanup ) { $cmd .= " -cleanup "; } $cmd .= " @job_ids"; $batch_job->construct_command_line($cmd); eval { # SMJS LSF Specific for debugging
#print "Submitting: ", $batch_job->bsub, "\n";
$batch_job->open_command_line(); }; if ($@) { print STDERR "Couldnt batch submit @job_ids\n [$@]\n"; print STDERR "Using " . $batch_job->bsub . "\n"; foreach my $job_id (@job_ids) { my $job = $adaptor->fetch_by_dbID($job_id); $job->set_status("FAILED"); } } else { my @jobs = $adaptor->fetch_by_dbID_list(@job_ids); foreach my $job (@jobs) { if ( $job->retry_count > 0 ) { for ( $job->stdout_file, $job->stderr_file ) { open( FILE, ">" . $_ ); close(FILE); } } if ( $batch_job->id ) { $job->submission_id( $batch_job->id ); } else { # submission seems to have succeeded, but we didnt
# get a job ID. Safest NOT to raise an error here,
# (a warning would have already issued) but flag
print STDERR "Job: Null submission ID for the following, but continuing: @job_ids\n"; $job->submission_id(0); } $job->retry_count( $job->retry_count + 1 ); $job->set_status("SUBMITTED"); $job->stdout_file( $lastjob->stdout_file ); $job->stderr_file( $lastjob->stderr_file ); } $adaptor->update(@jobs); } $queue->{'jobs'}->{$host}->{$dbname} = []; $queue->{'last_flushed'}->{$host}->{$dbname} = time; }
}
prioritydescriptionprevnextTop
sub priority {
	my ($self,$priority) = @_;
	if ($priority) {
    	$self->{priority} = $priority;
  	}
	if ( !$self->{priority} ) {
		my $ln       = $self->analysis->logic_name;
		my $p = $BATCH_QUEUES{$ln}{priority};
		throw("Priority for $ln not set in BatchQueue file\n") unless $p;
		if ( scalar(@$p) == 1 ) {
			$self->{priority} = $p->[0];
		}
		else {
			$self->set_update_value;
			$self->{priority} = $self->update ? $p->[1] : $p->[0];
		}
	}

	return $self->{priority};
}

# update values: 0 no update, new clone analysis; 1 update only patch file; 2 update patch and release files;
}
run_moduledescriptionprevnextTop
sub run_module {
	my $self               = shift;
	my $is_dbversion_saved = 0;

	# start timer
my $start = time; my $module = $self->analysis->module; my $hash_key = $self->analysis->logic_name; my $rdb; my ( $err, $res ); print "Running " . $module . " with " . $self . "\n"; if ( !exists( $BATCH_QUEUES{$hash_key} ) ) { $hash_key = 'default'; } my $runnable_db_path = $BATCH_QUEUES{$hash_key}{runnabledb_path}; my $verbosity = $BATCH_QUEUES{$hash_key}{verbosity}; my $perl_path; #print STDERR "Getting ".$hash_key." batchqueue value\n";
if ( $module =~ /::/ ) { #print STDERR "Module contains path info already\n";
$module =~ s/::/\//g; $perl_path = $module; } elsif ($runnable_db_path) { $perl_path = $runnable_db_path . "/" . $module; } else { $perl_path = $module; } my $current_verbosity = logger_verbosity; verbose($verbosity); logger_verbosity($verbosity); #print STDERR "have perlpath ".$perl_path."\n";
STATUS: { eval { require $perl_path . ".pm"; $perl_path =~ s/\//::/g; $rdb = $perl_path->new( -analysis => $self->analysis, -input_id => $self->input_id, -db => $self->adaptor->db, -verbosity => $verbosity ); }; if ( $err = $@ ) { print( STDERR "CREATE: Lost the will to live Error\n" ); $self->set_status("FAILED"); throw( "Problems creating runnable $module for " . $self->input_id . " [$err]\n" ); } # "READING"
print "READING\n"; eval { $self->set_status("READING"); $res = $rdb->fetch_input; }; if ( $err = $@ ) { $self->set_status("FAILED"); print( STDERR "READING: Lost the will to live Error\n" ); throw( "Problems with $module fetching input for " . $self->input_id . " [$err]\n" ); } if ( $rdb->input_is_void ) { $self->set_status("VOID"); } else { print "RUNNING\n"; # "RUNNING"
eval { $self->set_status("RUNNING"); $rdb->db->dbc->disconnect_when_inactive(1); $rdb->run; $rdb->db->dbc->disconnect_when_inactive(0); }; if ( $err = $@ ) { print STDERR $@ . "\n"; if ( my $err_state = $rdb->failing_job_status ) { $self->set_status($err_state); if ( $err_state eq 'VOID' ) { last STATUS; } } else { $self->set_status("FAILED"); # default to just failed
#these jobs get retried
} print( STDERR "RUNNING: Lost the will to live Error\n" ); throw( "Problems running $module for " . $self->input_id . " [$err]\n" ); } # "WRITING"
print "WRITING\n"; eval { $self->set_status("WRITING"); $rdb->write_output; if ( $rdb->can('db_version_searched') ) { my $new_db_version = $rdb->db_version_searched(); my $analysis = $self->analysis(); my $old_db_version = $analysis->db_version(); $analysis->db_version($new_db_version); #$self->adaptor->db->get_AnalysisAdaptor->update($analysis);
$is_dbversion_saved = 1; } $self->set_status("SUCCESSFUL"); }; if ( $err = $@ ) { $self->set_status("FAILED"); print( STDERR "WRITING: Lost the will to live Error\n" ); throw( "Problems for $module writing output for " . $self->input_id . " [$err] [ $?]" ); } } } # end timer
my $end = time; # Run time in seconds
my $runtime = $end - $start; logger_verbosity($current_verbosity); # update job in StateInfoContainer
eval { my $sic = $self->adaptor->db->get_StateInfoContainer; $sic->store_input_id_analysis( $self->input_id, $self->analysis, $self->execution_host, $is_dbversion_saved, $runtime ); }; if ( $err = $@ ) { my $error_msg = "Job finished successfully, but could not be " . "recorded as finished. Job : [" . $self->input_id . "]\n[$err]"; eval { $self->set_status("FAIL_NO_RETRY"); }; $error_msg .= ( "(And furthermore) Encountered an error in updating the job to status failed_no_retry.\n[$@]" ) if $@; throw($error_msg); } else { print STDERR "Updated successful job " . $self->dbID . "\n"; }
}
set_up_queuesdescriptionprevnextTop
sub set_up_queues {
	my %q;

	foreach my $queue (@$QUEUE_CONFIG) {
		my $ln = $queue->{logic_name};

		next unless $ln;

		while ( my ( $k, $v ) = each %$queue ) {
			$q{$ln}{$k} = $v;
		}
		$q{$ln}{jobs}         = {};
		$q{$ln}{last_flushed} = undef;
		$q{$ln}{batch_size}      ||= $DEFAULT_BATCH_SIZE;
		$q{$ln}{queue}           ||= $DEFAULT_BATCH_QUEUE;
		$q{$ln}{retries}         ||= $DEFAULT_RETRIES;
		$q{$ln}{cleanup}         ||= $DEFAULT_CLEANUP;
		$q{$ln}{runnabledb_path} ||= $DEFAULT_RUNNABLEDB_PATH;
		$q{$ln}{output_dir}      ||= $DEFAULT_OUTPUT_DIR;
		$q{$ln}{runner}          ||= $DEFAULT_RUNNER;
		$q{$ln}{verbosity}       ||= $DEFAULT_VERBOSITY;
	}

	# a default queue for everything else
if ( !exists( $q{default} ) ) { $q{default}{jobs} = {}; $q{default}{last_flushed} = undef; } # Need these set, so do the ||= thing
$q{default}{batch_size} ||= $DEFAULT_BATCH_SIZE; $q{default}{queue} ||= $DEFAULT_BATCH_QUEUE; $q{default}{retries} ||= $DEFAULT_RETRIES; $q{default}{cleanup} ||= $DEFAULT_CLEANUP; $q{default}{runnabledb_path} ||= $DEFAULT_RUNNABLEDB_PATH; $q{default}{output_dir} ||= $DEFAULT_OUTPUT_DIR; $q{default}{runner} ||= $DEFAULT_RUNNER; $q{default}{verbosity} ||= $DEFAULT_VERBOSITY; return %q; } 1;
}
set_update_valuedescriptionprevnextTop
sub set_update_value {
	my ($self) = @_;
	my $update_value = 0;
	my $sic  = $self->adaptor->db->get_StateInfoContainer;
	my $db_version_saved = $sic->fetch_db_version($self->input_id, $self->analysis);
	my $db_version_current = $self->analysis->db_version;
	if($db_version_saved) {
		$update_value = 2;
		# split the embl blast db version "12-Mar-06 (85)" to
# patch version "12-Mar-06" and release version "85"
my ($patch_sv,$release_sv) = $db_version_saved =~ /^(\S+)\s+\((\d+)\)$/; my ($patch_cv,$release_cv) = $db_version_current =~ /^(\S+)\s+\((\d+)\)$/; if($release_sv && ($release_sv eq $release_cv)){ $update_value = 1; } } $self->{update} = $update_value;
}
updatedescriptionprevnextTop
sub update {
	my ($self,$update) = @_;
	if ($update) {
    	$self->{update} = $update;
  	}

	return $self->{update} || 0;
}
General documentation
FEEDBACKTop
AUTHOR - Mustapha LarbaouiTop
Mustapha Larbaoui <ml6@sanger.ac.uk>
CONTACTTop
Post general queries to anacode@sanger.ac.uk
APPENDIXTop
The rest of the documentation details each of the object methods.
Internal methods are usually preceded with a _