Bio::EnsEMBL::Pipeline
Job
Toolbar
Summary
Bio::EnsEMBL::Pipeline::Job
Package variables
Privates (from "my" definitions)
%BATCH_QUEUES = &set_up_queues
$file = "$batch_q_module.pm"
$batch_q_module = "Bio::EnsEMBL::Pipeline::BatchSubmission::$QUEUE_MANAGER"
Included modules
Inherit
Synopsis
Description
Stores run and status details of an analysis job
Methods
Methods description
Title : adaptor Usage : $self->adaptor Function: get database adaptor, set only for constructor and adaptor usage. Returns : Args : |
Title : analysis Usage : $self->analysis($anal); Function: Get/set method for the analysis object of the job Returns : Bio::EnsEMBL::Analysis Args : Bio::EnsEMBL::Analysis |
Title : batch_runRemote Usage : $job->batch_runRemote Function: Issue more than one pipeline Job in one batch job because job submission is very slow Returns : Args : Is static, private function, dont call with arrow notation. |
Title : create_by_analysis_input_id Usage : $class->create_by..... Function: Creates a job given an analysis object and an input_id Recommended way of creating job objects! Returns : a job object, not connected to db Args : |
Title : current_status Usage : my $status = $job->current_status Function: Get/set method for the current status Returns : Bio::EnsEMBL::Pipeline::Status Args : Bio::EnsEMBL::Pipeline::Status |
Title : dbID Usage : $self->dbID($id) Function: get set the dbID for this object, only used by Adaptor Returns : int Args : int |
Title : flush_runs Usage : $job->flush_runs( jobadaptor, [queue] ); Function: Issue all jobs in the queue and empty the queue. Set LSF id in all jobs. Uses the given adaptor for connecting to db. Uses last job in queue for stdout/stderr. Returns : Args : |
Title : get_all_status Usage : my @status = $job->get_all_status Function: Get all status objects associated with this job Returns : @Bio::EnsEMBL::Pipeline::Status Args : @Bio::EnsEMBL::Pipeline::Status |
Title : get_last_status Usage : my @status = $job->get_all_status ($status) Function: Get latest status object associated with this job Returns : Bio::EnsEMBL::Pipeline::Status Args : status string |
Title : input_id Usage : $self->input_id($id) Function: Get/set method for the id of the input to the job Returns : string Args : string |
Arg [1] : STRING, analysis logic_name Function : remove job and delete output Returntype: none Exceptions: none Caller : $self Example : $self->remove($self->analysis->logic_name); |
Title : retry_count Usage : Function: Get/set method for the retry_count Returns : Args : |
Title : running Usage : $self->run...; Function: runLocally doesnt submit to LSF run_module is like runLocally, but doesnt redirect STDOUT and STDERR. runRemote submits to LSF via the runner.pl script. Returns : Args : |
Title : set_status Usage : my $status = $job->set_status Function: Sets the job status Returns : nothing Args : Bio::EnsEMBL::Pipeline::Status |
Title : stderr_file Usage : my $file = $self->stderr_file Function: Get/set method for stderr. Returns : string Args : string |
Title : stdout_file Usage : my $file = $self->stdout_file Function: Get/set method for stdout. Returns : string Args : string |
Title : submission_id Usage : Function: Get/set method for the submission ID Returns : Args : |
Methods code
sub adaptor
{ my ($self, $arg) = @_;
if ($arg) {
$self->{'_adaptor'} = $arg;
}
return $self->{'_adaptor'}; } |
sub analysis
{ my ($self, $arg) = @_;
if (defined($arg)) {
throw("[$arg] is not a Bio::EnsEMBL::Analysis object" )
unless $arg->isa("Bio::EnsEMBL::Analysis");
$self->{'_analysis'} = $arg;
}
return $self->{'_analysis'}; } |
sub batch_q_object
{ my ($self, $arg) = @_;
if ($arg) {
$self->{'_batch_q_object'} = $arg;
}
if (!$self->{'_batch_q_object'}) {
my $object = $batch_q_module->new();
$self->{'_batch_q_object'} = $object;
}
return $self->{'_batch_q_object'}; } |
sub batch_runRemote
{ my ($self) = @_;
my $queue;
if (!exists($BATCH_QUEUES{$self->analysis->logic_name})) {
$queue = 'default';
} else {
$queue = $self->analysis->logic_name;
}
push @{$BATCH_QUEUES{$queue}{'jobs'}}, $self->dbID;
if (scalar(@{$BATCH_QUEUES{$queue}{'jobs'}}) >=
$BATCH_QUEUES{$queue}{'batch_size'}) {
$self->flush_runs($self->adaptor, $queue);
} else {
} } |
sub can_retry
{ my ($self, $logic_name) = @_;
$logic_name = $self->analysis->logic_name if(!$logic_name);
if (!exists($BATCH_QUEUES{$logic_name})) {
$logic_name = 'default';
}
my $max_retry = $BATCH_QUEUES{$logic_name}{'retries'};
if (!$self->retry_count) {
return 1;
}
if ($self->retry_count && $self->retry_count <= $max_retry) {
return 1;
} else {
return 0;
} } |
sub cleanup
{ my ($self, $logic_name) = @_;
$logic_name = $self->analysis->logic_name if(!$logic_name);
if (!exists($BATCH_QUEUES{$logic_name})) {
$logic_name = 'default';
}
if ($BATCH_QUEUES{$logic_name}{'cleanup'} eq 'yes') {
return 1;
} else {
return 0;
} } |
sub create_by_analysis_input_id
{ my ($dummy, $analysis, $inputId, $output_dir, $auto_update) = @_;
warning("Bio::EnsEMBL::Pipeline::Job->create_by_analysis_input_id is deprecated ". (caller) .
" should now call the constructor directly");
my $job = Bio::EnsEMBL::Pipeline::Job->new(
-input_id => $inputId,
-analysis => $analysis,
-output_dir => $output_dir,
-auto_update => $auto_update,
-retry_count => 0
);
return $job; } |
sub current_status
{ my ($self, $arg) = @_;
if ( ! defined( $self->adaptor)) {
return undef;
}
my $status;
eval{
$status = $self->adaptor->current_status( $self, $arg );
};
if ($@) {
throw("Failed to get status for ".$self->dbID." ".$self->input_id.
" ".$self->analysis->logic_name." error $@");
}
return $status; } |
sub dbID
{ my ($self, $arg) = @_;
if ($arg) {
$self->{'_dbID'} = $arg;
}
return $self->{'_dbID'}; } |
sub execution_host
{ my ($self, $arg) = @_;
if ($arg) {
$self->{'_execution_host'} = $arg;
}
return $self->{'_execution_host'} || ''; } |
sub flush_runs
{ my ($self, $adaptor, $queue) = @_;
my @analyses = ($queue) || (keys %BATCH_QUEUES);
if ( !defined $adaptor) {
throw( "Cannot run remote without db connection" );
}
local *FILE;
my $dbc = $adaptor->db->dbc;
my $host = $dbc->host;
my $username = $dbc->username;
my $dbname = $dbc->dbname;
my $pass = $dbc->password;
my $port = $dbc->port;
my $runner = $self->runner;
if (!$runner || ! -x $runner) {
$runner = __FILE__;
$runner =~ s:/[^/]*$:/runner.pl:;
my $caller = caller(0);
throw("runner ".$runner." not found - needs to be set in ".
"$caller\n") unless -x $runner;
}
ANAL:
for my $anal (@analyses) {
my $queue = $BATCH_QUEUES{$anal};
my @job_ids = @{$queue->{'jobs'}};
if (!@job_ids) {
next ANAL;
}
my $this_runner = $queue->{'runner'};
$this_runner = (-x $this_runner) ? $this_runner : $runner;
my $lastjob = $adaptor->fetch_by_dbID($job_ids[-1]);
if ( ! $lastjob) {
throw( "Last batch job not in db" );
}
my $pre_exec = $this_runner." -check -output_dir ".$self->output_dir;
my ($system_queue, $parameters, $resources) =
($queue->{'queue'}, $queue->{'sub_args'},
$queue->{'resource'});
if($self->retry_count >= 1){
$system_queue = $queue->{retry_queue} if($queue->{retry_queue});
$parameters = $queue->{retry_sub_args} if($queue->{retry_sub_args});
$resources = $queue->{retry_resource} if($queue->{retry_resource});
}
my $batch_job = $batch_q_module->new
(
-STDOUT => $lastjob->stdout_file,
-PARAMETERS => $parameters,
-PRE_EXEC => $pre_exec,
-QUEUE => $system_queue,
-JOBNAME => $dbname . ':' . $anal,
-NODES => $queue->{'nodes'},
-RESOURCE => $resources,
);
my $cmd;
if (!$self->cleanup) {
$batch_job->stderr_file($lastjob->stderr_file);
}
if ($pass) {
$cmd = $runner." -dbhost $host -dbuser $username -dbname $dbname -dbpass $pass -dbport $port";
} else {
$cmd = $runner." -dbhost $host -dbuser $username -dbname $dbname -dbport $port";
}
$cmd .= " -output_dir ".$self->output_dir;
$cmd .= " -queue_manager $QUEUE_MANAGER " ;
if ($self->cleanup) {
$cmd .= " -cleanup "
}
$cmd .= " @job_ids";
$batch_job->construct_command_line($cmd);
eval {
$batch_job->open_command_line();
};
if ($@) {
print STDERR "Couldnt batch submit @job_ids\n [$@]\n";
print STDERR "Using ".$batch_job->bsub."\n";
foreach my $job_id (@job_ids) {
my $job = $adaptor->fetch_by_dbID($job_id);
$job->set_status( "FAILED" );
}
} else {
my @jobs = $adaptor->fetch_by_dbID_list(@job_ids);
foreach my $job (@jobs) {
if( $job->retry_count > 0 ) {
for ( $job->stdout_file, $job->stderr_file ) {
open( FILE, ">".$_ ); close( FILE );
}
}
if ($batch_job->id) {
$job->submission_id( $batch_job->id );
} else {
print STDERR "Job: Null submission ID for the following, but continuing: @job_ids\n";
$job->submission_id( 0 );
}
$job->retry_count( $job->retry_count + 1 );
$job->set_status( "SUBMITTED" );
$job->stdout_file($lastjob->stdout_file);
$job->stderr_file($lastjob->stderr_file);
}
$adaptor->update(@jobs);
}
$queue->{'jobs'} = [];
$queue->{'last_flushed'} = time;
} } |
sub get_all_status
{ my ($self) = @_;
if ($self->adaptor) {
return $self->adaptor->get_all_status( $self );
} else {
return undef;
} } |
sub get_last_status
{ my ($self) = @_;
if ($self->adaptor) {
return $self->adaptor->get_last_status( $self );
} else {
return undef;
} } |
sub input_id
{ my ($self, $arg) = @_;
if (defined($arg)) {
$self->{'_input_id'} = $arg;
}
return $self->{'_input_id'}; } |
sub make_filenames
{ my ($self) = @_;
my $output_dir_number ;
if ( $self->number_output_dirs ) {
$output_dir_number =$self->number_output_dirs ; ;
} else {
$output_dir_number =10 ;
}
my $num = int(rand($output_dir_number));
my $dir = $self->output_dir . "/$num/";
if ( ! -e $self->output_dir) {
warning("Your output-directory " . $self->output_dir . " does not exist - i'll create it now.\n") ;
eval{
system("mkdir -p " . $self->output_dir ) ;
};
if($@){
throw("Failed to make dir " . $self->output_dir . "$@" );
}
}
if ( ! -e $dir) {
eval{
my $command = "mkdir $dir";
system( $command );
};
if($@){
throw("Failed to make dir ".$dir." $@");
}
}
my $stub = $self->input_id.".";
$stub .= $self->analysis->logic_name.".";
$stub .= int(rand(1000));
$self->stdout_file($dir.$stub.".out") unless($self->stdout_file);
$self->stderr_file($dir.$stub.".err") unless($self->stderr_file); } |
sub new
{ my ($class, @args) = @_;
my $self = bless {},$class;
my ($p, $f, $l) = caller;
my ($adaptor,$dbID,$submission_id,$input_id,$analysis,$stdout,$stderr,$retry_count, $output_dir, $runner,$number_output_dirs)
= rearrange([qw(ADAPTOR
ID
SUBMISSION_ID
INPUT_ID
ANALYSIS
STDOUT
STDERR
RETRY_COUNT
OUTPUT_DIR
RUNNER
NUMBER_OUTPUT_DIRS
)],@args);
$dbID = -1 unless defined($dbID);
$submission_id = -1 unless defined($submission_id);
$input_id || throw("Can't create a job object without an input_id");
$analysis || throw("Can't create a job object without an analysis object");
$analysis->isa("Bio::EnsEMBL::Analysis") ||
throw("Analysis object [$analysis] is not a Bio::EnsEMBL::Analysis");
$self->dbID($dbID);
$self->adaptor($adaptor);
$self->input_id($input_id);
$self->analysis($analysis);
$self->stdout_file($stdout);
$self->stderr_file($stderr);
$self->retry_count($retry_count);
$self->submission_id($submission_id);
$self->output_dir($output_dir);
$self->number_output_dirs($number_output_dirs);
if ($self->output_dir) {
$self->make_filenames;
} else {
my $dir;
if (!exists($BATCH_QUEUES{$analysis->logic_name})) {
$dir = $BATCH_QUEUES{default}{output_dir};
} else {
$dir = $BATCH_QUEUES{$analysis->logic_name}{output_dir};
}
throw("need an output directory passed in from RuleManager or from Config/BatchQueue $!") unless($dir);
$self->output_dir($dir);
$self->make_filenames;
}
$self->runner($runner);
if(!$self->runner){
my $queue = $BATCH_QUEUES{$self->analysis->logic_name};
my $runner = $queue->{'runner'};
$self->runner($runner);
}
return $self; } |
sub number_output_dirs
{ my ($self, $arg ) = @_ ;
$self->{number_output_dirs} = $arg if $arg ;
return $self->{number_output_dirs};
}
1; } |
sub output_dir
{ my ($self, $arg) = @_;
if ($arg) {
$self->{'_output_dir'} = $arg;
}
return $self->{'_output_dir'}; } |
sub remove
{ my $self = shift;
my $logic_name = shift;
if (!exists($BATCH_QUEUES{$logic_name})) {
$logic_name = 'default';
}
if ($BATCH_QUEUES{$logic_name}{'cleanup'} eq 'yes') {
if ( -e $self->stdout_file) { unlink( $self->stdout_file ) };
if ( -e $self->stderr_file) { unlink( $self->stderr_file ) };
}
if (defined $self->adaptor) {
$self->adaptor->remove( $self );
} } |
sub retry_count
{ my ($self, $arg) = @_;
if ($arg) {
$self->{'_retry_count'} = $arg;
}
$self->{'_retry_count'} || 0; } |
sub runLocally
{ my $self = shift;
local *STDOUT;
local *STDERR;
if ( ! open(STDOUT, ">".$self->stdout_file)) {
$self->set_status( "FAILED" );
return;
}
if( ! open(STDERR, ">".$self->stderr_file)) {
$self->set_status( "FAILED" );
return;
}
$self->run_module();
if ($self->current_status->status eq "SUCCESSFUL"){
$self->adaptor->remove( $self );
if($self->cleanup){
unlink $self->stderr_file if(-e $self->stderr_file);
unlink $self->stdout_file if(-e $self->stdout_file);
}
}
}
} |
sub run_module
{ my $self = shift;
my $module = $self->analysis->module;
my $hash_key = $self->analysis->logic_name;
my $rdb;
my ($err, $res);
if (!exists($BATCH_QUEUES{$hash_key})) {
$hash_key = 'default';
}
my $runnable_db_path = $BATCH_QUEUES{$hash_key}{runnabledb_path};
my $verbosity = $BATCH_QUEUES{$hash_key}{verbosity};
my $perl_path;
if ($module =~ /::/) {
$module =~ s/::/\//g;
$perl_path = $module;
} elsif($runnable_db_path) {
$perl_path = $runnable_db_path."/".$module;
}else{
$perl_path = $module;
}
my $current_verbosity = logger_verbosity;
logger_verbosity($verbosity);
STATUS:
{
eval {
require $perl_path.".pm";
$perl_path =~ s/\//::/g;
$rdb = $perl_path->new( -analysis => $self->analysis,
-input_id => $self->input_id,
-db => $self->adaptor->db,
-verbosity => $verbosity
);
};
if ($err = $@) {
print (STDERR "CREATE: Lost the will to live Error\n");
$self->set_status( "FAILED" );
throw( "Problems creating runnable $module for " .
$self->input_id . " [$err]\n");
}
eval {
$self->set_status( "READING" );
$res = $rdb->fetch_input;
};
if ($err = $@) {
$self->set_status( "FAILED" );
print (STDERR "READING: Lost the will to live Error\n");
throw( "Problems with $module fetching input for " .
$self->input_id . " [$err]\n");
}
if ($rdb->input_is_void) {
$self->set_status( "VOID" );
} else {
eval {
$self->set_status( "RUNNING" );
$rdb->db->dbc->disconnect_when_inactive(1);
$rdb->run;
$rdb->db->dbc->disconnect_when_inactive(0);
};
if ($err = $@) {
print STDERR $@ . "\n";
if (my $err_state = $rdb->failing_job_status) {
$self->set_status( $err_state );
} else {
$self->set_status( "FAILED" ); }
print (STDERR "RUNNING: Lost the will to live Error\n");
throw("Problems running $module for " .
$self->input_id . " [$err]\n");
}
eval {
$self->set_status( "WRITING" );
$rdb->write_output;
if ($rdb->can('db_version_searched')) {
my $new_db_version = $rdb->db_version_searched();
my $analysis = $self->analysis();
my $old_db_version = $analysis->db_version();
$analysis->db_version($new_db_version);
} else {
$SAVE_RUNTIME_INFO = 0;
}
$self->set_status("SUCCESSFUL");
};
if ($err = $@) {
$self->set_status( "FAILED" );
print (STDERR "WRITING: Lost the will to live Error\n");
throw("Problems for $module writing output for " .
$self->input_id . " [$err]" );
}
}
}
logger_verbosity($current_verbosity);
eval {
my $sic = $self->adaptor->db->get_StateInfoContainer;
$sic->store_input_id_analysis(
$self->input_id,
$self->analysis,
$self->execution_host,
$SAVE_RUNTIME_INFO
);
};
if ($err = $@) {
my $error_msg = "Job finished successfully, but could not be ".
"recorded as finished. Job : [" . $self->input_id . "]\n[$err]";
eval {
$self->set_status("FAIL_NO_RETRY");
};
$error_msg .= ("(And furthermore) Encountered an error in updating the job to status failed_no_retry.\n[$@]") if $@;
throw($error_msg);
} else {
print STDERR "Updated successful job ".$self->dbID."\n";
} } |
sub runner
{ my ($self, $arg) = @_;
if ($arg) {
$self->{'_runner'} = $arg;
}
return $self->{'_runner'}; } |
sub set_status
{ my ($self, $arg) = @_;
throw("No status input" ) unless defined($arg);
if (!$self->adaptor) {
warning("No database connection. Can't set status to $arg");
return;
}
return $self->adaptor->set_status( $self, $arg ); } |
sub set_up_queues
{ my %q;
foreach my $queue (@$QUEUE_CONFIG) {
my $ln = $queue->{logic_name};
next unless $ln;
while (my($k, $v) = each %$queue) {
next if $k eq 'logic_name';
$q{$ln}{$k} = $v;
}
$q{$ln}{jobs} = [];
$q{$ln}{last_flushed} = undef;
$q{$ln}{batch_size} ||= $DEFAULT_BATCH_SIZE;
$q{$ln}{queue} ||= $DEFAULT_BATCH_QUEUE;
$q{$ln}{resource} ||= $DEFAULT_RESOURCE;
$q{$ln}{retries} ||= $DEFAULT_RETRIES;
$q{$ln}{cleanup} ||= $DEFAULT_CLEANUP;
$q{$ln}{runnabledb_path} ||= $DEFAULT_RUNNABLEDB_PATH;
$q{$ln}{output_dir} ||= $DEFAULT_OUTPUT_DIR;
$q{$ln}{runner} ||= $DEFAULT_RUNNER;
$q{$ln}{verbosity} ||= $DEFAULT_VERBOSITY;
$q{$ln}{sub_args} ||= $DEFAULT_SUB_ARGS;
$q{$ln}{retry_queue} ||= $DEFAULT_RETRY_QUEUE;
$q{$ln}{retry_resource} ||= $DEFAULT_RETRY_RESOURCE;
$q{$ln}{retry_sub_args} ||= $DEFAULT_RETRY_SUB_ARGS;
}
if ( ! exists($q{default})) {
$q{default}{jobs} = [];
$q{default}{last_flushed} = undef;
}
$q{default}{batch_size} ||= $DEFAULT_BATCH_SIZE;
$q{default}{queue} ||= $DEFAULT_BATCH_QUEUE;
$q{default}{resource} ||= $DEFAULT_RESOURCE;
$q{default}{retries} ||= $DEFAULT_RETRIES;
$q{default}{cleanup} ||= $DEFAULT_CLEANUP;
$q{default}{runnabledb_path} ||= $DEFAULT_RUNNABLEDB_PATH;
$q{default}{output_dir} ||= $DEFAULT_OUTPUT_DIR;
$q{default}{runner} ||= $DEFAULT_RUNNER;
$q{default}{verbosity} ||= $DEFAULT_VERBOSITY;
$q{default}{sub_args} ||= $DEFAULT_SUB_ARGS;
$q{default}{retry_queue} ||= $DEFAULT_RETRY_QUEUE;
$q{default}{retry_resource} ||= $DEFAULT_RETRY_RESOURCE;
$q{default}{retry_sub_args} ||= $DEFAULT_RETRY_SUB_ARGS;
return %q; } |
sub stderr_file
{ my ($self, $arg) = @_;
if ($arg) {
$self->{'_stderr_file'} = $arg;
if ($arg !~ /err/){
my ($p, $f, $l) = caller;
throw("You can't set stderr file to ".$arg." $f:$l\n");
}
}
return $self->{'_stderr_file'}; } |
sub stdout_file
{ my ($self, $arg) = @_;
if (defined($arg)) {
$self->{'_stdout_file'} = $arg;
}
return $self->{'_stdout_file'}; } |
sub submission_id
{ my ($self, $arg) = @_;
if (defined $arg) {
$self->{'_submission_id'} = $arg ;
}
return $self->{'_submission_id'}; } |
sub temp_dir
{ my ($self, $arg) = @_;
if ($arg) {
$self->{'_temp_dir'} = $arg;
}
if (!$self->{'_temp_dir'}) {
$self->{'_temp_dir'} = $self->batch_q_object->temp_filename;
}
return $self->{'_temp_dir'} || ''; } |
General documentation
Post general queries to ensembl-dev@ebi.ac.uk
The rest of the documentation details each of the object methods. Internal methods are usually preceded with a _