Raw content of Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor
# Perl module for Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor
#
# Date of creation: 22.03.2004
# Original Creator : Jessica Severin
#
# Copyright EMBL-EBI 2004
#
# You may distribute this module under the same terms as perl itself
# POD documentation - main docs before the code
=head1 NAME
Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor
=head1 SYNOPSIS
$analysisJobAdaptor = $db_adaptor->get_AnalysisJobAdaptor;
$analysisJobAdaptor = $analysisJob->adaptor;
=head1 DESCRIPTION
Module to encapsulate all db access for persistent class AnalysisJob.
There should be just one per application and database connection.
=head1 CONTACT
Contact Jessica Severin on implemetation/design detail: jessica@ebi.ac.uk
Contact Ewan Birney on EnsEMBL in general: birney@sanger.ac.uk
=head1 APPENDIX
The rest of the documentation details each of the object methods.
Internal methods are preceded with a _
=cut
# Let the code begin...
package Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
use strict;
use Bio::EnsEMBL::Hive::Worker;
use Bio::EnsEMBL::Hive::AnalysisJob;
use Bio::EnsEMBL::DBSQL::BaseAdaptor;
use Sys::Hostname;
use Data::UUID;
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
our @ISA = qw(Bio::EnsEMBL::DBSQL::BaseAdaptor);
# our $max_retry_count = 7;
###############################################################################
#
# CLASS methods
#
###############################################################################
=head2 CreateNewJob
Args : -input_id => string of input_id which will be passed to run the job
-analysis => Bio::EnsEMBL::Analysis object from a database
-block => int(0,1) set blocking state of job (default = 0)
-input_job_id => (optional) analysis_job_id of job that is creating this
job. Used purely for book keeping.
Example : $analysis_job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
-input_id => 'my input data',
-analysis => $myAnalysis);
Description: uses the analysis object to get the db connection from the adaptor to store a new
job in a hive. This is a class level method since it does not have any state.
Also updates corresponding analysis_stats by incrementing total_job_count,
unclaimed_job_count and flagging the incremental update by changing the status
to 'LOADING' (but only if the analysis is not blocked).
Returntype : int analysis_job_id on database analysis is from.
Exceptions : thrown if either -input_id or -analysis are not properly defined
Caller : general
=cut
sub CreateNewJob {
my ($class, @args) = @_;
return undef unless(scalar @args);
my ($input_id, $analysis, $prev_analysis_job_id, $blocked) =
rearrange([qw(INPUT_ID ANALYSIS input_job_id BLOCK )], @args);
$prev_analysis_job_id=0 unless($prev_analysis_job_id);
throw("must define input_id") unless($input_id);
throw("must define analysis") unless($analysis);
throw("analysis must be [Bio::EnsEMBL::Analysis] not a [$analysis]")
unless($analysis->isa('Bio::EnsEMBL::Analysis'));
throw("analysis must have adaptor connected to database")
unless($analysis->adaptor and $analysis->adaptor->db);
if(length($input_id) >= 255) {
my $input_data_id = $analysis->adaptor->db->get_AnalysisDataAdaptor->store_if_needed($input_id);
$input_id = "_ext_input_analysis_data_id $input_data_id";
}
my $sql = q{INSERT ignore into analysis_job
(input_id, prev_analysis_job_id,analysis_id,status)
VALUES (?,?,?,?)};
my $status ='READY';
$status = 'BLOCKED' if($blocked);
my $dbc = $analysis->adaptor->db->dbc;
my $sth = $dbc->prepare($sql);
$sth->execute($input_id, $prev_analysis_job_id, $analysis->dbID, $status);
my $dbID = $sth->{'mysql_insertid'};
$sth->finish;
$dbc->do("UPDATE analysis_stats SET ".
"total_job_count=total_job_count+1 ".
",unclaimed_job_count=unclaimed_job_count+1 ".
",status='LOADING' ".
"WHERE status!='BLOCKED' and analysis_id='".$analysis->dbID ."'");
return $dbID;
}
###############################################################################
#
# INSTANCE methods
#
###############################################################################
=head2 fetch_by_dbID
Arg [1] : int $id
the unique database identifier for the feature to be obtained
Example : $feat = $adaptor->fetch_by_dbID(1234);
Description: Returns the AnalysisJob defined by the analysis_job_id $id.
Returntype : Bio::EnsEMBL::Hive::AnalysisJob
Exceptions : thrown if $id is not defined
Caller : general
=cut
sub fetch_by_dbID {
my ($self,$id) = @_;
unless(defined $id) {
throw("fetch_by_dbID must have an id");
}
my @tabs = $self->_tables;
my ($name, $syn) = @{$tabs[0]};
#construct a constraint like 't1.table1_id = 1'
my $constraint = "${syn}.${name}_id = $id";
#return first element of _generic_fetch list
my ($obj) = @{$self->_generic_fetch($constraint)};
return $obj;
}
=head2 fetch_by_claim_analysis
Arg [1] : string job_claim (the UUID used to claim jobs)
Arg [2] : int analysis_id
Example : $jobs = $adaptor->fetch_by_claim_analysis('c6658fde-64ab-4088-8526-2e960bd5dd60',208);
Description: Returns a list of jobs for a claim id
Returntype : Bio::EnsEMBL::Hive::AnalysisJob
Exceptions : thrown if claim_id or analysis_id is not defined
Caller : general
=cut
sub fetch_by_claim_analysis {
my ($self,$claim,$analysis_id) = @_;
throw("fetch_by_claim_analysis must have claim ID") unless($claim);
throw("fetch_by_claim_analysis must have analysis_id") unless($analysis_id);
my $constraint = "a.status='CLAIMED' AND a.job_claim='$claim' AND a.analysis_id='$analysis_id'";
return $self->_generic_fetch($constraint);
}
sub fetch_by_run_analysis {
my ($self,$hive_id,$analysis_id) = @_;
throw("fetch_by_run_analysis must have hive_id") unless($hive_id);
throw("fetch_by_run_analysis must have analysis_id") unless($analysis_id);
my $constraint = "a.status='RUN' AND a.hive_id=$hive_id AND a.analysis_id='$analysis_id'";
return $self->_generic_fetch($constraint);
}
=head2 fetch_all
Arg : None
Example :
Description: fetches all jobs from database
Returntype :
Exceptions :
Caller :
=cut
sub fetch_all {
my $self = shift;
return $self->_generic_fetch();
}
=head2 fetch_all_failed_jobs
Arg [1] : (optional) int $analysis_id
Example : $failed_jobs = $adaptor->fetch_all_failed_jobs;
$failed_jobs = $adaptor->fetch_all_failed_jobs($analysis->dbID);
Description: Returns a list of all jobs with status 'FAILED'. If an $analysis_id
is specified it will limit the search accordingly.
Returntype : reference to list of Bio::EnsEMBL::Hive::AnalysisJob objects
Exceptions : none
Caller : user processes
=cut
sub fetch_all_failed_jobs {
my ($self,$analysis_id) = @_;
my $constraint = "a.status='FAILED'";
$constraint .= " AND a.analysis_id=$analysis_id" if($analysis_id);
return $self->_generic_fetch($constraint);
}
sub fetch_by_url_query
{
my $self = shift;
my $query = shift;
return undef unless($query);
#print("Bio::EnsEMBL::DBSQL::AnalysisAdaptor::fetch_by_url_query : $query\n");
if((my $p=index($query, "=")) != -1) {
my $type = substr($query,0, $p);
my $value = substr($query,$p+1,length($query));
if($type eq 'dbID') {
return $self->fetch_by_dbID($value);
}
}
return undef;
}
#
# INTERNAL METHODS
#
###################
sub _generic_fetch {
my ($self, $constraint, $join) = @_;
my @tables = $self->_tables;
my $columns = join(', ', $self->_columns());
if ($join) {
foreach my $single_join (@{$join}) {
my ($tablename, $condition, $extra_columns) = @{$single_join};
if ($tablename && $condition) {
push @tables, $tablename;
if($constraint) {
$constraint .= " AND $condition";
} else {
$constraint = " $condition";
}
}
if ($extra_columns) {
$columns .= ", " . join(', ', @{$extra_columns});
}
}
}
#construct a nice table string like 'table1 t1, table2 t2'
my $tablenames = join(', ', map({ join(' ', @$_) } @tables));
my $sql = "SELECT $columns FROM $tablenames";
my $default_where = $self->_default_where_clause;
my $final_clause = $self->_final_clause;
#append a where clause if it was defined
if($constraint) {
$sql .= " WHERE $constraint ";
if($default_where) {
$sql .= " AND $default_where ";
}
} elsif($default_where) {
$sql .= " WHERE $default_where ";
}
#append additional clauses which may have been defined
$sql .= " $final_clause";
my $sth = $self->prepare($sql);
$sth->execute;
#print STDOUT $sql,"\n";
return $self->_objs_from_sth($sth);
}
sub _tables {
my $self = shift;
return (['analysis_job', 'a']);
}
sub _columns {
my $self = shift;
return qw (a.analysis_job_id
a.prev_analysis_job_id
a.analysis_id
a.input_id
a.job_claim
a.hive_id
a.status
a.retry_count
a.completed
a.branch_code
a.runtime_msec
a.query_count
);
}
sub _default_where_clause {
my $self = shift;
return '';
}
sub _final_clause {
my $self = shift;
return 'ORDER BY retry_count';
}
sub _objs_from_sth {
my ($self, $sth) = @_;
my %column;
$sth->bind_columns( \( @column{ @{$sth->{NAME_lc} } } ));
my @jobs = ();
while ($sth->fetch()) {
my $job = new Bio::EnsEMBL::Hive::AnalysisJob;
$job->dbID($column{'analysis_job_id'});
$job->analysis_id($column{'analysis_id'});
$job->input_id($column{'input_id'});
$job->job_claim($column{'job_claim'});
$job->hive_id($column{'hive_id'});
$job->status($column{'status'});
$job->retry_count($column{'retry_count'});
$job->completed($column{'completed'});
$job->branch_code($column{'branch_code'});
$job->runtime_msec($column{'runtime_msec'});
$job->query_count($column{'query_count'});
$job->adaptor($self);
if($column{'input_id'} =~ /_ext_input_analysis_data_id (\d+)/) {
#print("input_id was too big so stored in analysis_data table as dbID $1 -- fetching now\n");
$job->input_id($self->db->get_AnalysisDataAdaptor->fetch_by_dbID($1));
}
push @jobs, $job;
}
$sth->finish;
return \@jobs
}
#
# STORE / UPDATE METHODS
#
################
=head2 update_status
Arg [1] : $analysis_id
Example :
Description: updates the analysis_job.status in the database
Returntype :
Exceptions :
Caller : general
=cut
sub update_status {
my ($self,$job) = @_;
my $sql = "UPDATE analysis_job SET status='".$job->status."' ";
if($job->status eq 'DONE') {
$sql .= ",completed=now(),branch_code=".$job->branch_code;
$sql .= ",runtime_msec=".$job->runtime_msec;
$sql .= ",query_count=".$job->query_count;
}
if($job->status eq 'READY') {
$sql .= ",job_claim=''";
}
$sql .= " WHERE analysis_job_id='".$job->dbID."' ";
my $sth = $self->prepare($sql);
$sth->execute();
$sth->finish;
}
sub reclaim_job {
my $self = shift;
my $job = shift;
my $ug = new Data::UUID;
my $uuid = $ug->create();
$job->job_claim($ug->to_string( $uuid ));
my $sql = "UPDATE analysis_job SET status='CLAIMED', job_claim=?, hive_id=? WHERE analysis_job_id=?";
#print("$sql\n");
my $sth = $self->prepare($sql);
$sth->execute($job->job_claim, $job->hive_id, $job->dbID);
$sth->finish;
}
=head2 store_out_files
Arg [1] : Bio::EnsEMBL::Hive::AnalysisJob $job
Example :
Description: if files are non-zero size, will update DB with location
Returntype :
Exceptions :
Caller : Bio::EnsEMBL::Hive::Worker
=cut
sub store_out_files {
my ($self,$job) = @_;
return unless($job);
my $sql = sprintf("DELETE from analysis_job_file WHERE hive_id=%d and analysis_job_id=%d",
$job->hive_id, $job->dbID);
$self->dbc->do($sql);
return unless($job->stdout_file or $job->stderr_file);
$sql = "INSERT ignore INTO analysis_job_file (analysis_job_id, hive_id, retry, type, path) VALUES ";
if($job->stdout_file) {
$sql .= sprintf("(%d,%d,%d,'STDOUT','%s')", $job->dbID, $job->hive_id,
$job->retry_count, $job->stdout_file);
}
$sql .= "," if($job->stdout_file and $job->stderr_file);
if($job->stderr_file) {
$sql .= sprintf("(%d,%d,%d,'STDERR','%s')", $job->dbID, $job->hive_id,
$job->retry_count, $job->stderr_file);
}
$self->dbc->do($sql);
}
sub claim_jobs_for_worker {
my $self = shift;
my $worker = shift;
throw("must define worker") unless($worker);
my $ug = new Data::UUID;
my $uuid = $ug->create();
my $claim = $ug->to_string( $uuid );
#print("claiming jobs for hive_id=", $worker->hive_id, " with uuid $claim\n");
my $status = 'READY';
$status = 'HIGHMEM' if (defined($worker->{HIGHMEM}));
my $sql_base = "UPDATE analysis_job SET job_claim='$claim'".
" , hive_id='". $worker->hive_id ."'".
" , status='CLAIMED'".
" WHERE job_claim='' and status='" . $status . "'".
" AND analysis_id='" .$worker->analysis->dbID. "'";
my $sql_virgin = $sql_base .
" AND retry_count=0".
" LIMIT " . $worker->batch_size;
my $sql_any = $sql_base .
" LIMIT " . $worker->batch_size;
my $claim_count = $self->dbc->do($sql_virgin);
if($claim_count == 0) {
$claim_count = $self->dbc->do($sql_any);
}
return $claim;
}
=head2 reset_dead_jobs_for_worker
Arg [1] : Bio::EnsEMBL::Hive::Worker object
Example :
Description: If a worker has died some of its jobs need to be reset back to 'READY'
so they can be rerun.
Jobs in state CLAIMED as simply reset back to READY.
If jobs was in a 'working' state (GET_INPUT, RUN, WRITE_OUTPUT))
the retry_count is increased and the status set back to READY.
If the retry_count >= $max_retry_count (3 by default) the job is set
to 'FAILED' and not rerun again.
Exceptions : $worker must be defined
Caller : Bio::EnsEMBL::Hive::Queen
=cut
sub reset_dead_jobs_for_worker {
my $self = shift;
my $worker = shift;
throw("must define worker") unless($worker);
#added hive_id index to analysis_job table which made this operation much faster
my ($sql, $sth);
my $max_retry_count = $worker->analysis->stats->max_retry_count();
#first just reset the claimed jobs, these don't need a retry_count index increment
$sql = "UPDATE analysis_job SET job_claim='', status='READY'".
" WHERE status='CLAIMED'".
" AND hive_id='" . $worker->hive_id ."'";
$sth = $self->prepare($sql);
$sth->execute();
$sth->finish;
#print(" done update CLAIMED\n");
# an update with select on status and hive_id took 4seconds per worker to complete,
# while doing a select followed by update on analysis_job_id returned almost instantly
$sql = "UPDATE analysis_job SET job_claim='', status='READY'".
" ,retry_count=retry_count+1".
" WHERE status in ('GET_INPUT','RUN','WRITE_OUTPUT')".
" AND retry_count<$max_retry_count".
" AND hive_id='" . $worker->hive_id ."'";
#print("$sql\n");
$sth = $self->prepare($sql);
$sth->execute();
$sth->finish;
$sql = "UPDATE analysis_job SET status='FAILED'".
" ,retry_count=retry_count+1".
" WHERE status in ('GET_INPUT','RUN','WRITE_OUTPUT')".
" AND retry_count>=$max_retry_count".
" AND hive_id='" . $worker->hive_id ."'";
#print("$sql\n");
$sth = $self->prepare($sql);
$sth->execute();
$sth->finish;
#print(" done update BROKEN jobs\n");
}
sub reset_dead_job_by_dbID {
my $self = shift;
my $job_id = shift;
#added hive_id index to analysis_job table which made this operation much faster
my $sql;
#first just reset the claimed jobs, these don't need a retry_count index increment
$sql = "UPDATE analysis_job SET job_claim='', status='READY'".
" WHERE status='CLAIMED'".
" AND analysis_job_id=$job_id";
$self->dbc->do($sql);
#print(" done update CLAIMED\n");
# an update with select on status and hive_id took 4seconds per worker to complete,
# while doing a select followed by update on analysis_job_id returned almost instantly
$sql = "
UPDATE analysis_job, analysis_stats
SET job_claim='', analysis_job.status='READY', retry_count=retry_count+1
WHERE
analysis_job.status in ('GET_INPUT','RUN','WRITE_OUTPUT')
AND analysis_job.analysis_id = analysis_stats.analysis_id
AND retry_count < max_retry_count
AND analysis_job_id=$job_id";
#print("$sql\n");
$self->dbc->do($sql);
$sql = "
UPDATE analysis_job, analysis_stats
SET job_claim='', analysis_job.status='FAILED', retry_count=retry_count+1
WHERE
analysis_job.status in ('GET_INPUT','RUN','WRITE_OUTPUT')
AND analysis_job.analysis_id = analysis_stats.analysis_id
AND retry_count >= max_retry_count
AND analysis_job_id=$job_id";
#print("$sql\n");
$self->dbc->do($sql);
#print(" done update BROKEN jobs\n");
}
sub reset_highmem_job_by_dbID {
my $self = shift;
my $job_id = shift;
#added hive_id index to analysis_job table which made this operation much faster
my $sql;
#first just reset the claimed jobs, these don't need a retry_count index increment
$sql = "UPDATE analysis_job SET job_claim='', status='HIGHMEM'".
" WHERE analysis_job_id=$job_id";
$self->dbc->do($sql);
#print(" done update CLAIMED\n");
}
=head2 reset_job_by_dbID
Arg [1] : int $analysis_job_id
Example :
Description: Forces a job to be reset to 'READY' so it can be run again.
Will also reset a previously 'BLOCKED' jobs to READY.
Exceptions : $job must be defined
Caller : user process
=cut
sub reset_job_by_dbID {
my $self = shift;
my $analysis_job_id = shift;
throw("must define job") unless($analysis_job_id);
my ($sql, $sth);
#first just reset the claimed jobs, these don't need a retry_count index increment
$sql = "UPDATE analysis_job SET hive_id=0, job_claim='', status='READY', retry_count=0 WHERE analysis_job_id=?";
$sth = $self->prepare($sql);
$sth->execute($analysis_job_id);
$sth->finish;
#print(" done update CLAIMED\n");
}
=head2 reset_all_jobs_for_analysis_id
Arg [1] : int $analysis_id
Example :
Description: Resets all not BLOCKED jobs back to READY so they can be rerun.
Needed if an analysis/process modifies the dataflow rules as the
system runs. The jobs that are flowed 'from' will need to be reset so
that the output data can be flowed through the new rule.
If one is designing a system based on a need to change rules mid-process
it is best to make sure such 'from' analyses that need to be reset are 'Dummy'
types so that they can 'hold' the output from the previous step and not require
the system to actually redo processing.
Exceptions : $analysis_id must be defined
Caller : user RunnableDB subclasses which build dataflow rules on the fly
=cut
sub reset_all_jobs_for_analysis_id {
my $self = shift;
my $analysis_id = shift;
throw("must define analysis_id") unless($analysis_id);
my ($sql, $sth);
$sql = "UPDATE analysis_job SET job_claim='', status='READY' WHERE status!='BLOCKED' and analysis_id=?";
$sth = $self->prepare($sql);
$sth->execute($analysis_id);
$sth->finish;
$self->db->get_AnalysisStatsAdaptor->update_status($analysis_id, 'LOADING');
}
=head2 remove_analysis_id
Arg [1] : int $analysis_id
Example :
Description: Remove the analysis from the database.
Jobs should have been killed before.
Exceptions : $analysis_id must be defined
Caller :
=cut
sub remove_analysis_id {
my $self = shift;
my $analysis_id = shift;
throw("must define analysis_id") unless($analysis_id);
my $sql;
#first just reset the claimed jobs, these don't need a retry_count index increment
$sql = "DELETE FROM analysis_stats WHERE analysis_id=$analysis_id";
$self->dbc->do($sql);
$sql = "ANALYZE TABLE analysis_stats";
$self->dbc->do($sql);
$sql = "DELETE FROM analysis_job WHERE analysis_id=$analysis_id";
$self->dbc->do($sql);
$sql = "ANALYZE TABLE analysis_job";
$self->dbc->do($sql);
$sql = "DELETE FROM hive WHERE analysis_id=$analysis_id";
$self->dbc->do($sql);
$sql = "ANALYZE TABLE hive";
$self->dbc->do($sql);
}
1;