Bio::EnsEMBL::Hive Worker
SummaryPackage variablesDescriptionGeneral documentationMethods
Toolbar
WebCvsRaw content
Summary
  Bio::EnsEMBL::Hive::Worker
Package variables
Privates (from "my" definitions)
$MIN_BATCH_TIME = 120000
Included modules
Bio::EnsEMBL::Analysis
Bio::EnsEMBL::DBSQL::DBAdaptor
Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor
Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor
Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor
Bio::EnsEMBL::Hive::Extensions
Bio::EnsEMBL::Hive::Process
Bio::EnsEMBL::Utils::Argument
Bio::EnsEMBL::Utils::Exception
Digest::MD5 qw ( md5_hex )
POSIX
Sys::Hostname
Time::HiRes qw ( time )
Synopsis
No synopsis!
Description
  Object which encapsulates the details of how to find jobs, how to run those
jobs, and then check the rules to create the next jobs in the chain.
Essentially knows where to find data, how to process data, and where to
put it when it is done (put in next persons INBOX) so the next Worker
in the chain can find data to work on.
Hive based processing is a concept based on a more controlled version of an autonomous agent type system. Each worker is not told what to do (like a centralized control system - like the current pipeline system) but rather queries a central database for jobs (give me jobs). Each worker is linked to an analysis_id, registers its self on creation into the Hive, creates a RunnableDB instance of the Analysis->module, gets relevant configuration information from the database, does its work, creates the next layer of analysis_job entries by interfacing to the DataflowRuleAdaptor to determine the analyses it needs to pass its output data to and creates jobs on the database of the next analysis. It repeats this cycle until it has lived its lifetime or until there are no more jobs left to process. The lifetime limit is a safety limit to prevent these from 'infecting' a system and sitting on a compute node for longer than is socially exceptable. This is primarily needed on compute resources like an LSF system where jobs are not preempted and run until they are done. The Queens primary job is to create Workers to get the work down. As part of this, she is also responsible for summarizing the status of the analyses by querying the analysis_jobs, summarizing, and updating the analysis_stats table. From this she is also responsible for monitoring and 'unblocking' analyses via the analysis_ctrl_rules. The Queen is also responsible for freeing up jobs that were claimed by Workers that died unexpectantly so that other workers can take over the work. The Beekeeper is in charge of interfacing between the Queen and a compute resource or 'compute farm'. Its job is to query Queens if they need any workers and to send the requested number of workers to open machines via the runWorker.pl script. It is also responsible for interfacing with the Queen to identify workers which died unexpectantly so that she can free the dead workers unfinished jobs.
Methods
_specific_job
No description
Code
analysisDescriptionCode
batch_sizeDescriptionCode
beekeeper
No description
Code
born
No description
Code
cause_of_death
No description
Code
check_system_load
No description
Code
cleanup_worker_process_temp_directory
No description
Code
close_and_update_job_output
No description
Code
db
No description
Code
debug
No description
Code
died
No description
Code
enter_status
No description
Code
execute_writes
No description
Code
hive_id
No description
Code
host
No description
Code
init
No description
Code
job_limitDescriptionCode
last_check_in
No description
Code
life_spanDescriptionCode
new
No description
Code
output_dirDescriptionCode
perform_global_cleanup
No description
Code
print_worker
No description
Code
process_id
No description
Code
queen
No description
Code
redirect_job_output
No description
Code
runDescriptionCode
run_module_with_job
No description
Code
set_worker_batch_size
No description
Code
status
No description
Code
work_done
No description
Code
worker_process_temp_directory
No description
Code
Methods description
analysiscode    nextTop
  Arg [1] : (optional) Bio::EnsEMBL::Analysis $value
Title : analysis
Usage : $value = $self->analysis;
$self->analysis($$analysis);
Description: Get/Set analysis object of this Worker
DefaultValue : undef
Returntype : Bio::EnsEMBL::Analysis object
batch_sizecodeprevnextTop
  Args    :   none
Title : batch_size
Usage : $value = $self->batch_size;
$self->batch_size($new_value);
Description: Defines the number of jobs that should run in batch
before querying the database for the next job batch. Used by the
Hive system to manage the number of workers needed to complete a
particular job type.
DefaultValue : batch_size of analysis
Returntype : integer scalar
job_limitcodeprevnextTop
  Title   :   job_limit
