Bio::EnsEMBL::Pipeline::BatchSubmission
LSF
Toolbar
Summary
Bio::EnsEMBL::Pipeline::BatchSubmission::LSF
Package variables
No package variables defined.
Included modules
Inherit
Synopsis
my $batchjob = Bio::EnsEMBL::Pipeline::BatchSubmission::LSF->new(
-STDOUT => $stdout_file,
-STDERR => $stderr_file,
-PARAMETERS => @args,
-PRE_EXEC => $pre_exec,
-QUEUE => $queue,
-JOBNAME => $jobname,
-NODES => $nodes,
-RESOURCE => $resource
);
$batch_job->construct_command_line('test.pl');
$batch_job->open_command_line();
Description
This module provides an interface to the Platform LSF load sharing software and
its commands. It implements the method construct_command_line which is not
defined in the base class and which enables the pipeline to submit jobs in a
distributed environment using LSF.
See base class Bio::EnsEMBL::Pipeline::BatchSubmission for more info
Methods
bsub | No description | Code |
check_existance | No description | Code |
construct_command_line | No description | Code |
copy_command | No description | Code |
copy_output | Description | Code |
delete_output | No description | Code |
get_job_time | No description | Code |
get_pending_jobs | No description | Code |
job_stats | No description | Code |
kill_job | No description | Code |
lsf_user | No description | Code |
new | No description | Code |
open_command_line | No description | Code |
stderr_file | No description | Code |
stdout_file | No description | Code |
submission_host | No description | Code |
temp_errfile | No description | Code |
temp_filename | No description | Code |
temp_outfile | No description | Code |
Methods description
copy_output is used to copy the job's STDOUT and STDERR files using lsrcp. This avoids using NFS'. |
Methods code
sub bsub
{ my($self, $arg) = @_;
if(defined($arg)){
$self->{'bsub'} = $arg;
}
return $self->{'bsub'};
}
} |
sub check_existance
{ my ($self, $id_hash, $verbose) = @_;
my %job_submission_ids = %$id_hash;
my $command = "bjobs";
local *BJOB;
open(BJOB, "$command 2>&1 |") or
throw("couldn't open pipe to bjobs");
my %existing_ids;
LINE:while(<BJOB>){
print STDERR if($verbose);
chomp;
if ($_ =~ /No unfinished job found/) {
last LINE;
}
my @values = split;
if($values[0] =~ /\d+/){
if($values[2] eq 'UNKWN'){
next LINE;
}
$existing_ids{$values[0]} = 1;
}
}
my @awol_jobs;
foreach my $job_id(keys(%job_submission_ids)){
if(!$existing_ids{$job_id}){
push(@awol_jobs, @{$job_submission_ids{$job_id}});
}
}
close(BJOB);
return\@ awol_jobs; } |
sub construct_command_line
{ my($self, $command, $stdout, $stderr) = @_;
if(!$command){
throw("cannot create bsub if nothing to submit to it : $!\n");
}
my $bsub_line;
$self->command($command);
if($stdout){
$bsub_line = "bsub -o ".$stdout;
}else{
$bsub_line = "bsub -o ".$self->stdout_file;
}
if($self->nodes){
my $nodes = $self->nodes;
$nodes =~ s/,/ /;
$nodes =~ s/ +/ /;
$bsub_line .= " -m '".$nodes."' ";
}
if(my $res = $self->resource){
$res = qq{-R '$res'} unless $res =~ /^-R/;
$bsub_line .= " $res ";
}
$bsub_line .= " -q ".$self->queue if $self->queue;
$bsub_line .= " -J ".$self->jobname if $self->jobname;
$bsub_line .= " ".$self->parameters." " if ($self->parameters);
if($stderr){
$bsub_line .= " -e ".$stderr;
}else{
$bsub_line .= " -e ".$self->stderr_file;
}
$bsub_line .= " -E\" ".$self->pre_exec."\"" if defined $self->pre_exec;
$bsub_line .= " ".$command;
$self->bsub($bsub_line); } |
sub copy_command
{ my ($self, $arg) = @_;
if($arg){
$self->{'_copy_command'} = $arg;
}
return $self->{'_copy_command'} || 'lsrcp ';
}
1; } |
sub copy_output
{ my ($self, $dest_err, $dest_out) = @_;
$dest_err ||= $self->stderr_file;
$dest_out ||= $self->stdout_file;
if (! $self->temp_filename) {
my ($p, $f, $l) = caller;
warning("The lsf environment variable LSB_JOBFILENAME is not defined".
" we can't copy the output files which don't exist $f:$l");
return;
}
my $old_fh = select(STDOUT);
$| = 1;
select($old_fh);
my $temp_err = $self->temp_errfile;
my $temp_out = $self->temp_outfile;
my $command = $self->copy_command;
my $remote = $self->lsf_user . '@' . $self->submission_host;
foreach my $set ([$temp_out, $dest_out], [$temp_err, $dest_err]) {
my( $temp, $dest ) = @$set;
if (-e $temp) {
if ($command eq 'cp' || $dest =~ /^\/lustre/) {
copy($temp, $dest);
} else {
my $err_copy = "$command $temp $remote:$dest";
unless (system($err_copy) == 0) {
warn "Error: copy '$err_copy' failed exit($?)";
}
}
} else {
warn "No such file '$temp' to copy\n";
}
} } |
sub delete_output
{ my ($self) = @_;
unlink $self->temp_errfile if(-e $self->temp_errfile);
unlink $self->temp_outfile if(-e $self->temp_outfile); } |
sub get_job_time
{ my ($self) = @_;
my $command = "bjobs -l";
my %id_times;
local *BJOB;
open(BJOB, "$command |") or throw("couldn't open pipe to bjobs");
my $job_id;
while(<BJOB>){
chomp;
if(/Job\s+\<(\d+)\>/){
$job_id = $1;
}elsif(/The CPU time used/){
my ($time) = $_ =~ /The CPU time used is (\d+)/;
$id_times{$job_id} = $time;
}
}
close(BJOB);
return\% id_times; } |
sub get_pending_jobs
{ my($self, %args) = @_;
my ($user) = $args{'-user'} || $args{'-USER'} || undef;
my ($queue) = $args{'-queue'} || $args{'-QUEUE'} || undef;
my $cmd = "bjobs";
$cmd .= " -q $queue" if $queue;
$cmd .= " -u $user" if $user;
$cmd .= " | grep -c PEND ";
print STDERR "$cmd\n" if $args{'-debug'};
my $pending_jobs = 0;
if( my $pid = open (my $fh, '-|') ){
eval{
local $SIG{ALRM} = sub { kill 9, $pid; };
alarm(60);
while(<$fh>){
chomp;
$pending_jobs = $_;
}
close $fh;
alarm 0;
}
}else{
exec( $cmd );
die q{Something went wrong here $!: } . $! . "\n";
}
print STDERR "FOUND $pending_jobs jobs pending\n" if $args{'-debug'};
return $pending_jobs; } |
sub job_stats
{ my ($self, $verbose) = @_;
my $command = "bjobs";
sleep(20);
local *BJOB;
open(BJOB, "$command 2>&1 |") or throw("couldn't open pipe to bjobs");
my %jobs;
LINE:
while(<BJOB>){
chomp;
if ($_ =~ /No unfinished job found/) {
last LINE;
}
my @values = split;
$jobs{$values[0]} = $values[2];
}
return\% jobs;
}
} |
sub kill_job
{ my ($self, $job_id) = @_;
my $command = "bkill ".$job_id;
system($command); } |
sub lsf_user
{ my ($self) = @_;
$self->{'_lsf_user'} = $ENV{'LSFUSER'};
return $self->{'_lsf_user'}; } |
sub new
{ my ($class, @args) = @_;
my $self = $class->SUPER::new(@args);
$self->{'bsub'} = undef;
return $self;
}
} |
sub open_command_line
{ my ($self, $verbose)= @_;
my $lsf = '';
if (open(my $pipe, '-|')) {
while (<$pipe>) {
if (/Job <(\d+)>/) {
$lsf = $1;
} else {
warning("DEBUG: unexpected from bsub: '$_'");
}
}
if (close $pipe) {
if ( ($? >> 8) == 0 ){
if ($lsf) {
$self->id($lsf);
} else {
warning("Bsub worked but returned no job ID. Weird");
}
} else {
throw("Bsub failed : exit status " . $? >> 8 . "\n");
}
} else {
throw("Could not close bsub pipe : $!\n");
}
} else {
exec($self->bsub .' 2>&1') || throw("Could not run bsub");
} } |
sub stderr_file
{ my ($self, $arg) = @_;
if ($arg){
$self->{'stderr'} = $arg;
}
if(!$self->{'stderr'}){
$self->{'stderr'} ='/dev/null'
}
return $self->{'stderr'}; } |
sub stdout_file
{ my ($self, $arg) = @_;
if($arg){
$self->{'stdout'} = $arg;
}
if(!$self->{'stdout'}){
$self->{'stdout'} ='/dev/null'
}
return $self->{'stdout'}; } |
sub submission_host
{ my ($self) = @_;
$self->{'_submission_host'} = $ENV{'LSB_SUB_HOST'};
return $self->{'_submission_host'}; } |
sub temp_errfile
{ my ($self) = @_;
$self->{'_temp_errfile'} = $self->temp_filename.".err";
return $self->{'_temp_errfile'}; } |
sub temp_filename
{ my ($self) = @_;
$self->{'lsf_jobfilename'} = $ENV{'LSB_JOBFILENAME'};
return $self->{'lsf_jobfilename'}; } |
sub temp_outfile
{ my ($self) = @_;
$self->{'_temp_outfile'} = $self->temp_filename.".out";
return $self->{'_temp_outfile'}; } |
General documentation
Post general queries to ensembl-dev@ebi.ac.uk
The rest of the documentation details each of the object methods. Internal
methods are usually preceded with a _