Raw content of Bio::EnsEMBL::Pipeline::Monitor package Bio::EnsEMBL::Pipeline::Monitor; use Bio::EnsEMBL::Utils::Exception qw(verbose throw warning info); use Bio::EnsEMBL::Utils::Argument qw( rearrange ); use strict; use vars qw(@ISA); @ISA = qw (); =pod =head2 new Title : new Usage : $self->new(-DBOBJ => $db); Function: creates a pipeline monitoring object if given a pipeline database. Can be used to query and control a running pipeline. Returns : Args : -dbobj: A Bio::EnsEMBL::Pipeline::DBSQL::DBAdaptor =cut sub new { my ($class, @args) = @_; my $self = bless {},$class; my ($dbobj) = rearrange([qw(DBOBJ)],@args); if ($dbobj) { $self->dbobj($dbobj); } else { throw("No database object input"); } return $self; } sub dbobj { my ($self,$arg) = @_; if (defined($arg)) { if (! $arg->isa("Bio::EnsEMBL::Pipeline::DBSQL::DBAdaptor")) { throw("[$arg] is not a Bio::EnsEMBL::Pipeline::DBSQL::DBAdaptor"); } $self->{_dbobj} = $arg; } return $self->{_dbobj}; } sub print_header { my ($self,$str) = @_; unless($self->{'_header'}){ $self->{'_header'} = sprintf("[%s:%s %s]\n\n", $self->dbobj->dbc->host, $self->dbobj->dbc->port, $self->dbobj->dbc->dbname); } print "\n$str " . $self->{'_header'}; } sub show_current_status { my ($self) = @_; #Show running/failed jobs grouped by status and analysis name. my $sth = $self->dbobj->prepare("select count(*), js.status, a.logic_name ,a.analysis_id from analysis a, job_status js, job j where j.job_id = js.job_id and a.analysis_id = j.analysis_id and js.is_current = 'y' group by a.logic_name, js.status"); my $res = $sth->execute; my $maxcount = undef; my $maxstatus; my $maxname; my $maxaid; my @counts; my @status; my @names; my @aid; while (my $ref = $sth->fetchrow_hashref) { my $count = $ref->{'count(*)'}; my $status = $ref->{'status'}; my $name = $ref->{'logic_name'}; my $aid = $ref->{'analysis_id'}; if (!defined($maxcount) || length($count) > $maxcount) { $maxcount = length($count); } if (!defined($maxstatus) || length($status) > $maxstatus) { $maxstatus = length($status); } if (!defined($maxname) || length($name) > $maxname) { $maxname = length($name); } if (!defined($maxaid) || length($aid) > $maxaid) { $maxaid = length($aid); } push(@counts,$count); push(@status,$status); push(@names,$name); push(@aid,$aid); } $maxcount++; $maxstatus++; $maxname++; $maxaid++; $self->print_header("Pipeline current status"); printf("%-${maxname}s %-${maxstatus}s %-${maxcount}s %-${maxaid}s\n","Name","Status","Count","Analysis-id"); printf("%-${maxname}s %-${maxstatus}s %-${maxcount}s %-${maxaid}s\n","----","------","-----","-----------"); while (my $count = shift(@counts)) { my $status = shift @status; my $name = shift @names; my $aid = shift @aid; printf("%-${maxname}s %-${maxstatus}s %-${maxcount}s %-${maxaid}s \n",$name,$status,$count,$aid); } print("\n"); } # Show running/failed jobs grouped by status and analysisId # my $sth = $self->dbobj->prepare("select count(*),js.status,job.analysis_id from current_status js,job where job.job_id = js.job_id group by job.analysis_id"; # show running/failed jobs grouped by status sub show_current_status_summary { my ($self) = @_; my $sth = $self->dbobj->prepare("select count(*), status from job_status where is_current = 'y' group by status"); my $res = $sth->execute; my $maxcount = 0; my $maxstatus = 0; my @counts; my @status; while (my $ref = $sth->fetchrow_hashref) { my $count = $ref->{'count(*)'}; my $status = $ref->{'status'}; if (length($count) > $maxcount) { $maxcount = length($count); } if (length($status) > $maxstatus) { $maxstatus = length($status); } push(@counts,$count); push(@status,$status); } $maxcount++; $maxstatus++; $self->print_header("Pipeline status summary"); printf("%-${maxstatus}s %-${maxcount}s\n","Status","Count"); printf("%-${maxstatus}s %-${maxcount}s\n","------","-----"); while (my $count = shift(@counts)) { my $status = shift @status; printf("%-${maxstatus}s %-${maxcount}s\n",$status,$count); } print "\n"; } # show finished jobs grouped by analysisId # #show running/failed jobs grouped by status sub show_finished_summary { my ($self, $no_submit, $show_percent) = @_; my $sth = $self->dbobj->prepare("select count(*),a.logic_name,a.analysis_id from input_id_analysis i, analysis a where a.analysis_id = i.analysis_id group by a.analysis_id"); my $res = $sth->execute; my $maxcount = 0; my $maxname = 0; my $maxid = 0; my $max_nums = undef; my @counts; my @names; my @ids; while (my $ref = $sth->fetchrow_hashref) { my $count = $ref->{'count(*)'}; my $name = $ref->{'logic_name'}; my $id = $ref->{'analysis_id'}; if ($name =~ /Submit/){ next if($no_submit); } if (length($count) > $maxcount) { $maxcount = length($count); } if (length($name) > $maxname) { $maxname = length($name); } if (length($id) > $maxid) { $maxid = length($id); } push(@counts,$count); push(@names,$name); push(@ids,$id); } $maxcount++; $maxname++; $maxid++; if($show_percent){ $max_nums = percent_finished($self, 0); $self->print_header("Finished job summary"); printf("%-${maxcount}s %-${maxname}s %-${maxid}s%s\n","Count","Name","Id","%Done"); printf("%-${maxcount}s %-${maxname}s %-${maxid}s%s\n","-----","----","--","-----"); while (my $count = shift(@counts)) { my $name = shift @names; my $id = shift @ids; my $total = 0; my $add = " "; if($count && $max_nums->{$id}){ $total = (100*$count/($max_nums->{$id})); #$total = $count."/".$max_nums->{$id}; $add = '%'; } printf("%-${maxcount}s %-${maxname}s %-${maxid}s %3d%s\n",$count,$name,$id,$total,$add); } } else{ $self->print_header("Finished job summary"); printf("%-${maxcount}s %-${maxname}s %-${maxid}s\n","Count","Name","Id"); printf("%-${maxcount}s %-${maxname}s %-${maxid}s\n","-----","----","--"); while (my $count = shift(@counts)) { my $name = shift @names; my $id = shift @ids; printf("%-${maxcount}s %-${maxname}s %-${maxid}s\n",$count,$name,$id); } } print "\n"; } # show analysis processes sub show_analysisprocess { my ($self) = @_; my $sth = $self->dbobj->prepare("select analysis_id,logic_name,db,program,parameters,module from analysis"); my $res = $sth->execute; my $maxname = 0; my $maxid = 0; my $maxdb = 0; my $maxprog = 0; my $maxparam = 0; my $maxmodule = 0; my @ids; my @names; my @dbs; my @progs; my @params; my @modules; while (my $ref = $sth->fetchrow_hashref) { my $id = $ref->{'analysis_id'}; my $name = $ref->{'logic_name'}; my $db = $ref->{'db'}; my $prog = $ref->{'program'}; my $param = $ref->{'parameters'}; my $module = $ref->{'module'}; if (length($id) > $maxid) { $maxid = length($id); } if (length($name) > $maxname) { $maxname = length($name); } if (length($db) > $maxdb) { $maxdb = length($db); } if (length($prog) > $maxprog) { $maxprog = length($prog); } if (length($module) > $maxmodule) { $maxmodule = length($module); } if (length($param) > $maxparam) { $maxparam = length($param); } push(@names,$name); push(@ids,$id); push(@dbs,$db); push(@progs,$prog); push(@modules,$module); push(@params,$param); } $maxname++; $maxid++; $maxprog++; $maxparam++; $maxmodule++; $maxdb++; $self->print_header("Analysis"); printf("%-${maxname}s %-${maxid}s %-${maxdb}s %-${maxprog}s %-${maxparam}s %-${maxmodule}s \n","Name","Id","db","Program","Params","Module"); printf("%-${maxname}s %-${maxid}s %-${maxdb}s %-${maxprog}s %-${maxparam}s %-${maxmodule}s \n","----","--","--","-------","------","------"); while (my $name = shift(@names)) { my $id = shift @ids; my $db = shift @dbs; my $prog = shift @progs; my $param = shift @params; my $module = shift @modules; printf("%-${maxname}s %-${maxid}s %-${maxdb}s %-${maxprog}s %-${maxparam}s %-${maxmodule}s \n",$name,$id,$db,$prog,$param,$module); } print "\n"; } # show rules sub show_Rules { my ($self) = @_; my $sth = $self->dbobj->prepare("select a.logic_name,rg.rule_id from rule_goal rg, analysis a where a.analysis_id = rg.goal"); my $res = $sth->execute; my @names; my @ids; my $maxname = 0; my $maxid = 0; while (my $ref = $sth->fetchrow_hashref) { my $id = $ref->{'rule_id'}; my $name = $ref->{'logic_name'}; if (length($id) > $maxid) { $maxid = length($id);} if (length($name) > $maxname) {$maxname = length($name);} push(@ids,$id); push(@names,$name); } $maxname++; $maxid++; $self->print_header("Pipeline rules"); printf("%-${maxname}s %-${maxid}s\n","Name","Id"); printf("%-${maxname}s %-${maxid}s\n","----","--"); while (my $name = shift @names) { my $id = shift @ids; printf("%-${maxname}s %-${maxid}s\n",$name,$id); } print "\n"; } sub show_Rules_and_Conditions { my ($self) = @_; my $sql = "select a.logic_name,rg.rule_id,rc.rule_condition from rule_conditions rc,rule_goal rg, analysis a where a.analysis_id = rg.goal and rg.rule_id = rc.rule_id"; my $sth = $self->dbobj->prepare($sql); my $res = $sth->execute; my @names; my @ids; my @conds; my $maxname = 0; my $maxid = 0; my $maxcond = 0; while (my $ref = $sth->fetchrow_hashref) { my $id = $ref->{'rule_id'}; my $name = $ref->{'logic_name'}; my $cond = $ref->{'rule_condition'}; if (length($id) > $maxid) { $maxid = length($id);} if (length($name) > $maxname) {$maxname = length($name);} if (length($cond) > $maxcond) {$maxcond = length($cond);} push(@ids,$id); push(@names,$name); push(@conds,$cond); } $maxname++; $maxid++; $maxcond++; $self->print_header("Rules and Conditions"); printf("%-${maxname}s %-${maxid}s %-${maxcond}s \n","Name","Id","Condition"); printf("%-${maxname}s %-${maxid}s %-${maxcond}s \n","----","--","---------"); while (my $name = shift @names) { my $id = shift @ids; my $cond = shift @conds; printf("%-${maxname}s %-${maxid}s %-${maxcond}s \n",$name,$id,$cond); } print "\n"; } sub show_jobs_by_status_and_analysis { my ($self,$status,$analysis) = @_; if (!defined($status) || !defined($analysis)) { throw("No status and/or analysis input\n"); } # my $sth = $self->dbobj->prepare("select job.* from job_status js,job,analysis a where a.analysis_id = job.analysis_id and job.job_id = js.job_id and js.status = '$status' and a.logic_name = '$analysis'"); my $sth = $self->dbobj->prepare(" SELECT job.* FROM job_status js,job,analysis a WHERE a.analysis_id = job.analysis_id && job.job_id = js.job_id && js.status = '$status' && a.logic_name = '$analysis' && js.is_current = 'y'"); my $res = $sth->execute; my @jobIds; my @input_ids; my @lsfs; my @out; my @err; my @retry; my @exhost; my $maxjobid = 0; my $maxinputid = 0; my $maxlsf = 0; my $maxout = 0; my $maxerr = 0; my $maxretry = 0; my $maxexhost = 0; while (my $ref = $sth->fetchrow_hashref) { my $jobId = $ref->{'job_id'}; my $input_id = $ref->{'input_id'}; my $submission_id = $ref->{'submission_id'}; my $out = $ref->{'stdout_file'}; my $err = $ref->{'stderr_file'}; my $retry = $ref->{'retry_count'}; my $exhost = $ref->{'exec_host'}; if (length($jobId) > $maxjobid) {$maxjobid = length($jobId);} if (length($input_id) > $maxinputid) {$maxinputid = length($input_id);} if (length($submission_id) > $maxlsf) {$maxlsf = length($submission_id);} if (length($out) > $maxout) {$maxout = length($out);} if (length($err) > $maxerr) {$maxerr = length($err);} if (length($retry) > $maxretry) {$maxretry = length($retry);} if (length($exhost) > $maxexhost) {$maxexhost = length($exhost);} push(@jobIds,$jobId); push(@input_ids,$input_id); push(@lsfs,$submission_id); push(@out,$out); push(@err,$err); push(@retry,$retry); push(@exhost,$exhost); } $self->print_header("Jobs by status $status and analysis $analysis"); printf("%-${maxinputid}s %-${maxjobid}s %-${maxlsf}s %-${maxretry}s %-${maxout}s %-${maxerr}s %-${maxexhost}s\n","Input id","Job","submission id","retry","output file","error file","exhost"); printf("%-${maxinputid}s %-${maxjobid}s %-${maxlsf}s %-${maxretry}s %-${maxout}s %-${maxerr}s %-${maxexhost}s\n","--------","---","-------------","-----","-----------","----------","------"); while (my $jobid = shift @jobIds) { my $inputid = shift @input_ids; my $lsf = shift @lsfs; my $out = shift @out; my $err = shift @err; my $retry = shift @retry; my $exhost = shift @exhost; printf("%-${maxinputid}s %-${maxjobid}s %-${maxlsf}s %-${maxretry}s %-${maxout}s %-${maxerr}s %-${maxexhost}s\n",$inputid,$jobid,$lsf,$retry,$out,$err,$exhost); } print "\n"; } # ==================================================== # sub rules_cache{ my $self = shift; unless($self->{'_rules_cache'}){ my $rule_adaptor = $self->dbobj->get_RuleAdaptor(); my @rules = $rule_adaptor->fetch_all; foreach my $rule (@rules) { $self->{'_rules_cache'}->{$rule->goalAnalysis->dbID} = $rule->goalAnalysis->logic_name if ($rule->list_conditions()->[0]); } } return $self->{'_rules_cache'}; } =pod =head2 get_unfinished_analyses*** The following methods return an anonymous list of arrays: get_unfinished_analyses_for_input_id( $input_id ) get_unfinished_analyses_for_assembly_type( $assembly_type ) get_unfinished_analyses with the following data spec: [ [ input_id, logic_name, analysis_id ] [ input_id, logic_name, analysis_id ] ... ] They all take an optional $print (Boolean) which prints the arrays for you. You can grow you're own look up hash to see if a $input_id, $logic_name combination has finished by doing. my $unfin = $monitor_obj->get_unfinished_analyses_for_assembly_type($assembly_type); my $hash; map { $hash->{$_->[0]}->{$_->[1]} = $_->[2] } @$unfin; Now B<$hash-E<gt>{$input_id}-E<gt>{$logic_name} = $analysis_id>. =head2 get_no_hit_contigs_for_analysis Takes two arguments, and finds all the finished input_ids for which the given analysis did NOT find any hits. Returns data structure as above. get_no_hit_contigs_for_analysis($feature_table, $analysis_id) ' =cut sub get_unfinished_analyses_for_input_id{ my ($self, $contig_name, $print, $unfinished) = @_; my $rules_cache = $self->rules_cache(); my $sth = $self->dbobj->prepare(qq{SELECT c.name, a.analysis_id AS a_id, a.logic_name FROM contig c STRAIGHT_JOIN analysis a LEFT JOIN input_id_analysis i ON c.name = i.input_id && a.analysis_id = i.analysis_id WHERE i.input_id IS NULL && c.name = ?}); $sth->execute($contig_name); while(my $row = $sth->fetchrow_hashref){ push(@{$unfinished}, [ $row->{'name'}, $row->{'logic_name'}, $row->{'a_id'} ]) if $rules_cache->{$row->{'a_id'}}; } map {print join("\t: ", @$_) . "\n"} @$unfinished if $print; return $unfinished; } sub get_unfinished_analyses_for_assembly_type{ my ($self, $assembly_type, $print, $unfinished) = @_; my $rules_cache = $self->rules_cache(); my $sth = $self->dbobj->prepare(qq{SELECT c.name, a.analysis_id AS a_id, a.logic_name, b.type FROM assembly b, contig c STRAIGHT_JOIN analysis a LEFT JOIN input_id_analysis i ON c.name = i.input_id && a.analysis_id = i.analysis_id WHERE i.input_id IS NULL && b.type = ? && b.contig_id = c.contig_id}); $sth->execute($assembly_type); while(my $row = $sth->fetchrow_hashref){ push(@{$unfinished}, [ $row->{'name'}, $row->{'logic_name'}, $row->{'a_id'} ]) if $rules_cache->{$row->{'a_id'}}; } map {print join("\t: ", @$_) . "\n"} @$unfinished if $print; return $unfinished || []; } sub get_unfinished_analyses{ my ($self, $print, $unfinished) = @_; my $rules_cache = $self->rules_cache(); my $sth = $self->dbobj->prepare(qq{SELECT c.name, a.analysis_id AS a_id, a.logic_name FROM contig c STRAIGHT_JOIN analysis a LEFT JOIN input_id_analysis i ON c.name = i.input_id && a.analysis_id = i.analysis_id WHERE i.input_id IS NULL ORDER BY c.name}); $sth->execute(); while(my $row = $sth->fetchrow_hashref){ push(@$unfinished, [ $row->{'name'}, $row->{'logic_name'}, $row->{'a_id'} ]) if $rules_cache->{$row->{'a_id'}}; } map {print join("\t: ", @$_) . "\n"} @$unfinished if $print; print "Waiting for " . scalar(@$unfinished) . " to complete.\n" if $print; return $unfinished; } sub get_no_hit_contigs_for_analysis{ my $self = shift; my ($feature_table, $analysis_id, $print, $no_hits) = @_; my $sth = $self->dbobj->prepare(qq{SELECT c.contig_id, c.name FROM contig c STRAIGHT_JOIN input_id_analysis i LEFT JOIN $feature_table f ON f.analysis_id = i.analysis_id && f.contig_id = c.contig_id WHERE f.contig_id IS NULL && f.analysis_id IS NULL && i.analysis_id = ? && c.name = i.input_id}); $sth->execute($analysis_id); $no_hits = []; while(my $row = $sth->fetchrow_hashref){ push(@{$no_hits}, [ $row->{'name'}, undef, $analysis_id ] ); } map {print join("\t: ", @$_) . "\n"} @$no_hits if $print; return $no_hits; } sub lock_status{ my ($self,$print) = @_; my $str = "Locked By USER: %s, HOST: %s, PID: %s, STARTED: %s (%s) \n"; my @a = (); my ($dbuser, $dbhost, $dbport, $dbname) = ($self->dbobj->dbc->username, $self->dbobj->dbc->host, $self->dbobj->dbc->port, $self->dbobj->dbc->dbname); if (my $lock_str = $self->dbobj->pipeline_lock) { my($user, $host, $pid, $started) = $lock_str =~ /(\w+)@(\S+):(\d+):(\d+)/; $self->print_header("This pipeline is LOCKED") if $print; @a = ($user, $host, $pid, scalar(localtime($started)), $started); printf($str, @a) if $print; }else{ $self->print_header("This pipeline is FREE") if $print; } return @a; } #get the maximum numbers of jobs for every analysis #for calculating the percent-done numbers sub percent_finished{ my ($self, $print) = @_; my $sql0 = "select a.analysis_id, a.logic_name ". "from analysis a;"; my $sth0 = $self->dbobj->prepare($sql0) or die "cant prepare: $sql0"; my $sql1 = "select rg.goal, rc.rule_condition ". "from rule_conditions rc,rule_goal rg ". "where rg.rule_id = rc.rule_id;"; my $sth1 = $self->dbobj->prepare($sql1) or die "cant prepare: $sql1"; my $sql2 = "select count(*) as count from input_id_analysis ". "where analysis_id = ?;"; my $sth2 = $self->dbobj->prepare($sql2) or die "cant prepare: $sql2"; my %analysis_id = (); my %analysis_name = (); my %analysis_maxnum = (); my %analysis_condition = (); my $ref3; $sth0->execute(); while (my $ref0 = $sth0->fetchrow_hashref) { $analysis_name{$ref0->{'analysis_id'}} = $ref0->{'logic_name'}; $analysis_id{$ref0->{'logic_name'}} = $ref0->{'analysis_id'}; if($ref0->{'logic_name'} =~ /Wait/i){ $analysis_maxnum{$ref0->{'analysis_id'}} = 1; } else{ $analysis_maxnum{$ref0->{'analysis_id'}} = 0; } } $sth1->execute(); while (my $ref1 = $sth1->fetchrow_hashref) { next if($ref1->{'rule_condition'} =~ m/.+wait/i); $analysis_condition{$ref1->{'goal'}} = $ref1->{'rule_condition'}; } foreach my $analysis_id (keys %analysis_name){ next if($analysis_name{$analysis_id} =~ /Wait/i); my $initial_id = $analysis_id; $sth2->execute($initial_id); $ref3 = $sth2->fetchrow_hashref; $analysis_maxnum{$initial_id} = $ref3->{'count'}; while(defined($analysis_condition{$analysis_id})){ $analysis_id = $analysis_id{ $analysis_condition{$analysis_id} }; $sth2->execute($analysis_id); $ref3 = $sth2->fetchrow_hashref; if($ref3->{'count'} > $analysis_maxnum{$initial_id}){ $analysis_maxnum{$initial_id} = $ref3->{'count'}; } #print "\n\t-> ".$analysis_id.": ".$analysis_maxnum{$initial_id}; } } #print out for testing if($print){ print "\n\n\nanalysis_id\tlogic_name\tmax_num\n"; foreach my $analysis_id (sort{$a<=>$b} keys %analysis_name){ print $analysis_name{$analysis_id}."\t\t". $analysis_id."\t". $analysis_maxnum{$analysis_id}."\n"; } } return(\%analysis_maxnum); } 1;