Raw content of Bio::EnsEMBL::Hive::AnalysisStats # # You may distribute this module under the same terms as perl itself # # POD documentation - main docs before the code =pod =head1 NAME Bio::EnsEMBL::Hive::AnalysisStats =head1 SYNOPSIS =head1 DESCRIPTION =head1 CONTACT Contact Jessica Severin on EnsEMBL::Hive implemetation/design detail: jessica@ebi.ac.uk Contact Ewan Birney on EnsEMBL in general: birney@sanger.ac.uk =head1 APPENDIX The rest of the documentation details each of the object methods. Internal methods are usually preceded with a _ =cut package Bio::EnsEMBL::Hive::AnalysisStats; use strict; use Bio::EnsEMBL::Analysis; use Bio::EnsEMBL::DBSQL::DBAdaptor; use Bio::EnsEMBL::Hive::Worker; sub new { my ($class,@args) = @_; my $self = bless {}, $class; return $self; } sub adaptor { my $self = shift; $self->{'_adaptor'} = shift if(@_); return $self->{'_adaptor'}; } sub refresh { my $self = shift; return unless($self->adaptor); $self->adaptor->refresh($self); } sub update { my $self = shift; return unless($self->adaptor); $self->adaptor->update($self); } sub update_status { my ($self, $status ) = @_; return unless($self->adaptor); $self->adaptor->update_status($self->analysis_id, $status); $self->status($status); } sub decrease_hive_capacity { my ($self) = @_; return unless ($self->adaptor); $self->adaptor->decrease_hive_capacity($self->analysis_id); } sub increase_hive_capacity { my ($self) = @_; return unless ($self->adaptor); $self->adaptor->increase_hive_capacity($self->analysis_id); } sub get_running_worker_count { my $self = shift; return unless ($self->adaptor); return $self->adaptor->get_running_worker_count($self); } sub analysis_id { my $self = shift; $self->{'_analysis_id'} = shift if(@_); return $self->{'_analysis_id'}; } sub get_analysis { my $self = shift; unless($self->{'_analysis'}) { $self->{'_analysis'} = $self->adaptor->db->get_AnalysisAdaptor->fetch_by_dbID($self->analysis_id); } return $self->{'_analysis'}; } sub status { my ($self, $value ) = @_; if(defined $value) { $self->{'_status'} = $value; } return $self->{'_status'}; } sub batch_size { my $self = shift; $self->{'_batch_size'} = shift if(@_); $self->{'_batch_size'}=1 unless(defined($self->{'_batch_size'})); return $self->{'_batch_size'}; } sub avg_msec_per_job { my $self = shift; $self->{'_avg_msec_per_job'} = shift if(@_); $self->{'_avg_msec_per_job'}=0 unless($self->{'_avg_msec_per_job'}); return $self->{'_avg_msec_per_job'}; } sub avg_input_msec_per_job { my $self = shift; $self->{'_avg_input_msec_per_job'} = shift if(@_); $self->{'_avg_input_msec_per_job'}=0 unless($self->{'_avg_input_msec_per_job'}); return $self->{'_avg_input_msec_per_job'}; } sub avg_run_msec_per_job { my $self = shift; $self->{'_avg_run_msec_per_job'} = shift if(@_); $self->{'_avg_run_msec_per_job'}=0 unless($self->{'_avg_run_msec_per_job'}); return $self->{'_avg_run_msec_per_job'}; } sub avg_output_msec_per_job { my $self = shift; $self->{'_avg_output_msec_per_job'} = shift if(@_); $self->{'_avg_output_msec_per_job'}=0 unless($self->{'_avg_output_msec_per_job'}); return $self->{'_avg_output_msec_per_job'}; } sub cpu_minutes_remaining { my $self = shift; return ($self->avg_msec_per_job * $self->unclaimed_job_count / 60000); } sub hive_capacity { my $self = shift; $self->{'_hive_capacity'} = shift if(@_); return $self->{'_hive_capacity'}; } sub behaviour { my $self = shift; $self->{'_behaviour'} = shift if(@_); return $self->{'_behaviour'}; } sub input_capacity { my $self = shift; $self->{'_input_capacity'} = shift if(@_); return $self->{'_input_capacity'}; } sub output_capacity { my $self = shift; $self->{'_output_capacity'} = shift if(@_); return $self->{'_output_capacity'}; } sub total_job_count { my $self = shift; $self->{'_total_job_count'} = shift if(@_); return $self->{'_total_job_count'}; } sub unclaimed_job_count { my $self = shift; $self->{'_unclaimed_job_count'} = shift if(@_); return $self->{'_unclaimed_job_count'}; } sub done_job_count { my $self = shift; $self->{'_done_job_count'} = shift if(@_); return $self->{'_done_job_count'}; } sub failed_job_count { my $self = shift; $self->{'_failed_job_count'} = shift if(@_); $self->{'_failed_job_count'} = 0 unless(defined($self->{'_failed_job_count'})); return $self->{'_failed_job_count'}; } sub max_retry_count { my $self = shift; $self->{'_max_retry_count'} = shift if(@_); $self->{'_max_retry_count'} = 3 unless(defined($self->{'_max_retry_count'})); return $self->{'_max_retry_count'}; } sub failed_job_tolerance { my $self = shift; $self->{'_failed_job_tolerance'} = shift if(@_); $self->{'_failed_job_tolerance'} = 0 unless(defined($self->{'_failed_job_tolerance'})); return $self->{'_failed_job_tolerance'}; } sub running_job_count { my $self = shift; return $self->total_job_count - $self->done_job_count - $self->unclaimed_job_count - $self->failed_job_count; } sub remaining_job_count { my $self = shift; return $self->total_job_count - $self->done_job_count - $self->failed_job_count; } sub num_running_workers { my $self = shift; $self->{'_num_running_workers'} = shift if(@_); return $self->{'_num_running_workers'}; } sub num_required_workers { my $self = shift; $self->{'_num_required_workers'} = shift if(@_); return $self->{'_num_required_workers'}; } sub seconds_since_last_update { my( $self, $value ) = @_; $self->{'_last_update'} = time() - $value if(defined($value)); return time() - $self->{'_last_update'}; } sub sync_lock { my $self = shift; $self->{'_sync_lock'} = shift if(@_); return $self->{'_sync_lock'}; } sub determine_status { my $self = shift; if($self->status ne 'BLOCKED') { if ($self->unclaimed_job_count == 0 and $self->total_job_count == $self->done_job_count + $self->failed_job_count) { my $failure_percentage = 0; if ($self->total_job_count) { $failure_percentage = $self->failed_job_count * 100 / $self->total_job_count; } if ($failure_percentage > $self->failed_job_tolerance) { $self->status('FAILED'); print "\n", "##################################################\n", printf "## ERROR: %-35s ##\n", $self->get_analysis->logic_name." failed!"; printf "## %4.1f%% jobs failed (tolerance: %3d%%) ##\n", $failure_percentage, $self->failed_job_tolerance; print "##################################################\n\n"; } else { $self->status('DONE'); } } if($self->total_job_count == $self->unclaimed_job_count) { $self->status('READY'); } if($self->unclaimed_job_count>0 and $self->total_job_count > $self->unclaimed_job_count) { $self->status('WORKING'); } } return $self; } sub print_stats { my $self = shift; my $mode = shift; return unless($self->get_analysis); $mode=1 unless($mode); my $name = sprintf("%s(%d)", $self->get_analysis->logic_name, $self->analysis_id); while(length($name) < 27) { $name.=' ';} if($mode == 1) { # printf("%s(%d) %s %d:ms %d:cpu (%d:q %d:r %d:d %d:f %d:t) [%d/%d workers] (%d secs synched)\n", #printf("%30s(%3d) %12s jobs(t:%d,q:%d,d:%d,f:%d) b:%d M:%d w:%d (%d secs old)\n", printf("$name %11s %d:cpum job(%d/%d run:%d fail:%d %dms) worker[%d/%d] (sync %d)\n", $self->status, $self->cpu_minutes_remaining, $self->remaining_job_count, $self->total_job_count, $self->running_job_count, $self->failed_job_count, $self->avg_msec_per_job, $self->num_required_workers, $self->hive_capacity, $self->seconds_since_last_update, ); } elsif ($mode == 2) { printf("$name %11s [%d/%d workers] (%d secs synched)\n", $self->status, $self->num_required_workers, $self->hive_capacity, $self->seconds_since_last_update); printf(" msec_per_job : %d\n", $self->avg_msec_per_job); printf(" cpu_min_total : %d\n", $self->cpu_minutes_remaining); printf(" batch_size : %d\n", $self->batch_size); printf(" total_jobs : %d\n", $self->total_job_count); printf(" unclaimed jobs : %d\n", $self->unclaimed_job_count); printf(" running jobs : %d\n", $self->running_job_count); printf(" done jobs : %d\n", $self->done_job_count); printf(" failed jobs : %d\n", $self->failed_job_count); } } 1;