Raw content of Bio::EnsEMBL::Pipeline::DBSQL::JobAdaptor
# Perl module for Bio::EnsEMBL::Pipeline::DBSQL::JobAdaptor
#
# Creator: Arne Stabenau
# Based on Job from Michele Clamp
#
# Date of creation: 15.08.2000
# Last modified : 15.08.2000 by Arne Stabenau
#
# Copyright EMBL-EBI 2000
#
# You may distribute this module under the same terms as perl itself
# POD documentation - main docs before the code
=head1 NAME
Bio::EnsEMBL::Pipeline::DBSQL::JobAdaptor
=head1 SYNOPSIS
$jobAdaptor = $dbobj->get_JobAdaptor;
$jobAdaptor = $jobobj->adaptor;
=head1 DESCRIPTION
Module to encapsulate all db access for persistent class Job.
There should be just one per application and database connection.
=head1 CONTACT
Contact Arne Stabenau on implemetation/design detail: stabenau@ebi.ac.uk
Contact Ewan Birney on EnsEMBL in general: birney@sanger.ac.uk
=head1 APPENDIX
The rest of the documentation details each of the object methods. Internal methods are usually preceded with a _
=cut
# Let the code begin...
package Bio::EnsEMBL::Pipeline::DBSQL::JobAdaptor;
use Bio::EnsEMBL::Pipeline::Job;
use Bio::EnsEMBL::Pipeline::Status;
use Bio::EnsEMBL::Utils::Exception qw(stack_trace_dump
verbose throw warning);
use vars qw(@ISA);
use strict;
@ISA = qw();
sub new {
my ($class,$dbobj) = @_;
my $self = bless {}, $class;
$self->db( $dbobj );
return $self;
}
=head2 fetch_by_dbID
Title : fetch_by_dbID
Usage : my $job = $adaptor->fetch_by_dbID
Function: Retrieves a job from database by internal id
Returns : throws exception when something goes wrong.
undef if the id is not in the db.
Args :
=cut
sub fetch_by_dbID {
my $self = shift;
my $id = shift;
my $sth = $self->prepare(q{
SELECT job_id, input_id, analysis_id, submission_id,
stdout_file, stderr_file, retry_count, temp_dir, exec_host
FROM job
WHERE job_id = ?
});
$sth->execute($id);
my $rowHashRef = $sth->fetchrow_hashref;
if( ! defined $rowHashRef ) {
return undef;
}
my $job = $self->_objFromHashref( $rowHashRef );
$sth->finish;
return $job;
}
sub fetch_by_submission_id {
my $self = shift;
my $submissionid = shift;
my @result;
my $sth = $self->prepare(q{
SELECT job_id, input_id, analysis_id, submission_id,
stdout_file, stderr_file, retry_count, temp_dir, exec_host
FROM job
WHERE submission_id = ?
});
$sth->execute($submissionid);
while( my $rowHashRef = $sth->fetchrow_hashref ) {
push( @result, $self->_objFromHashref( $rowHashRef ));
}
$sth->finish;
return @result;
}
=head2 fetch_by_dbID_list
Title : fetch_by_dbID_list
Usage : my $job = $adaptor->fetch_by_dbID_list
Function: Retrieves jobs from database by internal id
Returns : throws exception when something goes wrong.
undef if the id is not in the db.
Args :
=cut
sub fetch_by_dbID_list {
my ($self, @id) = @_;
return undef unless @id;
my @jobs;
local $" = ','; # are you local?
my $sth = $self->prepare( qq{
SELECT job_id, input_id, analysis_id, submission_id,
stdout_file, stderr_file, retry_count, temp_dir, exec_host
FROM job
WHERE job_id in (@id) } );
$sth->execute();
while (my $row = $sth->fetchrow_hashref) {
my $job = $self->_objFromHashref($row);
push(@jobs,$job);
}
$sth->finish;
return @jobs or undef;
}
=head2 fetch_by_Status_Analysis {
Title : fetch_by_Status_Analysis
Usage : my @jobs = $adaptor->fetch_by_Status_Analysis($id, $status)
Function: Retrieves all jobs in the database matching status and
an analysis id
Returns : @Bio::EnsEMBL::Pipeline::Job
Args : Analysis obj, string status, and optional start and end limits
=cut
sub fetch_by_Status_Analysis {
my ($self,$status, $analysis, $start, $end) = @_;
throw("Require status and analysis id for fetch_by_Status_Analysis")
unless ($analysis && $status);
if( ! defined $analysis->dbID ){
throw( "Analysis needs to be in database" );
}
my $analysisId = $analysis->dbID;
my $query = q{
SELECT j.job_id, j.input_id, j.analysis_id, j.submission_id,
j.stdout_file, j.stderr_file, j.retry_count,
j.temp_dir, j.exec_host
FROM job j, job_status js
WHERE j.job_id = js.job_id
AND j.analysis_id = ?
AND js.status = ?
AND js.is_current = 'y'
ORDER BY time desc
};
$query .= " LIMIT $start, $end" if ($start && $end);
my $sth = $self->prepare($query);
my $res = $sth->execute($analysisId, $status);
my @jobs;
while (my $row = $sth->fetchrow_hashref)
{
my $job = $self->_objFromHashref($row);
push(@jobs,$job);
}
$sth->finish;
return @jobs;
}
sub fetch_by_Status {
my ($self, $status, $start, $end) = @_;
throw("Require status for fetch_by_Status")
unless ($status);
my $query = q{
SELECT j.job_id, j.input_id, j.analysis_id, j.submission_id,
j.stdout_file, j.stderr_file, j.retry_count, j.temp_dir,
j.exec_host
FROM job j, job_status js
WHERE j.job_id = js.job_id
AND js.status = ?
AND js.is_current = 'y'
ORDER BY time desc
};
$query .= " LIMIT $start, $end" if ($start && $end);
my $sth = $self->prepare($query);
my $res = $sth->execute($status);
my @jobs;
while (my $row = $sth->fetchrow_hashref)
{
my $job = $self->_objFromHashref($row);
push(@jobs,$job);
}
$sth->finish;
return @jobs;
}
sub list_dbIDs{
my ($self) = @_;
my $query = q{
SELECT j.job_id
FROM job j
};
my $sth = $self->prepare($query);
$sth->execute();
my @ids;
while( my ($id) = $sth->fetchrow){
push(@ids, $id);
}
$sth->finish;
return \@ids;
}
sub fetch_all{
my ($self) = @_;
my $query = q{
SELECT j.job_id, j.input_id, j.analysis_id, j.submission_id,
j.stdout_file, j.stderr_file, j.retry_count, j.temp_dir,
j.exec_host
FROM job j
};
my $sth = $self->prepare($query);
my $res = $sth->execute();
my @jobs;
while (my $row = $sth->fetchrow_hashref) {
my $job = $self->_objFromHashref($row);
push(@jobs,$job);
}
$sth->finish;
return @jobs;
}
=head2 fetch_by_age {
Title : fetch_by_age
Usage : my @jobs = $db->fetch_by_age($duration)
Function: Retrieves all jobs in the database
that are older than than a certain duration given in minutes.
Returns : @Bio::EnsEMBL::Pipeline::Job
Args : int
=cut
sub fetch_by_age {
my ($self,$age) = @_;
throw("No input status for get_JobsByAge")
unless defined($age);
#convert age from minutes to seconds
my $sth = $self->prepare(qq{
SELECT j.job_id, j.input_id, j.analysis_id, j.submission_id,
j.stdout_file, j.stderr_file, j.retry_count, j.temp_dir,
j.exec_host
FROM job j, job_status js
WHERE j.job_id = js.job_id
AND is_current = 'y'
AND js.time < DATE_SUB(NOW(), INTERVAL $age MINUTE)
});
my $res = $sth->execute();
my @jobs;
while (my $row = $sth->fetchrow_hashref) {
my $job = $self->_objFromHashref($row);
push(@jobs,$job);
}
$sth->finish;
return @jobs;
}
=head2 fetch_by_input_id
Title : fetch_by_input_id
Usage : my @job = $adaptor->fetch_by_input_id
Function: Retrieves all jobs from adaptor with certain input id
Returns : list of job objects
throws exception when something goes wrong.
Args :
=cut
sub fetch_by_input_id {
my $self = shift;
my $inputid = shift;
my @result;
my $sth = $self->prepare(q{
SELECT job_id, input_id, analysis_id, submission_id,
stdout_file, stderr_file, retry_count, temp_dir, exec_host
FROM job
WHERE input_id = ?
});
$sth->execute($inputid);
while( my $rowHashRef = $sth->fetchrow_hashref ) {
push( @result, $self->_objFromHashref( $rowHashRef ));
}
$sth->finish;
return @result;
}
sub fetch_hash_by_input_id{
my $self = shift;
my $inputid = shift;
my $sth = $self->prepare(q{
SELECT j.job_id, j.input_id, j.analysis_id, j.submission_id,
j.stdout_file, j.stderr_file, j.retry_count, j.temp_dir, j.exec_host,
js.status, js.time, js.is_current
FROM job j, job_status js
WHERE j.input_id = ?
AND j.job_id = js.job_id
AND js.is_current = 'y'
});
my @results;
$sth->execute($inputid);
while( my $rowHashRef = $sth->fetchrow_hashref ) {
my $job = $self->_objFromHashref($rowHashRef);
push(@results, $job);
my $status = Bio::EnsEMBL::Pipeline::Status->new
(
'-jobid' => $rowHashRef->{job_id},
'-status' => $rowHashRef->{status},
'-created' => $rowHashRef->{time},
);
$self->current_status($job, $status);
}
my %hash;
foreach my $result (@results) {
$hash{$result->analysis->dbID} = $result;
}
$sth->finish;
return \%hash;
}
=head2 store
Title : store
Usage : $job->store
Function: puts a job in the db and gives it an internal id
expects analysis to be already in db.
Returns : throws exception when something goes wrong.
Args :
=cut
sub store {
my $self = shift;
my $job = shift;
if( ! defined( $job->analysis->dbID )) {
throw( "Need to store analysis first" );
}
my $sth = $self->prepare(q{
INSERT into job (input_id, analysis_id,
submission_id, stdout_file, stderr_file,
retry_count, temp_dir, exec_host)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
});
$sth->execute(
$job->input_id,
$job->analysis->dbID,
$job->submission_id,
$job->stdout_file,
$job->stderr_file,
$job->retry_count,
$job->temp_dir,
$job->execution_host,
);
my $dbId = $sth->{'mysql_insertid'};
$job->dbID( $dbId );
$job->adaptor( $self );
$sth->finish;
$self->set_status( $job, "CREATED" );
}
=head2 remove
Title : remove
Usage : $jobadaptor->remove( $job )
Function: deletes entries for job from database tables.
deletes also history of status.
Returns : throws exception when something goes wrong.
Args :
=cut
sub remove {
my $self = shift;
my $job = shift;
if( ! $job->dbID ) {
throw( "Cant remove job without dbID" );
}
my $dbID = $job->dbID;
my $sth = $self->prepare(qq{
DELETE FROM job
WHERE job_id = $dbID
});
$sth->execute;
$sth = $self->prepare( qq{
DELETE FROM job_status
WHERE job_id = $dbID
});
$sth->execute;
$sth->finish;
}
=head2 remove_by_dbID
Title : remove_by_dbID
Usage : $jobadaptor->remove_by_dbID( $dbID )
Function: deletes entries for job from database tables.
deletes also history of status. Can take a list of ids.
Returns : throws exception when something goes wrong.
Args :
=cut
sub remove_by_dbID {
my $self = shift;
my @dbIDs = @_;
if( $#dbIDs == -1 ) { return }
my $inExpr = "(".join(",", @dbIDs).")";
my $sth = $self->prepare(qq{
DELETE FROM job
WHERE job_id IN $inExpr
});
$sth->execute;
$sth->finish;
$sth = $self->prepare(qq{
DELETE FROM job_status
WHERE job_id IN $inExpr
});
$sth->execute;
$sth->finish;
}
=head2 update
Title : update
Usage : $job->update;
$jobAdaptor->update($job);
$jobAdaptor->update(@jobs)
Function: a job which is already in db can update its contents
it only updates stdout_file stderr_file, retry_count,
temp_dir, exec_host.
and submission_id
Returns : throws exception when something goes wrong.
Args : an array of Pipeline::Job objects
=cut
sub update {
my ($self, @jobs) = @_;
# only stdout, stderr, retry, submission_id and status are likely to be updated
my $sth = $self->prepare(q{
UPDATE job
SET stdout_file = ?,
stderr_file = ?,
retry_count = ?,
submission_id = ?,
exec_host = ?,
temp_dir = ?
WHERE job_id = ?
});
foreach my $job (@jobs) {
$sth->execute( $job->stdout_file,
$job->stderr_file,
$job->retry_count,
$job->submission_id,
$job->execution_host,
$job->temp_dir,
$job->dbID );
}
$sth->finish;
}
=head2 _objFromHashref
Title : _objFromHashref
Usage : my $job = $self->objFromHashref( $queryResult )
Function: Creates a Job object from given hash reference.
The hash contains column names and content of the column.
Returns : the object or undef if that wasnt possible
Args : a hash reference
=cut
sub _objFromHashref {
# create the appropriate job object
my $self = shift;
my $hashref = shift;
my $job;
my $analysis;
$analysis =
$self->db->get_AnalysisAdaptor->
fetch_by_dbID( $hashref->{analysis_id} );
$job = Bio::EnsEMBL::Pipeline::Job->new
(
'-dbobj' => $self->db,
'-adaptor' => $self,
'-id' => $hashref->{'job_id'},
'-submission_id' => $hashref->{'submission_id'},
'-input_id' => $hashref->{'input_id'},
'-stdout' => $hashref->{'stdout_file'},
'-stderr' => $hashref->{'stderr_file'},
'-analysis' => $analysis,
'-retry_count' => $hashref->{'retry_count'},
'-exec_host' => $hashref->{'exec_host'},
'-temp_dir' => $hashref->{'temp_dir'},
);
return $job;
}
=head2 set_status
Title : set_status
Usage : my $status = $job->set_status
Function: Sets the job status
Returns : nothing
Args : Pipeline::Job Bio::EnsEMBL::Pipeline::Status
=cut
sub set_status {
my ($self, $job, $stat_str) = @_;
my $status;
my $jobId;
if( ! defined ($jobId = $job->dbID)) {
throw( "Job has to be in database" );
}
eval {
my ($sth, $sth_upd, $sth_ins, $res);
$sth_upd = $self->prepare(q{
UPDATE job_status
SET is_current = 'n'
WHERE job_id = ?
AND is_current = 'y'
});
$sth_ins = $self->prepare(q{
INSERT into job_status
(job_id, status, time, is_current)
VALUES (?, ?, NOW(), 'y')
});
$sth_upd->execute($jobId);
$sth_upd->finish;
$sth_ins->execute($jobId, $stat_str);
$sth_ins->finish;
$sth = $self->prepare("SELECT NOW()");
$sth->execute();
my $time = ($sth->fetchrow_arrayref())->[0];
$status = Bio::EnsEMBL::Pipeline::Status->new
(
'-jobid' => $jobId,
'-status' => $stat_str,
'-created' => $time,
);
$self->current_status($job, $status);
$sth->finish;
};
if ($@) {
print( " $@ " );
throw("Error setting status to $stat_str");
} else {
return $status;
}
}
=head2 current_status
Title : current_status
Usage : my $status = $job->current_status
Function: Get/set method for the current status
Returns : Bio::EnsEMBL::Pipeline::Status
Args : Bio::EnsEMBL::Pipeline::Status
=cut
sub current_status {
my ($self, $job, $arg) = @_;
if (defined($arg))
{
throw("[$arg] is not a Bio::EnsEMBL::Pipeline::Status object")
unless $arg->isa("Bio::EnsEMBL::Pipeline::Status");
$job->{'_status'} = $arg;
}
else
{
throw("Can't get status if id not defined")
unless defined($job->dbID);
my $id =$job->dbID;
my $sth = $self->prepare(q{
SELECT status
FROM job_status
WHERE job_id = ?
AND is_current = 'y'
});
my $res = $sth->execute($id);
my $status;
while (my $rowhash = $sth->fetchrow_hashref() ) {
$status = $rowhash->{'status'};
}
$sth->finish;
$sth = $self->prepare("SELECT NOW()");
$res = $sth->execute();
my $time;
while (my $rowhash = $sth->fetchrow_arrayref()) {
$time = $rowhash->[0];
}
if(!$status){
my ($p, $f, $l) = caller;
warning("Have found no status for ".$job->dbID." ".
$job->input_id." ".$job->analysis->dbID.
" assuming is sucessful $f:$l\n");
my $std = stack_trace_dump();
print STDERR "$std\n";
$status = 'SUCCESSFUL';
}
my $statusobj = Bio::EnsEMBL::Pipeline::Status->new(
'-jobid' => $id,
'-status' => $status,
'-created' => $time,
);
$job->{'_status'} = $statusobj;
$sth->finish;
}
return $job->{'_status'};
}
=head2 get_all_status
Title : get_all_status
Usage : my @status = $job->get_all_status
Function: Get all status objects associated with this job
Returns : @Bio::EnsEMBL::Pipeline::Status
Args : Bio::EnsEMBL::Pipeline::Job
=cut
sub get_all_status {
my ($self, $job) = @_;
my @status;
throw("Can't get status if id not defined")
unless defined($job->dbID);
my $sth = $self->prepare(q{
SELECT job_id, status, UNIX_TIMESTAMP(time)
FROM job_status
WHERE id = ?
ORDER BY time desc
});
my $res = $sth->execute($job->dbID);
while (my $rowhash = $sth->fetchrow_hashref() ) {
my $time = $rowhash->{'UNIX_TIMESTAMP(time)'};
my $status = $rowhash->{'status'};
my $statusobj = Bio::EnsEMBL::Pipeline::Status->new(
'-jobid' => $job->dbID,
'-status' => $status,
'-created' => $time,
);
push(@status,$statusobj);
}
$sth->finish;
return @status;
}
=head2 get_last_status
Title : get_last_status
Usage : my @status = $job->get_all_status
Function: Get most recent status object associated with this job
Returns : Bio::EnsEMBL::Pipeline::Status
Args : Bio::EnsEMBL::Pipeline::Job, status string
=cut
sub get_last_status {
my ($self, $job) = @_;
throw("Can't get status if id not defined")
unless defined($job->dbID);
my $sth = $self->prepare (qq{
SELECT job_id, status, UNIX_TIMESTAMP(time)
FROM job_status
WHERE is_current = 'y'
AND job_id = ?
});
my $res = $sth->execute($job->dbID);
my $rowHashRef = $sth->fetchrow_hashref();
if( ! defined $rowHashRef ) {
return undef;
}
my $time = $rowHashRef->{'UNIX_TIMESTAMP(time)'};
my $status = $rowHashRef->{'status'};
my $statusobj = Bio::EnsEMBL::Pipeline::Status->new(
'-jobid' => $job->dbID,
'-status' => $status,
'-created' => $time,
);
$sth->finish;
return $statusobj;
}
sub list_job_id_by_status {
my ($self,$status) = @_;
my @result;
my @row;
my $sth = $self->prepare(qq{
SELECT j.job_id
FROM job j, job_status js
WHERE j.job_id = js.job_id
AND js.status = '$status'
AND is_current = 'y'
ORDER BY job_id
});
$sth->execute;
while( @row = $sth->fetchrow_array ) {
push( @result, $row[0] );
}
$sth->finish;
return \@result;
}
sub list_job_id_by_status_age {
my ($self,$status,$age) = @_;
my @result;
my @row;
my $sth = $self->prepare(qq{
SELECT job_id
FROM job_status
AND status = '$status'
AND time < DATE_SUB(NOW(), INTERVAL $age MINUTE)
ORDER BY job_id
});
$sth->execute;
while( @row = $sth->fetchrow_array ) {
push( @result, $row[0] );
}
$sth->finish;
return \@result;
}
sub db {
my ( $self, $arg ) = @_;
if( defined $arg ) {
$self->{'_db'} = $arg;
}
$self->{'_db'};
}
sub prepare {
my ( $self, $query ) = @_;
$self->db->prepare( $query );
}
sub deleteObj {
my ($self) = @_;
my @dummy = values %{$self};
foreach my $key ( keys %$self ) {
delete $self->{$key};
}
foreach my $obj ( @dummy ) {
eval {
$obj->deleteObj;
}
}
}
sub lock_tables{
my ($self) = @_;
my $sql = "LOCK TABLES job READ, job_status READ";
my $sth = $self->db->prepare($sql);
$sth->execute;
$sth->finish;
}
sub unlock_tables{
my ($self) = @_;
my $sql = "UNLOCK TABLES";
my $sth = $self->db->prepare($sql);
$sth->execute;
$sth->finish;
}
1;