Bio::Tools::Run::Search
SGE_WuBlast
Toolbar
Summary
Bio::Tools::Run::Search::SGE_WuBlast - Base class for Ensembl BLAST searches
Package variables
No package variables defined.
Included modules
Bio::Tools::Run::Search::WuBlast
Data::Dumper qw ( Dumper )
EnsEMBL::Web::RegObj
File::Copy
Sys::Hostname qw ( hostname )
Inherit
Bio::Tools::Run::Search::WuBlast
Synopsis
see Bio::Tools::Run::Search::WuBlast
Description
An extension of Bio::Tools::Run::Search::WuBlast to cope with a
blast farm usine SGE. E.g. uses the qsub job submission system to
dispatch jobs. The jobs themselves are wrapped in the
utils/runblast.pl perl script.
Methods
Methods description
Arg [1] : Function : Returntype: Exceptions: Caller : Example : |
Arg [1] : None Function : Generate the blast command itself Returntype: String: $command Exceptions: Caller : Example : |
Arg [1] : None Function : Internal method to generate the shell qsub command. This command calls the utils/runblast.pm wrapper script rather that the blast command itself Returntype: String: $command Exceptions: Caller : Example : |
Arg [1] : Function : Fires off the qsub command Returntype: Exceptions: Caller : run method Example : |
Arg [1] : none Function : Dispatches the blast job using the dispatch_qsub method Returntype: Exceptions: Caller : Example : |
Arg [1] : None Function : Fires off the blast command (SUPER::run), with a pre-repeatmask step Returntype: Boolean Exceptions: Caller : Example : |
Methods code
BEGIN { $SPECIES_DEFS = $ENSEMBL_WEB_REGISTRY->species_defs;
$QSUB_RESOURCE = '-l h_rt=10:00:00,s_rt=10:00:00';
$MAX_BLAST_CPUS = 1; } |
sub _add_command
{ my $self = shift;
warn join "\n",@_,"";
$self->{'command_string'} .= join "\n", @_, ''; } |
sub _command_string
{ my $self = shift;
return $self->{'command_string'};
}
} |
sub _init_command_string
{ my $self = shift;
$self->{'command_string'} = ''; } |
sub _repeatmask
{ my $self = shift;
warn ".... repeat_masker_called";
$ENV{BLASTREPEATMASKER} = $SPECIES_DEFS->ENSEMBL_REPEATMASKER;
return $self->SUPER::_repeatmask(@_);
}
} |
sub command
{ my $self = shift;
if( ! -f $self->fastafile ){ $self->throw("Need a query sequence!") }
my $res_file = $self->reportfile;
if( -f $res_file ){
$self->warn("A result already exists for $res_file" );
unlink( $self->reportfile );
}
my $res_file_local = '/tmp/blast_$$.out';
$ENV{'BLASTMAT'} || $self->warn( "BLASTMAT variable not set" );
$ENV{'BLASTFILTER'} || $self->warn( "BLASTFILTER variable not set" );
$ENV{'BLASTDB'} || $self->warn( "BLASTBD variable not set" );
my $database = $self->database ||
$self->throw("No database");
my $param_str = '';
foreach my $param( $self->option ){
my $val = $self->option($param) || '';
next if $param eq "repeatmask";
next if $param eq "-RepeatMasker";
if( $param =~ /=$/ ){ $param_str .= " $param$val" }
elsif( $val ){ $param_str .= " $param $val" }
else{ $param_str .= " $param" }
}
return join( ' ', $SPECIES_DEFS->ENSEMBL_BLAST_BIN_PATH."/".$self->program_path,
$SPECIES_DEFS->ENSEMBL_BLAST_DATA_PATH."/$database", '[[]]', $param_str);
}
} |
sub command_qsub
{ my $self = shift;
my $blastscript = $SiteDefs::ENSEMBL_BLASTSCRIPT;
my $args = $self->token;
my $command = "$blastscript $args";
return $command;
}
} |
sub dispatch_qsub
{ my $self = shift;
my $command = shift || die( "Need a command to dispatch!" );
my( $ticket ) = $self->statefile =~ m#/([^/]+$)#;
my $shared_tmp_dir = $SiteDefs::ENSEMBL_SGE_SHARED_DIR;
my $server_out_file = "$shared_tmp_dir/$ticket.out";
my $server_fail_file = "$shared_tmp_dir/$ticket.fail";
my $server_flag_file = "$shared_tmp_dir/$ticket.flag";
my $server_fasta_file = "$shared_tmp_dir/$ticket.fa";
my $server_job_id = "$shared_tmp_dir/$ticket.job";
my $client_out_file = $self->reportfile;
my $state_file = $self->statefile;
my @PARTS = split /\//, $state_file;
my $TICKET_NAME = "$PARTS[-3]$PARTS[-2]-$PARTS[-1]";
my $client_flag_file = $SPECIES_DEFS->ENSEMBL_TMP_DIR_BLAST."/pending/$TICKET_NAME";
my $client_sent_file = $SPECIES_DEFS->ENSEMBL_TMP_DIR_BLAST."/sent/$TICKET_NAME";
my $client_fail_file = "$state_file.fail";
my $client_fasta_file = $self->fastafile;
$command =~ s/\[\[\]\]/$server_fasta_file/;
my $queue = $self->priority || 'offline';
my $jobid;
my $host = hostname();
my $pid;
local *QSUB;
warn "#### dispatch_qsub_called";
my $repeatmask_command = '/usr/local/bioinf/bin/RepeatMasker';
copy($client_fasta_file, $server_fasta_file); $ENV{'SGE_ROOT'} = $SiteDefs::ENSEMBL_SGE_ROOT;
my $SGE_JobName = "EnsBlast_" . $ticket;
if( open(QSUB, qq(|qsub $QSUB_RESOURCE -N $SGE_JobName -S /bin/bash -o /dev/null -e /dev/null) )) {
if( open(FH,">$client_sent_file" ) ) {
print FH "$state_file";
close FH;
}
$self->_init_command_string();
if(
( $self->option("repeatmask") || defined( $self->option("-RepeatMasker") ) ) &&
( uc($self->seq->alphabet) eq 'DNA' )
) {
$self->_add_command( qq( echo\$ JOB_ID > $server_job_id)); $self->_add_command( qq( $repeatmask_command $server_fasta_file ), qq( rm $server_fasta_file.out ), qq( rm $server_fasta_file.stderr ),
qq( rm $server_fasta_file.cat ),
qq( rm $server_fasta_file.RepMask ),
qq( rm $server_fasta_file.RepMask.cat ),
qq( rm $server_fasta_file.masked.log ),
qq( mv $server_fasta_file.masked $server_fasta_file ) ); }
$self->_add_command( "$command >$server_out_file 2>$server_fail_file" ); $self->_add_command( 'status=$?' );
$self->_add_command( "echo '$state_file' > $server_flag_file" ); $self->_add_command( qq($SiteDefs::ENSEMBL_SGE_RCP_CMD "$server_out_file" "$host:$client_out_file"),
qq($SiteDefs::ENSEMBL_SGE_RCP_CMD "$server_fail_file" "$host:$client_fail_file"),
qq($SiteDefs::ENSEMBL_SGE_RCP_CMD "$server_flag_file" "$host:$client_flag_file") ); $self->_add_command( qq(rm -f "$shared_tmp_dir"/$ticket.*) ); $self->_add_command( 'exit $status' );
print QSUB $self->_command_string();
close QSUB;
if ($? != 0) {
die("qsub exited with non-zero status - job not submitted\n");
}
} else {
die("Could not exec qsub : $!\n");
}
return 1; } |
sub remove
{ my $self = shift;
my( $ticket ) = $self->statefile =~ m#/([^/]+$)#;
my $shared_tmp_dir = $SiteDefs::ENSEMBL_SGE_SHARED_DIR;
my $server_job_id = "$shared_tmp_dir/$ticket.job";
if( -e $server_job_id) {
open(JF, "<$server_job_id");
my @jf_content = <JF>;
close(JF);
my $job_id = $jf_content[0];
chomp($job_id);
my $sec = 10;
local $SIG{ALRM} = sub{ die( "qdel timeout ($sec secs)\n" ) };
my $out;
eval{
alarm( $sec );
$out = `qdel $job_id 2>&1`;
alarm( 0 );
};
if( $@ ){ die( $@ ) }
warn ( "QSUB REMOVING $ticket: ",$out );
$self->SUPER::remove();
}
}
1; } |
sub run
{ my $self = shift;
if( $self->status ne 'PENDING' and
$self->status ne 'DISPATCHED' ){
$self->warn( "Wrong status for run: ". $self->status );
}
my %ENV_TMP = %ENV;
foreach my $env( $self->environment_variable() ){
my $val = $self->environment_variable( $env );
if( defined $val ){ $ENV{$env} = $val }
else{ delete( $ENV{$env} ) }
}
my $command = $self->command;
$self->dispatch_qsub( $command );
$self->debug( "BLAST COMMAND: " .$command."\n" );
%ENV = %ENV_TMP;
return 1;
}
} |
sub run_blast
{ my $self = shift;
return $self->SUPER::run();
}
} |
General documentation
No general documentation available.