Raw content of Bio::Tools::Run::Search::SGE_WuBlast =head1 NAME Bio::Tools::Run::Search::SGE_WuBlast - Base class for Ensembl BLAST searches =head1 SYNOPSIS see Bio::Tools::Run::Search::WuBlast =head1 DESCRIPTION An extension of Bio::Tools::Run::Search::WuBlast to cope with a blast farm usine SGE. E.g. uses the qsub job submission system to dispatch jobs. The jobs themselves are wrapped in the utils/runblast.pl perl script. =cut # Let the code begin... package Bio::Tools::Run::Search::SGE_WuBlast; use strict; #use File::Copy qw(mv cp); use File::Copy; use Data::Dumper qw(Dumper); use vars qw( @ISA $QSUB_RESOURCE $MAX_BLAST_CPUS $SPECIES_DEFS ); use Bio::Tools::Run::Search::WuBlast; use EnsEMBL::Web::RegObj; use Sys::Hostname qw(hostname); @ISA = qw( Bio::Tools::Run::Search::WuBlast ); BEGIN{ $SPECIES_DEFS = $ENSEMBL_WEB_REGISTRY->species_defs; $QSUB_RESOURCE = '-l h_rt=10:00:00,s_rt=10:00:00'; # Set default blast cpus flag for SMP boxes $MAX_BLAST_CPUS = 1; } #---------------------------------------------------------------------- =head2 run Arg [1] : none Function : Dispatches the blast job using the dispatch_qsub method Returntype: Exceptions: Caller : Example : =cut sub run { my $self = shift; if( $self->status ne 'PENDING' and $self->status ne 'DISPATCHED' ){ $self->warn( "Wrong status for run: ". $self->status ); } # Apply environment variables, keeping a backup copy my %ENV_TMP = %ENV; foreach my $env( $self->environment_variable() ){ my $val = $self->environment_variable( $env ); if( defined $val ){ $ENV{$env} = $val } else{ delete( $ENV{$env} ) } } # Do the deed my $command = $self->command; $self->dispatch_qsub( $command ); $self->debug( "BLAST COMMAND: " .$command."\n" ); # $self->debug( "BLAST COMMAND: ".$self->command."\n" ); # Restore environment %ENV = %ENV_TMP; return 1; } #---------------------------------------------------------------------- =head2 run_blast Arg [1] : None Function : Fires off the blast command (SUPER::run), with a pre-repeatmask step Returntype: Boolean Exceptions: Caller : Example : =cut sub run_blast{ my $self = shift; # if( $self->option("repeatmask") || # defined( $self->option("-RepeatMasker") ) ){ # uc($self->seq->alphabet) eq 'DNA' || # ( $self->warn( "Can't repeatmask peptide sequences!" ) && return ); # $self->_repeatmask; # } return $self->SUPER::run(); } #---------------------------------------------------------------------- =head2 command_qsub Arg [1] : None Function : Internal method to generate the shell qsub command. This command calls the utils/runblast.pm wrapper script rather that the blast command itself Returntype: String: $command Exceptions: Caller : Example : =cut sub command_qsub{ my $self = shift; # my $program_name = "runblast.pl"; # my $program_dir = $SiteDefs::ENSEMBL_SERVERROOT."/utils"; my $blastscript = $SiteDefs::ENSEMBL_BLASTSCRIPT; my $args = $self->token; my $command = "$blastscript $args"; return $command; } #---------------------------------------------------------------------- =head2 _repeatmask Arg [1] : Function : Returntype: Exceptions: Caller : Example : =cut sub _repeatmask{ my $self = shift; #TODO: expunge SpDefs warn ".... repeat_masker_called"; $ENV{BLASTREPEATMASKER} = $SPECIES_DEFS->ENSEMBL_REPEATMASKER; return $self->SUPER::_repeatmask(@_); } #---------------------------------------------------------------------- =head2 command Arg [1] : None Function : Generate the blast command itself Returntype: String: $command Exceptions: Caller : Example : =cut sub command{ my $self = shift; if( ! -f $self->fastafile ){ $self->throw("Need a query sequence!") } my $res_file = $self->reportfile; if( -f $res_file ){ $self->warn("A result already exists for $res_file" ); unlink( $self->reportfile ); } my $res_file_local = '/tmp/blast_$$.out'; $ENV{'BLASTMAT'} || $self->warn( "BLASTMAT variable not set" ); $ENV{'BLASTFILTER'} || $self->warn( "BLASTFILTER variable not set" ); $ENV{'BLASTDB'} || $self->warn( "BLASTBD variable not set" ); my $database = $self->database || $self->throw("No database"); my $param_str = ''; foreach my $param( $self->option ){ my $val = $self->option($param) || ''; next if $param eq "repeatmask"; next if $param eq "-RepeatMasker"; if( $param =~ /=$/ ){ $param_str .= " $param$val" } elsif( $val ){ $param_str .= " $param $val" } else{ $param_str .= " $param" } } return join( ' ', $SPECIES_DEFS->ENSEMBL_BLAST_BIN_PATH."/".$self->program_path, $SPECIES_DEFS->ENSEMBL_BLAST_DATA_PATH."/$database", '[[]]', $param_str); } #---------------------------------------------------------------------- =head2 dispatch_qsub Arg [1] : Function : Fires off the qsub command Returntype: Exceptions: Caller : run method Example : =cut sub dispatch_qsub { my $self = shift; my $command = shift || die( "Need a command to dispatch!" ); my( $ticket ) = $self->statefile =~ m#/([^/]+$)#; ## Files on BLAST SERVER my $shared_tmp_dir = $SiteDefs::ENSEMBL_SGE_SHARED_DIR; my $server_out_file = "$shared_tmp_dir/$ticket.out"; my $server_fail_file = "$shared_tmp_dir/$ticket.fail"; my $server_flag_file = "$shared_tmp_dir/$ticket.flag"; my $server_fasta_file = "$shared_tmp_dir/$ticket.fa"; my $server_job_id = "$shared_tmp_dir/$ticket.job"; ## Files on web-blade my $client_out_file = $self->reportfile; my $state_file = $self->statefile; my @PARTS = split /\//, $state_file; my $TICKET_NAME = "$PARTS[-3]$PARTS[-2]-$PARTS[-1]"; my $client_flag_file = $SPECIES_DEFS->ENSEMBL_TMP_DIR_BLAST."/pending/$TICKET_NAME"; my $client_sent_file = $SPECIES_DEFS->ENSEMBL_TMP_DIR_BLAST."/sent/$TICKET_NAME"; my $client_fail_file = "$state_file.fail"; my $client_fasta_file = $self->fastafile; $command =~ s/\[\[\]\]/$server_fasta_file/; my $queue = $self->priority || 'offline'; my $jobid; my $host = hostname(); my $pid; local *QSUB; warn "#### dispatch_qsub_called"; my $repeatmask_command = '/usr/local/bioinf/bin/RepeatMasker'; copy($client_fasta_file, $server_fasta_file); # copy the input fasta file to the SGE shared dir $ENV{'SGE_ROOT'} = $SiteDefs::ENSEMBL_SGE_ROOT; # if( open(QSUB, qq(|qsub $QSUB_RESOURCE -N $ticket -S /bin/bash -o /dev/null -e $shared_tmp_dir/sge.e) )) { my $SGE_JobName = "EnsBlast_" . $ticket; if( open(QSUB, qq(|qsub $QSUB_RESOURCE -N $SGE_JobName -S /bin/bash -o /dev/null -e /dev/null) )) { if( open(FH,">$client_sent_file" ) ) { print FH "$state_file"; close FH; } $self->_init_command_string(); if( ( $self->option("repeatmask") || defined( $self->option("-RepeatMasker") ) ) && ( uc($self->seq->alphabet) eq 'DNA' ) ) { $self->_add_command( qq( echo \$JOB_ID > $server_job_id)); # wirte SGE job id to a file $self->_add_command( qq( $repeatmask_command $server_fasta_file ), ## Run repeat masker qq( rm $server_fasta_file.out ), ## Remove all of the temporary files qq( rm $server_fasta_file.stderr ), qq( rm $server_fasta_file.cat ), qq( rm $server_fasta_file.RepMask ), qq( rm $server_fasta_file.RepMask.cat ), qq( rm $server_fasta_file.masked.log ), qq( mv $server_fasta_file.masked $server_fasta_file ) ); ## Copy back the repeat masked file! } $self->_add_command( "$command >$server_out_file 2>$server_fail_file" ); # Run the blast, sending output to local temp file $self->_add_command( 'status=$?' ); $self->_add_command( "echo '$state_file' > $server_flag_file" ); # Touch flag file so that can indicate blast has finished $self->_add_command( qq($SiteDefs::ENSEMBL_SGE_RCP_CMD "$server_out_file" "$host:$client_out_file"), qq($SiteDefs::ENSEMBL_SGE_RCP_CMD "$server_fail_file" "$host:$client_fail_file"), qq($SiteDefs::ENSEMBL_SGE_RCP_CMD "$server_flag_file" "$host:$client_flag_file") ); # Copy all files back... $self->_add_command( qq(rm -f "$shared_tmp_dir"/$ticket.*) ); # Now tidy up the temporary files $self->_add_command( 'exit $status' ); print QSUB $self->_command_string(); close QSUB; if ($? != 0) { die("qsub exited with non-zero status - job not submitted\n"); } } else { die("Could not exec qsub : $!\n"); } return 1; } sub _init_command_string { my $self = shift; $self->{'command_string'} = ''; } sub _add_command { my $self = shift; warn join "\n",@_,""; $self->{'command_string'} .= join "\n", @_, ''; } sub _command_string { my $self = shift; return $self->{'command_string'}; } #---------------------------------------------------------------------- sub remove{ my $self = shift; my( $ticket ) = $self->statefile =~ m#/([^/]+$)#; my $shared_tmp_dir = $SiteDefs::ENSEMBL_SGE_SHARED_DIR; my $server_job_id = "$shared_tmp_dir/$ticket.job"; if( -e $server_job_id) { open(JF, "<$server_job_id"); my @jf_content = <JF>; close(JF); my $job_id = $jf_content[0]; chomp($job_id); my $sec = 10; local $SIG{ALRM} = sub{ die( "qdel timeout ($sec secs)\n" ) }; my $out; eval{ alarm( $sec ); $out = `qdel $job_id 2>&1`; alarm( 0 ); }; if( $@ ){ die( $@ ) } warn ( "QSUB REMOVING $ticket: ",$out ); $self->SUPER::remove(); } } #---------------------------------------------------------------------- 1;