package Makefile::Parallel; use Makefile::Parallel::Grammar; use Log::Log4perl; use Proc::Simple; use Clone qw(clone); use Time::HiRes qw(gettimeofday tv_interval); use Time::Interval; use Time::Piece::ISO; use GraphViz; use Digest::MD5; use Data::Dumper; use warnings; use strict; our $VERSION = '0.09'; =encoding utf8 =head1 NAME Makefile::Parallel - A distributed parallel makefile =head1 SYNOPSIS This module should not be called directly. Please see the perldoc of the pmake program on the /examples directory of this distribution. =cut # Module Stuff our @ISA = qw(Exporter); our %EXPORT_TAGS = ( 'all' => [ qw() ] ); our @EXPORT_OK = ( @{ $EXPORT_TAGS{'all'} } ); our @EXPORT = qw( process_makefile ); my $logger; my $queue; my $running = {}; # Holds the running ID's (ID -> info) my $finnished = {}; # Holds the finnished ID's (ID -> info) my $scheduler; # Holds the scheduler engine my $counter = 0; # Holds the order of the executed processes my $filename; # Holds the filename of the makefile my $debug; # TRUE if we got debug enabled # This stuff deals with the interruption (Ctrl + C) $SIG{INT} = \&process_interrupt; my $interrupted = 0; =head1 process_makefile Main function. Accepts a file to parse and a hash reference with options. TODO: Document options =cut sub process_makefile { my ($file, $options) = @_; # Set sensible defaults $options ||= {}; $options->{scheduler} ||= 'LOCAL'; $options->{local} ||= '1'; # Default CPU's on local mode $options->{dump} ||= 0; $options->{clean} ||= 0; $options->{clock} ||= 10; $options->{debug} ||= 0; $options->{continue} ||= 0; # TODO: Give more flexibility if($options->{scheduler} eq 'PBS') { use Makefile::Parallel::Scheduler::PBS; $scheduler = Makefile::Parallel::Scheduler::PBS->new(); $scheduler->{mail} = $options->{mail} if $options->{mail}; } else { use Makefile::Parallel::Scheduler::Local; $scheduler = Makefile::Parallel::Scheduler::Local->new({ max => $options->{local} }); } # Debug settings if($options->{debug}) { # Clean logs... ## FIXME - do not rely on OS. `rm -rf log/`; mkdir "log"; my $conf = q( log4perl.category.PMake = DEBUG, Logfile, Screen log4perl.appender.Logfile = Log::Log4perl::Appender::File log4perl.appender.Logfile.filename = log/makefile.log log4perl.appender.Logfile.layout = Log::Log4perl::Layout::PatternLayout log4perl.appender.Logfile.layout.ConversionPattern = [%d] [%p] %F(%L) %m%n log4perl.appender.Screen = Log::Log4perl::Appender::Screen log4perl.appender.Screen.stderr = 0 log4perl.appender.Screen.layout = Log::Log4perl::Layout::PatternLayout log4perl.appender.Screen.layout.ConversionPattern = [%d] %m%n ); Log::Log4perl::init(\$conf); $debug = 1; } else { my $conf = q( log4perl.category.PMake = INFO, Screen log4perl.appender.Screen = Log::Log4perl::Appender::Screen log4perl.appender.Screen.stderr = 0 log4perl.appender.Screen.layout = Log::Log4perl::Layout::PatternLayout log4perl.appender.Screen.layout.ConversionPattern = [%d] %m%n ); Log::Log4perl::init(\$conf); $debug = 0; } $logger = Log::Log4perl::get_logger("PMake"); # Parse the file $logger->info("Trying to parse \"$file\""); $queue = Makefile::Parallel::Grammar->parseFile($file); if($queue) { $logger->info("Parse ok.. proceeding to plan the scheduling"); } else { $logger->error("Parse failed, aborting..."); return } $filename = $file; # Copy perl routines to perl actions if(defined $queue->[-1]{perl}) { for my $job (@{$queue}) { if(defined $job->{action}[0]{perl}) { $job->{perl} = $queue->[-1]{perl}; } } delete $queue->[-1]; } # Dump if the user want it die Dumper $queue if($options->{dump}); # Clean the temporary files if we are PBS clean() if($options->{clean}); # Recover the journal if the user wants to continue journal_recover() if ($options->{continue}); # Enter the loop while(1) { # $logger->debug("New loop starting"); loop(); # $logger->debug("Loop processed, sleeping"); sleep $options->{clock}; } } =head1 journal_recover Tries to recover the journal of the last makefile run. =cut sub journal_recover { my $journal = do "$filename.journal" or die "Can't open $filename.journal: $!"; my $md5 = calc_makefile_md5(); if($journal->{md5} ne $md5) { $logger->warn("MD5 Check Failed... The original Makefile was changed!! CONTINUE AT YOUR OWN RISK!"); } # Restore the finnished list $finnished = $journal->{finnished}; $counter = $journal->{counter}; # Ignore jobs already concluded # 1a passagem - cálculo das variáveis for my $job (@{$queue}) { next unless $job; if(is_finnished($job->{rule}{id})) { # If we got asShell to run, run it! find_and_run_asShell($job->{rule}{id}); # If we got asPerl to run, run it! find_and_run_asPerl($job->{rule}{id}); } } # 2a passagem - remoção dos já executados my $new_queue = []; for my $job (@{$queue}) { next unless $job; push @{$new_queue}, $job unless is_finnished($job->{rule}{id}); } $queue = $new_queue; $logger->warn("Journal recovered.. Cross your fingers now..."); } =head1 clean This function is responsible to clean all the temporary files created by the PBS system. It should be used only on the PBS scheduler method. =cut sub clean { $scheduler->clean($queue); $logger->info("Temporary files cleaned"); exit(0); } =head1 loop Loop it baby :D =cut sub loop { reap_dead_bodies(); dispatch(); write_journal(); } =head1 reap_dead_bodies This function is responsible of reaping the jobs that are finnished. If the job needs to run something at the end (example, find i <- grep | awk...) it is executed and the job queue is expanded. =cut sub reap_dead_bodies { # Search all running procs for someone who died for my $runid (keys %{$running}) { if($scheduler->poll($running->{$runid}, $logger)) { # Still running } else { # No running anymore, remove from running and save # Save time stats my $t1 = [gettimeofday]; my $elapsed = tv_interval($running->{$runid}->{starttime}, $t1); $running->{$runid}->{stoptime} = $t1; $elapsed = parseInterval(seconds => int($elapsed), Small => 1); $running->{$runid}->{elapsed} = $elapsed; # Give user some feedback $logger->info("Process " . $scheduler->get_id($running->{$runid}) . " (" . $running->{$runid}->{rule}->{id} . ") has terminated [$elapsed]"); $finnished->{$runid} = $running->{$runid}; delete $running->{$runid}; # Don't do nothing more if it was interrupted next if($finnished->{$runid}{interrupted}); # Verify the exit status $scheduler->get_dead_job_info($finnished->{$runid}); if($finnished->{$runid}{exitstatus} && !$finnished->{$runid}{interrupted}) { # Pumm!! Cancelar tudo! $logger->fatal("Process " . $scheduler->get_id($finnished->{$runid}) . " exited with exit status " . $finnished->{$runid}{exitstatus} . "! Aborting all queue..."); process_interrupt(1); # Forced; $finnished->{$runid}{fatal} = 1; # To graphviz later... } # If we got asShell to run, run it! find_and_run_asShell($runid); # If we got asPerl to run, run it! find_and_run_asPerl($runid); } } } =head1 find_and_run_asShell This function goes through the finnished job and tries to find asShell commands to run, doing all the expands necessary =cut sub find_and_run_asShell { my ($runid) = @_; for my $action (@{$finnished->{$runid}->{action}}) { if($action->{asShell} && !(defined $finnished->{__var__}->{$action->{def}})) { $logger->info("Running shell action $action->{asShell}"); $finnished->{__var__}->{$action->{def}} = []; open P, "$action->{asShell} |"; while(

) { chomp; $logger->warn("Return value from the shell action is not a integer") unless /^\d+$/; push @{$finnished->{__var__}->{$action->{def}}}, $_; } close P; # Now expand the queue expand_forks($action->{def}); } } } =head1 find_and_run_asPerl This function goes through the finnished job and tries to find asPerl commands to run, doing all the expands necessary =cut sub find_and_run_asPerl { my ($runid) = @_; for my $action (@{$finnished->{$runid}->{action}}) { if($action->{asPerl} && !(defined $finnished->{__var__}->{$action->{def}})) { $logger->info("Running perl action $action->{asPerl}"); $finnished->{__var__}->{$action->{def}} = []; $finnished->{__var__}{$action->{def}} = paction_list($action->{asPerl}); # Now expand the queue expand_forks($action->{def}); } } } =head1 paction_list this function evaluates a perl action and retruns a list of strings. the action can: .return a ARRAY reference, .print a list of lines to STDOUT (to be splited end chomped) .or return a string (to be splited and chomped) =cut sub paction_list{ my $act=shift; my $var=""; my $final=[]; open(A,'>', \$var); my $old= select A; my $res = eval( "package main; no strict; " . $act ); die $@ if $@; close A; select $old; if (ref($res) eq "ARRAY"){ $final = $res; } elsif($var =~ /\S/) { for(split("\n",$var)){ push (@$final, $_) if /\S/; } } else{ for(split("\n",$res)){ push (@$final, $_) if /\S/; } } $final; } =head1 expand_forks This function is responsible of expanding all the jobs when a variable is evaluated. It expands both forks and joins. =cut sub expand_forks { my ($var) = @_; my $values = $finnished->{__var__}->{$var}; # For all queue items that has a $var, expand my $index = -1; for my $job (@{$queue}) { $index++; next unless $job; if($job->{rule}{vars} && (grep { $_ eq "\$$var" } @{$job->{rule}{vars}} )) { $logger->info("Found a fork on $job->{rule}->{id}. Expanding..."); # Expand, expand, expand $job->{rule}{vars} = [ grep { $_ ne "\$$var" } @{$job->{rule}{vars} }]; delete $job->{rule}{vars} unless scalar @{$job->{rule}{vars}}; delete $queue->[$index]; my $count = 0; my @added_jobs = (); for my $index (@{$values}) { my $newjob = clone($job); $count++; # Actualiazr o id $newjob->{rule}{id} .= $index; # Actualizar a linha a executar for my $act (@{$newjob->{action}}) { if($act->{shell}){ $act->{shell} =~ s/\$$var\b/$index/g; } elsif($act->{perl}){ $act->{perl} =~ s/\$$var\b/$index/g; } } # Expand pipelines for my $dep (@{$newjob->{depend_on}}) { if ($dep->{vars} && (grep { $_ eq "\$$var"} @{$dep->{vars}} )) { # Expand the dependencie $dep->{vars} = [ grep { $_ ne "\$$var" } @{$dep->{vars}} ]; delete $dep->{vars} unless scalar @{$dep->{vars}}; $dep->{id} .= $index; } } push @{$queue}, $newjob; push @added_jobs, $newjob->{rule}{id}; } $logger->info("Expanded.. Created new $count jobs: @added_jobs"); } # Find joiners my $pos = 0; for my $dep (@{$job->{depend_on}}) { if ($dep->{vars} && (grep { $_ eq "\$$var" } @{$dep->{vars}} )) { $dep->{vars} = [ grep { $_ ne "\$$var" } @{$dep->{vars}}]; # Expand the dependencies delete $job->{depend_on}->[$pos]; for my $index (@{$values}) { my @vars = (scalar @{$dep->{vars}})?(vars => $dep->{vars}):(); push @{$job->{depend_on}}, { @vars, id => $dep->{id} . $index }; } } $pos++; } } # Now find constructors like @var for my $job (@{$queue}) { next unless $job; # Search on actions for my $action (@{$job->{action}}) { if($action->{shell} && $action->{shell} =~ /\@$var\b/) { my $string = ''; map { $string .= "$_ " } @{$values}; $action->{shell} =~ s/\@$var\b/$string/g; $logger->info("The job $job->{rule}->{id} has been action expanded with $string"); } elsif($action->{perl} && $action->{perl} =~ /\@$var\b/) { my $string = join(",", map { "q{$_}" } @{$values}); $action->{perl} =~ s/\@$var\b/($string)/g; $logger->info("The job $job->{rule}->{id} has been action expanded with ($string)"); } } # Search on asShell for my $action (@{$job->{action}}) { if($action->{asShell} && $action->{asShell} =~ /\@$var\b/) { my $string = ''; map { $string .= "$_ " } @{$values}; $action->{asShell} =~ s/\@$var\b/$string/g; $logger->info("The job $job->{rule}->{id} has been shell expanded with $string"); } } # Search on asPerl for my $action (@{$job->{action}}) { if($action->{asPerl} && $action->{asPerl} =~ /\@$var\b/) { my $string = 'qw/'; map { $string .= "$_ " } @{$values}; $string .= "/"; $action->{asPerl} =~ s/\@$var\b/$string/g; $logger->info("The job $job->{rule}->{id} has been Perl expanded with $string"); } } } } =head1 report Print a pretty report bla bla bla =cut sub report { $logger->info("Creating HTML report"); open REPORT, ">$filename.html" or die "Can't create $filename.html"; print REPORT "\n"; print REPORT "\n"; my ($id,$start,$stop,$interval); for my $job (sort sortcallback keys %{$finnished}) { next unless $finnished->{$job}{rule}; $id = $finnished->{$job}{rule}{id}; $start = (localtime($finnished->{$job}{starttime}[0]))->iso; $stop = (localtime($finnished->{$job}{stoptime}[0]))->iso; $interval = $finnished->{$job}{realtime} || $finnished->{$job}{elapsed}; print REPORT "\n"; } print REPORT "
IDStart TimeEnd TimeElapsed
$id$start$stop$interval
\n"; close REPORT; } sub sortcallback { my $foo = $finnished->{$a}; my $bar = $finnished->{$b}; return 0 if(!$foo->{order} && !$bar->{order}); return -1 unless $foo->{order}; return 1 unless $bar->{order}; return $foo->{order} <=> $bar->{order}; } =head1 dispatch This function is responsible for dispatching the jobs that can run. =cut sub dispatch { my $new_queue = []; # If we aren't running nothing and ($interrupted || $queue empty) exit if((scalar keys %{$running}) == 0 && ($interrupted || (scalar @{$queue} == 0))) { $logger->info("Terminating the pipeline"); at_exit(); } # We don't wanna dispatch NOTHING if we have interrupted return if $interrupted; for my $job (@{$queue}) { next unless $job; # Find if the job dependencies are finnished if(can_run_job($job->{rule}->{id}, $job->{depend_on})) { $logger->info(Dumper($job)) unless $job->{rule}{id}; $logger->info("The job \"" . $job->{rule}->{id} . "\" is ready to run. Launching"); launch($job); $job->{starttime} = [gettimeofday]; # Jump to the next job in queue next; } # This job can't run yet.. add it to the new queue push @{$new_queue}, $job; } # Return the new queue, the jobs that can't be dispatched yet $queue = $new_queue; } =head1 is_finnished This function checks if the specified job is already done in the finnished list. =cut sub is_finnished { my ($jobid) = @_; for my $job (keys %{$finnished}) { next unless $finnished->{$job}{rule}; return 1 if($finnished->{$job}{rule}{id} eq $jobid); } return 0; } =head1 at_exit This sub is called at the program exit =cut sub at_exit { graphviz(); report(); write_journal(); exit(0); } =head1 write_journal Saves the scheduler state to disk. =cut sub write_journal { my $journal = {}; $journal->{md5} = calc_makefile_md5(); # Pass all interrupted and failled processes back to queue my $acabados = clone($finnished); for my $job (keys %{$acabados}) { next unless $acabados->{$job}{rule}; if($acabados->{$job}{fatal} || $acabados->{$job}{interrupted}) { delete $acabados->{$job}; } } delete $acabados->{__var__}; $journal->{finnished} = $acabados; $journal->{counter} = $counter; open F, ">$filename.journal"; print F (Dumper $journal); close F; } =head1 calc_makefile_md5 Calculates the MD5 of the current makefile =cut sub calc_makefile_md5 { open F, "<$filename"; my $ctx = Digest::MD5->new; $ctx->addfile(*F); close F; return $ctx->b64digest; } =head1 can_run_jub This one finds out if a job can run (all the dependencies are met). =cut sub can_run_job { my ($id, $deps) = @_; return 0 unless $scheduler->can_run(); for my $dep (@{$deps}) { next unless $dep; next unless $dep->{id}; return 0 unless defined $finnished->{$dep->{id}} } return 1; } =head1 launch Launch a process (really??) =cut sub launch { my ($job) = @_; # Launch the process $scheduler->launch($job, $debug); $job->{order} = $counter++; # Save in the running list $running->{$job->{rule}->{id}} = $job; $logger->info("Launched \"" . $job->{rule}->{id} . "\" (" . $scheduler->get_id($job) . ")"); } =head1 graphviz Builds a preety graphviz file after the execution of the makefile =cut sub graphviz { my $time_for = {}; # Holds the walltime for the job id my $g = GraphViz->new(rankdir => 1); $logger->info("Creating GraphViz nodes"); # Create all nodes for my $job (keys %{$finnished}) { next unless $finnished->{$job}{rule}; my $id = $finnished->{$job}{rule}{id}; $time_for->{$id} = $finnished->{$job}{realtime} || $finnished->{$job}{elapsed}; my $color = 'black'; $color = 'red' if $finnished->{$job}{fatal}; $color = 'yellow' if $finnished->{$job}{interrupted}; $g->add_node($id, label => "$id\n$time_for->{$id}" , shape => 'box', color => $color); } $logger->info("Creating GraphViz edges"); # Create edges for my $job (keys %{$finnished}) { next unless $finnished->{$job}{rule}; for my $dep (@{$finnished->{$job}{depend_on}}) { next unless $dep; $g->add_edge($dep->{id}, $finnished->{$job}{rule}{id}); } } open F, ">$filename.ps"; print F $g->as_ps; close F; open F, ">$filename.dot"; print F $g->as_text; close F; $logger->info("GraphViz file created on $filename.ps"); } =head1 process_interrupt This function is called everytime the user send a SIGINT to this process. The objective is to kill all the running processes and wait for them to die. =cut sub process_interrupt { my $forced = shift; $forced = 0 if $forced eq "INT"; # Hack O:-) if(!$interrupted || $forced) { if(!$forced) { $logger->warn("Interrupt pressed, enter QUIT to quit, other thing to continue"); my $linha = ; chomp($linha); if($linha ne 'QUIT') { $logger->info("Interrupt canceled... Keeping the loop"); return; } } $interrupted = 1; $logger->info("Interrupt pressed, cleaning all the running processes"); for my $runid (keys %{$running}) { $logger->info("Terminating job " . $scheduler->get_id($running->{$runid})); $running->{$runid}{interrupted} = 1; $scheduler->interrupt($running->{$runid}); } } else { $logger->warn("Interrupt already called, please wait while cleaning"); } } =head1 AUTHOR Ruben Fonseca, C<< >> Alberto Simões C<< >> José João Almeida C<< >> =head1 BUGS Please report any bugs or feature requests to C, or through the web interface at L. I will be notified, and then you'll automatically be notified of progress on your bug as I make changes. =head1 COPYRIGHT & LICENSE Copyright 2006-2011 Ruben Fonseca, et al, all rights reserved. This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself. =cut 1; # End of Makefile::Parallel