Raw content of BioMart::Dataset::TableSet
# $Id: TableSet.pm,v 1.8.2.1 2009-05-04 22:32:16 syed Exp $
#
# BioMart module for BioMart::Dataset::TableSet
#
# You may distribute this module under the same terms as perl itself
# POD documentation - main docs before the code
=head1 NAME
BioMart::Dataset::TableSet
=head1 SYNOPSIS
Synopsis here
=head1 DESCRIPTION
Description here
=head1 AUTHOR - Arek Kasprzyk, Syed Haider, Darin London, Damian Smedley
=head1 CONTACT
This module is part of the BioMart project http://www.biomart.org
Questions can be posted to the mart-dev mailing list:
mart-dev@ebi.ac.uk
=head1 METHODS
=cut
package BioMart::Dataset::TableSet;
# implements Dataset interface
use strict;
use warnings;
use Digest::MD5;
use IO::Handle;
use Log::Log4perl;
# temporary imports until XML configuration system is working
use BioMart::Configuration::ConfigurationTree;
use BioMart::Configuration::FilterTree;
use BioMart::Configuration::AttributeTree;
use BioMart::Configuration::FilterCollection;
use BioMart::Configuration::AttributeCollection;
use BioMart::Configuration::FilterGroup;
use BioMart::Configuration::AttributeGroup;
use BioMart::Configuration::BooleanFilter;
use BioMart::Configuration::ValueFilter;
use BioMart::Configuration::Attribute;
use base qw(BioMart::DatasetI);
use constant REPLACEBFILTER => '@@BATCHFILTER@@';
use constant REPLACELIMIT => '@@LIMITCLAUSE@@';
sub _new {
# called by configurator which passes itself in as a reference along with
# the dataSet internal name and displayName
my ($self, @param) = @_;
$self->SUPER::_new(@param);
$self->attr('mains',[]);
$self->attr('keys',[]);
$self->attr('batch_size', 0);
$self->attr('batched_filterlist', undef);
$self->attr('sql', undef);
$self->attr('batch_filterSQL', undef);
# for batching_filterlist queries, TableSet must
# maintain its own batch_starts relative to
# each list of items from the batching_filterlist
$self->attr('batch_rows_processed', 0);
}
# private methods
sub __processNewQuery {
my ($self, $query) = @_;
#ignores query
$self->set('batched_filterlist', undef);
$self->set('sql', undef);
$self->set('batch_filterSQL', undef);
$self->set('batch_rows_processed', 0);
}
# Interface Implementations
sub _getConfigurationTree {
my ($self,$interface,$dsCounter)=@_;
return $self->getParam('configurator')->getConfigurationTree(
$self->virtualSchema, $self->name, $interface,$dsCounter);
}
sub _getResultTable {
my ($self, @param) = @_;
local($^W) = 0; # prevent "odd number of elements" warning with -w.
my(%param) = @param;
my $table = $param{'table'};
my $query = $param{'query'};
my $batch_start = $param{'batch_start'} || 0;
my $batch_size = $param{'batch_size'};
my $rows_added = $self->_fillAttributeTableWith($query, $table,
$batch_start,$batch_size);
# System will run entire query in one call if they do not contain an
# importable, and are not explicitly called to batch with a batch_size
# parameter to getResultTable.
$self->_setExhausted(1) unless ($self->get('explicit_batching') ||
$self->get('batched_filterlist'));
if ($rows_added < 1) {
if ($self->get('explicit_batching')) {
# if explicit_batching, this is a SQL batching query, which
# must set exhausted and return undef if a SQL query ever
# results in no rows being added to the ResultTable.
$self->_setExhausted(1);
return undef;
}
else {
#this will cause next call to get the next batch of ids
$self->set('batch_filterSQL', undef);
}
}
elsif ($self->get('batched_filterlist')) {
my $batch_rows_processed = $self->get('batch_rows_processed');
$batch_rows_processed += $rows_added;
$self->set('batch_rows_processed', $batch_rows_processed);
}
return $table;
}
sub _fillAttributeTableWith {
my ($self, $query, $table, $batch_start,$batch_size) = @_;
# if this is a batched_filterlist, set batch_start to batch_rows_processed
# after sql_gen (which may reset this to zero at the beginning of each new
# batch)
if ($self->get('batched_filterlist')) {
$batch_start = $self->get('batch_rows_processed');
}
my $counter = 0;
my $rows_added = 0;
if ($self->serverType eq "web"){
# below is essential for linked dataset batching
my $filters = $query->getAllFilters;
foreach my $filter (@$filters){
# recover the tables
if ($filter->isa("BioMart::Configuration::FilterList")){
if ($filter->batching) {
$self->set('batch_rows_processed', 0);
$batch_start = $self->get('batch_rows_processed');
# when exhausted is true, no more results remain
$self->_setExhausted($filter->exhausted);
# set batched_filterlist to keep up with changes to
# underlying table
$self->set('batched_filterlist', $filter);
}
else{
if (!$self->get('explicit_batching') &&
!$self->get('batched_filterlist')) {
$batch_start = 0;
$batch_size = 0;
}
}
}
}
if (!$self->get('explicit_batching') &&
!$self->get('batched_filterlist')) {
$batch_start = 0;
$batch_size = 0;
}
my $location = $self->getParam('configurator')->get('location');
my $xml = $query->toXML($batch_start,$batch_size,0);
my $logger=Log::Log4perl->get_logger(__PACKAGE__);
$logger->debug("QUERY XML: $xml");
foreach my $el($location->getResultSet("","POST",$xml)){
if ($el =~ /^\s/) {next;}
$rows_added++;
# add false end to stop loss of trailing tab-sep empty strings
# vital for attribute merging
$el .= "\tend";
my @clean=split(/\t/,$el);
$table->addRow([@clean[0..scalar(@clean)-2]]);
}
return $rows_added;
}
else {# "rdms" type rather than "web"
my $oracle = 0;
if ($self->getParam('configurator')->get('location')->
databaseType eq 'oracle'){
$oracle = 1;
}
my $sql= $self->_generateSQL($query, $batch_start, $batch_size,$oracle);
if ($oracle && $self->get('batched_filterlist')) {
$batch_start = $self->get('batch_rows_processed');
}
my $dbh = $self->_getDBH;
my $batch;
eval {
my $sth = $dbh->prepare($sql);
$sth->execute;
$batch = $sth->fetchall_arrayref;
$sth->finish;
};
BioMart::Exception::Database->throw("Error during query execution: ".
$dbh->errstr."\n") if $@;
$dbh->disconnect;
foreach my $row (@{$batch}){
$counter++;
if ($oracle){
if ( $counter > $batch_start ) {
$table->addRow($row);
$rows_added++;
}
} else{
$table->addRow($row);
$rows_added++;
}
}
}
return $rows_added;
}
sub _generateSQL {
my ($self, $query, $batch_start, $batch_size, $oracle) = @_;
my $sql = $self->get('sql');
unless ($sql) {
my ($select, $from, $where, $orderby, $comma, $and) = ('')x6;
my %tables;
my %joinTables;
my $schema;
# attributes = > SELECT clause generation
my $attributes = $query->getAllAttributes;
foreach my $attribute (@$attributes){
# postgres does not like mixing schemas and aliases
if($attribute->table ne "main"){
$schema=$self->schema.".";
}
else{
$schema="";
}
$select .= $comma.$schema.$attribute->toSQL;
my $table = $attribute->table;
$tables{$table} = 1;
if ($table eq 'main'){
my $keys = $self->get('keys');
foreach my $key (reverse @$keys){
last if (uc($joinTables{'main'}) eq uc($key));
if (uc($attribute->key) eq uc($key)){
$joinTables{'main'} = $key;
last;
}
}
}
else{ # dm table
$joinTables{$self->schema.".".$table} = $attribute->key;
}
$comma = ', ';
}
# filters (and filterlists) => WHERE clause generation
my $filters = $query->getAllFilters;
foreach my $filter (@$filters){
if ($filter->isa("BioMart::Configuration::FilterList")
|| $filter->isa("BioMart::Configuration::FilterList_List") ){
if ($filter->batching) {
$self->set('batched_filterlist', $filter);
#REPLACEBFILTER replaced later with actual SQL
$where .= $and.REPLACEBFILTER;
}
else {
$where .= $and.$filter->toSQL($oracle);
}
my $list_filters = $filter->getAllFilters;
foreach my $list_filter (@$list_filters){
my $table = $list_filter->table;
$tables{$table} = 1;
if ($table eq 'main'){
my $keys = $self->get('keys');
foreach my $key (reverse @$keys){
last if (uc($joinTables{'main'}) eq uc($key));
if (uc($list_filter->attribute->key) eq uc($key)){
$joinTables{'main'} = $key;
last;
}
}
}
else{ # dm table
$joinTables{$table} = $list_filter->attribute->key;
}
}
}
else{# non FilterList filter
$where .= $and.$filter->toSQL($oracle);
my $table = $filter->table;
$tables{$table} = 1;
if ($table eq 'main'){
my $keys = $self->get('keys');
foreach my $key (reverse @$keys){
last if (uc($joinTables{'main'}) eq uc($key));
if (uc($filter->attribute->key) eq uc($key)){
$joinTables{'main'} = $key;
last;
}
}
}
else{# dm table
$joinTables{$table} = $filter->attribute->key;
}
}
$and = ' AND ';
}# end of filters => WHERE clause generation
my ($main,$i);
# identify the lowest key and set main accordingly
if (%joinTables){
my $keys = $self->get('keys');
$i = scalar @$keys - 1;
OUTER:foreach my $key (reverse @$keys){
foreach my $join_table (keys %joinTables){
if (uc($joinTables{$join_table}) eq uc($key)){
last OUTER;
}
}
$i--;
}
}
else{
$i = 0;# for when no join tables
}
my $mains = $self->get('mains');
$main = $$mains[$i];
# AttributeList(s) = > SELECT clause additional generation
# has to be done last as the choice of correct attribute can depend on
# the key chosen above - "dynamic linking"
my $ct = $self->getConfigurationTree($query->
getInterfaceForDataset($self->name));
$comma = '';
my $subselect;
my $attribute_lists = $query->getAllAttributeLists;
foreach my $attribute_list (reverse @$attribute_lists){
# reverse makes sure the attlists used for attribute merging are
# put at the beginning
$subselect = '';
my $attributeString = $attribute_list->attributeString;
my @attributeNames = split(/,/,$attributeString);
foreach my $attributeName (@attributeNames){
# dynamically recover correct attribute to use based on the
# cardinality of the current query
my $keys = $self->get('keys');
my $attribute;
my $j = $i;#current key from above
while ($j > -1){
my $key = $$keys[$j];
$attribute = $ct->getAttributeByNameKey($attributeName,
$key);
last if ($attribute);
$j--;
}
if (!$attribute){
$j = $i + 1;
my $keys = $self->get('keys');
while ($j <= (scalar (@$keys - 1))){
my $key = $$keys[$j];
$attribute = $ct->getAttributeByNameKey($attributeName,
$key);
if ($attribute){
$main = $$mains[$j];
last;
}
$j++;
}
}
if (!$attribute){
$attribute = $ct->getAttributeByName($attributeName);
}
if (!$attribute){
# recover from the actual AttributeList
$attribute = $attribute_list->
getAttributeByName($attributeName);
}
$subselect .= $comma.$attribute->toSQL;
my $table = $attribute->table;
$tables{$table} = 1;
if ($table eq 'main'){
my $keys = $self->get('keys');
my $k = scalar @$keys - 1;
foreach my $key (reverse @$keys){
last if (uc($joinTables{'main'}) eq uc($key));
if (uc($attribute->key) eq uc($key)){
$joinTables{'main'} = $key;
# set main table to this lower one
$main = $$mains[$k];
last;
}
$k--;
}
}
else{ # dm table
$joinTables{$table} = $attribute->key;
}
$comma = ', ';
}# end of attribute name loop
# add the attribute list generated SQL to the query
if ($select){#some attributes already added
$select = $subselect.$comma.$select;
}
else{
$select = $subselect;
}
# order by code
$comma = '';
my $orderByString = $attribute_list->orderByString;
my @orderByAttNames;
@orderByAttNames = split( /\,/, $orderByString)
if ($orderByString);
foreach my $attributeName (@orderByAttNames){
# dynamically recover correct attribute to use based on the
# cardinality of the current query
my $keys = $self->get('keys');
my $attribute;
my $j = $i;#current key from above
while ($j > -1){
my $key = $$keys[$j];
$attribute = $ct->getAttributeByNameKey($attributeName,
$key);
last if ($attribute);
$j--;
}
if (!$attribute){
$j = $i + 1;
my $keys = $self->get('keys');
while ($j <= (scalar (@$keys - 1))){
my $key = $$keys[$j];
$attribute = $ct->getAttributeByNameKey($attributeName,
$key);
last if ($attribute);
$j++;
}
}
$orderby .= $comma.$attribute->toSQL;
$comma = ', ';
} # end of orderBy loop
$comma = '';
} # end of AttributeList(s) = > SELECT clause additional generation
if ($query->orderBy()) {
# Exportable orderBy instructions over ride Query->orderBy
unless ($orderby) {
my @orderByNames = map { $_->name } @{$query->orderBy()};
foreach my $attributeName (@orderByNames){
# dynamically recover correct attribute to use based on
# the cardinality of the current query
my $keys = $self->get('keys');
my $attribute;
my $j = $i;#current key from above
while ($j > -1){
my $key = $$keys[$j];
$attribute = $ct->getAttributeByNameKey($attributeName,
$key);
last if ($attribute);
$j--;
}
if (!$attribute){
$j = $i + 1;
my $keys = $self->get('keys');
while ($j <= (scalar (@$keys - 1))){
my $key = $$keys[$j];
$attribute = $ct->getAttributeByNameKey(
$attributeName,$key);
last if ($attribute);
$j++;
}
}
$orderby .= $comma.$attribute->toSQL;
$comma = ', ';
}
}
}
# redo main table choice incase attlist has changed it
if (%joinTables){
my $keys = $self->get('keys');
$i = scalar @$keys - 1;
OUTER:foreach my $key (reverse @$keys){
foreach my $join_table (keys %joinTables){
if (uc($joinTables{$join_table}) eq uc($key)){
last OUTER;
}
}
$i--;
}
}
else{
$i = 0;# for when no join tables
}
$mains = $self->get('mains');
$main = $$mains[$i];
# generate FROM clause
foreach my $table (keys %tables){
if ($table ne "main"){
$from .= $self->schema.".".$table.', ';
}
}
$from .= $self->schema.".".$main.' main';
# add table joins to WHERE clause
foreach my $join_table (keys %joinTables){
next if $join_table eq "main";
$where .= $and."main.".$joinTables{$join_table}."=".$join_table
.".".$joinTables{$join_table};
$and = ' AND ';
}
# generate the whole SQL statement
$sql = 'SELECT '.$select.' FROM '.$from;
if ($where){
$sql .= ' WHERE '.$where;
}
# restricted primary key access
my $restricted_pk = $self->getConfigurationTree($query->
getInterfaceForDataset($self->name))->primaryKeyRestriction;
if ($restricted_pk){
my $or = '';
my $key = ${$self->get('keys')}[0];
my $restrictedSQL = '(';
my @restrictions = split(/,/,$restricted_pk);
foreach(@restrictions){
my ($start_restriction,$end_restiction) = split(/\-/,$_);
$restrictedSQL .= $or.'main.'.$key.' BETWEEN '.
$start_restriction.' AND '.$end_restiction;
$or = ' OR ';
}
$restrictedSQL .= ')';
if ($where){
$sql .= ' AND '.$restrictedSQL;
}
else{
$sql .= ' WHERE '.$restrictedSQL;
$where = $restrictedSQL;
}
}# end of resticted pk
# add batching specific limits
if ($self->get('explicit_batching') ||
$self->get('batched_filterlist')) {
#SQL batches both batching filter and explicit batched queries
if ($oracle){
if ($orderby) {
$sql .= ' ORDER BY '.$orderby;
# Oracle rownum is calculated before the order by
# in order to get around this create a subselect from
# original ORDERED query and then get rownum based on that
my @origfields = split ", ",$select;
# remove duplicates in the select list
# duplicates in the select list cause problems with
# the outer select in the subselect construct below
my %saw;
# try without uniquifying
my @unique_qualified_fields =
grep (!$saw{$_}++, @origfields );
# try without uniquifying
my $uniqueselect = join ", ", @unique_qualified_fields;
$sql =~ s/$select/$uniqueselect/g;
# Outer select cannot have table alias names in it
# i.e. 'table_name.field_name' will become 'field_name'
my @unqualified_fields =
map { $_ =~ s/[^\.]+\.//g;$_; } @origfields;
my $unqualified_select = join ", ", @unqualified_fields;
# now create that new sql query
$sql = "SELECT $unqualified_select FROM ($sql) WHERE ".
REPLACELIMIT;
}
elsif ($where){
$sql .= ' AND '.REPLACELIMIT;
}
else{
$sql .= ' WHERE '.REPLACELIMIT;
}
}# end of ORACLE batching
else{ # non ORACLE batching
#order goes before limit in MYSQL
if ($orderby) {
$sql .= ' ORDER BY '.$orderby;
}
$sql .= REPLACELIMIT;
}
}# end of batching limit generation
$self->set('sql', $sql);
}# end of unless ($sql)
# put in the correct current limits into the batching
my $batched_filterlist = $self->get('batched_filterlist');
if ($batched_filterlist) {
my $sub = $self->get('batch_filterSQL');
unless ($sub) {
$sub = $batched_filterlist->toSQL($oracle);
$self->set('batch_filterSQL', $sub);
$self->set('batch_rows_processed', 0);
}
$batch_start = $self->get('batch_rows_processed');
my $replace = REPLACEBFILTER;
$sql =~ s/$replace/$sub/;
#when this is true, no more results remain
$self->_setExhausted($batched_filterlist->exhausted);
#keep up with changes to underlying table
$self->set('batched_filterlist', $batched_filterlist);
}
# set limits from batch_start and batch_size
# if explicit_batching or batch_filter
if ($self->get('explicit_batching') || $batched_filterlist) {
my $limit;
if ($oracle) {
my $rownum_limit = $batch_size + $batch_start + 1;
$limit = ' rowNum < '.$rownum_limit;
}
elsif ($self->getParam('configurator')->get('location')->databaseType
eq 'postgres'){
$limit = ' LIMIT ';
if ($batch_start){
$limit .= $batch_size.' OFFSET ';
$limit .= $batch_start;
}
else {
$limit .= $batch_size;
}
}
elsif ($self->getParam('configurator')->get('location')->databaseType
eq 'mysql'){
$limit = ' LIMIT ';
if ($batch_start){
$limit .= $batch_start.',';
}
$limit .= $batch_size;
}
else {
BioMart::Exception::Query->throw("Unsupported RDBMS type: ".$self->getParam('configurator')->get('location')->databaseType ."Currently supported: mysql, oracle and postgres");
}
my $replace = REPLACELIMIT;
$sql =~ s/$replace/$limit/;
}
my $logger=Log::Log4perl->get_logger(__PACKAGE__);
$logger->info("QUERY SQL: $sql");
return $sql;
}
sub _getCount {
my ($self, @param) = @_;
my $ret;
my $batching;
local($^W) = 0; # prevent "odd number of elements" warning with -w.
my(%param) = @param;
my $query = $param{'query'};
$self->_processNewQuery($query); #always act as if a new query for count.
if ($self->serverType eq "web"){
my $location = $self->getParam('configurator')->get('location');
my $xml = $query->toXML(0,0,1);
my $logger=Log::Log4perl->get_logger(__PACKAGE__);
$logger->info("COUNT XML: $xml");
my @results = $location->getResultSet("","POST",$xml);
return $results[0];
}
# rbdms
my ($sql, $select, $from, $where, $limit, $comma, $and) = ('')x7;
my %tables;
my %joinTables;
my $oracle = 0;
if ($self->getParam('configurator')->get('location')->databaseType eq
'oracle'){
$oracle = 1;
}
$select = 'COUNT(*)';
my $filtList_List_flag = 0;
# recover where clause from filters (and filterlists)
my $filters = $query->getAllFilters;
FILTERS: foreach my $filter (@$filters){
# call with 'ORACLE' flag if appropiate to allow IN list switching
$where .= $and.$filter->toSQL($oracle);
# recover the tables
if ($filter->isa("BioMart::Configuration::FilterList")
|| $filter->isa("BioMart::Configuration::FilterList_List")){
if ($filter->isa("BioMart::Configuration::FilterList_List")){
$filtList_List_flag = 1;
}
if ($filter->batching) {
$ret = 1;
$batching = 1;
last FILTERS;
}
my $list_filters = $filter->getAllFilters;
foreach my $list_filter (@$list_filters){
my $table = $list_filter->table;
$tables{$table} = 1;
if (!(($table =~ /main$/) && ($list_filter->attribute->key eq
($self->get('keys')->[0])))){
$joinTables{$table} = $list_filter->attribute->key;
}
}
}
else{
my $table = $filter->table;
$tables{$table} = 1;
if (!(($table =~ /main$/) && ($filter->attribute->key eq
($self->get('keys')->[0])))){
$joinTables{$table} = $filter->attribute->key;
}
}
$and = ' AND ';
}
#this will be true if there is a batching filter
if ($batching) {
return [ $ret ];
}
my ($main,$i);
# identify the lowest key and set main accordingly
my $keys = $self->get('keys');
if (%joinTables){
$i = scalar @$keys - 1;
OUTER:foreach my $key (reverse @$keys){
foreach my $join_table (keys %joinTables){
if (uc($joinTables{$join_table}) eq uc($key)){
last OUTER;
}
}
$i--;
}
}
else{
$i = 0;# for when no join tables
}
my $mains = $self->get('mains');
$main = $$mains[$i];
if ($i != 0 || $filtList_List_flag){
$select = 'COUNT(DISTINCT main.'.$$keys[0].')';
}
foreach my $table (keys %tables){
if ($table !~ /main$/){
$from .= $self->schema.".".$table.', ';
}
}
$from .= $self->schema.".".$main.' main';
foreach my $join_table (keys %joinTables){
next if $join_table eq "main";
$where .= $and."main.".$joinTables{$join_table}."=".$join_table.".".
$joinTables{$join_table};
$and = ' AND ';
}
$sql = 'SELECT '.$select.' FROM '.$from;
if ($where){
$sql .= ' WHERE '.$where;
}
if ($limit){
$sql .= ' LIMIT '.$limit;
}
# restricted primary key access
my $restricted_pk = $self->getConfigurationTree($query->
getInterfaceForDataset($self->name))->primaryKeyRestriction;
if ($restricted_pk){
my $or = '';
my $key = ${$self->get('keys')}[0];
my $restrictedSQL = '(';
my @restrictions = split(/,/,$restricted_pk);
foreach(@restrictions){
my ($start_restriction,$end_restiction) = split(/\-/,$_);
$restrictedSQL .= $or.'main.'.$key.' BETWEEN '.$start_restriction.
' AND '.$end_restiction;
$or = ' OR ';
}
$restrictedSQL .= ')';
if ($where){
$sql .= ' AND '.$restrictedSQL;
}
else{
$sql .= ' WHERE '.$restrictedSQL;
$where = $restrictedSQL;
}
}
my $logger=Log::Log4perl->get_logger(__PACKAGE__);
$logger->info("COUNT SQL: $sql");
my $dbh = $self->_getDBH;
my $sth = $dbh->prepare($sql);
unless ($sth) {
BioMart::Exception::Database->throw("Couldnt connect to Database: ".
$dbh->errstr."\n");
}
$sth->{RaiseError} = 0;
$sth->execute || warn($sth->errstr);
$ret = ${$sth->fetchrow_arrayref}[0];
$sth->finish;
$dbh->disconnect;
return $ret;
}
1;