Raw content of Bio::EnsEMBL::Pipeline::BatchSubmission::LSF
# Ensembl Pipeline module for handling job submission via Platform LSF
# load sharing software
#
# Cared for by Laura Clarke
#
# Copyright Laura Clarke
#
# You may distribute this module under the same terms as perl itself
#
# POD documentation - main docs before the code
=pod
=head1 NAME
Bio::EnsEMBL::Pipeline::BatchSubmission::LSF
=head1 SYNOPSIS
my $batchjob = Bio::EnsEMBL::Pipeline::BatchSubmission::LSF->new(
-STDOUT => $stdout_file,
-STDERR => $stderr_file,
-PARAMETERS => @args,
-PRE_EXEC => $pre_exec,
-QUEUE => $queue,
-JOBNAME => $jobname,
-NODES => $nodes,
-RESOURCE => $resource
);
$batch_job->construct_command_line('test.pl');
$batch_job->open_command_line();
=head1 DESCRIPTION
This module provides an interface to the Platform LSF load sharing software and
its commands. It implements the method construct_command_line which is not
defined in the base class and which enables the pipeline to submit jobs in a
distributed environment using LSF.
See base class Bio::EnsEMBL::Pipeline::BatchSubmission for more info
=head1 CONTACT
Post general queries to B
=head1 APPENDIX
The rest of the documentation details each of the object methods. Internal
methods are usually preceded with a _
=cut
package Bio::EnsEMBL::Pipeline::BatchSubmission::LSF;
use Bio::EnsEMBL::Utils::Exception qw(verbose throw warning info);
use Bio::EnsEMBL::Utils::Argument qw( rearrange );
use Bio::EnsEMBL::Pipeline::BatchSubmission;
use File::Copy;
use vars qw(@ISA);
use strict;
@ISA = qw(Bio::EnsEMBL::Pipeline::BatchSubmission);
sub new{
my ($class, @args) = @_;
my $self = $class->SUPER::new(@args);
#print "CREATING ".$self." with ".join(" ", @args)."\n";
$self->{'bsub'} = undef;
return $self;
}
##################
#accessor methods#
##################
sub bsub{
my($self, $arg) = @_;
if(defined($arg)){
$self->{'bsub'} = $arg;
}
return $self->{'bsub'};
}
##other accessor are in base class##
######################
#command line methods#
######################
sub construct_command_line{
my($self, $command, $stdout, $stderr) = @_;
#command must be the first argument then if stdout or stderr aren't definhed the objects own can be used
if(!$command){
throw("cannot create bsub if nothing to submit to it : $!\n");
}
my $bsub_line;
$self->command($command);
if($stdout){
$bsub_line = "bsub -o ".$stdout;
}else{
$bsub_line = "bsub -o ".$self->stdout_file;
}
if($self->nodes){
my $nodes = $self->nodes;
# $nodes needs to be a space-delimited list
$nodes =~ s/,/ /;
$nodes =~ s/ +/ /;
# undef $nodes unless $nodes =~ m{(\w+\ )*\w};
$bsub_line .= " -m '".$nodes."' ";
}
if(my $res = $self->resource){
$res = qq{-R '$res'} unless $res =~ /^-R/;
$bsub_line .= " $res ";
}
$bsub_line .= " -q ".$self->queue if $self->queue;
$bsub_line .= " -J ".$self->jobname if $self->jobname;
$bsub_line .= " ".$self->parameters." " if ($self->parameters);
if($stderr){
$bsub_line .= " -e ".$stderr;
}else{
$bsub_line .= " -e ".$self->stderr_file;
}
$bsub_line .= " -E \"".$self->pre_exec."\"" if defined $self->pre_exec;
## must ensure the prexec is in quotes ##
$bsub_line .= " ".$command;
$self->bsub($bsub_line);
}
sub open_command_line{
my ($self, $verbose)= @_;
my $lsf = '';
if (open(my $pipe, '-|')) {
while (<$pipe>) {
if (/Job <(\d+)>/) {
$lsf = $1;
} else {
warning("DEBUG: unexpected from bsub: '$_'");
}
}
if (close $pipe) {
if ( ($? >> 8) == 0 ){
if ($lsf) {
$self->id($lsf);
} else {
warning("Bsub worked but returned no job ID. Weird");
}
} else {
throw("Bsub failed : exit status " . $? >> 8 . "\n");
}
} else {
throw("Could not close bsub pipe : $!\n");
}
} else {
# We want STDERR and STDOUT merged for the bsub process
# open STDERR, '>&STDOUT';
# probably better to do with shell redirection as above can fail
exec($self->bsub .' 2>&1') || throw("Could not run bsub");
}
}
sub get_pending_jobs {
my($self, %args) = @_;
my ($user) = $args{'-user'} || $args{'-USER'} || undef;
my ($queue) = $args{'-queue'} || $args{'-QUEUE'} || undef;
my $cmd = "bjobs";
$cmd .= " -q $queue" if $queue;
$cmd .= " -u $user" if $user;
$cmd .= " | grep -c PEND ";
print STDERR "$cmd\n" if $args{'-debug'};
my $pending_jobs = 0;
if( my $pid = open (my $fh, '-|') ){
eval{
local $SIG{ALRM} = sub { kill 9, $pid; };
alarm(60);
while(<$fh>){
chomp;
$pending_jobs = $_;
}
close $fh;
alarm 0;
}
}else{
exec( $cmd );
die q{Something went wrong here $!: } . $! . "\n";
}
print STDERR "FOUND $pending_jobs jobs pending\n" if $args{'-debug'};
return $pending_jobs;
}
sub get_job_time{
my ($self) = @_;
my $command = "bjobs -l";
#print $command."\n";
my %id_times;
local *BJOB;
open(BJOB, "$command |") or throw("couldn't open pipe to bjobs");
my $job_id;
while(){
chomp;
if(/Job\s+\<(\d+)\>/){
$job_id = $1;
}elsif(/The CPU time used/){
my ($time) = $_ =~ /The CPU time used is (\d+)/;
$id_times{$job_id} = $time;
}
}
close(BJOB);
#or throw("couldn't close pipe to bjobs");
return \%id_times;
}
sub check_existance{
my ($self, $id_hash, $verbose) = @_;
my %job_submission_ids = %$id_hash;
my $command = "bjobs";
local *BJOB;
open(BJOB, "$command 2>&1 |") or
throw("couldn't open pipe to bjobs");
my %existing_ids;
LINE:while(){
print STDERR if($verbose);
chomp;
if ($_ =~ /No unfinished job found/) {
last LINE;
}
my @values = split;
if($values[0] =~ /\d+/){
if($values[2] eq 'UNKWN'){
next LINE;
}
$existing_ids{$values[0]} = 1;
}
}
my @awol_jobs;
foreach my $job_id(keys(%job_submission_ids)){
if(!$existing_ids{$job_id}){
push(@awol_jobs, @{$job_submission_ids{$job_id}});
}
}
close(BJOB);
#or throw("Can't close pipe to bjobs");
return \@awol_jobs;
}
sub job_stats {
my ($self, $verbose) = @_;
my $command = "bjobs";
# Need to sleep to make sure lsf is displaying correct info
sleep(20);
local *BJOB;
open(BJOB, "$command 2>&1 |") or throw("couldn't open pipe to bjobs");
my %jobs;
LINE:
while(){
chomp;
if ($_ =~ /No unfinished job found/) {
last LINE;
}
my @values = split;
$jobs{$values[0]} = $values[2];
}
return \%jobs;
}
#sub check_existance{
# my ($self, $id, $verbose) = @_;
# if(!$id){
# die("Can't run without an LSF id");
# }
# my $command = "bjobs ".$id;
# #print STDERR "Running ".$command."\n";
# my $flag = 0;
# open(BJOB, "$command 2>&1 |") or throw("couldn't open pipe to bjobs");
# while(){
# print STDERR if($verbose);
# chomp;
# if ($_ =~ /No unfinished job found/) {
# #print "Set flag\n";
# $flag = 1;
# }
# my @values = split;
# if($values[0] =~ /\d+/){
# return $values[0];
# }
# }
# close(BJOB);
# print STDERR "Have lost ".$id."\n" if($verbose);
# return undef;
#}
sub kill_job{
my ($self, $job_id) = @_;
my $command = "bkill ".$job_id;
system($command);
}
sub stdout_file{
my ($self, $arg) = @_;
if($arg){
$self->{'stdout'} = $arg;
}
if(!$self->{'stdout'}){
$self->{'stdout'} ='/dev/null'
}
return $self->{'stdout'};
}
sub stderr_file{
my ($self, $arg) = @_;
if ($arg){
$self->{'stderr'} = $arg;
}
if(!$self->{'stderr'}){
$self->{'stderr'} ='/dev/null'
}
return $self->{'stderr'};
}
sub temp_filename{
my ($self) = @_;
$self->{'lsf_jobfilename'} = $ENV{'LSB_JOBFILENAME'};
return $self->{'lsf_jobfilename'};
}
sub temp_outfile{
my ($self) = @_;
$self->{'_temp_outfile'} = $self->temp_filename.".out";
return $self->{'_temp_outfile'};
}
sub temp_errfile{
my ($self) = @_;
$self->{'_temp_errfile'} = $self->temp_filename.".err";
return $self->{'_temp_errfile'};
}
sub submission_host{
my ($self) = @_;
$self->{'_submission_host'} = $ENV{'LSB_SUB_HOST'};
return $self->{'_submission_host'};
}
sub lsf_user{
my ($self) = @_;
$self->{'_lsf_user'} = $ENV{'LSFUSER'};
return $self->{'_lsf_user'};
}
=head2 copy_output
copy_output is used to copy the job's STDOUT and
STDERR files using B. This avoids using NFS'.
=cut
sub copy_output {
my ($self, $dest_err, $dest_out) = @_;
$dest_err ||= $self->stderr_file;
$dest_out ||= $self->stdout_file;
if (! $self->temp_filename) {
my ($p, $f, $l) = caller;
warning("The lsf environment variable LSB_JOBFILENAME is not defined".
" we can't copy the output files which don't exist $f:$l");
return;
}
# Unbuffer STDOUT so that data gets flushed to file
# (It is OK to leave it unbuffered because this method
# gets called after the job is finished.)
my $old_fh = select(STDOUT);
$| = 1;
select($old_fh);
my $temp_err = $self->temp_errfile;
my $temp_out = $self->temp_outfile;
my $command = $self->copy_command;
my $remote = $self->lsf_user . '@' . $self->submission_host;
foreach my $set ([$temp_out, $dest_out], [$temp_err, $dest_err]) {
my( $temp, $dest ) = @$set;
if (-e $temp) {
if ($command eq 'cp' || $dest =~ /^\/lustre/) {
copy($temp, $dest);
} else {
my $err_copy = "$command $temp $remote:$dest";
unless (system($err_copy) == 0) {
warn "Error: copy '$err_copy' failed exit($?)";
}
}
} else {
warn "No such file '$temp' to copy\n";
}
}
}
sub delete_output{
my ($self) = @_;
unlink $self->temp_errfile if(-e $self->temp_errfile);
unlink $self->temp_outfile if(-e $self->temp_outfile);
}
sub copy_command{
my ($self, $arg) = @_;
if($arg){
$self->{'_copy_command'} = $arg;
}
return $self->{'_copy_command'} || 'lsrcp ';
}
1;