Package variables
Privates (from "my" definitions)
$MIN_BATCH_TIME = 120000
Included modules
Digest::MD5 qw ( md5_hex )
Time::HiRes qw ( time )
No synopsis!
Object which encapsulates the details of how to find jobs, how to run those
jobs, and then check the rules to create the next jobs in the chain.
Essentially knows where to find data, how to process data, and where to
put it when it is done (put in next persons INBOX) so the next Worker
in the chain can find data to work on.
Hive based processing is a concept based on a more controlled version
of an autonomous agent type system. Each worker is not told what to do
(like a centralized control system - like the current pipeline system)
but rather queries a central database for jobs (give me jobs).
Each worker is linked to an analysis_id, registers its self on creation
into the Hive, creates a RunnableDB instance of the Analysis->module,
gets relevant configuration information from the database, does its
work, creates the next layer of analysis_job entries by interfacing to
the DataflowRuleAdaptor to determine the analyses it needs to pass its
output data to and creates jobs on the database of the next analysis.
It repeats this cycle until it has lived its lifetime or until there are no
more jobs left to process.
The lifetime limit is a safety limit to prevent these from 'infecting'
a system and sitting on a compute node for longer than is socially exceptable.
This is primarily needed on compute resources like an LSF system where jobs
are not preempted and run until they are done.
The Queens primary job is to create Workers to get the work down.
As part of this, she is also responsible for summarizing the status of the
analyses by querying the analysis_jobs, summarizing, and updating the
analysis_stats table. From this she is also responsible for monitoring and
'unblocking' analyses via the analysis_ctrl_rules.
The Queen is also responsible for freeing up jobs that were claimed by Workers
that died unexpectantly so that other workers can take over the work.
The Beekeeper is in charge of interfacing between the Queen and a compute resource
or 'compute farm'. Its job is to query Queens if they need any workers and to
send the requested number of workers to open machines via the script.
It is also responsible for interfacing with the Queen to identify workers which died
unexpectantly so that she can free the dead workers unfinished jobs.
_specific_job | No description | Code |
analysis | Description | Code |
batch_size | Description | Code |
beekeeper | No description | Code |
born | No description | Code |
cause_of_death | No description | Code |
check_system_load | No description | Code |
cleanup_worker_process_temp_directory | No description | Code |
close_and_update_job_output | No description | Code |
db | No description | Code |
debug | No description | Code |
died | No description | Code |
enter_status | No description | Code |
execute_writes | No description | Code |
hive_id | No description | Code |
host | No description | Code |
init | No description | Code |
job_limit | Description | Code |
last_check_in | No description | Code |
life_span | Description | Code |
new | No description | Code |
output_dir | Description | Code |
perform_global_cleanup | No description | Code |
print_worker | No description | Code |
process_id | No description | Code |
queen | No description | Code |
redirect_job_output | No description | Code |
run | Description | Code |
run_module_with_job | No description | Code |
set_worker_batch_size | No description | Code |
status | No description | Code |
work_done | No description | Code |
worker_process_temp_directory | No description | Code |
Methods description
Arg [1] : (optional) Bio::EnsEMBL::Analysis $value Title : analysis Usage : $value = $self->analysis; $self->analysis($$analysis); Description: Get/Set analysis object of this Worker DefaultValue : undef Returntype : Bio::EnsEMBL::Analysis object |
Args : none Title : batch_size Usage : $value = $self->batch_size; $self->batch_size($new_value); Description: Defines the number of jobs that should run in batch before querying the database for the next job batch. Used by the Hive system to manage the number of workers needed to complete a particular job type. DefaultValue : batch_size of analysis Returntype : integer scalar |
Title : job_limit Arg [1] : (optional) integer $value Usage : $value = $self->job_limit; $self->job_limit($new_value); Description: Defines the maximum number of jobs a worker can process before it needs to die. A worker 'dies' when either the 'life_span' or 'job_limit' is exceeded. DefaultValue : undef (relies on life_span to limit life of worker) Returntype : integer scalar |
Arg [1] : (optional) integer $value (in seconds) Title : life_span Usage : $value = $self->life_span; $self->life_span($new_value); Description: Defines the maximum time a worker can live for. Workers are always allowed to complete the jobs they get, but whether they can do multiple rounds of work is limited by their life_span DefaultValue : 3600 (60 minutes) Returntype : integer scalar |
Arg [1] : (optional) string directory path Title : output_dir Usage : $value = $self->output_dir; $self->output_dir($new_value); Description: sets the directory where STDOUT and STRERR will be redirected to. Each worker will create a subdirectory where each analysis_job will get a .out and .err file Returntype : string |
Title : run Usage : $worker->run; Description: This is a self looping autonomous function to process jobs. First all STDOUT/STDERR is rediected, then looping commences. Looping consists of 1) claiming jobs, 2) processing those jobs through an instance of the 'module class' of the analysis asigned to this worker, 3) updating the analysis_job, analysis_stats, and hive tables to track the progress of the job, the analysis and this worker. Looping stops when any one of these are met: 1) there is no more jobs to process 2) job_limit is reached 3) life_span has been reached. Returntype : none |
Methods code
_specific_job | description | prev | next | Top |
sub _specific_job
{ my $self = shift;
$self->{'_specific_job'} = shift if(@_);
return $self->{'_specific_job'};
1; } |
sub analysis
{ my $self = shift;
my $analysis = shift;
if(defined($analysis)) {
throw("analysis arg must be a [Bio::EnsEMBL::Analysis] not a [$analysis]")
$self->{'_analysis'} = $analysis;
return $self->{'_analysis'}; } |
sub batch_size
{ my $self = shift;
my $stats = $self->analysis->stats;
my $batch_size = $stats->batch_size;
if(defined($self->{'_batch_size'})) {
$batch_size = $self->{'_batch_size'};
if(($batch_size <= 0) and ($stats->avg_msec_per_job)) {
$batch_size = POSIX::ceil($MIN_BATCH_TIME / $stats->avg_msec_per_job); # num jobs in $MIN_BATCH_TIME msecs } $batch_size = 1 if($batch_size < 1); # make sure we grab at least one job if($self->job_limit and ($self->job_limit < $batch_size)) { $batch_size = $self->job_limit; }
return $batch_size; } |
sub beekeeper
{ my $self = shift;
$self->{'_beekeeper'} = shift if(@_);
return $self->{'_beekeeper'}; } |
sub born
{ my( $self, $value ) = @_;
$self->{'_born'} = $value if($value);
return $self->{'_born'}; } |
sub cause_of_death
{ my( $self, $value ) = @_;
$self->{'_cause_of_death'} = $value if($value);
return $self->{'_cause_of_death'}; } |
sub check_system_load
{ my $self = shift;
my $host = hostname;
my $numCpus = `grep -c '^process' /proc/cpuinfo`;
print("host: $host cpus:$numCpus\n");
return 1;
} |
cleanup_worker_process_temp_directory | description | prev | next | Top |
sub cleanup_worker_process_temp_directory
{ my $self = shift;
if($self->{'_tmp_dir'}) {
my $cmd = "rm -r ". $self->{'_tmp_dir'};
} |
sub close_and_update_job_output
my $self = shift;
my $job = shift;
return unless($job);
return unless($self->output_dir);
return unless($job->adaptor);
if(-z $job->stdout_file) {
unlink $job->stdout_file;
if(-z $job->stderr_file) {
unlink $job->stderr_file;
$job->adaptor->store_out_files($job) if($job->adaptor); } |
sub db
{ my $self = shift;
$self->{'_db'} = shift if(@_);
return $self->{'_db'}; } |
sub debug
{ my $self = shift;
$self->{'_debug'} = shift if(@_);
$self->{'_debug'}=0 unless(defined($self->{'_debug'}));
return $self->{'_debug'}; } |
sub died
{ my( $self, $value ) = @_;
$self->{'_died'} = $value if($value);
return $self->{'_died'}; } |
sub enter_status
{ my ($self, $status) = @_;
return $self->queen->enter_status($self, $status); } |
sub execute_writes
{ my $self = shift;
$self->{'_execute_writes'} = shift if(@_);
$self->{'_execute_writes'}=1 unless(defined($self->{'_execute_writes'}));
return $self->{'_execute_writes'}; } |
sub hive_id
{ my( $self, $value ) = @_;
$self->{'_hive_id'} = $value if($value);
return $self->{'_hive_id'}; } |
sub host
{ my( $self, $value ) = @_;
$self->{'_host'} = $value if($value);
return $self->{'_host'}; } |
sub init
{ my $self = shift;
$self->{'start_time'} = time();
return $self; } |
sub job_limit
{ my $self=shift;
$self->{'_job_limit'}=shift if(@_);
return $self->{'_job_limit'}; } |
sub last_check_in
{ my( $self, $value ) = @_;
$self->{'_last_check_in'} = $value if($value);
return $self->{'_last_check_in'}; } |
sub life_span
{ my( $self, $value ) = @_;
$self->{'_life_span'} = 60*60 unless(defined($self->{'_life_span'}));
$self->{'_life_span'} = $value if(defined($value));
return $self->{'_life_span'}; } |
sub new
{ my ($class,@args) = @_;
my $self = bless {}, $class;
return $self; } |
sub output_dir
{ my ($self, $outdir) = @_;
if ($outdir and (-d $outdir)) {
my $hive_id = $self->hive_id;
my (@hex) = md5_hex($hive_id) =~ m/\G(..)/g; $outdir = join('/', $outdir, $hex[0], 'hive_id_' . $hive_id);
system("mkdir -p $outdir") && die "Could not create $outdir\n";
$self->{'_output_dir'} = $outdir;
return $self->{'_output_dir'}; } |
sub perform_global_cleanup
{ my $self = shift;
$self->{'_perform_global_cleanup'} = shift if(@_);
$self->{'_perform_global_cleanup'} = 1 unless(defined($self->{'_perform_global_cleanup'}));
return $self->{'_perform_global_cleanup'}; } |
sub print_worker
{ my $self = shift;
print("WORKER: hive_id=",$self->hive_id,
" analysis_id=(",$self->analysis->dbID,")",$self->analysis->logic_name,
" host=",$self->host,
" pid=",$self->process_id,
print(" batch_size = ", $self->batch_size,"\n");
print(" job_limit = ", $self->job_limit,"\n") if(defined($self->job_limit));
print(" life_span = ", $self->life_span,"\n") if(defined($self->life_span));
if($self->output_dir) {
print(" output_dir = ", $self->output_dir, "\n") if($self->output_dir);
} else {
print(" output_dir = STDOUT/STDERR\n")
} } |
sub process_id
{ my( $self, $value ) = @_;
$self->{'_ppid'} = $value if($value);
return $self->{'_ppid'}; } |
sub queen
{ my $self = shift;
$self->{'_queen'} = shift if(@_);
return $self->{'_queen'}; } |
sub redirect_job_output
my $self = shift;
my $job = shift;
my $outdir = $self->output_dir();
return unless($outdir);
return unless($job);
return unless($job->adaptor);
$job->stdout_file($outdir . "/job_".$job->dbID.".out");
$job->stderr_file($outdir . "/job_".$job->dbID.".err");
close STDOUT;
open STDOUT, ">".$job->stdout_file;
close STDERR;
open STDERR, ">".$job->stderr_file;
$job->adaptor->store_out_files($job) if($job->adaptor); } |
sub run
my $self = shift;
my $specific_job = $self->_specific_job;
if($self->output_dir()) {
open OLDOUT, ">&STDOUT";
open OLDERR, ">&STDERR";
open WORKER_STDOUT, ">".$self->output_dir()."/worker.out";
open WORKER_STDERR, ">".$self->output_dir()."/worker.err";
close STDOUT;
close STDERR;
my $alive = 1;
while ($alive) {
my $batch_start = time() * 1000;
my $batch_end = $batch_start;
my $job_counter = 0;
my $jobs = [];
$self->{fetch_time} = 0;
$self->{run_time} = 0;
$self->{write_time} = 0;
do {
if($specific_job) {
push @$jobs, $specific_job;
} else {
$jobs = $self->queen->worker_grab_jobs($self);
$self->cause_of_death('NO_WORK') unless(scalar @{$jobs});
if($self->debug) {
print(STDOUT "claimed ",scalar(@{$jobs}), " jobs to process\n");
foreach my $job (@{$jobs}) {
$job->print_job if($self->debug);
$self->queen->worker_register_job_done($self, $job);
$batch_end = time() * 1000;
$job_counter += scalar(@$jobs);
} while (!$specific_job and scalar(@$jobs) and $batch_end-$batch_start < $MIN_BATCH_TIME);
$job_counter, $batch_end-$batch_start, $self);
$self->cause_of_death('JOB_LIMIT') if($specific_job);
if($self->job_limit and ($self->{'_work_done'} >= $self->job_limit)) {
if(($self->life_span()>0) and ((time() - $self->{'start_time'}) > $self->life_span())) {
printf("life_span exhausted (alive for %d secs)\n", (time() - $self->{'start_time'}));
if (!$self->cause_of_death and $self->analysis->stats->num_running_workers > $self->analysis->stats->hive_capacity) {
my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ".
"WHERE num_running_workers > hive_capacity AND analysis_id = " . $self->analysis->stats->analysis_id;
my $row_count = $self->queen->dbc->do($sql);
if ($row_count == 1) {
if($self->cause_of_death) { $alive=undef; }
$self->queen->dbc->do("UPDATE hive SET status = 'DEAD' WHERE hive_id = ".$self->hive_id);
if($self->perform_global_cleanup) {
$self->analysis->stats->print_stats if($self->debug);
printf("dbc %d disconnect cycles\n", $self->db->dbc->disconnect_count);
print("total jobs completes : ", $self->work_done, "\n");
if($self->output_dir()) {
close STDOUT;
close STDERR;
open STDOUT, ">&",\* OLDOUT;
open STDERR, ">&",\* OLDERR;
} } |
sub run_module_with_job
my $self = shift;
my $job = shift;
my ($start_time, $end_time);
my $runObj = $self->analysis->process;
return 0 unless($runObj);
return 0 unless($job and ($job->hive_id eq $self->hive_id));
my $init_time = time() * 1000;
if($runObj->isa("Bio::EnsEMBL::Hive::Process")) {
} else {
my $analysis_stats = $self->analysis->stats;
print("\nGET_INPUT\n") if($self->debug);
$start_time = time() * 1000;
$end_time = time() * 1000;
$self->{fetch_time} += $end_time - $start_time;
print("\nRUN\n") if($self->debug);
$start_time = time() * 1000;
$end_time = time() * 1000;
$self->{run_time} += $end_time - $start_time;
if($self->execute_writes) {
print("\nWRITE_OUTPUT\n") if($self->debug);
$start_time = time() * 1000;
$end_time = time() * 1000;
$self->{write_time} += $end_time - $start_time;
} else {
print("\n\n!!!! NOT write_output\n\n\n") if($self->debug);
$job->runtime_msec(time()*1000 - $init_time);
if ($runObj->isa("Bio::EnsEMBL::Hive::Process") and $runObj->autoflow_inputjob
and $self->execute_writes) {
printf("AUTOFLOW input->output\n") if($self->debug);
return 1; } |
sub set_worker_batch_size
{ my $self = shift;
my $batch_size = shift;
if(defined($batch_size)) {
$self->{'_batch_size'} = $batch_size;
} } |
sub status
{ my( $self, $value ) = @_;
$self->{'_status'} = $value if($value);
return $self->{'_status'}; } |
sub work_done
{ my( $self, $value ) = @_;
$self->{'_work_done'} = 0 unless($self->{'_work_done'});
$self->{'_work_done'} = $value if($value);
return $self->{'_work_done'}; } |
worker_process_temp_directory | description | prev | next | Top |
sub worker_process_temp_directory
{ my $self = shift;
unless(defined($self->{'_tmp_dir'}) and (-e $self->{'_tmp_dir'})) {
$self->{'_tmp_dir'} = "/tmp/worker.$$/";
mkdir($self->{'_tmp_dir'}, 0777);
throw("unable to create ".$self->{'_tmp_dir'}) unless(-e $self->{'_tmp_dir'});
return $self->{'_tmp_dir'}; } |
General documentation
The rest of the documentation details each of the object methods.
Internal methods are usually preceded with a _