Raw content of Bio::EnsEMBL::Hive::Worker # # You may distribute this module under the same terms as perl itself # # POD documentation - main docs before the code =pod =head1 NAME Bio::EnsEMBL::Hive::Worker =cut =head1 DESCRIPTION Object which encapsulates the details of how to find jobs, how to run those jobs, and then check the rules to create the next jobs in the chain. Essentially knows where to find data, how to process data, and where to put it when it is done (put in next persons INBOX) so the next Worker in the chain can find data to work on. Hive based processing is a concept based on a more controlled version of an autonomous agent type system. Each worker is not told what to do (like a centralized control system - like the current pipeline system) but rather queries a central database for jobs (give me jobs). Each worker is linked to an analysis_id, registers its self on creation into the Hive, creates a RunnableDB instance of the Analysis->module, gets relevant configuration information from the database, does its work, creates the next layer of analysis_job entries by interfacing to the DataflowRuleAdaptor to determine the analyses it needs to pass its output data to and creates jobs on the database of the next analysis. It repeats this cycle until it has lived its lifetime or until there are no more jobs left to process. The lifetime limit is a safety limit to prevent these from 'infecting' a system and sitting on a compute node for longer than is socially exceptable. This is primarily needed on compute resources like an LSF system where jobs are not preempted and run until they are done. The Queens primary job is to create Workers to get the work down. As part of this, she is also responsible for summarizing the status of the analyses by querying the analysis_jobs, summarizing, and updating the analysis_stats table. From this she is also responsible for monitoring and 'unblocking' analyses via the analysis_ctrl_rules. The Queen is also responsible for freeing up jobs that were claimed by Workers that died unexpectantly so that other workers can take over the work. The Beekeeper is in charge of interfacing between the Queen and a compute resource or 'compute farm'. Its job is to query Queens if they need any workers and to send the requested number of workers to open machines via the runWorker.pl script. It is also responsible for interfacing with the Queen to identify workers which died unexpectantly so that she can free the dead workers unfinished jobs. =cut =head1 CONTACT Contact Jessica Severin on EnsEMBL::Hive implemetation/design detail: jessica@ebi.ac.uk Contact Ewan Birney on EnsEMBL in general: birney@sanger.ac.uk =cut =head1 APPENDIX The rest of the documentation details each of the object methods. Internal methods are usually preceded with a _ =cut package Bio::EnsEMBL::Hive::Worker; use strict; use Bio::EnsEMBL::Utils::Argument; use Bio::EnsEMBL::Utils::Exception; use Sys::Hostname; use Time::HiRes qw(time); use POSIX; use Bio::EnsEMBL::Analysis; use Bio::EnsEMBL::DBSQL::DBAdaptor; use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor; use Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor; use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor; use Bio::EnsEMBL::Hive::Extensions; use Bio::EnsEMBL::Hive::Process; ## Minimum amount of time in msec that a worker should run before reporting ## back to the hive. This is used when setting the batch_size automatically. ## 120000 msec = 2 minutes my $MIN_BATCH_TIME = 120000; sub new { my ($class,@args) = @_; my $self = bless {}, $class; return $self; } sub init { my $self = shift; $self->{'start_time'} = time(); $self->debug(0); return $self; } sub queen { my $self = shift; $self->{'_queen'} = shift if(@_); return $self->{'_queen'}; } sub db { my $self = shift; $self->{'_db'} = shift if(@_); return $self->{'_db'}; } sub beekeeper { my $self = shift; $self->{'_beekeeper'} = shift if(@_); return $self->{'_beekeeper'}; } sub debug { my $self = shift; $self->{'_debug'} = shift if(@_); $self->{'_debug'}=0 unless(defined($self->{'_debug'})); return $self->{'_debug'}; } sub execute_writes { my $self = shift; $self->{'_execute_writes'} = shift if(@_); $self->{'_execute_writes'}=1 unless(defined($self->{'_execute_writes'})); return $self->{'_execute_writes'}; } =head2 analysis Arg [1] : (optional) Bio::EnsEMBL::Analysis $value Title : analysis Usage : $value = $self->analysis; $self->analysis($$analysis); Description: Get/Set analysis object of this Worker DefaultValue : undef Returntype : Bio::EnsEMBL::Analysis object =cut sub analysis { my $self = shift; my $analysis = shift; if(defined($analysis)) { throw("analysis arg must be a [Bio::EnsEMBL::Analysis] not a [$analysis]") unless($analysis->isa('Bio::EnsEMBL::Analysis')); $self->{'_analysis'} = $analysis; } return $self->{'_analysis'}; } =head2 life_span Arg [1] : (optional) integer $value (in seconds) Title : life_span Usage : $value = $self->life_span; $self->life_span($new_value); Description: Defines the maximum time a worker can live for. Workers are always allowed to complete the jobs they get, but whether they can do multiple rounds of work is limited by their life_span DefaultValue : 3600 (60 minutes) Returntype : integer scalar =cut sub life_span { #default life_span = 60minutes my( $self, $value ) = @_; $self->{'_life_span'} = 60*60 unless(defined($self->{'_life_span'})); $self->{'_life_span'} = $value if(defined($value)); return $self->{'_life_span'}; } =head2 job_limit Title : job_limit Arg [1] : (optional) integer $value Usage : $value = $self->job_limit; $self->job_limit($new_value); Description: Defines the maximum number of jobs a worker can process before it needs to die. A worker 'dies' when either the 'life_span' or 'job_limit' is exceeded. DefaultValue : undef (relies on life_span to limit life of worker) Returntype : integer scalar =cut sub job_limit { my $self=shift; $self->{'_job_limit'}=shift if(@_); return $self->{'_job_limit'}; } sub hive_id { my( $self, $value ) = @_; $self->{'_hive_id'} = $value if($value); return $self->{'_hive_id'}; } sub host { my( $self, $value ) = @_; $self->{'_host'} = $value if($value); return $self->{'_host'}; } sub process_id { my( $self, $value ) = @_; $self->{'_ppid'} = $value if($value); return $self->{'_ppid'}; } sub work_done { my( $self, $value ) = @_; $self->{'_work_done'} = 0 unless($self->{'_work_done'}); $self->{'_work_done'} = $value if($value); return $self->{'_work_done'}; } sub cause_of_death { my( $self, $value ) = @_; $self->{'_cause_of_death'} = $value if($value); return $self->{'_cause_of_death'}; } sub status { my( $self, $value ) = @_; $self->{'_status'} = $value if($value); return $self->{'_status'}; } sub born { my( $self, $value ) = @_; $self->{'_born'} = $value if($value); return $self->{'_born'}; } sub died { my( $self, $value ) = @_; $self->{'_died'} = $value if($value); return $self->{'_died'}; } sub last_check_in { my( $self, $value ) = @_; $self->{'_last_check_in'} = $value if($value); return $self->{'_last_check_in'}; } =head2 output_dir Arg [1] : (optional) string directory path Title : output_dir Usage : $value = $self->output_dir; $self->output_dir($new_value); Description: sets the directory where STDOUT and STRERR will be redirected to. Each worker will create a subdirectory where each analysis_job will get a .out and .err file Returntype : string =cut use Digest::MD5 qw(md5_hex); sub output_dir { my ($self, $outdir) = @_; if ($outdir and (-d $outdir)) { my $hive_id = $self->hive_id; my (@hex) = md5_hex($hive_id) =~ m/\G(..)/g; # If you want more than one level of directories, change $hex[0] # below into an array slice. e.g @hex[0..1] for two levels. $outdir = join('/', $outdir, $hex[0], 'hive_id_' . $hive_id); system("mkdir -p $outdir") && die "Could not create $outdir\n"; $self->{'_output_dir'} = $outdir; } return $self->{'_output_dir'}; } sub perform_global_cleanup { my $self = shift; $self->{'_perform_global_cleanup'} = shift if(@_); $self->{'_perform_global_cleanup'} = 1 unless(defined($self->{'_perform_global_cleanup'})); return $self->{'_perform_global_cleanup'}; } sub print_worker { my $self = shift; print("WORKER: hive_id=",$self->hive_id, " analysis_id=(",$self->analysis->dbID,")",$self->analysis->logic_name, " host=",$self->host, " pid=",$self->process_id, "\n"); print(" batch_size = ", $self->batch_size,"\n"); print(" job_limit = ", $self->job_limit,"\n") if(defined($self->job_limit)); print(" life_span = ", $self->life_span,"\n") if(defined($self->life_span)); if($self->output_dir) { print(" output_dir = ", $self->output_dir, "\n") if($self->output_dir); } else { print(" output_dir = STDOUT/STDERR\n") } } sub worker_process_temp_directory { my $self = shift; unless(defined($self->{'_tmp_dir'}) and (-e $self->{'_tmp_dir'})) { #create temp directory to hold fasta databases $self->{'_tmp_dir'} = "/tmp/worker.$$/"; mkdir($self->{'_tmp_dir'}, 0777); throw("unable to create ".$self->{'_tmp_dir'}) unless(-e $self->{'_tmp_dir'}); } return $self->{'_tmp_dir'}; } sub cleanup_worker_process_temp_directory { my $self = shift; if($self->{'_tmp_dir'}) { my $cmd = "rm -r ". $self->{'_tmp_dir'}; system($cmd); } } ############################### # # WORK section # ############################### =head2 batch_size Args : none Title : batch_size Usage : $value = $self->batch_size; $self->batch_size($new_value); Description: Defines the number of jobs that should run in batch before querying the database for the next job batch. Used by the Hive system to manage the number of workers needed to complete a particular job type. DefaultValue : batch_size of analysis Returntype : integer scalar =cut sub set_worker_batch_size { my $self = shift; my $batch_size = shift; if(defined($batch_size)) { $self->{'_batch_size'} = $batch_size; } } sub batch_size { my $self = shift; my $stats = $self->analysis->stats; my $batch_size = $stats->batch_size; if(defined($self->{'_batch_size'})) { $batch_size = $self->{'_batch_size'}; } if(($batch_size <= 0) and ($stats->avg_msec_per_job)) { $batch_size = POSIX::ceil($MIN_BATCH_TIME / $stats->avg_msec_per_job); # num jobs in $MIN_BATCH_TIME msecs } $batch_size = 1 if($batch_size < 1); # make sure we grab at least one job if($self->job_limit and ($self->job_limit < $batch_size)) { $batch_size = $self->job_limit; } return $batch_size; } =head2 run Title : run Usage : $worker->run; Description: This is a self looping autonomous function to process jobs. First all STDOUT/STDERR is rediected, then looping commences. Looping consists of 1) claiming jobs, 2) processing those jobs through an instance of the 'module class' of the analysis asigned to this worker, 3) updating the analysis_job, analysis_stats, and hive tables to track the progress of the job, the analysis and this worker. Looping stops when any one of these are met: 1) there is no more jobs to process 2) job_limit is reached 3) life_span has been reached. Returntype : none =cut sub run { my $self = shift; my $specific_job = $self->_specific_job; if($self->output_dir()) { open OLDOUT, ">&STDOUT"; open OLDERR, ">&STDERR"; open WORKER_STDOUT, ">".$self->output_dir()."/worker.out"; open WORKER_STDERR, ">".$self->output_dir()."/worker.err"; close STDOUT; close STDERR; open STDOUT, ">&WORKER_STDOUT"; open STDERR, ">&WORKER_STDERR"; } $self->print_worker(); $self->db->dbc->disconnect_when_inactive(0); my $alive = 1; while ($alive) { my $batch_start = time() * 1000; my $batch_end = $batch_start; my $job_counter = 0; my $jobs = []; $self->{fetch_time} = 0; $self->{run_time} = 0; $self->{write_time} = 0; do { if($specific_job) { $self->queen->worker_reclaim_job($self,$specific_job); push @$jobs, $specific_job; $alive=undef; } else { $jobs = $self->queen->worker_grab_jobs($self); } $self->queen->worker_check_in($self); #will sync analysis_stats if needed $self->cause_of_death('NO_WORK') unless(scalar @{$jobs}); if($self->debug) { $self->analysis->stats->print_stats; print(STDOUT "claimed ",scalar(@{$jobs}), " jobs to process\n"); } foreach my $job (@{$jobs}) { $job->print_job if($self->debug); $self->redirect_job_output($job); $self->run_module_with_job($job); $self->close_and_update_job_output($job); $self->queen->worker_register_job_done($self, $job); $self->{'_work_done'}++; } $batch_end = time() * 1000; $job_counter += scalar(@$jobs); } while (!$specific_job and scalar(@$jobs) and $batch_end-$batch_start < $MIN_BATCH_TIME); ## Run for $MIN_BATCH_TIME at least #printf("batch start:%f end:%f\n", $batch_start, $batch_end); $self->db->get_AnalysisStatsAdaptor->interval_update_work_done($self->analysis->dbID, $job_counter, $batch_end-$batch_start, $self); $self->cause_of_death('JOB_LIMIT') if($specific_job); if($self->job_limit and ($self->{'_work_done'} >= $self->job_limit)) { $self->cause_of_death('JOB_LIMIT'); } if(($self->life_span()>0) and ((time() - $self->{'start_time'}) > $self->life_span())) { printf("life_span exhausted (alive for %d secs)\n", (time() - $self->{'start_time'})); $self->cause_of_death('LIFESPAN'); } if (!$self->cause_of_death and $self->analysis->stats->num_running_workers > $self->analysis->stats->hive_capacity) { my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ". "WHERE num_running_workers > hive_capacity AND analysis_id = " . $self->analysis->stats->analysis_id; my $row_count = $self->queen->dbc->do($sql); if ($row_count == 1) { $self->cause_of_death('HIVE_OVERLOAD'); } } if($self->cause_of_death) { $alive=undef; } } $self->queen->dbc->do("UPDATE hive SET status = 'DEAD' WHERE hive_id = ".$self->hive_id); if($self->perform_global_cleanup) { #have runnable cleanup any global/process files/data it may have created $self->cleanup_worker_process_temp_directory; } $self->queen->register_worker_death($self); $self->analysis->stats->print_stats if($self->debug); printf("dbc %d disconnect cycles\n", $self->db->dbc->disconnect_count); print("total jobs completes : ", $self->work_done, "\n"); if($self->output_dir()) { close STDOUT; close STDERR; close WORKER_STDOUT; close WORKER_STDERR; open STDOUT, ">&", \*OLDOUT; open STDERR, ">&", \*OLDERR; } } sub run_module_with_job { my $self = shift; my $job = shift; my ($start_time, $end_time); my $runObj = $self->analysis->process; return 0 unless($runObj); return 0 unless($job and ($job->hive_id eq $self->hive_id)); my $init_time = time() * 1000; $self->queen->dbc->query_count(0); #pass the input_id from the job into the Process object if($runObj->isa("Bio::EnsEMBL::Hive::Process")) { $runObj->input_job($job); $runObj->queen($self->queen); $runObj->worker($self); $runObj->debug($self->debug); } else { $runObj->input_id($job->input_id); $runObj->db($self->db); } my $analysis_stats = $self->analysis->stats; $self->enter_status("GET_INPUT"); $job->update_status('GET_INPUT'); print("\nGET_INPUT\n") if($self->debug); $start_time = time() * 1000; $runObj->fetch_input; $end_time = time() * 1000; $self->{fetch_time} += $end_time - $start_time; $self->enter_status("RUN"); $job->update_status('RUN'); print("\nRUN\n") if($self->debug); $start_time = time() * 1000; $runObj->run; $end_time = time() * 1000; $self->{run_time} += $end_time - $start_time; if($self->execute_writes) { $self->enter_status("WRITE_OUTPUT"); $job->update_status('WRITE_OUTPUT'); print("\nWRITE_OUTPUT\n") if($self->debug); $start_time = time() * 1000; $runObj->write_output; $end_time = time() * 1000; $self->{write_time} += $end_time - $start_time; } else { print("\n\n!!!! NOT write_output\n\n\n") if($self->debug); } $self->enter_status("READY"); $job->query_count($self->queen->dbc->query_count); $job->runtime_msec(time()*1000 - $init_time); if ($runObj->isa("Bio::EnsEMBL::Hive::Process") and $runObj->autoflow_inputjob and $self->execute_writes) { printf("AUTOFLOW input->output\n") if($self->debug); $self->queen->flow_output_job($job); } return 1; } sub enter_status { my ($self, $status) = @_; return $self->queen->enter_status($self, $status); } sub redirect_job_output { my $self = shift; my $job = shift; my $outdir = $self->output_dir(); return unless($outdir); return unless($job); return unless($job->adaptor); $job->stdout_file($outdir . "/job_".$job->dbID.".out"); $job->stderr_file($outdir . "/job_".$job->dbID.".err"); close STDOUT; open STDOUT, ">".$job->stdout_file; close STDERR; open STDERR, ">".$job->stderr_file; $job->adaptor->store_out_files($job) if($job->adaptor); } sub close_and_update_job_output { my $self = shift; my $job = shift; return unless($job); return unless($self->output_dir); return unless($job->adaptor); # the following flushes $job->stderr_file and $job->stdout_file open STDOUT, ">&WORKER_STDOUT"; open STDERR, ">&WORKER_STDERR"; if(-z $job->stdout_file) { #print("unlink zero size ", $job->stdout_file, "\n"); unlink $job->stdout_file; $job->stdout_file(''); } if(-z $job->stderr_file) { #print("unlink zero size ", $job->stderr_file, "\n"); unlink $job->stderr_file; $job->stderr_file(''); } $job->adaptor->store_out_files($job) if($job->adaptor); } sub check_system_load { my $self = shift; my $host = hostname; my $numCpus = `grep -c '^process' /proc/cpuinfo`; print("host: $host cpus:$numCpus\n"); return 1; #everything ok } sub _specific_job { my $self = shift; $self->{'_specific_job'} = shift if(@_); return $self->{'_specific_job'}; } 1;