Raw content of XrefMapper::SubmitMapper
package XrefMapper::SubmitMapper;
use vars '@ISA';
@ISA = qw{ XrefMapper::BasicMapper };
use strict;
use warnings;
use XrefMapper::BasicMapper;
use Cwd;
use DBI;
use File::Basename;
use IPC::Open3;
##################################################################
# JOB 1 Do exonerate jobs and get core info
##################################################################
# Also load database with connection details of the core database for use later in JOBS
# One get all info from core that is needed. :- stable_id's etc.
# Process the direct xrefs by putting them in the object_xref table
# dump the fasta files
# submit exonerate jobs (fill tables mapping, mapping_jobs) if not already done
# submit coordinate xrefs if not processed
sub new {
my($class, $mapper) = @_;
my $self ={};
bless $self,$class;
$self->core($mapper->core);
$self->xref($mapper->xref);
$self->mapper($mapper);
$self->verbose($mapper->verbose);
return $self;
}
sub mapper{
my ($self, $arg) = @_;
(defined $arg) &&
($self->{_mapper} = $arg );
return $self->{_mapper};
}
sub store_core_database_details{
my ($self, $port, $user, $pass, $dbname, $dir);
}
##############################################################################
# dump fasta files code
##############################################################################
=head2 dump_seqs
Arg[1]: xref object which holds info needed for the dump of xref
Description: Dumps out the files for the mapping. Xref object should hold
the value of the databases and source to be used.
Returntype : none
Exceptions : will die if species not known or an error occurs while
: trying to write to files.
Caller : general
=cut
sub dump_seqs{
my ($self, $location) = @_;
$self->core->dbc->disconnect_if_idle(1);
$self->core->dbc->disconnect_when_inactive(1);
$self->dump_xref();
$self->core->dbc->disconnect_if_idle(0);
$self->core->dbc->disconnect_when_inactive(0);
$self->xref->dbc->disconnect_when_inactive(1);
$self->xref->dbc->disconnect_if_idle(1);
$self->dump_ensembl($location);
$self->xref->dbc->disconnect_if_idle(0);
$self->xref->dbc->disconnect_when_inactive(0);
}
sub no_dump_xref {
my ($self) = @_;
my @method=();
my @lists =@{$self->get_set_lists()};
my $i=0;
my $k = 0;
foreach my $list (@lists){
$method[$k++] = shift @$list;
}
$self->method(\@method);
$self->core->dna_file($self->core->dir."/".$self->core->species."_dna.fasta");
$self->core->protein_file($self->core->dir."/".$self->core->species."_protein.fasta");
}
=head2 dump_xref
Arg[1]: xref object which holds info on method and files.
Description: Dumps the Xref data as fasta file(s)
Returntype : none
Exceptions : none
Caller : dump_seqs
=cut
sub dump_xref{
my ($self) = @_;
my $xref =$self->xref();
if(!defined($xref->dir())){
if(defined($self->dir)){
$xref->species($self->dir);
$self->species_id($self->get_id_from_species_name($self->species));
}
else{
$xref->dir(".");
}
}
my @method=();
my @lists =@{$self->get_set_lists()};
my $k = 0;
foreach my $list (@lists){
$method[$k++] = shift @$list;
}
$self->method(\@method);
my $i=0;
if(defined($self->mapper->dumpcheck())){
my $skip = 1;
foreach my $list (@lists){
if(!-e $xref->dir()."/xref_".$i."_dna.fasta"){
$skip = 0;
}
if(!-e $xref->dir()."/xref_".$i."_peptide.fasta"){
$skip = 0;
}
$i++;
}
if($skip){
print "Xref fasta files found and will be used (No new dumping)\n" if($self->verbose);
return;
}
}
print "Dumping Xref fasta files\n" if($self->verbose());
for my $sequence_type ('dna', 'peptide') {
my $filename = $xref->dir() . "/xref_0_" . $sequence_type . ".fasta";
open(XREF_DUMP,">$filename") || die "Could not open $filename";
my $sql = "SELECT p.xref_id, p.sequence, x.species_id , x.source_id ";
$sql .= " FROM primary_xref p, xref x ";
$sql .= " WHERE p.xref_id = x.xref_id AND ";
$sql .= " p.sequence_type ='$sequence_type' ";
my $sth = $xref->dbc->prepare($sql);
$sth->execute();
while(my @row = $sth->fetchrow_array()){
$row[1] =~ s/(.{60})/$1\n/g;
print XREF_DUMP ">".$row[0]."\n".$row[1]."\n";
}
close(XREF_DUMP);
$sth->finish();
}
my $sth = $xref->dbc->prepare("insert into process_status (status, date) values('xref_fasta_dumped',now())");
$sth->execute();
$sth->finish;
return;
}
=head2 dump_ensembl
Description: Dumps the ensembl data to a file in fasta format.
Returntype : none
Exceptions : none
Caller : dump_seqs
=cut
sub dump_ensembl{
my ($self, $location) = @_;
$self->fetch_and_dump_seq($location);
}
=head2 fetch_and_dump_seq
Description: Dumps the ensembl data to a file in fasta format.
Returntype : none
Exceptions : wil die if the are errors in db connection or file creation.
Caller : dump_ensembl
=cut
sub fetch_and_dump_seq{
my ($self) = @_;
my $ensembl = $self->core;
my $db = new Bio::EnsEMBL::DBSQL::DBAdaptor(-dbconn => $ensembl->dbc);
#
# store ensembl dna file name and open it
#
if(!defined($ensembl->dir())){
$ensembl->dir(".");
}
$ensembl->dna_file($ensembl->dir."/".$ensembl->species."_dna.fasta");
#
# store ensembl protein file name and open it
#
$ensembl->protein_file($ensembl->dir."/".$ensembl->species."_protein.fasta");
if(defined($self->mapper->dumpcheck()) and -e $ensembl->protein_file() and -e $ensembl->dna_file()){
my $sth = $self->xref->dbc->prepare("insert into process_status (status, date) values('core_fasta_dumped',now())");
$sth->execute();
print "Ensembl Fasta files found (no new dumping)\n" if($self->verbose());
return;
}
print "Dumping Ensembl Fasta files\n" if($self->verbose());
open(DNA,">".$ensembl->dna_file())
|| die("Could not open dna file for writing: ".$ensembl->dna_file."\n");
open(PEP,">".$ensembl->protein_file())
|| die("Could not open protein file for writing: ".$ensembl->protein_file."\n");
my $gene_adaptor = $db->get_GeneAdaptor();
# fetch by location, or everything if not defined
my @genes;
my $constraint;
# TEST PURPOSES ONLY#################################################
#####################################################################
@genes = @{$gene_adaptor->fetch_all()};
# push @genes, $gene_adaptor->fetch_by_stable_id("ENSG00000139618");
#####################################################################
my $max = undef;
my $i =0;
my $rna = 0;
foreach my $gene (@genes){
next if $gene->biotype eq 'J_segment';
next if $gene->biotype eq 'D_segment';
foreach my $transcript (@{$gene->get_all_Transcripts()}) {
$i++;
my $seq = $transcript->spliced_seq();
$seq =~ s/(.{60})/$1\n/g;
print DNA ">" . $transcript->dbID() . "\n" .$seq."\n";
my $trans = $transcript->translation();
my $translation = $transcript->translate();
if(defined($translation)){
my $pep_seq = $translation->seq();
$pep_seq =~ s/(.{60})/$1\n/g;
print PEP ">".$trans->dbID()."\n".$pep_seq."\n";
}
}
last if(defined($max) and $i > $max);
}
close DNA;
close PEP;
my $sth = $self->xref->dbc->prepare("insert into process_status (status, date) values('core_fasta_dumped',now())");
$sth->execute();
$sth->finish;
}
=head2 get_set_lists
Description: specifies the list of databases and source to be used in the
: generation of one or more data sets.
Returntype : list of lists
Example : my @lists =@{$self->get_set_lists()};
Exceptions : none
Caller : dump_xref
=cut
sub get_set_lists{
my ($self) = @_;
return [["ExonerateGappedBest1", ["*","*"]]];
}
###################################################################################################
# exonerate subs
###################################################################################################
=head2 jobcount
Arg [1] : (optional)
Example : $mapper->jobcount(1004);
Description: Getter / Setter for number of jobs submitted.
Returntype : scalar
Exceptions : none
=cut
sub jobcount {
my ($self, $arg) = @_;
(defined $arg) &&
($self->{_jobcount} = $arg );
return $self->{_jobcount};
}
sub method{
my ($self, $arg) = @_;
(defined $arg) &&
($self->{_method} = $arg );
return $self->{_method};
}
=head2 build_list_and_map
Arg[1]: xref object which holds info on method and files.
Description: runs the mapping of the list of files with species methods
Returntype : none
Exceptions : none
Caller : general
=cut
sub build_list_and_map {
my ($self) = @_;
my @list=();
my $i = 0;
foreach my $method (@{$self->method()}){
my @dna=();
my $q_dna_file = $self->xref->dir."/xref_".$i."_dna.fasta";
if (-e $q_dna_file and -s $q_dna_file) {
push @dna, $method;
push @dna, $q_dna_file;
push @dna, $self->core->dna_file();
push @list, \@dna;
}
my @pep=();
my $q_pep_file = $self->xref->dir."/xref_".$i."_peptide.fasta";
if (-e $q_pep_file and -s $q_pep_file) {
push @pep, $method;
push @pep, $self->xref->dir."/xref_".$i."_peptide.fasta";
push @pep, $self->core->protein_file();
push @list, \@pep;
}
$i++;
}
$self->run_mapping(\@list);
}
=head2 run_mapping
Arg[1] : List of lists of (method, query, target)
Arg[2] :
Example : none
Description: Create and submit mapping jobs to LSF, and wait for them to finish.
Returntype : none
Exceptions : none
Caller : general
=cut
sub run_mapping {
my ($self, $lists) = @_;
# delete old output files in target directory if we're going to produce new ones
my $dir = $self->core->dir();
print "Deleting out, err and map files from output dir: $dir\n" if($self->verbose());
unlink (<$dir/*.map $dir/*.out $dir/*.err>);
$self->remove_all_old_output_files();
#disconnect so that we can then reconnect after the long mapping bit.
$self->core->dbc->disconnect_if_idle(1);
$self->xref->dbc->disconnect_if_idle(1);
$self->core->dbc->disconnect_when_inactive(1);
$self->xref->dbc->disconnect_when_inactive(1);
# foreach method, submit the appropriate job & keep track of the job name
# note we check if use_existing_mappings is set here, not earlier, as we
# still need to instantiate the method object in order to fill
# method_query_threshold and method_target_threshold
my @job_names;
my @running_methods;
foreach my $list (@$lists){
my ($method, $queryfile ,$targetfile) = @$list;
my $obj_name = "XrefMapper::Methods::$method";
# check that the appropriate object exists
eval "require $obj_name";
if($@) {
warn("Could not find object $obj_name corresponding to mapping method $method, skipping\n$@");
} else {
my $obj = $obj_name->new();
my $job_name = $obj->run($queryfile, $targetfile, $self);
push @job_names, $job_name;
push @running_methods, $obj;
sleep 1; # make sure unique names really are unique
$self->jobcount(($self->jobcount||0)+$obj->jobcount);
}
} # foreach method
# submit depend job to wait for all mapping jobs
foreach my $method( @running_methods ){
# Submit all method-specific depend jobs
if( $method->can('submit_depend_job') ){
$method->submit_depend_job;
}
}
# Submit generic depend job. Defaults to LSF
$self->submit_depend_job($self->core->dir, @job_names);
$self->core->dbc->disconnect_if_idle(0);
$self->xref->dbc->disconnect_if_idle(0);
$self->core->dbc->disconnect_when_inactive(0);
$self->xref->dbc->disconnect_when_inactive(0);
$self->check_err($self->core->dir);
} # run_mapping
sub nofarm{
my ($self, $arg) = @_;
(defined $arg) &&
($self->{_nofarm} = $arg );
return $self->{_nofarm};
}
sub check_err {
my ($self, $dir) = @_;
foreach my $err (glob("$dir/*.err")) {
print STDERR "\n\n*** Warning: $err has non-zero size; may indicate".
" problems with exonerate run\n\n\n" if (-s $err);
}
}
=head2 submit_depend_job
Arg[1] : List of job names.
Arg[2] :
Example : none
Description: Submit an LSF job that waits for other jobs to finish.
Returntype : none
Exceptions : none
Caller : general
=cut
sub submit_depend_job {
my ($self, $root_dir, @job_names) = @_;
if(defined($self->nofarm)){
return;
}
# Submit a job that does nothing but wait on the main jobs to
# finish. This job is submitted interactively so the exec does not
# return until everything is finished.
# build up the bsub command; first part
my @depend_bsub = ('bsub', '-K');
# build -w 'ended(job1) && ended(job2)' clause
my $ended_str = "-w ";
my $i = 0;
foreach my $job (@job_names) {
$ended_str .= "ended($job)";
$ended_str .= " && " if ($i < $#job_names);
$i++;
}
push @depend_bsub, $ended_str;
# rest of command
push @depend_bsub, ('-q', 'small', '-o', "$root_dir/depend.out", '-e', "$root_dir/depend.err");
my $jobid = 0;
eval {
my $pid;
my $reader;
local *BSUB;
local *BSUB_READER;
if ( ( $reader = open( BSUB_READER, '-|' ) ) ) {
while () {
if (/^Job <(\d+)> is submitted/) {
$jobid = $1;
print "LSF job ID for depend job: $jobid\n" if($self->verbose);
}
}
close(BSUB_READER);
} else {
die("Could not fork : $!\n") unless ( defined($reader) );
open( STDERR, ">&STDOUT" );
if ( ( $pid = open( BSUB, '|-' ) ) ) {
print BSUB "/bin/true\n";
close BSUB;
if ( $? != 0 ) {
die( "bsub exited with non-zero status ($?) "
. "- job not submitted\n" );
}
} else {
if ( defined($pid) ) {
exec(@depend_bsub);
die("Could not exec bsub : $!\n");
} else {
die("Could not fork : $!\n");
}
}
exit(0);
}
};
if ($@) {
# Something went wrong
warn("Job submission failed:\n$@\n");
}
else{
my $sth = $self->xref->dbc->prepare("insert into process_status (status, date) values('mapping_finished',now())");
$sth->execute();
$sth->finish;
}
}
sub remove_all_old_output_files{
my ($self) =@_;
my $dir = $self->core->dir();
print "Deleting txt and sql files from output dir: $dir\n" if($self->verbose);
unlink(<$dir/*.txt $dir/*.sql>);
# $self->cleanup_projections_file(); # now to be done when we load core.
}
1;