Bio::EnsEMBL::Hive::DBSQL AnalysisStatsAdaptor
SummaryIncluded librariesPackage variablesSynopsisDescriptionGeneral documentationMethods
Toolbar
WebCvsRaw content
Summary
  Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor
Package variables
No package variables defined.
Included modules
Bio::EnsEMBL::DBSQL::BaseAdaptor
Bio::EnsEMBL::Hive::AnalysisStats
Bio::EnsEMBL::Utils::Argument
Bio::EnsEMBL::Utils::Exception
Inherit
Bio::EnsEMBL::DBSQL::BaseAdaptor
Synopsis
  $analysisStatsAdaptor = $db_adaptor->get_AnalysisStatsAdaptor;
$analysisStatsAdaptor = $analysisStats->adaptor;
Description
  Module to encapsulate all db access for persistent class AnalysisStats.
There should be just one per application and database connection.
Methods
_columns
No description
Code
_create_new_for_analysis_id
No description
Code
_default_where_clause
No description
Code
_final_clause
No description
Code
_generic_fetchDescriptionCode
_objs_from_sth
No description
Code
_tables
No description
Code
decrease_hive_capacity
No description
Code
decrease_needed_workers
No description
Code
decrease_running_workers
No description
Code
fetch_all
No description
Code
fetch_by_analysis_idDescriptionCode
fetch_by_needed_workers
No description
Code
fetch_by_status
No description
Code
get_running_worker_count
No description
Code
increase_hive_capacity
No description
Code
increase_needed_workers
No description
Code
increase_running_workers
No description
Code
interval_update_work_doneDescriptionCode
refresh
No description
Code
updateDescriptionCode
update_status
No description
Code
Methods description
_generic_fetchcode    nextTop
  Arg [1]    : (optional) string $constraint
An SQL query constraint (i.e. part of the WHERE clause)
Arg [2] : (optional) string $logic_name
the logic_name of the analysis of the features to obtain
Example : $fts = $a->_generic_fetch('contig_id in (1234, 1235)', 'Swall');
Description: Performs a database fetch and returns feature objects in
contig coordinates.
Returntype : listref of Bio::EnsEMBL::SeqFeature in contig coordinates
Exceptions : none
Caller : BaseFeatureAdaptor, ProxyDnaAlignFeatureAdaptor::_generic_fetch
fetch_by_analysis_idcodeprevnextTop
  Arg [1]    : int $id
the unique database identifier for the feature to be obtained
Example : $feat = $adaptor->fetch_by_analysis_id(1234);
Description: Returns the feature created from the database defined by the
the id $id.
Returntype : Bio::EnsEMBL::Hive::AnalysisStats
Exceptions : thrown if $id is not defined
Caller : general
interval_update_work_donecodeprevnextTop
  Arg [1]     : int $analysis_id
Arg [2] : int $jobs_done_in_interval
Arg [3] : int $interval_msec
Example : $statsDBA->incremental_update_work_done($analysis_id, $jobs_done, $interval_msecs);
Description : does a database update to recalculate the avg_msec_per_job and done_job_count
does an interval equation by multiplying out the previous done_job_count with the
previous avg_msec_per_job and then expanding by new interval values to give a better average.
Caller : Bio::EnsEMBL::Hive::Worker
updatecodeprevnextTop
  Arg [1]    : Bio::EnsEMBL::Hive::AnalysisStats object
