Bio::EnsEMBL::Pipeline::Finished RuleManager
SummaryIncluded librariesPackage variablesSynopsisDescriptionGeneral documentationMethods
Toolbar
WebCvsRaw content
Summary
Bio::EnsEMBL::Pipeline::Finished::RuleManager.pm
Package variables
Privates (from "my" definitions)
$s_last_modified = ''
$input_list = {}
Included modules
Bio::EnsEMBL::Pipeline::Config::BatchQueue
Bio::EnsEMBL::Pipeline::Config::General
Bio::EnsEMBL::Pipeline::Finished::Job
Bio::EnsEMBL::Pipeline::RuleManager
Bio::EnsEMBL::Utils::Argument qw ( rearrange )
Bio::EnsEMBL::Utils::Exception qw ( verbose throw warning info )
DBI
File::stat
Net::Netrc
Inherit
Bio::EnsEMBL::Pipeline::RuleManager
Synopsis
Description
Finished group specific RuleManager module.
Allow a better job submission handling through a priority queue
and limit the number of submitted jobs. Note that a ~/.netrc
file is necessary to fetch the priority queue connexion parameters.
See Net::Netrc module for more details.
Methods
_job_db_queueDescriptionCode
can_job_run
No description
Code
check_if_done
No description
Code
create_and_store_job
No description
Code
get_db_param
No description
Code
get_dbi
No description
Code
job_statsDescriptionCode
push_jobDescriptionCode
read_input_file
No description
Code
status_from_outputDescriptionCode
urgent_input_idDescriptionCode
valid_statuses_for_awol
No description
Code
Methods description
_job_db_queuecode    nextTop
  Function  : Gets the database connection to the RuleManager queue.
Exceptions: none
job_statscodeprevnextTop
  Arg [1]   : int, max number of jobs (optional)
Arg [2] : array_ref to array of Bio::EnsEMBL::Pipeline::Job objects
(optional)
Function : gets statistics from BatchSubmission module about what
jobs are running and what their status is then take action on this
information. job_stats will mark awol and out_of_memory jobs as well
Return : number of free job slots in the farm. It depends on the JOB_LIMIT set in BatchQeue.pm
Returntype: int
Exceptions: throws if batch_submission module can't do the method
job stats'
Example : $rulemanager->job_stats;('1000', \@jobs);
push_jobcodeprevnextTop
  Function  : add a job in the queue
Exceptions: none
status_from_outputcodeprevnextTop
  Arg : Job object
Function : Read the job's output file and return the exception status if system exception.
OUT_OF_MEMORY for MEMORY LIMIT and RUNTIME_LIMIT for RUNTIME LIMIT.
Returntype: string
urgent_input_id/read_input_filecodeprevnextTop
  Arg : None
Function : gets a hash reference of the input_id that need to be completed at short notice.
The path to the file that contains the list of urgent input_ids
is set in the BatchQueue configuration file.
(see variable URGENT_INPUTID_FILE)
Returntype: Hash reference
Methods code
_job_db_queuedescriptionprevnextTop
sub _job_db_queue {
	my ($self) = @_;
	if ( !$self->{'db_queue'} ) {
		$self->{'db_queue'} = &get_dbi( $QUEUE_NAME, $QUEUE_HOST );
	}

	return $self->{'db_queue'};
}
can_job_rundescriptionprevnextTop
sub can_job_run {
	my ( $self, $input_id, $analysis, $current_jobs ) = @_;

	if ( !$input_id || !$analysis ) {
		throw(  "Can't create job without an input_id $input_id or analysis "
			  . "$analysis" );
	}

	my $job;
	my $status;

	if ( $current_jobs->{ $analysis->dbID } ) {
		my $cj = $current_jobs->{ $analysis->dbID };
		$status = $cj->{_status}->status;

		if (
			(
				   $status eq 'FAILED'
				|| $status eq 'AWOL'
				|| $status eq 'BUS_ERROR'
				|| $status eq 'OUT_OF_MEMORY'
				|| $status eq 'RUNTIME_LIMIT'
			)
			&& $cj->can_retry
		  )
		{
			print "Retrying job " . $cj->dbID . " with status $status\n"
			  if $self->be_verbose;

			if ( $self->rename_on_retry ) {
				$self->rename_files($cj);
			}
			$cj->set_status('CREATED');
			$job = $cj;
			$job->output_dir($self->output_dir) if $self->output_dir;
		}
	}
	else {
		$job = $self->create_and_store_job( $input_id, $analysis );
		print "Creating job " . $job->dbID . "\n" if $self->be_verbose;
	}

	if ($job) {
		my $priority = 0;
		if($status) {
			$priority = $BIG_MEM_PRIORITY if $status eq 'OUT_OF_MEMORY';
			$priority = $LONG_JOB_PRIORITY if $status eq 'RUNTIME_LIMIT';
		}

		$self->push_job( $job, $priority );

		return 1;
	}

	return 0;
}
check_if_donedescriptionprevnextTop
sub check_if_done {
  my ($self) = @_;
  my @jobs = $self->job_adaptor->fetch_all;
  my $continue;

 JOB:
  foreach my $job (@jobs) {
    my $status = $job->current_status->status;

    if ($status eq 'KILLED' || $status eq 'SUCCESSFUL') {
      next JOB;
    } elsif ($status eq 'FAILED' || $status eq 'AWOL' || $status eq 'OUT_OF_MEMORY' || $status eq 'RUNTIME_LIMIT') {
      if (!$job->can_retry) {
        next JOB;
      } else {
        return 1;
      }
    } else {
      return 1;
    }
  }

  return 0;
}