Arg [1] : (optional) integer $value
Usage : $value = $self->job_limit;
$self->job_limit($new_value);
Description: Defines the maximum number of jobs a worker can process
before it needs to die. A worker 'dies' when either the
'life_span' or 'job_limit' is exceeded.
DefaultValue : undef (relies on life_span to limit life of worker)
Returntype : integer scalar
life_spancodeprevnextTop
  Arg [1] : (optional) integer $value (in seconds)
Title : life_span
Usage : $value = $self->life_span;
$self->life_span($new_value);
Description: Defines the maximum time a worker can live for. Workers are always
allowed to complete the jobs they get, but whether they can
do multiple rounds of work is limited by their life_span
DefaultValue : 3600 (60 minutes)
Returntype : integer scalar
output_dircodeprevnextTop
  Arg [1] : (optional) string directory path
Title : output_dir
Usage : $value = $self->output_dir;
$self->output_dir($new_value);
Description: sets the directory where STDOUT and STRERR will be
redirected to. Each worker will create a subdirectory
where each analysis_job will get a .out and .err file
Returntype : string
runcodeprevnextTop
  Title   :   run
Usage : $worker->run;
Description:
This is a self looping autonomous function to process jobs.
First all STDOUT/STDERR is rediected, then looping commences.
Looping consists of
1) claiming jobs,
2) processing those jobs through an instance of the 'module class' of
the analysis asigned to this worker,
3) updating the analysis_job, analysis_stats, and hive tables to track the
progress of the job, the analysis and this worker.
Looping stops when any one of these are met:
1) there is no more jobs to process
2) job_limit is reached
3) life_span has been reached.
Returntype : none
Methods code
_specific_jobdescriptionprevnextTop
sub _specific_job {
  my $self = shift;
  $self->{'_specific_job'} = shift if(@_);
  return $self->{'_specific_job'};
}

1;
}
analysisdescriptionprevnextTop
sub analysis {
  my $self = shift;
  my $analysis = shift;

  if(defined($analysis)) {
    throw("analysis arg must be a [Bio::EnsEMBL::Analysis] not a [$analysis]")
       unless($analysis->isa('Bio::EnsEMBL::Analysis'));
    $self->{'_analysis'} = $analysis;
  }

  return $self->{'_analysis'};
}
batch_sizedescriptionprevnextTop
sub batch_size {
  my $self = shift;

  my $stats = $self->analysis->stats;
  my $batch_size = $stats->batch_size;
  if(defined($self->{'_batch_size'})) {
    $batch_size = $self->{'_batch_size'};
  } 
    
  if(($batch_size <= 0) and ($stats->avg_msec_per_job)) {
    $batch_size = POSIX::ceil($MIN_BATCH_TIME / $stats->avg_msec_per_job); # num jobs in $MIN_BATCH_TIME msecs
}
$batch_size = 1 if($batch_size < 1); # make sure we grab at least one job

if(
$self->job_limit and ($self->job_limit < $batch_size)) {
$batch_size = $self->job_limit;
} return $batch_size;
}
beekeeperdescriptionprevnextTop
sub beekeeper {
  my $self = shift;
  $self->{'_beekeeper'} = shift if(@_);
  return $self->{'_beekeeper'};
}
borndescriptionprevnextTop
sub born {
  my( $self, $value ) = @_;
  $self->{'_born'} = $value if($value);
  return $self->{'_born'};
}
cause_of_deathdescriptionprevnextTop
sub cause_of_death {
  my( $self, $value ) = @_;
  $self->{'_cause_of_death'} = $value if($value);
  return $self->{'_cause_of_death'};
}
check_system_loaddescriptionprevnextTop
sub check_system_load {
  my $self = shift;

  my $host = hostname;
  my $numCpus = `grep -c '^process' /proc/cpuinfo`;
  print("host: $host  cpus:$numCpus\n");

  return 1;  #everything ok
}
cleanup_worker_process_temp_directorydescriptionprevnextTop
sub cleanup_worker_process_temp_directory {
  my $self = shift;
  if($self->{'_tmp_dir'}) {
    my $cmd = "rm -r ". $self->{'_tmp_dir'};
    system($cmd);
  }
}