Example :
Description:
Returntype : Bio::EnsEMBL::Hive::Worker
Exceptions :
Caller :
Methods code
_columnsdescriptionprevnextTop
sub _columns {
  my $self = shift;

  my @columns = qw (ast.analysis_id
                    ast.status
                    ast.batch_size
                    ast.avg_msec_per_job
                    ast.avg_input_msec_per_job
                    ast.avg_run_msec_per_job
                    ast.avg_output_msec_per_job
                    ast.hive_capacity
                    ast.behaviour
                    ast.input_capacity
                    ast.output_capacity
                    ast.total_job_count
                    ast.unclaimed_job_count
                    ast.done_job_count
                    ast.max_retry_count
                    ast.failed_job_count
                    ast.failed_job_tolerance
                    ast.num_running_workers
                    ast.num_required_workers
                    ast.last_update
                    ast.sync_lock
                   );
  push @columns , "UNIX_TIMESTAMP()-UNIX_TIMESTAMP(ast.last_update) seconds_since_last_update ";
  return @columns;
}
_create_new_for_analysis_iddescriptionprevnextTop
sub _create_new_for_analysis_id {
  my ($self, $analysis_id) = @_;

  my $sql;

  $sql = "INSERT ignore INTO analysis_stats (analysis_id) VALUES ($analysis_id)";
  #print("$sql\n");
my $sth = $self->prepare($sql); $sth->execute(); $sth->finish; } 1;
}
_default_where_clausedescriptionprevnextTop
sub _default_where_clause {
  my $self = shift;
  return '';
}
_final_clausedescriptionprevnextTop
sub _final_clause {
  my $self = shift;
  $self->{'_final_clause'} = shift if(@_);
  $self->{'_final_clause'} = "" unless($self->{'_final_clause'});
  return $self->{'_final_clause'};
}
_generic_fetchdescriptionprevnextTop
sub _generic_fetch {
  my ($self, $constraint, $join) = @_;
  
  my @tables = $self->_tables;
  my $columns = join(', ', $self->_columns());
  
  if ($join) {
    foreach my $single_join (@{$join}) {
      my ($tablename, $condition, $extra_columns) = @{$single_join};
      if ($tablename && $condition) {
        push @tables, $tablename;
        
        if($constraint) {
          $constraint .= " AND $condition";
        } else {
          $constraint = " $condition";
        }
      } 
      if ($extra_columns) {
        $columns .= ", " . join(', ', @{$extra_columns});
      }
    }
  }
      
  #construct a nice table string like 'table1 t1, table2 t2'
my $tablenames = join(', ', map({ join(' ', @$_) } @tables)); my $sql = "SELECT $columns FROM $tablenames"; my $default_where = $self->_default_where_clause; my $final_clause = $self->_final_clause; #append a where clause if it was defined
if($constraint) { $sql .= " WHERE $constraint "; if($default_where) { $sql .= " AND $default_where "; } } elsif($default_where) { $sql .= " WHERE $default_where "; } #append additional clauses which may have been defined
$sql .= " $final_clause"; #rint STDOUT $sql,"\n";
my $sth = $self->prepare($sql); $sth->execute; return $self->_objs_from_sth($sth);
}
_objs_from_sthdescriptionprevnextTop
sub _objs_from_sth {
  my ($self, $sth) = @_;
  
  my %column;
  $sth->bind_columns(\(  @column{ @{$sth->{NAME_lc} } } ));

  my @statsArray = ();

  while ($sth->fetch()) {
    my $analStats = new Bio::EnsEMBL::Hive::AnalysisStats;

    $analStats->analysis_id($column{'analysis_id'});
    $analStats->status($column{'status'});
    $analStats->sync_lock($column{'sync_lock'});
    $analStats->batch_size($column{'batch_size'});
    $analStats->avg_msec_per_job($column{'avg_msec_per_job'});
    $analStats->avg_input_msec_per_job($column{'avg_input_msec_per_job'});
    $analStats->avg_run_msec_per_job($column{'avg_run_msec_per_job'});
    $analStats->avg_output_msec_per_job($column{'avg_output_msec_per_job'});
    $analStats->hive_capacity($column{'hive_capacity'});
    $analStats->behaviour($column{'behaviour'});
    $analStats->input_capacity($column{'input_capacity'});
    $analStats->output_capacity($column{'output_capacity'});
    $analStats->total_job_count($column{'total_job_count'});
    $analStats->unclaimed_job_count($column{'unclaimed_job_count'});
    $analStats->done_job_count($column{'done_job_count'});
    $analStats->max_retry_count($column{'max_retry_count'});
    $analStats->failed_job_count($column{'failed_job_count'});
    $analStats->failed_job_tolerance($column{'failed_job_tolerance'});
    $analStats->num_running_workers($column{'num_running_workers'});
    $analStats->num_required_workers($column{'num_required_workers'});
    $analStats->seconds_since_last_update($column{'seconds_since_last_update'});
    $analStats->adaptor($self);

    push @statsArray, $analStats;
  }
  $sth->finish;

  return\@ statsArray
}
_tablesdescriptionprevnextTop
sub _tables {
  my $self = shift;

  return (['analysis_stats', 'ast']);
}
decrease_hive_capacitydescriptionprevnextTop
sub decrease_hive_capacity {
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats ".
      " SET hive_capacity = hive_capacity - 1, ".
      " num_required_workers = IF(num_required_workers > 0, num_required_workers - 1, 0) ".
      " WHERE analysis_id='$analysis_id' and hive_capacity > 1";

  $self->dbc->do($sql);
}
decrease_needed_workersdescriptionprevnextTop
sub decrease_needed_workers {
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats SET num_required_workers=num_required_workers-1 ".
            "WHERE analysis_id='$analysis_id' ";

  $self->dbc->do($sql);
}
decrease_running_workersdescriptionprevnextTop
sub decrease_running_workers {
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ".
      " WHERE analysis_id='$analysis_id'";

  $self->dbc->do($sql);
}
fetch_alldescriptionprevnextTop
sub fetch_all {
  my $self = shift;
  return $self->_generic_fetch();
}
fetch_by_analysis_iddescriptionprevnextTop
sub fetch_by_analysis_id {
  my ($self,$id) = @_;

  unless(defined $id) {
    throw("fetch_by_analysis_id must have an id");
  }

  my $constraint = "ast.analysis_id = $id";

  #return first element of _generic_fetch list
my ($obj) = @{$self->_generic_fetch($constraint)}; unless(defined($obj)) { $self->_create_new_for_analysis_id($id); ($obj) = @{$self->_generic_fetch($constraint)}; } if(!defined($obj)) { throw("unable to fetch analysis_stats for analysis_id = $id\n"); } return $obj;
}
fetch_by_needed_workersdescriptionprevnextTop
sub fetch_by_needed_workers {
  my $self = shift;
  my $limit = shift;
  my $maximise_concurrency = shift;

  my $constraint = "ast.num_required_workers>0 AND ast.status in ('READY','WORKING')";
  my $first_order_by;
  if ($maximise_concurrency) {
    $first_order_by = 'ORDER BY num_running_workers';
    # print STDERR "###> Maximising concurrency\n";
} else { $first_order_by = 'ORDER BY num_required_workers DESC'; } if($limit) { $self->_final_clause("$first_order_by, hive_capacity DESC, analysis_id LIMIT $limit"); } else { $self->_final_clause("$first_order_by, hive_capacity DESC, analysis_id"); } my $results = $self->_generic_fetch($constraint); $self->_final_clause(""); #reset final clause for other fetches
return $results;
}
fetch_by_statusdescriptionprevnextTop
sub fetch_by_status {
  my $self = shift;

  my $constraint = "ast.status in (";
  my $addComma;
  while(@_) {
    my $status = shift;
    $constraint .= ',' if($addComma);
    $constraint .= "'$status' ";
    $addComma = 1;
  }
  $constraint .= ")";

  $self->_final_clause("ORDER BY last_update");
  my $results = $self->_generic_fetch($constraint);
  $self->_final_clause(""); #reset final clause for other fetches
return $results;
}
get_running_worker_countdescriptionprevnextTop
sub get_running_worker_count {
  my ($self, $stats) = @_;

  my $sql = "SELECT count(*) FROM hive WHERE cause_of_death='' and analysis_id=?";
  my $sth = $self->prepare($sql);
  $sth->execute($stats->analysis_id);
  my ($liveCount) = $sth->fetchrow_array();
  $sth->finish;

  return $liveCount;
}