1;
}
create_and_store_jobdescriptionprevnextTop
sub create_and_store_job {
	my ( $self, $input_id, $analysis ) = @_;

	if ( !$input_id || !$analysis ) {
		throw(  "Can't create job without an input_id $input_id or analysis "
			  . "$analysis" );
	}
	my $job = Bio::EnsEMBL::Pipeline::Finished::Job->new(
		-input_id   => $input_id,
		-analysis   => $analysis,
		-output_dir => $self->output_dir,
		-runner     => $self->runner,
	);

	eval { $self->job_adaptor->store($job); };

	if ($@) {
		throw(  "Failed to store job "
			  . $job->input_id . " "
			  . $job->analysis->logic_name . " "
			  . $@ );
	}

	return $job;
}
get_db_paramdescriptionprevnextTop
sub get_db_param {
	my ( $dbname, $dbhost ) = @_;
	my ( $dbuser, $dbpass, $dbport );

	my $ref = Net::Netrc->lookup($dbhost);
	throw("$dbhost entry is missing from ~/.netrc") unless ($ref);
	$dbuser = $ref->login;
	$dbpass = $ref->password;
	$dbport = $ref->account;
	throw(
		"Missing parameter in the ~/.netrc file:\n
			machine " .  ( $dbhost || 'missing' ) . "\n
			login " .    ( $dbuser || 'missing' ) . "\n
			password " . ( $dbpass || 'missing' ) . "\n
			account "
		  . ( $dbport || 'missing' )
		  . " (should be used to set the port number)"
	  )
	  unless ( $dbuser && $dbpass && $dbport );

	  return ($dbuser, $dbpass, $dbport);
}
get_dbidescriptionprevnextTop
sub get_dbi {
	my ( $dbname, $dbhost ) = @_;
	my ($dbuser, $dbpass, $dbport) = &get_db_param( $dbname, $dbhost );
	my $dsn = "DBI:mysql:host=$dbhost;dbname=$dbname;port=$dbport";

	return DBI->connect( $dsn, $dbuser, $dbpass );
}
job_statsdescriptionprevnextTop
sub job_stats {
	my ( $self, $job_limit, $jobs ) = @_;

	if ( !$job_limit ) {
		$job_limit = $self->job_limit;
	}

	# Do job_stats call before getting jobs
if ( !$self->batch_q_module->can('job_stats') ) { throw( $self->batch_q_module . " doesn't have the job_stats method" ); } my %statuses_to_count = map { $_, 1 } @{$JOB_STATUSES_TO_COUNT}; #found in
#BatchQueue.pm
my %job_stats = %{ $self->batch_q_module->job_stats }; my @jobs; if ( !$jobs ) { @jobs = $self->job_adaptor->fetch_by_Status_not_like('CREATED'); } else { @jobs = @$jobs; } my @awol_jobs; my $global_job_count = 0; # job count for all pipelines
my $local_job_count = 0; # job count for this pipeline
GLOBAL: foreach my $sub_id ( keys %job_stats ) { if ( $statuses_to_count{ $job_stats{$sub_id} } ) { $global_job_count++; } } LOCAL: foreach my $job (@jobs) { if ( !$job_stats{ $job->submission_id } ) { push( @awol_jobs, $job ); next LOCAL; } if ( $statuses_to_count{ $job_stats{ $job->submission_id } } ) { $local_job_count++; } } print $self->db->dbc->dbname . ": $local_job_count / $global_job_count (limit: $job_limit) Pending jobs in the farm\n" if ( $self->be_verbose ); if ( $self->mark_awol_jobs ) { foreach my $awol (@awol_jobs) { my $status = $awol->current_status->status; if ( $self->valid_statuses_for_awol->{$status} ) { # parse output file and get the right status
my $s = $self->status_from_output($awol); $s ||= 'AWOL'; $awol->set_status($s); print "Job " . $awol->dbID . " status $status changed to " . $awol->current_status->status . "\n" if ( $self->be_verbose ); } } } my $free_slots = $job_limit - $local_job_count; # number of free farm slots
$free_slots = $free_slots > 0 ? $free_slots : 0; # total nb. of jobs must not exceeds job limit
return $free_slots;
}
push_jobdescriptionprevnextTop
sub push_job {
	my ( $self, $job, $priority ) = @_;

	my $dbq    = $self->_job_db_queue;
	my $insert = $dbq->prepare(
		qq  {
			INSERT INTO queue (created, priority, job_id, host, pipeline, analysis, is_update)
			VALUES ( NOW() , ? , ? , ? , ? , ? , ?)
		}
	);
	my $job_id = $job->dbID;
	my $dbc    = $job->adaptor->db->dbc();
	my $dbname = $dbc->dbname;
	my $host   = $dbc->host;
	my $analysis = $job->analysis->logic_name;
	my $job_priority = $job->priority;
	$job_priority = $priority if $priority;
	$job_priority = $URGENT_JOB_PRIORITY
	  if ( $self->urgent_input_id->{ $job->input_id } );
	my $update = $job->update;

	return $insert->execute( $job_priority, $job_id, $host, $dbname, $analysis, $update );
}
read_input_filedescriptionprevnextTop
sub read_input_file {
	my ( $self, $file ) = @_;
	my $list = {};
	open( my $IN, "<$file" ) || throw("Unable to open input id file $file");
	while (<$IN>) {
		chomp;
		s/\s//g;
		$list->{$_} = 1;
	}
	close($IN);

	return $list;
}
status_from_outputdescriptionprevnextTop
sub status_from_output {
	my ( $self, $job ) = @_;
	my $out_file = $job->stdout_file;
	my $status;
	eval {
		print "READING: $out_file\n" if ( $self->be_verbose );
		if ( -e $out_file ) {
			open( my $F, "<$out_file" );
			while (<$F>) {
				if (/TERM_MEMLIMIT/) { $status = 'OUT_OF_MEMORY'; last; }
				if (/TERM_RUNLIMIT/) { $status = 'RUNTIME_LIMIT'; last; }
			}
			close($F);
		}
	};
	print STDERR "ERROR [$@]\n" if ( $@ && $self->be_verbose );

	return $status;
}
urgent_input_iddescriptionprevnextTop
sub urgent_input_id {
	my ($self) = @_;
	my $file = $URGENT_INPUTID_FILE;
	if ( -e $file ) {
		my $c_last_modified = stat($file)->[9];
		if ( $s_last_modified ne $c_last_modified ) {
			$s_last_modified = $c_last_modified;
			$input_list      = $self->read_input_file($file);
		}
	}
	else {
		$input_list = {};
	}

	return $input_list;
}
valid_statuses_for_awoldescriptionprevnextTop
sub valid_statuses_for_awol {
  my ($self)  = @_;
  if(!$self->{'status_for_awol'}) {
    my %statuses = map{$_, 1} ('SUBMITTED', 'RUNNING', 'READING',
                               'WRITING', 'WAITING','AWOL');
      $self->{'status_for_awol'} =\% statuses;
  }
  return $self->{'status_for_awol'};
}
General documentation
FEEDBACKTop
AUTHOR - Mustapha LarbaouiTop
Mustapha Larbaoui <ml6@sanger.ac.uk>
CONTACTTop
Post general queries to anacode@sanger.ac.uk
APPENDIXTop
The rest of the documentation details each of the object methods.
Internal methods are usually preceded with a _
can_run_jobTop
  Arg [1]   : string, input_id
Arg [2] : Bio::EnsEMBL::Pipeline::Analysis
Arg [3] : string, directory path (optional)
Arg [4] : string, runner script path (optional)
Arg [5] : int, for a boolean flag to mark verbosity (optional)
Function : Check if a job can be created for an input_id and analysis
If a job already exists check if it needs to be retried,
then push the job in a priority queue.
Returntype: int
Exceptions: throws if not passed an input_id or an analysis object and
if fails to submit the job
Example : $rulemanager->can_run_job('filename', $analysis
'path/to/dir',
'path/to/runner', 1);