Raw content of Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor
# Perl module for Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor
#
# Date of creation: 22.03.2004
# Original Creator : Jessica Severin
#
# Copyright EMBL-EBI 2004
#
# You may distribute this module under the same terms as perl itself
# POD documentation - main docs before the code
=head1 NAME
Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor
=head1 SYNOPSIS
$analysisStatsAdaptor = $db_adaptor->get_AnalysisStatsAdaptor;
$analysisStatsAdaptor = $analysisStats->adaptor;
=head1 DESCRIPTION
Module to encapsulate all db access for persistent class AnalysisStats.
There should be just one per application and database connection.
=head1 CONTACT
Contact Jessica Severin on implemetation/design detail: jessica@ebi.ac.uk
Contact Ewan Birney on EnsEMBL in general: birney@sanger.ac.uk
=head1 APPENDIX
The rest of the documentation details each of the object methods.
Internal methods are usually preceded with a _
=cut
# Let the code begin...
package Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor;
use strict;
use Bio::EnsEMBL::Hive::AnalysisStats;
use Bio::EnsEMBL::DBSQL::BaseAdaptor;
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
our @ISA = qw(Bio::EnsEMBL::DBSQL::BaseAdaptor);
=head2 fetch_by_analysis_id
Arg [1] : int $id
the unique database identifier for the feature to be obtained
Example : $feat = $adaptor->fetch_by_analysis_id(1234);
Description: Returns the feature created from the database defined by the
the id $id.
Returntype : Bio::EnsEMBL::Hive::AnalysisStats
Exceptions : thrown if $id is not defined
Caller : general
=cut
sub fetch_by_analysis_id {
my ($self,$id) = @_;
unless(defined $id) {
throw("fetch_by_analysis_id must have an id");
}
my $constraint = "ast.analysis_id = $id";
#return first element of _generic_fetch list
my ($obj) = @{$self->_generic_fetch($constraint)};
unless(defined($obj)) {
$self->_create_new_for_analysis_id($id);
($obj) = @{$self->_generic_fetch($constraint)};
}
if(!defined($obj)) {
throw("unable to fetch analysis_stats for analysis_id = $id\n");
}
return $obj;
}
sub fetch_all {
my $self = shift;
return $self->_generic_fetch();
}
sub fetch_by_needed_workers {
my $self = shift;
my $limit = shift;
my $maximise_concurrency = shift;
my $constraint = "ast.num_required_workers>0 AND ast.status in ('READY','WORKING')";
my $first_order_by;
if ($maximise_concurrency) {
$first_order_by = 'ORDER BY num_running_workers';
# print STDERR "###> Maximising concurrency\n";
} else {
$first_order_by = 'ORDER BY num_required_workers DESC';
}
if($limit) {
$self->_final_clause("$first_order_by, hive_capacity DESC, analysis_id LIMIT $limit");
} else {
$self->_final_clause("$first_order_by, hive_capacity DESC, analysis_id");
}
my $results = $self->_generic_fetch($constraint);
$self->_final_clause(""); #reset final clause for other fetches
return $results;
}
sub fetch_by_status {
my $self = shift;
my $constraint = "ast.status in (";
my $addComma;
while(@_) {
my $status = shift;
$constraint .= ',' if($addComma);
$constraint .= "'$status' ";
$addComma = 1;
}
$constraint .= ")";
$self->_final_clause("ORDER BY last_update");
my $results = $self->_generic_fetch($constraint);
$self->_final_clause(""); #reset final clause for other fetches
return $results;
}
sub refresh {
my ($self, $stats) = @_;
my $constraint = "ast.analysis_id = " . $stats->analysis_id;
#return first element of _generic_fetch list
$stats = @{$self->_generic_fetch($constraint)};
return $stats;
}
sub get_running_worker_count {
my ($self, $stats) = @_;
my $sql = "SELECT count(*) FROM hive WHERE cause_of_death='' and analysis_id=?";
my $sth = $self->prepare($sql);
$sth->execute($stats->analysis_id);
my ($liveCount) = $sth->fetchrow_array();
$sth->finish;
return $liveCount;
}
#
# STORE / UPDATE METHODS
#
################
=head2 update
Arg [1] : Bio::EnsEMBL::Hive::AnalysisStats object
Example :
Description:
Returntype : Bio::EnsEMBL::Hive::Worker
Exceptions :
Caller :
=cut
sub update {
my ($self, $stats) = @_;
my $running_worker_count = $self->get_running_worker_count($stats);
$stats->num_running_workers($running_worker_count);
my $hive_capacity = $stats->hive_capacity;
if ($stats->behaviour eq "DYNAMIC") {
my $max_hive_capacity = $hive_capacity;
if ($stats->avg_input_msec_per_job) {
$max_hive_capacity = int($stats->input_capacity * $stats->avg_msec_per_job / $stats->avg_input_msec_per_job);
}
if ($stats->avg_output_msec_per_job) {
my $max_hive_capacity2 = int($stats->output_capacity * $stats->avg_msec_per_job / $stats->avg_output_msec_per_job);
if ($max_hive_capacity2 < $max_hive_capacity) {
$max_hive_capacity = $max_hive_capacity2;
}
}
if (($hive_capacity > $max_hive_capacity) or ($hive_capacity < $max_hive_capacity )) {
if (abs($hive_capacity - $max_hive_capacity) > 2) {
$stats->hive_capacity(($hive_capacity + $max_hive_capacity) / 2);
} elsif ($hive_capacity > $max_hive_capacity) {
$stats->hive_capacity($hive_capacity - 1);
} elsif ($hive_capacity < $max_hive_capacity) {
$stats->hive_capacity($hive_capacity + 1);
}
}
}
my $sql = "UPDATE analysis_stats SET status='".$stats->status."' ";
$sql .= ",batch_size=" . $stats->batch_size();
$sql .= ",avg_msec_per_job=" . $stats->avg_msec_per_job();
$sql .= ",avg_input_msec_per_job=" . $stats->avg_input_msec_per_job();
$sql .= ",avg_run_msec_per_job=" . $stats->avg_run_msec_per_job();
$sql .= ",avg_output_msec_per_job=" . $stats->avg_output_msec_per_job();
$sql .= ",hive_capacity=" . $stats->hive_capacity();
$sql .= ",total_job_count=" . $stats->total_job_count();
$sql .= ",unclaimed_job_count=" . $stats->unclaimed_job_count();
$sql .= ",done_job_count=" . $stats->done_job_count();
$sql .= ",max_retry_count=" . $stats->max_retry_count();
$sql .= ",failed_job_count=" . $stats->failed_job_count();
$sql .= ",failed_job_tolerance=" . $stats->failed_job_tolerance();
$sql .= ",num_running_workers=" . $stats->num_running_workers();
$sql .= ",num_required_workers=" . $stats->num_required_workers();
$sql .= ",last_update=NOW()";
$sql .= ",sync_lock=''";
$sql .= " WHERE analysis_id='".$stats->analysis_id."' ";
my $sth = $self->prepare($sql);
$sth->execute();
$sth->finish;
$sth = $self->prepare("INSERT INTO analysis_stats_monitor SELECT now(), analysis_stats.* from analysis_stats WHERE analysis_id = ".$stats->analysis_id);
$sth->execute();
$sth->finish;
$stats->seconds_since_last_update(0); #not exact but good enough :)
}
sub update_status
{
my ($self, $analysis_id, $status) = @_;
my $sql = "UPDATE analysis_stats SET status='$status' ";
$sql .= " WHERE analysis_id='$analysis_id' ";
my $sth = $self->prepare($sql);
$sth->execute();
$sth->finish;
}
=head2 interval_update_work_done
Arg [1] : int $analysis_id
Arg [2] : int $jobs_done_in_interval
Arg [3] : int $interval_msec
Example : $statsDBA->incremental_update_work_done($analysis_id, $jobs_done, $interval_msecs);
Description : does a database update to recalculate the avg_msec_per_job and done_job_count
does an interval equation by multiplying out the previous done_job_count with the
previous avg_msec_per_job and then expanding by new interval values to give a better average.
Caller : Bio::EnsEMBL::Hive::Worker
=cut
sub interval_update_work_done
{
my ($self, $analysis_id, $job_count, $interval, $worker) = @_;
my $sql = "UPDATE analysis_stats SET ".
"unclaimed_job_count = unclaimed_job_count - $job_count, ".
"avg_msec_per_job = (((done_job_count*avg_msec_per_job)/3 + $interval) / (done_job_count/3 + $job_count)), ".
"avg_input_msec_per_job = (((done_job_count*avg_input_msec_per_job)/3 + ".
($worker->{fetch_time}).") / (done_job_count/3 + $job_count)), ".
"avg_run_msec_per_job = (((done_job_count*avg_run_msec_per_job)/3 + ".
($worker->{run_time}).") / (done_job_count/3 + $job_count)), ".
"avg_output_msec_per_job = (((done_job_count*avg_output_msec_per_job)/3 + ".
($worker->{write_time}).") / (done_job_count/3 + $job_count)), ".
"done_job_count = done_job_count + $job_count ".
" WHERE analysis_id= $analysis_id";
$self->dbc->do($sql);
}
sub decrease_hive_capacity
{
my $self = shift;
my $analysis_id = shift;
my $sql = "UPDATE analysis_stats ".
" SET hive_capacity = hive_capacity - 1, ".
" num_required_workers = IF(num_required_workers > 0, num_required_workers - 1, 0) ".
" WHERE analysis_id='$analysis_id' and hive_capacity > 1";
$self->dbc->do($sql);
}
sub increase_hive_capacity
{
my $self = shift;
my $analysis_id = shift;
my $sql = "UPDATE analysis_stats ".
" SET hive_capacity = hive_capacity + 1, num_required_workers = 1".
" WHERE analysis_id='$analysis_id' and hive_capacity <= 500 and num_required_workers = 0";
$self->dbc->do($sql);
}
sub increase_running_workers
{
my $self = shift;
my $analysis_id = shift;
my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers + 1 ".
" WHERE analysis_id='$analysis_id'";
$self->dbc->do($sql);
}
sub decrease_running_workers
{
my $self = shift;
my $analysis_id = shift;
my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ".
" WHERE analysis_id='$analysis_id'";
$self->dbc->do($sql);
}
sub decrease_needed_workers
{
my $self = shift;
my $analysis_id = shift;
my $sql = "UPDATE analysis_stats SET num_required_workers=num_required_workers-1 ".
"WHERE analysis_id='$analysis_id' ";
$self->dbc->do($sql);
}
sub increase_needed_workers
{
my $self = shift;
my $analysis_id = shift;
my $sql = "UPDATE analysis_stats SET num_required_workers=num_required_workers+1 ".
"WHERE analysis_id='$analysis_id' ";
$self->dbc->do($sql);
}
#
# INTERNAL METHODS
#
###################
=head2 _generic_fetch
Arg [1] : (optional) string $constraint
An SQL query constraint (i.e. part of the WHERE clause)
Arg [2] : (optional) string $logic_name
the logic_name of the analysis of the features to obtain
Example : $fts = $a->_generic_fetch('contig_id in (1234, 1235)', 'Swall');
Description: Performs a database fetch and returns feature objects in
contig coordinates.
Returntype : listref of Bio::EnsEMBL::SeqFeature in contig coordinates
Exceptions : none
Caller : BaseFeatureAdaptor, ProxyDnaAlignFeatureAdaptor::_generic_fetch
=cut
sub _generic_fetch {
my ($self, $constraint, $join) = @_;
my @tables = $self->_tables;
my $columns = join(', ', $self->_columns());
if ($join) {
foreach my $single_join (@{$join}) {
my ($tablename, $condition, $extra_columns) = @{$single_join};
if ($tablename && $condition) {
push @tables, $tablename;
if($constraint) {
$constraint .= " AND $condition";
} else {
$constraint = " $condition";
}
}
if ($extra_columns) {
$columns .= ", " . join(', ', @{$extra_columns});
}
}
}
#construct a nice table string like 'table1 t1, table2 t2'
my $tablenames = join(', ', map({ join(' ', @$_) } @tables));
my $sql = "SELECT $columns FROM $tablenames";
my $default_where = $self->_default_where_clause;
my $final_clause = $self->_final_clause;
#append a where clause if it was defined
if($constraint) {
$sql .= " WHERE $constraint ";
if($default_where) {
$sql .= " AND $default_where ";
}
} elsif($default_where) {
$sql .= " WHERE $default_where ";
}
#append additional clauses which may have been defined
$sql .= " $final_clause";
#rint STDOUT $sql,"\n";
my $sth = $self->prepare($sql);
$sth->execute;
return $self->_objs_from_sth($sth);
}
sub _tables {
my $self = shift;
return (['analysis_stats', 'ast']);
}
sub _columns {
my $self = shift;
my @columns = qw (ast.analysis_id
ast.status
ast.batch_size
ast.avg_msec_per_job
ast.avg_input_msec_per_job
ast.avg_run_msec_per_job
ast.avg_output_msec_per_job
ast.hive_capacity
ast.behaviour
ast.input_capacity
ast.output_capacity
ast.total_job_count
ast.unclaimed_job_count
ast.done_job_count
ast.max_retry_count
ast.failed_job_count
ast.failed_job_tolerance
ast.num_running_workers
ast.num_required_workers
ast.last_update
ast.sync_lock
);
push @columns , "UNIX_TIMESTAMP()-UNIX_TIMESTAMP(ast.last_update) seconds_since_last_update ";
return @columns;
}
sub _objs_from_sth {
my ($self, $sth) = @_;
my %column;
$sth->bind_columns( \( @column{ @{$sth->{NAME_lc} } } ));
my @statsArray = ();
while ($sth->fetch()) {
my $analStats = new Bio::EnsEMBL::Hive::AnalysisStats;
$analStats->analysis_id($column{'analysis_id'});
$analStats->status($column{'status'});
$analStats->sync_lock($column{'sync_lock'});
$analStats->batch_size($column{'batch_size'});
$analStats->avg_msec_per_job($column{'avg_msec_per_job'});
$analStats->avg_input_msec_per_job($column{'avg_input_msec_per_job'});
$analStats->avg_run_msec_per_job($column{'avg_run_msec_per_job'});
$analStats->avg_output_msec_per_job($column{'avg_output_msec_per_job'});
$analStats->hive_capacity($column{'hive_capacity'});
$analStats->behaviour($column{'behaviour'});
$analStats->input_capacity($column{'input_capacity'});
$analStats->output_capacity($column{'output_capacity'});
$analStats->total_job_count($column{'total_job_count'});
$analStats->unclaimed_job_count($column{'unclaimed_job_count'});
$analStats->done_job_count($column{'done_job_count'});
$analStats->max_retry_count($column{'max_retry_count'});
$analStats->failed_job_count($column{'failed_job_count'});
$analStats->failed_job_tolerance($column{'failed_job_tolerance'});
$analStats->num_running_workers($column{'num_running_workers'});
$analStats->num_required_workers($column{'num_required_workers'});
$analStats->seconds_since_last_update($column{'seconds_since_last_update'});
$analStats->adaptor($self);
push @statsArray, $analStats;
}
$sth->finish;
return \@statsArray
}
sub _default_where_clause {
my $self = shift;
return '';
}
sub _final_clause {
my $self = shift;
$self->{'_final_clause'} = shift if(@_);
$self->{'_final_clause'} = "" unless($self->{'_final_clause'});
return $self->{'_final_clause'};
}
sub _create_new_for_analysis_id {
my ($self, $analysis_id) = @_;
my $sql;
$sql = "INSERT ignore INTO analysis_stats (analysis_id) VALUES ($analysis_id)";
#print("$sql\n");
my $sth = $self->prepare($sql);
$sth->execute();
$sth->finish;
}
1;