Bio::EnsEMBL::Pipeline::BatchSubmission LSF
SummaryIncluded librariesPackage variablesSynopsisDescriptionGeneral documentationMethods
Toolbar
WebCvsRaw content
Summary
Bio::EnsEMBL::Pipeline::BatchSubmission::LSF
Package variables
No package variables defined.
Included modules
Bio::EnsEMBL::Pipeline::BatchSubmission
Bio::EnsEMBL::Utils::Argument qw ( rearrange )
Bio::EnsEMBL::Utils::Exception qw ( verbose throw warning info )
File::Copy
strict
Inherit
Bio::EnsEMBL::Pipeline::BatchSubmission
Synopsis
my $batchjob = Bio::EnsEMBL::Pipeline::BatchSubmission::LSF->new(
-STDOUT => $stdout_file,
-STDERR => $stderr_file,
-PARAMETERS => @args,
-PRE_EXEC => $pre_exec,
-QUEUE => $queue,
-JOBNAME => $jobname,
-NODES => $nodes,
-RESOURCE => $resource
);
$batch_job->construct_command_line('test.pl');
$batch_job->open_command_line();
Description
This module provides an interface to the Platform LSF load sharing software and
its commands. It implements the method construct_command_line which is not
defined in the base class and which enables the pipeline to submit jobs in a
distributed environment using LSF.
See base class Bio::EnsEMBL::Pipeline::BatchSubmission for more info
Methods
bsub
No description
Code
check_existance
No description
Code
construct_command_line
No description
Code
copy_command
No description
Code
copy_outputDescriptionCode
delete_output
No description
Code
get_job_time
No description
Code
get_pending_jobs
No description
Code
job_stats
No description
Code
kill_job
No description
Code
lsf_user
No description
Code
new
No description
Code
open_command_line
No description
Code
stderr_file
No description
Code
stdout_file
No description
Code
submission_host
No description
Code
temp_errfile
No description
Code
temp_filename
No description
Code
temp_outfile
No description
Code
Methods description
copy_outputcode    nextTop
copy_output is used to copy the job's STDOUT and
STDERR files using lsrcp. This avoids using NFS'.
Methods code
bsubdescriptionprevnextTop
sub bsub {
  my($self, $arg) = @_;

  if(defined($arg)){
    $self->{'bsub'} = $arg;
  }

  return $self->{'bsub'};

}

##other accessor are in base class##
######################
#command line methods#
######################
}
check_existancedescriptionprevnextTop
sub check_existance {
  my ($self, $id_hash, $verbose) = @_;
  my %job_submission_ids = %$id_hash;
  my $command = "bjobs";
  local *BJOB;
  open(BJOB, "$command 2>&1 |") or 
    throw("couldn't open pipe to bjobs");
  my %existing_ids;
 LINE:while(<BJOB>){
    print STDERR if($verbose);
    chomp;
    if ($_ =~ /No unfinished job found/) {
      last LINE;
    }
    my @values = split;
    if($values[0] =~ /\d+/){
      if($values[2] eq 'UNKWN'){
        next LINE;
      }
      $existing_ids{$values[0]} = 1;
    }
  }
  my @awol_jobs;
  foreach my $job_id(keys(%job_submission_ids)){
    if(!$existing_ids{$job_id}){
      push(@awol_jobs, @{$job_submission_ids{$job_id}});
    }
  }
  close(BJOB);
  #or throw("Can't close pipe to bjobs");
return\@ awol_jobs;
}
construct_command_linedescriptionprevnextTop
sub construct_command_line {
  my($self, $command, $stdout, $stderr) = @_; 
#command must be the first argument then if stdout or stderr aren't definhed the objects own can be used
if(!$command){ throw("cannot create bsub if nothing to submit to it : $!\n"); } my $bsub_line; $self->command($command); if($stdout){ $bsub_line = "bsub -o ".$stdout; }else{ $bsub_line = "bsub -o ".$self->stdout_file; } if($self->nodes){ my $nodes = $self->nodes; # $nodes needs to be a space-delimited list
$nodes =~ s/,/ /; $nodes =~ s/ +/ /; # undef $nodes unless $nodes =~ m{(\w+\ )*\w};
$bsub_line .= " -m '".$nodes."' "; } if(my $res = $self->resource){ $res = qq{-R '$res'} unless $res =~ /^-R/; $bsub_line .= " $res "; } $bsub_line .= " -q ".$self->queue if $self->queue; $bsub_line .= " -J ".$self->jobname if $self->jobname; $bsub_line .= " ".$self->parameters." " if ($self->parameters); if($stderr){ $bsub_line .= " -e ".$stderr; }else{ $bsub_line .= " -e ".$self->stderr_file; } $bsub_line .= " -E\" ".$self->pre_exec."\"" if defined $self->pre_exec; ## must ensure the prexec is in quotes ##
$bsub_line .= " ".$command; $self->bsub($bsub_line);
}
copy_commanddescriptionprevnextTop
sub copy_command {
  my ($self, $arg) = @_;

  if($arg){
    $self->{'_copy_command'} = $arg;
  }

  return $self->{'_copy_command'} || 'lsrcp ';
}


