sub run()
{
my ($self, $query, $target, $mapper) = @_;
my $name = $self->submit_exonerate($query, $target, $mapper, $self->options());
return $name; } |
sub submit_exonerate
{
my ($self, $query, $target, $mapper, @options) = @_;
my $root_dir = $mapper->core->dir;
print "query $query\n" if($mapper->verbose);
my $queryfile = basename($query);
print "target $target\n" if($mapper->verbose);
my $targetfile = basename($target);
my $prefix = $root_dir . "/" . basename($query);
$prefix =~ s/\.\w+$//;
my ($ensembl_type) = $prefix =~ /.*_(dna|peptide)$/; my $options_str = join(" ", @options);
my $unique_name = $self->get_class_name() . "_" . time();
my $disk_space_needed = (stat($query))[7]+(stat($target))[7];
$disk_space_needed /= 1024000; # convert to MB $disk_space_needed = int($disk_space_needed); $disk_space_needed += 1;
my $num_jobs = calculate_num_jobs($query);
if(defined($mapper->nofarm)){
my $output = $self->get_class_name() . "_" . $ensembl_type . "_1.map";
my $cmd = <<EON; $exonerate_path $query $target --showvulgar false --showalignment FALSE --ryo "xref:%qi:%ti:%ei:%ql:%tl:%qab:%qae:%tab:%tae:%C:%s\n" $options_str | grep '^xref' > $root_dir/$output
EON print "none farm command is $cmd\n" if($mapper->verbose);
system($cmd);
$self->jobcount(1);
return "nofarm";
}
if($num_jobs == 1){
$num_jobs++;
}
$self->jobcount($self->jobcount()+$num_jobs);
my $output = $self->get_class_name() . "_" . $ensembl_type . "_" . "\$LSB_JOBINDEX.map";
my @main_bsub = ( 'bsub', '-R' .'select[linux] -Rrusage[tmp='.$disk_space_needed.']', '-J' . $unique_name . "[1-$num_jobs]%200", '-o', "$prefix.%J-%I.out", '-e', "$prefix.%J-%I.err");
my $main_job = <<EOF; . /usr/local/lsf/conf/profile.lsf
$exonerate_path $query $target --querychunkid \$LSB_JOBINDEX --querychunktotal $num_jobs --showvulgar false --showalignment FALSE --ryo "xref:%qi:%ti:%ei:%ql:%tl:%qab:%qae:%tab:%tae:%C:%s\n" $options_str | grep '^xref' > $root_dir/$output
EOF
my $jobid = 0;
eval {
my $pid;
my $reader;
local *BSUB;
local *BSUB_READER;
if (($reader = open(BSUB_READER, '-|'))) {
while (<BSUB_READER>) {
if (/^Job <(\d+)> is submitted/) {
$jobid = $1;
print "LSF job ID for main mapping job: $jobid (job array with $num_jobs jobs)\n" if($mapper->verbose);
}
}
close(BSUB_READER);
} else {
die("Could not fork : $!\n") unless (defined($reader));
open(STDERR, ">&STDOUT");
if (($pid = open(BSUB, '|-'))) {
print BSUB $main_job;
close BSUB;
if ($? != 0) {
die("bsub exited with non-zero status - job not submitted\n");
}
} else {
if (defined($pid)) {
exec(@main_bsub);
die("Could not exec bsub : $!\n");
} else {
die("Could not fork : $!\n");
}
}
exit(0);
}
};
if ($@) {
warn("Job submission failed:\n$@\n");
return 0;
}
else{
my $command = "$exonerate_path $query $target --querychunkid\$ LSB_JOBINDEX --querychunktotal $num_jobs --showvulgar false --showalignment FALSE --ryo ".
'"xref:%qi:%ti:%ei:%ql:%tl:%qab:%qae:%tab:%tae:%C:%s\n"'." $options_str | grep ".'"'."^xref".'"'." > $root_dir/$output";
my $sth = $mapper->xref->dbc->prepare("insert into process_status (status, date) values('mapping_submitted',now())");
$sth->execute();
$sth->finish;
my $insert = "insert into mapping (job_id, type, command_line, percent_query_cutoff, percent_target_cutoff, method, array_size) values($jobid, '$ensembl_type', '$command',".
$self->query_identity_threshold.", ".$self->target_identity_threshold.", '".$self->get_class_name()."', $num_jobs)";
$sth = $mapper->xref->dbc->prepare($insert);
$sth->execute;
$sth->finish;
$sth = $mapper->xref->dbc->prepare("insert into mapping_jobs (root_dir, map_file, status, out_file, err_file, array_number, job_id) values (?,?,?,?,?,?,?)");
for( my $i=1; $i<=$num_jobs; $i++){
my $map_file = $self->get_class_name()."_".$ensembl_type."_".$i.".map";
my $out_file = "xref_0_".$ensembl_type.".".$jobid."-".$i.".out";
my $err_file = "xref_0_".$ensembl_type.".".$jobid."-".$i.".err";
$sth->execute($root_dir, $map_file, 'SUBMITTED', $out_file, $err_file, $i, $jobid);
}
$sth->finish;
}
return $unique_name; } |