#
# STORE / UPDATE METHODS
#
################
}
increase_hive_capacitydescriptionprevnextTop
sub increase_hive_capacity {
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats ".
      " SET hive_capacity = hive_capacity + 1, num_required_workers = 1".
      " WHERE analysis_id='$analysis_id' and hive_capacity <= 500 and num_required_workers = 0";

  $self->dbc->do($sql);
}
increase_needed_workersdescriptionprevnextTop
sub increase_needed_workers {
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats SET num_required_workers=num_required_workers+1 ".
            "WHERE analysis_id='$analysis_id' ";

  $self->dbc->do($sql);
}


#
# INTERNAL METHODS
#
###################
}
increase_running_workersdescriptionprevnextTop
sub increase_running_workers {
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers + 1 ".
      " WHERE analysis_id='$analysis_id'";

  $self->dbc->do($sql);
}
interval_update_work_donedescriptionprevnextTop
sub interval_update_work_done {
  my ($self, $analysis_id, $job_count, $interval, $worker) = @_;

  my $sql = "UPDATE analysis_stats SET ".
            "unclaimed_job_count = unclaimed_job_count - $job_count, ".
            "avg_msec_per_job = (((done_job_count*avg_msec_per_job)/3 + $interval) / (done_job_count/3 + $job_count)), ".
            "avg_input_msec_per_job = (((done_job_count*avg_input_msec_per_job)/3 + ".
                ($worker->{fetch_time}).") / (done_job_count/3 + $job_count)), ".
            "avg_run_msec_per_job = (((done_job_count*avg_run_msec_per_job)/3 + ".
                ($worker->{run_time}).") / (done_job_count/3 + $job_count)), ".
            "avg_output_msec_per_job = (((done_job_count*avg_output_msec_per_job)/3 + ".
                ($worker->{write_time}).") / (done_job_count/3 + $job_count)), ".
            "done_job_count = done_job_count + $job_count ".
            " WHERE analysis_id= $analysis_id";

  $self->dbc->do($sql);
}
refreshdescriptionprevnextTop
sub refresh {
  my ($self, $stats) = @_;

  my $constraint = "ast.analysis_id = " . $stats->analysis_id;

  #return first element of _generic_fetch list
$stats = @{$self->_generic_fetch($constraint)}; return $stats;
}
updatedescriptionprevnextTop
sub update {
  my ($self, $stats) = @_;

  my $running_worker_count = $self->get_running_worker_count($stats);
  $stats->num_running_workers($running_worker_count);
  my $hive_capacity = $stats->hive_capacity;

  if ($stats->behaviour eq "DYNAMIC") {
    my $max_hive_capacity = $hive_capacity;
    if ($stats->avg_input_msec_per_job) {
      $max_hive_capacity = int($stats->input_capacity * $stats->avg_msec_per_job / $stats->avg_input_msec_per_job);
} if ($stats->avg_output_msec_per_job) { my $max_hive_capacity2 = int($stats->output_capacity * $stats->avg_msec_per_job / $stats->avg_output_msec_per_job);
if ($max_hive_capacity2 < $max_hive_capacity) { $max_hive_capacity = $max_hive_capacity2; } } if (($hive_capacity > $max_hive_capacity) or ($hive_capacity < $max_hive_capacity )) { if (abs($hive_capacity - $max_hive_capacity) > 2) { $stats->hive_capacity(($hive_capacity + $max_hive_capacity) / 2);
} elsif ($hive_capacity > $max_hive_capacity) { $stats->hive_capacity($hive_capacity - 1); } elsif ($hive_capacity < $max_hive_capacity) { $stats->hive_capacity($hive_capacity + 1); } } } my $sql = "UPDATE analysis_stats SET status='".$stats->status."' "; $sql .= ",batch_size=" . $stats->batch_size(); $sql .= ",avg_msec_per_job=" . $stats->avg_msec_per_job(); $sql .= ",avg_input_msec_per_job=" . $stats->avg_input_msec_per_job(); $sql .= ",avg_run_msec_per_job=" . $stats->avg_run_msec_per_job(); $sql .= ",avg_output_msec_per_job=" . $stats->avg_output_msec_per_job(); $sql .= ",hive_capacity=" . $stats->hive_capacity(); $sql .= ",total_job_count=" . $stats->total_job_count(); $sql .= ",unclaimed_job_count=" . $stats->unclaimed_job_count(); $sql .= ",done_job_count=" . $stats->done_job_count(); $sql .= ",max_retry_count=" . $stats->max_retry_count(); $sql .= ",failed_job_count=" . $stats->failed_job_count(); $sql .= ",failed_job_tolerance=" . $stats->failed_job_tolerance(); $sql .= ",num_running_workers=" . $stats->num_running_workers(); $sql .= ",num_required_workers=" . $stats->num_required_workers(); $sql .= ",last_update=NOW()"; $sql .= ",sync_lock=''"; $sql .= " WHERE analysis_id='".$stats->analysis_id."' "; my $sth = $self->prepare($sql); $sth->execute(); $sth->finish; $sth = $self->prepare("INSERT INTO analysis_stats_monitor SELECT now(), analysis_stats.* from analysis_stats WHERE analysis_id = ".$stats->analysis_id); $sth->execute(); $sth->finish; $stats->seconds_since_last_update(0); #not exact but good enough :)
}
update_statusdescriptionprevnextTop
sub update_status {
  my ($self, $analysis_id, $status) = @_;

  my $sql = "UPDATE analysis_stats SET status='$status' ";
  $sql .= " WHERE analysis_id='$analysis_id' ";

  my $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;
}
General documentation
CONTACTTop
  Contact Jessica Severin on implemetation/design detail: jessica@ebi.ac.uk
Contact Ewan Birney on EnsEMBL in general: birney@sanger.ac.uk
APPENDIXTop
  The rest of the documentation details each of the object methods.
Internal methods are usually preceded with a _