Bio::EnsEMBL::Pipeline::DBSQL
JobAdaptor
Toolbar
Summary
Bio::EnsEMBL::Pipeline::DBSQL::JobAdaptor
Package variables
No package variables defined.
Included modules
Inherit
Synopsis
$jobAdaptor = $dbobj->get_JobAdaptor;
$jobAdaptor = $jobobj->adaptor;
Description
Module to encapsulate all db access for persistent class Job.
There should be just one per application and database connection.
Methods
Methods description
Title : _objFromHashref Usage : my $job = $self->objFromHashref( $queryResult ) Function: Creates a Job object from given hash reference. The hash contains column names and content of the column. Returns : the object or undef if that wasnt possible Args : a hash reference |
Title : current_status Usage : my $status = $job->current_status Function: Get/set method for the current status Returns : Bio::EnsEMBL::Pipeline::Status Args : Bio::EnsEMBL::Pipeline::Status |
Title : fetch_by_Status_Analysis Usage : my @jobs = $adaptor->fetch_by_Status_Analysis($id, $status) Function: Retrieves all jobs in the database matching status and an analysis id Returns : @Bio::EnsEMBL::Pipeline::Job Args : Analysis obj, string status, and optional start and end limits |
Title : fetch_by_age Usage : my @jobs = $db->fetch_by_age($duration) Function: Retrieves all jobs in the database that are older than than a certain duration given in minutes. Returns : @Bio::EnsEMBL::Pipeline::Job Args : int |
Title : fetch_by_dbID Usage : my $job = $adaptor->fetch_by_dbID Function: Retrieves a job from database by internal id Returns : throws exception when something goes wrong. undef if the id is not in the db. Args : |
Title : fetch_by_dbID_list Usage : my $job = $adaptor->fetch_by_dbID_list Function: Retrieves jobs from database by internal id Returns : throws exception when something goes wrong. undef if the id is not in the db. Args : |
Title : fetch_by_input_id Usage : my @job = $adaptor->fetch_by_input_id Function: Retrieves all jobs from adaptor with certain input id Returns : list of job objects throws exception when something goes wrong. Args : |
Title : get_all_status Usage : my @status = $job->get_all_status Function: Get all status objects associated with this job Returns : @Bio::EnsEMBL::Pipeline::Status Args : Bio::EnsEMBL::Pipeline::Job |
Title : get_last_status Usage : my @status = $job->get_all_status Function: Get most recent status object associated with this job Returns : Bio::EnsEMBL::Pipeline::Status Args : Bio::EnsEMBL::Pipeline::Job, status string |
Title : remove Usage : $jobadaptor->remove( $job ) Function: deletes entries for job from database tables. deletes also history of status. Returns : throws exception when something goes wrong. Args : |
Title : remove_by_dbID Usage : $jobadaptor->remove_by_dbID( $dbID ) Function: deletes entries for job from database tables. deletes also history of status. Can take a list of ids. Returns : throws exception when something goes wrong. Args : |
Title : set_status Usage : my $status = $job->set_status Function: Sets the job status Returns : nothing Args : Pipeline::Job Bio::EnsEMBL::Pipeline::Status |
Title : store Usage : $job->store Function: puts a job in the db and gives it an internal id expects analysis to be already in db. Returns : throws exception when something goes wrong. Args : |
Title : update Usage : $job->update; $jobAdaptor->update($job); $jobAdaptor->update(@jobs) Function: a job which is already in db can update its contents it only updates stdout_file stderr_file, retry_count, temp_dir, exec_host. and submission_id Returns : throws exception when something goes wrong. Args : an array of Pipeline::Job objects |
Methods code
sub _objFromHashref
{
my $self = shift;
my $hashref = shift;
my $job;
my $analysis;
$analysis =
$self->db->get_AnalysisAdaptor->
fetch_by_dbID( $hashref->{analysis_id} );
$job = Bio::EnsEMBL::Pipeline::Job->new
(
'-dbobj' => $self->db,
'-adaptor' => $self,
'-id' => $hashref->{'job_id'},
'-submission_id' => $hashref->{'submission_id'},
'-input_id' => $hashref->{'input_id'},
'-stdout' => $hashref->{'stdout_file'},
'-stderr' => $hashref->{'stderr_file'},
'-analysis' => $analysis,
'-retry_count' => $hashref->{'retry_count'},
'-exec_host' => $hashref->{'exec_host'},
'-temp_dir' => $hashref->{'temp_dir'},
);
return $job; } |
sub current_status
{ my ($self, $job, $arg) = @_;
if (defined($arg))
{
throw("[$arg] is not a Bio::EnsEMBL::Pipeline::Status object")
unless $arg->isa("Bio::EnsEMBL::Pipeline::Status");
$job->{'_status'} = $arg;
}
else
{
throw("Can't get status if id not defined")
unless defined($job->dbID);
my $id =$job->dbID;
my $sth = $self->prepare(q{ SELECT status FROM job_status WHERE job_id = ? AND is_current = 'y' });
my $res = $sth->execute($id);
my $status;
while (my $rowhash = $sth->fetchrow_hashref() ) {
$status = $rowhash->{'status'};
}
$sth->finish;
$sth = $self->prepare("SELECT NOW()");
$res = $sth->execute();
my $time;
while (my $rowhash = $sth->fetchrow_arrayref()) {
$time = $rowhash->[0];
}
if(!$status){
my ($p, $f, $l) = caller;
warning("Have found no status for ".$job->dbID." ".
$job->input_id." ".$job->analysis->dbID.
" assuming is sucessful $f:$l\n");
my $std = stack_trace_dump();
print STDERR "$std\n";
$status = 'SUCCESSFUL';
}
my $statusobj = Bio::EnsEMBL::Pipeline::Status->new(
'-jobid' => $id,
'-status' => $status,
'-created' => $time,
);
$job->{'_status'} = $statusobj;
$sth->finish;
}
return $job->{'_status'}; } |
sub db
{ my ( $self, $arg ) = @_;
if( defined $arg ) {
$self->{'_db'} = $arg;
}
$self->{'_db'}; } |
sub deleteObj
{ my ($self) = @_;
my @dummy = values %{$self};
foreach my $key ( keys %$self ) {
delete $self->{$key};
}
foreach my $obj ( @dummy ) {
eval {
$obj->deleteObj;
}
} } |
sub fetch_all
{ my ($self) = @_;
my $query = q{ SELECT j.job_id, j.input_id, j.analysis_id, j.submission_id, j.stdout_file, j.stderr_file, j.retry_count, j.temp_dir, j.exec_host FROM job j };
my $sth = $self->prepare($query);
my $res = $sth->execute();
my @jobs;
while (my $row = $sth->fetchrow_hashref) {
my $job = $self->_objFromHashref($row);
push(@jobs,$job);
}
$sth->finish;
return @jobs; } |
sub fetch_by_Status
{ my ($self, $status, $start, $end) = @_;
throw("Require status for fetch_by_Status")
unless ($status);
my $query = q{ SELECT j.job_id, j.input_id, j.analysis_id, j.submission_id, j.stdout_file, j.stderr_file, j.retry_count, j.temp_dir, j.exec_host FROM job j, job_status js WHERE j.job_id = js.job_id AND js.status = ? AND js.is_current = 'y' ORDER BY time desc };
$query .= " LIMIT $start, $end" if ($start && $end);
my $sth = $self->prepare($query);
my $res = $sth->execute($status);
my @jobs;
while (my $row = $sth->fetchrow_hashref)
{
my $job = $self->_objFromHashref($row);
push(@jobs,$job);
}
$sth->finish;
return @jobs; } |
sub fetch_by_Status_Analysis
{ my ($self,$status, $analysis, $start, $end) = @_;
throw("Require status and analysis id for fetch_by_Status_Analysis")
unless ($analysis && $status);
if( ! defined $analysis->dbID ){
throw( "Analysis needs to be in database" );
}
my $analysisId = $analysis->dbID;
my $query = q{ SELECT j.job_id, j.input_id, j.analysis_id, j.submission_id, j.stdout_file, j.stderr_file, j.retry_count, j.temp_dir, j.exec_host FROM job j, job_status js WHERE j.job_id = js.job_id AND j.analysis_id = ? AND js.status = ? AND js.is_current = 'y' ORDER BY time desc };
$query .= " LIMIT $start, $end" if ($start && $end);
my $sth = $self->prepare($query);
my $res = $sth->execute($analysisId, $status);
my @jobs;
while (my $row = $sth->fetchrow_hashref)
{
my $job = $self->_objFromHashref($row);
push(@jobs,$job);
}
$sth->finish;
return @jobs; } |
sub fetch_by_age
{ my ($self,$age) = @_;
throw("No input status for get_JobsByAge")
unless defined($age);
my $sth = $self->prepare(qq{
SELECT j.job_id, j.input_id, j.analysis_id, j.submission_id,
j.stdout_file, j.stderr_file, j.retry_count, j.temp_dir,
j.exec_host
FROM job j, job_status js
WHERE j.job_id = js.job_id
AND is_current = 'y'
AND js.time < DATE_SUB(NOW(), INTERVAL $age MINUTE)
});
my $res = $sth->execute();
my @jobs;
while (my $row = $sth->fetchrow_hashref) {
my $job = $self->_objFromHashref($row);
push(@jobs,$job);
}
$sth->finish;
return @jobs; } |
sub fetch_by_dbID
{ my $self = shift;
my $id = shift;
my $sth = $self->prepare(q{ SELECT job_id, input_id, analysis_id, submission_id, stdout_file, stderr_file, retry_count, temp_dir, exec_host FROM job WHERE job_id = ? });
$sth->execute($id);
my $rowHashRef = $sth->fetchrow_hashref;
if( ! defined $rowHashRef ) {
return undef;
}
my $job = $self->_objFromHashref( $rowHashRef );
$sth->finish;
return $job; } |
sub fetch_by_dbID_list
{ my ($self, @id) = @_;
return undef unless @id;
my @jobs;
local $" = ',';
my $sth = $self->prepare( qq{
SELECT job_id, input_id, analysis_id, submission_id,
stdout_file, stderr_file, retry_count, temp_dir, exec_host
FROM job
WHERE job_id in (@id) } );
$sth->execute();
while (my $row = $sth->fetchrow_hashref) {
my $job = $self->_objFromHashref($row);
push(@jobs,$job);
}
$sth->finish;
return @jobs or undef; } |
sub fetch_by_input_id
{ my $self = shift;
my $inputid = shift;
my @result;
my $sth = $self->prepare(q{ SELECT job_id, input_id, analysis_id, submission_id, stdout_file, stderr_file, retry_count, temp_dir, exec_host FROM job WHERE input_id = ? });
$sth->execute($inputid);
while( my $rowHashRef = $sth->fetchrow_hashref ) {
push( @result, $self->_objFromHashref( $rowHashRef ));
}
$sth->finish;
return @result; } |
sub fetch_by_submission_id
{ my $self = shift;
my $submissionid = shift;
my @result;
my $sth = $self->prepare(q{ SELECT job_id, input_id, analysis_id, submission_id, stdout_file, stderr_file, retry_count, temp_dir, exec_host FROM job WHERE submission_id = ? });
$sth->execute($submissionid);
while( my $rowHashRef = $sth->fetchrow_hashref ) {
push( @result, $self->_objFromHashref( $rowHashRef ));
}
$sth->finish;
return @result; } |
sub fetch_hash_by_input_id
{ my $self = shift;
my $inputid = shift;
my $sth = $self->prepare(q{ SELECT j.job_id, j.input_id, j.analysis_id, j.submission_id, j.stdout_file, j.stderr_file, j.retry_count, j.temp_dir, j.exec_host, js.status, js.time, js.is_current FROM job j, job_status js WHERE j.input_id = ? AND j.job_id = js.job_id AND js.is_current = 'y' });
my @results;
$sth->execute($inputid);
while( my $rowHashRef = $sth->fetchrow_hashref ) {
my $job = $self->_objFromHashref($rowHashRef);
push(@results, $job);
my $status = Bio::EnsEMBL::Pipeline::Status->new
(
'-jobid' => $rowHashRef->{job_id},
'-status' => $rowHashRef->{status},
'-created' => $rowHashRef->{time},
);
$self->current_status($job, $status);
}
my %hash;
foreach my $result (@results) {
$hash{$result->analysis->dbID} = $result;
}
$sth->finish;
return\% hash; } |
sub get_all_status
{ my ($self, $job) = @_;
my @status;
throw("Can't get status if id not defined")
unless defined($job->dbID);
my $sth = $self->prepare(q{ SELECT job_id, status, UNIX_TIMESTAMP(time) FROM job_status WHERE id = ? ORDER BY time desc });
my $res = $sth->execute($job->dbID);
while (my $rowhash = $sth->fetchrow_hashref() ) {
my $time = $rowhash->{'UNIX_TIMESTAMP(time)'};
my $status = $rowhash->{'status'};
my $statusobj = Bio::EnsEMBL::Pipeline::Status->new(
'-jobid' => $job->dbID,
'-status' => $status,
'-created' => $time,
);
push(@status,$statusobj);
}
$sth->finish;
return @status; } |
sub get_last_status
{ my ($self, $job) = @_;
throw("Can't get status if id not defined")
unless defined($job->dbID);
my $sth = $self->prepare (qq{
SELECT job_id, status, UNIX_TIMESTAMP(time)
FROM job_status
WHERE is_current = 'y'
AND job_id = ?
});
my $res = $sth->execute($job->dbID);
my $rowHashRef = $sth->fetchrow_hashref();
if( ! defined $rowHashRef ) {
return undef;
}
my $time = $rowHashRef->{'UNIX_TIMESTAMP(time)'};
my $status = $rowHashRef->{'status'};
my $statusobj = Bio::EnsEMBL::Pipeline::Status->new(
'-jobid' => $job->dbID,
'-status' => $status,
'-created' => $time,
);
$sth->finish;
return $statusobj; } |
sub list_dbIDs
{ my ($self) = @_;
my $query = q{ SELECT j.job_id FROM job j };
my $sth = $self->prepare($query);
$sth->execute();
my @ids;
while( my ($id) = $sth->fetchrow){
push(@ids, $id);
}
$sth->finish;
return\@ ids; } |
sub list_job_id_by_status
{ my ($self,$status) = @_;
my @result;
my @row;
my $sth = $self->prepare(qq{
SELECT j.job_id
FROM job j, job_status js
WHERE j.job_id = js.job_id
AND js.status = '$status'
AND is_current = 'y'
ORDER BY job_id
});
$sth->execute;
while( @row = $sth->fetchrow_array ) {
push( @result, $row[0] );
}
$sth->finish;
return\@ result; } |
sub list_job_id_by_status_age
{ my ($self,$status,$age) = @_;
my @result;
my @row;
my $sth = $self->prepare(qq{
SELECT job_id
FROM job_status
AND status = '$status'
AND time < DATE_SUB(NOW(), INTERVAL $age MINUTE)
ORDER BY job_id
});
$sth->execute;
while( @row = $sth->fetchrow_array ) {
push( @result, $row[0] );
}
$sth->finish;
return\@ result; } |
sub lock_tables
{ my ($self) = @_;
my $sql = "LOCK TABLES job READ, job_status READ";
my $sth = $self->db->prepare($sql);
$sth->execute;
$sth->finish; } |
sub new
{ my ($class,$dbobj) = @_;
my $self = bless {}, $class;
$self->db( $dbobj );
return $self; } |
sub prepare
{ my ( $self, $query ) = @_;
$self->db->prepare( $query ); } |
sub remove
{ my $self = shift;
my $job = shift;
if( ! $job->dbID ) {
throw( "Cant remove job without dbID" );
}
my $dbID = $job->dbID;
my $sth = $self->prepare(qq{
DELETE FROM job
WHERE job_id = $dbID
});
$sth->execute;
$sth = $self->prepare( qq{
DELETE FROM job_status
WHERE job_id = $dbID
});
$sth->execute;
$sth->finish; } |
sub remove_by_dbID
{ my $self = shift;
my @dbIDs = @_;
if( $#dbIDs == -1 ) { return }
my $inExpr = "(".join(",", @dbIDs).")";
my $sth = $self->prepare(qq{
DELETE FROM job
WHERE job_id IN $inExpr
});
$sth->execute;
$sth->finish;
$sth = $self->prepare(qq{
DELETE FROM job_status
WHERE job_id IN $inExpr
});
$sth->execute;
$sth->finish; } |
sub set_status
{ my ($self, $job, $stat_str) = @_;
my $status;
my $jobId;
if( ! defined ($jobId = $job->dbID)) {
throw( "Job has to be in database" );
}
eval {
my ($sth, $sth_upd, $sth_ins, $res);
$sth_upd = $self->prepare(q{ UPDATE job_status SET is_current = 'n' WHERE job_id = ? AND is_current = 'y' });
$sth_ins = $self->prepare(q{ INSERT into job_status (job_id, status, time, is_current) VALUES (?, ?, NOW(), 'y') });
$sth_upd->execute($jobId);
$sth_upd->finish;
$sth_ins->execute($jobId, $stat_str);
$sth_ins->finish;
$sth = $self->prepare("SELECT NOW()");
$sth->execute();
my $time = ($sth->fetchrow_arrayref())->[0];
$status = Bio::EnsEMBL::Pipeline::Status->new
(
'-jobid' => $jobId,
'-status' => $stat_str,
'-created' => $time,
);
$self->current_status($job, $status);
$sth->finish;
};
if ($@) {
print( " $@ " );
throw("Error setting status to $stat_str");
} else {
return $status;
} } |
sub store
{ my $self = shift;
my $job = shift;
if( ! defined( $job->analysis->dbID )) {
throw( "Need to store analysis first" );
}
my $sth = $self->prepare(q{ INSERT into job (input_id, analysis_id, submission_id, stdout_file, stderr_file, retry_count, temp_dir, exec_host) VALUES (?, ?, ?, ?, ?, ?, ?, ?) });
$sth->execute(
$job->input_id,
$job->analysis->dbID,
$job->submission_id,
$job->stdout_file,
$job->stderr_file,
$job->retry_count,
$job->temp_dir,
$job->execution_host,
);
my $dbId = $sth->{'mysql_insertid'};
$job->dbID( $dbId );
$job->adaptor( $self );
$sth->finish;
$self->set_status( $job, "CREATED" ); } |
sub unlock_tables
{ my ($self) = @_;
my $sql = "UNLOCK TABLES";
my $sth = $self->db->prepare($sql);
$sth->execute;
$sth->finish;
}
1; } |
sub update
{ my ($self, @jobs) = @_;
my $sth = $self->prepare(q{ UPDATE job SET stdout_file = ?, stderr_file = ?, retry_count = ?, submission_id = ?, exec_host = ?, temp_dir = ? WHERE job_id = ? });
foreach my $job (@jobs) {
$sth->execute( $job->stdout_file,
$job->stderr_file,
$job->retry_count,
$job->submission_id,
$job->execution_host,
$job->temp_dir,
$job->dbID );
}
$sth->finish; } |
General documentation
The rest of the documentation details each of the object methods. Internal methods are usually preceded with a _