Raw content of BioMart::QueryRunner
#
# BioMart module for BioMart::QueryRunner
#
# You may distribute this module under the same terms as perl itself
# POD documentation - main docs before the code
=head1 NAME
BioMart::QueryRunner
=head1 SYNOPSIS
The BioMart::QueryRunner contains a simple Query Planner to
run a BioMart::Query query against one or more BioMart::DatasetI
objects.
=head1 DESCRIPTION
The BioMart::QueryRunner object contains a simple Query Planner
to run a BioMart::Query query (or count) against one or more
BioMart::DatasetI implementing objects. It uses the following Recursive
algorithm:
Given a BioMart::Query object involving one or more BioMart::DatasetI
implementing objects, and the directional Links information for multi
Dataset queries:
if datasets.length < 2 :
process the query and return the ResultTable
else :
dataset = shift dataset;
if dataset can not import from another dataset by design
or all datasets exporting to this dataset have been
processed:
create subquery involving this Datasets filters, and the
Attributes in the Exportable process the subuery and add the
ResultTable as an Importable to the main Query
update this dataset as 'processed'
else :
pop dataset onto end of list
recurse
=head1 AUTHOR - Arek Kasprzyk, Syed Haider, Darin London, Damian Smedley
=head1 CONTACT
This module is part of the BioMart project http://www.biomart.org
Questions can be posted to the mart-dev mailing list:
mart-dev@ebi.ac.uk
=head1 METHODS
=cut
package BioMart::QueryRunner;
use strict;
use warnings;
use Digest::MD5;
use Digest::SHA qw (sha256_base64);
use Log::Log4perl;
my $logger=Log::Log4perl->get_logger(__PACKAGE__);
use BioMart::Configuration::AttributeList;
use base qw(BioMart::Root);
=head2 new
Usage : my $query_runner = BioMart::QueryRunner->new();
$query_runner->execute($query);
$query_runner->printResults();
Description: Creates a BioMart::QueryRunner object.
Can then be used to execute a BioMart::Query object
and print the results in the format specified on
the Query object.
Can be reused to execute multiple queries
Return type: A BioMart::QueryRunner object.
Exceptions :
Caller : Any BioMart clients building up a mart query
=cut
sub _new {
my $self = shift;
#state variables
$self->attr('query', undef);
$self->attr('registry', undef);
$self->attr('formatter', undef);
$self->attr('count', undef);
$self->attr('final_dataset_order', undef);
$self->attr('batched_already', undef);
$self->attr('processed_datasets', undef);
$self->attr('datasets_encountered', undef);
$self->attr('last_visible_exportable', undef);
$self->attr('union_tables', undef);
$self->attr('visibleDSCount', undef);
$self->attr('uniqueResultsFlag', undef);
}
=head2 execute
Usage :
$query_runner->execute($query);
$query_runner->printResults();
Description: Executes a BioMart::Query object leaving the QueryRunner ready
to print the results in the format specified on
the Query object.
Return type: none
Exceptions : BioMart::Exception::Query if there is not a possible path
through the datasets in the query
Caller : Any BioMart clients executing a mart query
=cut
sub execute {
my ($self, $query) = @_;
my $registry = $query->getRegistry;
my $GMA_present = 0 ;
my $visibleDSCount = 0;
if (scalar (@{$query->getDatasetNames}) > 0)
{
foreach my $ds (@{$query->getDatasetNames})
{
# GenomicMAlign check, so Bens stuff would still keep working
# All the sequence requests via GenomiMAlign cant work because of new hashing logic which requires
# a unique when results are being hashed. For Structure/GenomicSeq, the first attribute in the filterlist
# of importable and exportables should be unique. In ComparaMarts, neither they have a unique column
# in Mart nor they have filterList of exportable/importable accordingly.
# for them merging/hashing should follow the old principle of concatenation of all the coordinates
# to make a unique key, which may result in collisions.
$GMA_present = 1 if ($registry->getDatasetByName($query->virtualSchema, $ds)->isa("BioMart::Dataset::GenomicMAlign")) ;
# counting the visible DSs to use this number to avoid the merging of sequences as
# semi-colon separated in case of two dataset queries both involving Genomic Sequence
$visibleDSCount++ if ($registry->getDatasetByName($query->virtualSchema, $ds)->visible);
}
$self->set('visibleDSCount', $visibleDSCount);
}
if ($GMA_present) ## flag to follow the old hashing/merging
{
foreach my $ds (@{$query->getDatasetNames})
{
$registry->getDatasetByName($query->virtualSchema, $ds)->GenomicMAlignHack(1);
}
}
if (defined $query->getAllAttributes()){
foreach my $att (@{$query->getAllAttributes()}){
$logger->warn("ATTRIBUTE: ", $att->dataSetName,"\t",$att->name,"\t",$att->table);
}
}
else
{
$logger->warn("NO ATTRIBUTES");
}
if (defined $query->getAllFilters()){
foreach my $filt (@{$query->getAllFilters()}){
$logger->warn("FILTER TABLE: ", $filt->dataSetName,"\t",$filt->name,"\t",$filt->table);
}
}
else
{
$logger->warn("NO FILTERS");
}
$self->executionPlan($query);
my $formatterName = $query->formatter() || 'TSV';
my $module = "BioMart::Formatter::$formatterName";
$self->loadModule($module);
my $formatter = $module->new();
$query = $formatter->processQuery($query);
$self->executionPlan($query);# call again as formatter processQuery can
# change plan
my $rtable = $self->_getResultTable($query);
if ($query->count){
$self->set('count',$rtable);
}
else{
$formatter->resultTable($rtable);
$self->set('formatter',$formatter);
}
}
=head2 getCount
Usage : $query_runner->getCount;
Description: Returns the counts for BioMart::Query previously
executed by the QueryRunner (see execute method above)
Return type: scalar $count
Exceptions :
Caller : Any BioMart clients executing up a mart query
=cut
sub getCount {
my $self = shift;
return $self->get('count');
}
=head2 printResults
Usage : $query_runner->printResults;
Description: prints formatted results for BioMart::Query previously
executed by the QueryRunner (see execute method above).
Results will be printed in a batched manner
Return type:
Exceptions :
Caller : Any BioMart clients executing up a mart query
=cut
sub printResults {
my ($self, $filehandle, $lines) = @_;
$filehandle ||= \*STDOUT; # in case no fhandle is provided
my $formatter = $self->get('formatter');
if ($formatter->isa("BioMart::Formatter::XLS")) {
$formatter->printResults($filehandle,$lines, $self->uniqueRowsOnly());
}
else
{
my $counter;
my %collisions;
no warnings 'uninitialized';
while (my $row = $formatter->nextRow)
{
next if ($row eq "\n");
# send unique results only if its set on QueryRunner Object
if ($self->uniqueRowsOnly()) {
my $hash = sha256_base64($row);
next if exists $collisions{$hash};
$collisions{$hash} = undef;
}
$counter++;
last if ($lines && $counter > $lines);
print $filehandle $row;
}
}
}
=head2 printHeader
Usage : $query_runner->printHeader;
Description: Prints a correctly formatted header (typically column
display names) using a call to the getDisplayNames method
of the Formatter defined in the original Query object.
Return type:
Exceptions :
Caller : Any BioMart clients executing up a mart query
=cut
sub printHeader {
my ($self, $filehandle) = @_;
my $formatter = $self->get('formatter');
$filehandle ||= \*STDOUT; # in case no fhandle is provided
my $text = $formatter->getDisplayNames;
if ($text) { print $filehandle $text };
}
=head2 printFooter
Usage : $query_runner->printFooter;
Description: Prints a correctly formatted footer using a call to the
getFooterText method of the Formatter defined in the
original Query object.
Return type:
Exceptions :
Caller : Any BioMart clients executing up a mart query
=cut
sub printFooter {
my ($self, $filehandle) = @_;
my $formatter = $self->get('formatter');
$filehandle ||= \*STDOUT; # in case no fhandle is provided
my $text = $formatter->getFooterText;
if ($text) { print $filehandle $text };
}
=head2 printCompletionStamp
Usage : $query_runner->printCompletionStamp;
Description: Prints a CompletionStamp [success]
Return type: [success]
Exceptions :
Caller : Any BioMart clients executing up a mart query
=cut
sub printCompletionStamp {
my ($self, $filehandle) = @_;
$filehandle ||= \*STDOUT; # in case no fhandle is provided
print $filehandle "[success]\n";
}
=head2 _getResultTable
Usage : usage
Description: Uses a recursive algorithm to
process all primary and intermediate queries
into exportables with the correct batching
logic, and returns the ResultTable from
the terminal dataset. If query->count is
defined, then the a count of the objects
in the focus_dataset given all other filters
is returned instead of the BioMart::ResultTable.
Note, for queries involving visible, upstream
datasets, the count is not run for efficiency
reasons, and an exception is thrown.
Returntype : BioMart::ResultTable object, or scalar $count
Exceptions : BioMart::Exception::Query if try a count on a
query with more than one visible dataset
Caller : caller
=cut
sub _getResultTable {
my ($self,$query) = @_;
if (!defined($query->limitSize)){
# ie do not validate webservices originating subqueries as they are
# not user generated and will not have usage errors
# also likely to have attributes from a link_attribute page and
# normal user page and hence throw a usage exception
$query->validate();
}
my $registry = $query->getRegistry;
$self->set('query', $query);
$self->set('registry', $registry);
#reset recursion state
$self->set('union_tables', {});
$self->set('batched_already', undef);
if ($query->count){
# test if two visible datasets involved, in which case no count
my $allDsets = $query->getDatasetNames;
my $visibleDatasetCounter = 0;
foreach my $dset (@{$allDsets}) {
$visibleDatasetCounter++ if ($registry->getDatasetByName
($query->virtualSchema, $dset)->visible);
BioMart::Exception::Usage->throw("count unavailable for this query\n") if ($visibleDatasetCounter == 2);
}
}
my $datasetsToProcess = [@{$self->get('final_dataset_order')}];
my $results = $self->_processPath($datasetsToProcess);
return defined($results) ? $results:undef;
}
sub _processPath {
my ($self, $datasetsToProcess) = @_;
my $query = $self->get('query');
my %union_tables = %{$self->get('union_tables')};
my $last_visible_exportable = $self->get('last_visible_exportable');
# $datasetsToProcess length > 1
unless (scalar(@{$datasetsToProcess}) > 1) {
# base case, process the query and return the ResulTable or count
my $dset = shift @{$datasetsToProcess};
my $datasetToProcess = $self->get('registry')->
getDatasetByName($query->virtualSchema, $dset);
return if (!$datasetToProcess);
#determine batching logic
my $virtualSchemaNameForQuery = $query->virtualSchema;
if ($datasetToProcess->serverType eq "web"){
my $location = $datasetToProcess->getParam('configurator')
->get('location');
$virtualSchemaNameForQuery = $location->serverVirtualSchema;
}
my $subquery =
BioMart::Query->new('registry' => $self->get('registry'),
'virtualSchemaName'=>$virtualSchemaNameForQuery);
# don't use addAttributes and _addAttribute method as
# these automatically link placeholder atts and this will
# produce a 2 dataset subquery which messes up batching and
# web services mode - same problem for filters below
#$subquery->addAttributes($query->getAllAttributes($dset))
# if ($query->getAllAttributes($dset));
my $subquery_atts = $query->getAllAttributes($dset);
if ($subquery_atts){
foreach my $subquery_att(@{$subquery_atts}){
$subquery->addAttributeWithoutLinking($subquery_att);
$logger->debug("Added attribute $subquery_att to bottom dataset ".$datasetToProcess->name);
}
}
#$subquery->addFilters($query->getAllFilters($dset))
# if ($query->getAllFilters($dset));
my $subquery_filts = $query->getAllFilters($dset);
if ($subquery_filts){
foreach my $subquery_filter(@{$subquery_filts}){
$subquery->addFilterWithoutLinking($subquery_filter);
$logger->debug("Added filter $subquery_filter to bottom dataset ".$datasetToProcess->name);
}
}
$subquery->orderBy($query->orderBy())
if ($query->orderBy());
$logger->debug("Added orderBy ".$query->orderBy." to bottom dataset ".$datasetToProcess->name)
if ($query->orderBy());
# add dataset name incase no atts/filts eg start page count
$subquery->addDatasetName($dset,
$query->getInterfaceForDataset($dset));
$logger->debug("Added dataset $dset interface ".$query->getInterfaceForDataset($dset)." to bottom dataset ".$datasetToProcess->name);
my %params = ('query' => $subquery);
if ($query->count) {
return $datasetToProcess->getCount(%params);
}
else {
if (defined($query->limitSize)){
# martservices originating query. Therefore want to just
# get a single batch of results back from the
# dataset->getResultTable call corresponding to the passed
# in values set on the Query object for limitStart and
# limitSize
$params{'batch_size'} = $query->limitSize;
$params{'batch_start'} = $query->limitStart;
$params{'web_origin'} = 1;
$self->set('batched_already', 1);
}
else{
unless ($self->get('batched_already')) {
$self->set('batched_already', 1);
$params{'batch_size'} =
$datasetToProcess->initialBatchSize;
$params{'batch_start'} = 0;
}
}
# call for a single dataset query
$logger->debug("Bottom dataset ".$datasetToProcess->name." query params are: ".keys(%{$params{'query'}}));
## to see if its GS and its the last one, so the expected results would be
## in FASTA format, and should be comma separated
$datasetToProcess->lastDS(1) if ($self->get('visibleDSCount') == 1);
$datasetToProcess->lastDS(2) if ($self->get('visibleDSCount') > 1);
my $rtable = $datasetToProcess->getResultTable(%params);
$logger->debug("Bottom dataset ".$datasetToProcess->name." gave ".scalar(@{$rtable->get('columns')}));
# perform union if appropiate entry exists in union_tables hash
if ($union_tables{$datasetToProcess->name}){
my $tableToAdd = $union_tables{$datasetToProcess->name};
if ($tableToAdd->getNumFields == $rtable->getNumFields){
$rtable->addRows($tableToAdd->getRows);
$union_tables{$datasetToProcess->name} = undef;
$self->set('union_tables',\%union_tables);
}
}
return $rtable;
}
}
# more than one dataset - this one is an exportable
my ($links, $targetDataset, $exportable, $invisible_exportable);
my $dset = shift @{$datasetsToProcess};
my $datasetToProcess = $self->get('registry')->
getDatasetByName($query->virtualSchema, $dset);
my @current_visible_links;
my @current_invisible_links;
foreach my $odset (@{$datasetsToProcess}) {
my $li = $query->getLinks($dset, $odset);
if ($li) {
$links = $li;
$targetDataset = $self->get('registry')->
getDatasetByName($query->virtualSchema, $odset);
if ($links->operation eq 'join'){
$exportable = $targetDataset->
getImportables($links->defaultLink,
$query->getInterfaceForDataset($odset))->new;
if ($targetDataset->visible){
push @current_visible_links,$links;
$self->set('last_visible_exportable',$exportable);
}
else{
$invisible_exportable = $targetDataset->
getImportables($links->defaultLink,
$query->getInterfaceForDataset($odset))->new;
push @current_invisible_links,$links;
}
}
}
}
$exportable = $invisible_exportable if ($invisible_exportable);
if (!$exportable){
# in the situation where first dataset placeholder dataset needs to
# export the data to the second v dataset need to store the original
# visible exportable from mouse to human and use that
$exportable = $last_visible_exportable;
$exportable->batching(1);# propogate batching through
my $exportable_size = @{$exportable->getAllFilters};
$datasetToProcess->forceHash($exportable_size);# going to have to export data from 1st dataset placeholder to second visible dataset
}
#create subquery
my $virtualSchemaNameForQuery = $query->virtualSchema;
if ($datasetToProcess->serverType eq "web"){
my $location = $datasetToProcess->
getParam('configurator')->get('location');
$virtualSchemaNameForQuery = $location->serverVirtualSchema;
}
my $subquery = BioMart::Query->new('registry' => $self->get('registry'),
'virtualSchemaName'=>$virtualSchemaNameForQuery);
# don't use addAttributes and _addAttribute method as
# these automatically link placeholder atts and this will
# produce a 2 dataset subquery which messes up batching and
# web services mode - same problem for filters below
#$subquery->addAttributes($query->getAllAttributes($dset))
# if ($query->getAllAttributes($dset));
my $subquery_atts = $query->getAllAttributes($dset);
if ($subquery_atts){
foreach my $subquery_att(@{$subquery_atts}){
$subquery->addAttributeWithoutLinking($subquery_att);
}
}
#$subquery->addFilters($query->getAllFilters($dset))
# if ($query->getAllFilters($dset));
my $subquery_filts = $query->getAllFilters($dset);
if ($subquery_filts){
foreach my $subquery_filter(@{$subquery_filts}){
$subquery->addFilterWithoutLinking($subquery_filter);
}
}
# incase on start page count
$subquery->addDatasetName($dset,$query->getInterfaceForDataset($dset));
foreach my $links(@current_invisible_links){
if ($links && $links->operation eq 'join'){
$subquery->addAttributeList($datasetToProcess->
getExportables($links->defaultLink,
$query->getInterfaceForDataset($dset)))
}
#determine batching logic
if ($links->operation eq 'union'){
$self->set('batched_already',1);# turn off batching for unions
}
}
foreach my $links(@current_visible_links){
if ($links && $links->operation eq 'join'){
my $att_list = $datasetToProcess->
getExportables($links->defaultLink,
$query->getInterfaceForDataset($dset));
if (@current_invisible_links){
$subquery->addAttributes($att_list->getAllAttributes);# works for placeholder 1st dset atts
}
else{
$subquery->addAttributeList($att_list);# works for non-placeholder scenario
}
}
#determine batching logic
if ($links->operation eq 'union'){
$self->set('batched_already',1);# turn off batching for unions
}
}
my %params = ('query' => $subquery);
if ($datasetToProcess->visible) {
#propogate batching through all visible datasets
$exportable->batching(1) if ($exportable);
if ($query->limitSize && $query->limitSize > 0){
# martservices originating query. Therefore want to just get a
# single batch of results back from the dataset->getResultTable
# call corresponding to the passed in values set on the Query
# object for limitStart and limitSize
$params{'batch_size'} = $query->limitSize;
$params{'batch_start'} = $query->limitStart;
$params{'web_origin'} = 1;
$self->set('batched_already', 1);
}
else{
unless ($self->get('batched_already')) {
$self->set('batched_already', 1);
$params{'batch_size'} = $datasetToProcess->initialBatchSize;
$params{'batch_start'} = 0;
}
}
}
else {
# intermediate invisible datasets must also propogate batches
$exportable->batching(1) if ($exportable
&& $self->get('batched_already')
&& $datasetToProcess->exportableFrom
&& $datasetToProcess->importableTo);
}
# execute and add exportable to Query
$datasetToProcess->lastDS(0); ## thats not the lastDS as the last one gets called from previous block
my $tempTable = $datasetToProcess->getResultTable(%params);
$logger->debug("Non-bottom dataset ".$datasetToProcess->name." gave ".scalar(@{$tempTable->get('columns')}));
# perform union if appropiate entry exists in union_tables hash
if ($union_tables{$datasetToProcess->name}){
my $tableToAdd = $union_tables{$datasetToProcess->name};
if ($tableToAdd->getNumFields == $tempTable->getNumFields){
$tempTable->addRows($tableToAdd->getRows);
$union_tables{$datasetToProcess->name} = undef;
$self->set('union_tables',\%union_tables);
}
}
if ( $exportable){# JOIN
if ($tempTable != -1){
$exportable->setTable($tempTable);
$query->_addFilter($exportable);
}
}
else{# UNION
$union_tables{$targetDataset->name} = $tempTable;
$self->set('union_tables',\%union_tables);
}
$self->set('query', $query);
return $self->_processPath($datasetsToProcess) || undef;
}
sub executionPlan{
my ($self,$query) = @_;
$query->finalProcess();# needed before every executionPlan
# as creates links if not there
# ? why below needed rather than just $query->getRegistry;
# if not there error checking should be in Query as all Query objects
# should have a registry?
my $registry;
use Carp;
eval{ $registry = $query->getRegistry; };
if($@) { confess(); }
$self->set('query', $query);
$self->set('registry', $registry);
#reset recursion state
$self->set('processed_datasets', {});
$self->set('datasets_encountered', {});
$self->set('final_dataset_order', []);
if (scalar (@{$query->getDatasetNames}) > 0)
{
$self->_executionPlan($query->getDatasetNames) || undef;
}
else
{
BioMart::Exception::Usage->throw('Query Runner - Problematic Query: No dataset names in the Query');
}
}
sub _executionPlan {
my ($self, $datasetsToProcess) = @_;
my $query = $self->get('query');
my $processed_datasets = $self->get('processed_datasets');
my $datasets_encountered = $self->get('datasets_encountered');
my $final_dataset_order = $self->get('final_dataset_order');
if (scalar(@{$datasetsToProcess}) == 1) {
my $dset = shift @{$datasetsToProcess};
push @{$final_dataset_order},$dset;
$query->finalDatasetOrder($final_dataset_order);
}
else { # more than one dataset - this one is an exportable
my @targetDatasets;
my $dset = shift @{$datasetsToProcess};
if ( $datasets_encountered->{$dset}
&& ( scalar(@{$datasetsToProcess}) ==
$datasets_encountered->{$dset} ) ) {
# degenerate base case, this is designed to prevent infinite recursion
BioMart::Exception::Query->throw("Problematic Query, unable to determine finite path through Datasets\n");
}
$datasets_encountered->{$dset} = scalar(@{$datasetsToProcess});
my $datasetToProcess = $self->get('registry')->
getDatasetByName($query->virtualSchema, $dset);
ODSET: foreach my $odset (@{$datasetsToProcess}) {
if ($query->getLinks($odset, $dset) &&
!( $processed_datasets->{$odset} )) {
push @{$datasetsToProcess}, $dset;
$datasetToProcess = 0;
# essentially go to end of method and recursively call again
# with the new dataset order
last ODSET;
}
# if here, determine if this is the targetDataset query guarantees
# that for any dataset, there will only be one target dataset thus,
# only one combination of (dset, odset) where $query->getLinks
# returns a defined value
if ($query->getLinks($dset, $odset)) {
my $targetDataset = $self->get('registry')->
getDatasetByName($query->virtualSchema, $odset);
push @targetDatasets, $targetDataset;
}
}
if ($datasetToProcess) {
push @{$final_dataset_order},$dset;
$processed_datasets->{$dset} = 1;
if (@targetDatasets > 1){
foreach my $targetDataset(@targetDatasets){
if (!$targetDataset->visible){
my $target_name = $targetDataset->name;
# this deals with mouse seq when mouse is the first
# visible dataset in a query for example
push @{$final_dataset_order},$target_name;
$processed_datasets->{$target_name} = 1;
my @new_datasetsToProcess;
foreach my $odset (@{$datasetsToProcess}){
next if ($odset eq $target_name);
if ($query->getLinks($target_name, $odset)){
push @{$final_dataset_order},$odset;
}
else{
push @new_datasetsToProcess, $odset;
}
}
@{$datasetsToProcess} = @new_datasetsToProcess;
}
}
}
}
# at this point, either datasetsToProcess is one dataset shorter,
# after processing a dataset or the order of datasets has changed
# set state and recurse
$self->set('processed_datasets', $processed_datasets);
$self->set('datasets_encountered', $datasets_encountered);
$self->_executionPlan($datasetsToProcess) || undef;
}
}
sub uniqueRowsOnly
{
my ($self, $val) = @_;
if($val) {
$self->set('uniqueResultsFlag', 1);
}
return $self->get('uniqueResultsFlag');
}
1;