1;
}
copy_outputdescriptionprevnextTop
sub copy_output {
    my ($self, $dest_err, $dest_out) = @_;

    $dest_err ||= $self->stderr_file;
    $dest_out ||= $self->stdout_file;

    if (! $self->temp_filename) {
        my ($p, $f, $l) = caller;
        warning("The lsf environment variable LSB_JOBFILENAME is not defined".
                    " we can't copy the output files which don't exist $f:$l");
        return;
    }
    
    # Unbuffer STDOUT so that data gets flushed to file
# (It is OK to leave it unbuffered because this method
# gets called after the job is finished.)
my $old_fh = select(STDOUT); $| = 1; select($old_fh); my $temp_err = $self->temp_errfile; my $temp_out = $self->temp_outfile; my $command = $self->copy_command; my $remote = $self->lsf_user . '@' . $self->submission_host; foreach my $set ([$temp_out, $dest_out], [$temp_err, $dest_err]) { my( $temp, $dest ) = @$set; if (-e $temp) { if ($command eq 'cp' || $dest =~ /^\/lustre/) { copy($temp, $dest); } else { my $err_copy = "$command $temp $remote:$dest"; unless (system($err_copy) == 0) { warn "Error: copy '$err_copy' failed exit($?)"; } } } else { warn "No such file '$temp' to copy\n"; } }
}
delete_outputdescriptionprevnextTop
sub delete_output {
  my ($self) = @_;
  
  unlink $self->temp_errfile if(-e $self->temp_errfile);
  unlink $self->temp_outfile if(-e $self->temp_outfile);
}
get_job_timedescriptionprevnextTop
sub get_job_time {
  my ($self) = @_;
  my $command = "bjobs -l";
  #print $command."\n";
my %id_times; local *BJOB; open(BJOB, "$command |") or throw("couldn't open pipe to bjobs"); my $job_id; while(<BJOB>){ chomp; if(/Job\s+\<(\d+)\>/){ $job_id = $1; }elsif(/The CPU time used/){ my ($time) = $_ =~ /The CPU time used is (\d+)/; $id_times{$job_id} = $time; } } close(BJOB); #or throw("couldn't close pipe to bjobs");
return\% id_times;
}
get_pending_jobsdescriptionprevnextTop
sub get_pending_jobs {
  my($self, %args) = @_;

  my ($user)  = $args{'-user'}  || $args{'-USER'}  || undef;
  my ($queue) = $args{'-queue'} || $args{'-QUEUE'} || undef;

  my $cmd = "bjobs";
  $cmd .= " -q $queue" if $queue;
  $cmd .= " -u $user"  if $user;
  $cmd .= " | grep -c PEND ";

  print STDERR "$cmd\n" if $args{'-debug'};

  my $pending_jobs = 0;
  if( my $pid = open (my $fh, '-|') ){
      eval{
	  local $SIG{ALRM} = sub { kill 9, $pid; };
	  alarm(60);
	  while(<$fh>){
	      chomp;
	      $pending_jobs = $_;
	  }
	  close $fh;
	  alarm 0;
      }
  }else{
      exec( $cmd );
      die q{Something went wrong here $!: } . $! . "\n";
  }
  print STDERR "FOUND $pending_jobs jobs pending\n" if $args{'-debug'};
  return $pending_jobs;
}
job_statsdescriptionprevnextTop
sub job_stats {
  my ($self, $verbose) = @_;
  my $command = "bjobs";

  # Need to sleep to make sure lsf is displaying correct info
sleep(20); local *BJOB; open(BJOB, "$command 2>&1 |") or throw("couldn't open pipe to bjobs"); my %jobs; LINE: while(<BJOB>){ chomp; if ($_ =~ /No unfinished job found/) { last LINE; } my @values = split; $jobs{$values[0]} = $values[2]; } return\% jobs; } #sub check_existance{
# my ($self, $id, $verbose) = @_;
# if(!$id){
# die("Can't run without an LSF id");
# }
# my $command = "bjobs ".$id;
# #print STDERR "Running ".$command."\n";
# my $flag = 0;
# open(BJOB, "$command 2>&1 |") or throw("couldn't open pipe to bjobs");
# while(<BJOB>){
# print STDERR if($verbose);
# chomp;
# if ($_ =~ /No unfinished job found/) {
# #print "Set flag\n";
# $flag = 1;
# }
# my @values = split;
# if($values[0] =~ /\d+/){
# return $values[0];
# }
# }
# close(BJOB);
# print STDERR "Have lost ".$id."\n" if($verbose);
# return undef;
#
}
kill_jobdescriptionprevnextTop
sub kill_job {
  my ($self, $job_id) = @_;

  my $command = "bkill ".$job_id;
  system($command);
}
lsf_userdescriptionprevnextTop
sub lsf_user {
  my ($self) = @_;

 
  $self->{'_lsf_user'} = $ENV{'LSFUSER'};
  

  return $self->{'_lsf_user'};
}
newdescriptionprevnextTop
sub new {
  my ($class, @args) = @_;
  my $self = $class->SUPER::new(@args);
  #print "CREATING ".$self." with ".join(" ", @args)."\n";
$self->{'bsub'} = undef; return $self; } ##################
#accessor methods#
##################
}
open_command_linedescriptionprevnextTop
sub open_command_line {
  my ($self, $verbose)= @_;

  my $lsf = '';

  if (open(my $pipe, '-|')) {
    while (<$pipe>) {
      if (/Job <(\d+)>/) {
	      $lsf = $1;
      } else {
	      warning("DEBUG: unexpected from bsub: '$_'");
      }	  
    }
    if (close $pipe) {
      if ( ($? >> 8) == 0 ){
	      if ($lsf) {
          $self->id($lsf);
	      } else {
          warning("Bsub worked but returned no job ID. Weird");
	      }
      } else {
	      throw("Bsub failed : exit status " . $? >> 8 . "\n");
      }
    } else {
      throw("Could not close bsub pipe : $!\n");
    }      
  } else {      
    # We want STDERR and STDOUT merged for the bsub process
# open STDERR, '>&STDOUT';
# probably better to do with shell redirection as above can fail
exec($self->bsub .' 2>&1') || throw("Could not run bsub"); }
}
stderr_filedescriptionprevnextTop
sub stderr_file {
   my ($self, $arg) = @_;

   if ($arg){
     $self->{'stderr'} = $arg;
   }
   if(!$self->{'stderr'}){
     $self->{'stderr'} ='/dev/null'
   }
   return $self->{'stderr'};
}
stdout_filedescriptionprevnextTop
sub stdout_file {
   my ($self, $arg) = @_;

   if($arg){
     $self->{'stdout'} = $arg;
   }

   if(!$self->{'stdout'}){
     $self->{'stdout'} ='/dev/null'
   }
   return $self->{'stdout'};
}
submission_hostdescriptionprevnextTop
sub submission_host {
  my ($self) = @_;

  $self->{'_submission_host'} = $ENV{'LSB_SUB_HOST'};
  

  return $self->{'_submission_host'};
}
temp_errfiledescriptionprevnextTop
sub temp_errfile {
  my ($self) = @_;

  $self->{'_temp_errfile'} = $self->temp_filename.".err";
  

  return $self->{'_temp_errfile'};
}
temp_filenamedescriptionprevnextTop
sub temp_filename {
  my ($self) = @_;

  $self->{'lsf_jobfilename'} = $ENV{'LSB_JOBFILENAME'};
  return $self->{'lsf_jobfilename'};
}
temp_outfiledescriptionprevnextTop
sub temp_outfile {
  my ($self) = @_;

  $self->{'_temp_outfile'} = $self->temp_filename.".out";

  return $self->{'_temp_outfile'};
}
General documentation
CONTACTTop
Post general queries to ensembl-dev@ebi.ac.uk
APPENDIXTop
The rest of the documentation details each of the object methods. Internal
methods are usually preceded with a _