###############################
#
# WORK section
#
###############################
}
close_and_update_job_outputdescriptionprevnextTop
sub close_and_update_job_output {
  my $self = shift;
  my $job  = shift;

  return unless($job);
  return unless($self->output_dir);
  return unless($job->adaptor);


  # the following flushes $job->stderr_file and $job->stdout_file
open STDOUT, ">&WORKER_STDOUT"; open STDERR, ">&WORKER_STDERR"; if(-z $job->stdout_file) { #print("unlink zero size ", $job->stdout_file, "\n");
unlink $job->stdout_file; $job->stdout_file(''); } if(-z $job->stderr_file) { #print("unlink zero size ", $job->stderr_file, "\n");
unlink $job->stderr_file; $job->stderr_file(''); } $job->adaptor->store_out_files($job) if($job->adaptor);
}
dbdescriptionprevnextTop
sub db {
  my $self = shift;
  $self->{'_db'} = shift if(@_);
  return $self->{'_db'};
}
debugdescriptionprevnextTop
sub debug {
  my $self = shift;
  $self->{'_debug'} = shift if(@_);
  $self->{'_debug'}=0 unless(defined($self->{'_debug'}));
  return $self->{'_debug'};
}
dieddescriptionprevnextTop
sub died {
  my( $self, $value ) = @_;
  $self->{'_died'} = $value if($value);
  return $self->{'_died'};
}
enter_statusdescriptionprevnextTop
sub enter_status {
  my ($self, $status) = @_;
  return $self->queen->enter_status($self, $status);
}
execute_writesdescriptionprevnextTop
sub execute_writes {
  my $self = shift;
  $self->{'_execute_writes'} = shift if(@_);
  $self->{'_execute_writes'}=1 unless(defined($self->{'_execute_writes'}));
  return $self->{'_execute_writes'};
}
hive_iddescriptionprevnextTop
sub hive_id {
  my( $self, $value ) = @_;
  $self->{'_hive_id'} = $value if($value);
  return $self->{'_hive_id'};
}
hostdescriptionprevnextTop
sub host {
  my( $self, $value ) = @_;
  $self->{'_host'} = $value if($value);
  return $self->{'_host'};
}
initdescriptionprevnextTop
sub init {
  my $self = shift;
  $self->{'start_time'} = time();
  $self->debug(0);
  return $self;
}
job_limitdescriptionprevnextTop
sub job_limit {
  my $self=shift;
  $self->{'_job_limit'}=shift if(@_);
  return $self->{'_job_limit'};
}
last_check_indescriptionprevnextTop
sub last_check_in {
  my( $self, $value ) = @_;
  $self->{'_last_check_in'} = $value if($value);
  return $self->{'_last_check_in'};
}
life_spandescriptionprevnextTop
sub life_span {
  #default life_span = 60minutes
my( $self, $value ) = @_; $self->{'_life_span'} = 60*60 unless(defined($self->{'_life_span'})); $self->{'_life_span'} = $value if(defined($value)); return $self->{'_life_span'};
}
newdescriptionprevnextTop
sub new {
  my ($class,@args) = @_;
  my $self = bless {}, $class;
  return $self;
}
output_dirdescriptionprevnextTop
sub output_dir {
  my ($self, $outdir) = @_;
  if ($outdir and (-d $outdir)) {
    my $hive_id = $self->hive_id;
    my (@hex) = md5_hex($hive_id) =~ m/\G(..)/g;
# If you want more than one level of directories, change $hex[0]
# below into an array slice. e.g @hex[0..1] for two levels.
$outdir = join('/', $outdir, $hex[0], 'hive_id_' . $hive_id); system("mkdir -p $outdir") && die "Could not create $outdir\n"; $self->{'_output_dir'} = $outdir; } return $self->{'_output_dir'};
}
perform_global_cleanupdescriptionprevnextTop
sub perform_global_cleanup {
  my $self = shift;
  $self->{'_perform_global_cleanup'} = shift if(@_);
  $self->{'_perform_global_cleanup'} = 1 unless(defined($self->{'_perform_global_cleanup'}));
  return $self->{'_perform_global_cleanup'};
}
print_workerdescriptionprevnextTop
sub print_worker {
  my $self = shift;
  print("WORKER: hive_id=",$self->hive_id,
     " analysis_id=(",$self->analysis->dbID,")",$self->analysis->logic_name,
     " host=",$self->host,
     " pid=",$self->process_id,
     "\n");
  print("  batch_size = ", $self->batch_size,"\n");
  print("  job_limit  = ", $self->job_limit,"\n") if(defined($self->job_limit));
  print("  life_span  = ", $self->life_span,"\n") if(defined($self->life_span));
  if($self->output_dir) {
    print("  output_dir = ", $self->output_dir, "\n") if($self->output_dir);
  } else {
    print("  output_dir = STDOUT/STDERR\n")
  }
}
process_iddescriptionprevnextTop
sub process_id {
  my( $self, $value ) = @_;
  $self->{'_ppid'} = $value if($value);
  return $self->{'_ppid'};
}
queendescriptionprevnextTop
sub queen {
  my $self = shift;
  $self->{'_queen'} = shift if(@_);
  return $self->{'_queen'};
}
redirect_job_outputdescriptionprevnextTop
sub redirect_job_output {
  my $self = shift;
  my $job  = shift;

  my $outdir = $self->output_dir();
  return unless($outdir);
  return unless($job);
  return unless($job->adaptor);

  $job->stdout_file($outdir . "/job_".$job->dbID.".out");
  $job->stderr_file($outdir . "/job_".$job->dbID.".err");

  close STDOUT;
  open STDOUT, ">".$job->stdout_file;

  close STDERR;
  open STDERR, ">".$job->stderr_file;

  $job->adaptor->store_out_files($job) if($job->adaptor);
}
rundescriptionprevnextTop
sub run {
  my $self = shift;
  my $specific_job = $self->_specific_job;

  if($self->output_dir()) {
    open OLDOUT, ">&STDOUT";
    open OLDERR, ">&STDERR";
    open WORKER_STDOUT, ">".$self->output_dir()."/worker.out";
    open WORKER_STDERR, ">".$self->output_dir()."/worker.err";
    close STDOUT;
    close STDERR;
    open STDOUT, ">&WORKER_STDOUT";
    open STDERR, ">&WORKER_STDERR";
  }
  $self->print_worker();

  $self->db->dbc->disconnect_when_inactive(0);

  my $alive = 1;
  while ($alive) {
    my $batch_start = time() * 1000;    
    my $batch_end = $batch_start;
    my $job_counter = 0;
    my $jobs = [];
    $self->{fetch_time} = 0;
    $self->{run_time} = 0;
    $self->{write_time} = 0;

    do {
      if($specific_job) {
        $self->queen->worker_reclaim_job($self,$specific_job);
        push @$jobs, $specific_job;
        $alive=undef;
      } else {
        $jobs = $self->queen->worker_grab_jobs($self);
      }

      $self->queen->worker_check_in($self); #will sync analysis_stats if needed
$self->cause_of_death('NO_WORK') unless(scalar @{$jobs}); if($self->debug) { $self->analysis->stats->print_stats; print(STDOUT "claimed ",scalar(@{$jobs}), " jobs to process\n"); } foreach my $job (@{$jobs}) { $job->print_job if($self->debug); $self->redirect_job_output($job); $self->run_module_with_job($job); $self->close_and_update_job_output($job); $self->queen->worker_register_job_done($self, $job); $self->{'_work_done'}++; } $batch_end = time() * 1000; $job_counter += scalar(@$jobs); } while (!$specific_job and scalar(@$jobs) and $batch_end-$batch_start < $MIN_BATCH_TIME); ## Run for $MIN_BATCH_TIME at least
#printf("batch start:%f end:%f\n", $batch_start, $batch_end);
$self->db->get_AnalysisStatsAdaptor->interval_update_work_done($self->analysis->dbID, $job_counter, $batch_end-$batch_start, $self); $self->cause_of_death('JOB_LIMIT') if($specific_job); if($self->job_limit and ($self->{'_work_done'} >= $self->job_limit)) { $self->cause_of_death('JOB_LIMIT'); } if(($self->life_span()>0) and ((time() - $self->{'start_time'}) > $self->life_span())) { printf("life_span exhausted (alive for %d secs)\n", (time() - $self->{'start_time'})); $self->cause_of_death('LIFESPAN'); } if (!$self->cause_of_death and $self->analysis->stats->num_running_workers > $self->analysis->stats->hive_capacity) { my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ". "WHERE num_running_workers > hive_capacity AND analysis_id = " . $self->analysis->stats->analysis_id; my $row_count = $self->queen->dbc->do($sql); if ($row_count == 1) { $self->cause_of_death('HIVE_OVERLOAD'); } } if($self->cause_of_death) { $alive=undef; } } $self->queen->dbc->do("UPDATE hive SET status = 'DEAD' WHERE hive_id = ".$self->hive_id); if($self->perform_global_cleanup) { #have runnable cleanup any global/process files/data it may have created
$self->cleanup_worker_process_temp_directory; } $self->queen->register_worker_death($self); $self->analysis->stats->print_stats if($self->debug); printf("dbc %d disconnect cycles\n", $self->db->dbc->disconnect_count); print("total jobs completes : ", $self->work_done, "\n"); if($self->output_dir()) { close STDOUT; close STDERR; close WORKER_STDOUT; close WORKER_STDERR; open STDOUT, ">&",\* OLDOUT; open STDERR, ">&",\* OLDERR; }
}
run_module_with_jobdescriptionprevnextTop
sub run_module_with_job {
  my $self = shift;
  my $job  = shift;

  my ($start_time, $end_time);

  my $runObj = $self->analysis->process;
  return 0 unless($runObj);
  return 0 unless($job and ($job->hive_id eq $self->hive_id));
  
  my $init_time = time() * 1000;
  $self->queen->dbc->query_count(0);

  #pass the input_id from the job into the Process object
if($runObj->isa("Bio::EnsEMBL::Hive::Process")) { $runObj->input_job($job); $runObj->queen($self->queen); $runObj->worker($self); $runObj->debug($self->debug); } else { $runObj->input_id($job->input_id); $runObj->db($self->db); } my $analysis_stats = $self->analysis->stats; $self->enter_status("GET_INPUT"); $job->update_status('GET_INPUT'); print("\nGET_INPUT\n") if($self->debug); $start_time = time() * 1000; $runObj->fetch_input; $end_time = time() * 1000; $self->{fetch_time} += $end_time - $start_time; $self->enter_status("RUN"); $job->update_status('RUN'); print("\nRUN\n") if($self->debug); $start_time = time() * 1000; $runObj->run; $end_time = time() * 1000; $self->{run_time} += $end_time - $start_time; if($self->execute_writes) { $self->enter_status("WRITE_OUTPUT"); $job->update_status('WRITE_OUTPUT'); print("\nWRITE_OUTPUT\n") if($self->debug); $start_time = time() * 1000; $runObj->write_output; $end_time = time() * 1000; $self->{write_time} += $end_time - $start_time; } else { print("\n\n!!!! NOT write_output\n\n\n") if($self->debug); } $self->enter_status("READY"); $job->query_count($self->queen->dbc->query_count); $job->runtime_msec(time()*1000 - $init_time); if ($runObj->isa("Bio::EnsEMBL::Hive::Process") and $runObj->autoflow_inputjob and $self->execute_writes) { printf("AUTOFLOW input->output\n") if($self->debug); $self->queen->flow_output_job($job); } return 1;
}
set_worker_batch_sizedescriptionprevnextTop
sub set_worker_batch_size {
  my $self = shift;
  my $batch_size = shift;
  if(defined($batch_size)) {
    $self->{'_batch_size'} = $batch_size;
  }
}
statusdescriptionprevnextTop
sub status {
  my( $self, $value ) = @_;
  $self->{'_status'} = $value if($value);
  return $self->{'_status'};
}
work_donedescriptionprevnextTop
sub work_done {
  my( $self, $value ) = @_;
  $self->{'_work_done'} = 0 unless($self->{'_work_done'});
  $self->{'_work_done'} = $value if($value);
  return $self->{'_work_done'};
}
worker_process_temp_directorydescriptionprevnextTop
sub worker_process_temp_directory {
  my $self = shift;
  
  unless(defined($self->{'_tmp_dir'}) and (-e $self->{'_tmp_dir'})) {
    #create temp directory to hold fasta databases
$self->{'_tmp_dir'} = "/tmp/worker.$$/"; mkdir($self->{'_tmp_dir'}, 0777); throw("unable to create ".$self->{'_tmp_dir'}) unless(-e $self->{'_tmp_dir'}); } return $self->{'_tmp_dir'};
}
General documentation
CONTACTTop
 
Contact Jessica Severin on EnsEMBL::Hive implemetation/design detail: jessica@ebi.ac.uk
Contact Ewan Birney on EnsEMBL in general: birney@sanger.ac.uk
APPENDIXTop
  The rest of the documentation details each of the object methods.
Internal methods are usually preceded with a _