package Makefile::Parallel; use Makefile::Parallel::Grammar; use Log::Log4perl; use Proc::Simple; use Clone qw(clone); use Time::HiRes qw(gettimeofday tv_interval); use Time::Interval; use Time::Piece::ISO; use GraphViz; use Digest::MD5; use Data::Dumper; use warnings; use strict; our $VERSION = '0.09'; =encoding utf8 =head1 NAME Makefile::Parallel - A distributed parallel makefile =head1 SYNOPSIS This module should not be called directly. Please see the perldoc of the pmake program on the /examples directory of this distribution. =cut # Module Stuff our @ISA = qw(Exporter); our %EXPORT_TAGS = ( 'all' => [ qw() ] ); our @EXPORT_OK = ( @{ $EXPORT_TAGS{'all'} } ); our @EXPORT = qw( process_makefile ); my $logger; my $queue; my $running = {}; # Holds the running ID's (ID -> info) my $finnished = {}; # Holds the finnished ID's (ID -> info) my $scheduler; # Holds the scheduler engine my $counter = 0; # Holds the order of the executed processes my $filename; # Holds the filename of the makefile my $debug; # TRUE if we got debug enabled # This stuff deals with the interruption (Ctrl + C) $SIG{INT} = \&process_interrupt; my $interrupted = 0; =head1 process_makefile Main function. Accepts a file to parse and a hash reference with options. TODO: Document options =cut sub process_makefile { my ($file, $options) = @_; # Set sensible defaults $options ||= {}; $options->{scheduler} ||= 'LOCAL'; $options->{local} ||= '1'; # Default CPU's on local mode $options->{dump} ||= 0; $options->{clean} ||= 0; $options->{clock} ||= 10; $options->{debug} ||= 0; $options->{continue} ||= 0; # TODO: Give more flexibility if($options->{scheduler} eq 'PBS') { use Makefile::Parallel::Scheduler::PBS; $scheduler = Makefile::Parallel::Scheduler::PBS->new(); $scheduler->{mail} = $options->{mail} if $options->{mail}; } else { use Makefile::Parallel::Scheduler::Local; $scheduler = Makefile::Parallel::Scheduler::Local->new({ max => $options->{local} }); } # Debug settings if($options->{debug}) { # Clean logs... ## FIXME - do not rely on OS. `rm -rf log/`; mkdir "log"; my $conf = q( log4perl.category.PMake = DEBUG, Logfile, Screen log4perl.appender.Logfile = Log::Log4perl::Appender::File log4perl.appender.Logfile.filename = log/makefile.log log4perl.appender.Logfile.layout = Log::Log4perl::Layout::PatternLayout log4perl.appender.Logfile.layout.ConversionPattern = [%d] [%p] %F(%L) %m%n log4perl.appender.Screen = Log::Log4perl::Appender::Screen log4perl.appender.Screen.stderr = 0 log4perl.appender.Screen.layout = Log::Log4perl::Layout::PatternLayout log4perl.appender.Screen.layout.ConversionPattern = [%d] %m%n ); Log::Log4perl::init(\$conf); $debug = 1; } else { my $conf = q( log4perl.category.PMake = INFO, Screen log4perl.appender.Screen = Log::Log4perl::Appender::Screen log4perl.appender.Screen.stderr = 0 log4perl.appender.Screen.layout = Log::Log4perl::Layout::PatternLayout log4perl.appender.Screen.layout.ConversionPattern = [%d] %m%n ); Log::Log4perl::init(\$conf); $debug = 0; } $logger = Log::Log4perl::get_logger("PMake"); # Parse the file $logger->info("Trying to parse \"$file\""); $queue = Makefile::Parallel::Grammar->parseFile($file); if($queue) { $logger->info("Parse ok.. proceeding to plan the scheduling"); } else { $logger->error("Parse failed, aborting..."); return } $filename = $file; # Copy perl routines to perl actions if(defined $queue->[-1]{perl}) { for my $job (@{$queue}) { if(defined $job->{action}[0]{perl}) { $job->{perl} = $queue->[-1]{perl}; } } delete $queue->[-1]; } # Dump if the user want it die Dumper $queue if($options->{dump}); # Clean the temporary files if we are PBS clean() if($options->{clean}); # Recover the journal if the user wants to continue journal_recover() if ($options->{continue}); # Enter the loop while(1) { # $logger->debug("New loop starting"); loop(); # $logger->debug("Loop processed, sleeping"); sleep $options->{clock}; } } =head1 journal_recover Tries to recover the journal of the last makefile run. =cut sub journal_recover { my $journal = do "$filename.journal" or die "Can't open $filename.journal: $!"; my $md5 = calc_makefile_md5(); if($journal->{md5} ne $md5) { $logger->warn("MD5 Check Failed... The original Makefile was changed!! CONTINUE AT YOUR OWN RISK!"); } # Restore the finnished list $finnished = $journal->{finnished}; $counter = $journal->{counter}; # Ignore jobs already concluded # 1a passagem - cálculo das variáveis for my $job (@{$queue}) { next unless $job; if(is_finnished($job->{rule}{id})) { # If we got asShell to run, run it! find_and_run_asShell($job->{rule}{id}); # If we got asPerl to run, run it! find_and_run_asPerl($job->{rule}{id}); } } # 2a passagem - remoção dos já executados my $new_queue = []; for my $job (@{$queue}) { next unless $job; push @{$new_queue}, $job unless is_finnished($job->{rule}{id}); } $queue = $new_queue; $logger->warn("Journal recovered.. Cross your fingers now..."); } =head1 clean This function is responsible to clean all the temporary files created by the PBS system. It should be used only on the PBS scheduler method. =cut sub clean { $scheduler->clean($queue); $logger->info("Temporary files cleaned"); exit(0); } =head1 loop Loop it baby :D =cut sub loop { reap_dead_bodies(); dispatch(); write_journal(); } =head1 reap_dead_bodies This function is responsible of reaping the jobs that are finnished. If the job needs to run something at the end (example, find i <- grep | awk...) it is executed and the job queue is expanded. =cut sub reap_dead_bodies { # Search all running procs for someone who died for my $runid (keys %{$running}) { if($scheduler->poll($running->{$runid}, $logger)) { # Still running } else { # No running anymore, remove from running and save # Save time stats my $t1 = [gettimeofday]; my $elapsed = tv_interval($running->{$runid}->{starttime}, $t1); $running->{$runid}->{stoptime} = $t1; $elapsed = parseInterval(seconds => int($elapsed), Small => 1); $running->{$runid}->{elapsed} = $elapsed; # Give user some feedback $logger->info("Process " . $scheduler->get_id($running->{$runid}) . " (" . $running->{$runid}->{rule}->{id} . ") has terminated [$elapsed]"); $finnished->{$runid} = $running->{$runid}; delete $running->{$runid}; # Don't do nothing more if it was interrupted next if($finnished->{$runid}{interrupted}); # Verify the exit status $scheduler->get_dead_job_info($finnished->{$runid}); if($finnished->{$runid}{exitstatus} && !$finnished->{$runid}{interrupted}) { # Pumm!! Cancelar tudo! $logger->fatal("Process " . $scheduler->get_id($finnished->{$runid}) . " exited with exit status " . $finnished->{$runid}{exitstatus} . "! Aborting all queue..."); process_interrupt(1); # Forced; $finnished->{$runid}{fatal} = 1; # To graphviz later... } # If we got asShell to run, run it! find_and_run_asShell($runid); # If we got asPerl to run, run it! find_and_run_asPerl($runid); } } } =head1 find_and_run_asShell This function goes through the finnished job and tries to find asShell commands to run, doing all the expands necessary =cut sub find_and_run_asShell { my ($runid) = @_; for my $action (@{$finnished->{$runid}->{action}}) { if($action->{asShell} && !(defined $finnished->{__var__}->{$action->{def}})) { $logger->info("Running shell action $action->{asShell}"); $finnished->{__var__}->{$action->{def}} = []; open P, "$action->{asShell} |"; while(
) { chomp; $logger->warn("Return value from the shell action is not a integer") unless /^\d+$/; push @{$finnished->{__var__}->{$action->{def}}}, $_; } close P; # Now expand the queue expand_forks($action->{def}); } } } =head1 find_and_run_asPerl This function goes through the finnished job and tries to find asPerl commands to run, doing all the expands necessary =cut sub find_and_run_asPerl { my ($runid) = @_; for my $action (@{$finnished->{$runid}->{action}}) { if($action->{asPerl} && !(defined $finnished->{__var__}->{$action->{def}})) { $logger->info("Running perl action $action->{asPerl}"); $finnished->{__var__}->{$action->{def}} = []; $finnished->{__var__}{$action->{def}} = paction_list($action->{asPerl}); # Now expand the queue expand_forks($action->{def}); } } } =head1 paction_list this function evaluates a perl action and retruns a list of strings. the action can: .return a ARRAY reference, .print a list of lines to STDOUT (to be splited end chomped) .or return a string (to be splited and chomped) =cut sub paction_list{ my $act=shift; my $var=""; my $final=[]; open(A,'>', \$var); my $old= select A; my $res = eval( "package main; no strict; " . $act ); die $@ if $@; close A; select $old; if (ref($res) eq "ARRAY"){ $final = $res; } elsif($var =~ /\S/) { for(split("\n",$var)){ push (@$final, $_) if /\S/; } } else{ for(split("\n",$res)){ push (@$final, $_) if /\S/; } } $final; } =head1 expand_forks This function is responsible of expanding all the jobs when a variable is evaluated. It expands both forks and joins. =cut sub expand_forks { my ($var) = @_; my $values = $finnished->{__var__}->{$var}; # For all queue items that has a $var, expand my $index = -1; for my $job (@{$queue}) { $index++; next unless $job; if($job->{rule}{vars} && (grep { $_ eq "\$$var" } @{$job->{rule}{vars}} )) { $logger->info("Found a fork on $job->{rule}->{id}. Expanding..."); # Expand, expand, expand $job->{rule}{vars} = [ grep { $_ ne "\$$var" } @{$job->{rule}{vars} }]; delete $job->{rule}{vars} unless scalar @{$job->{rule}{vars}}; delete $queue->[$index]; my $count = 0; my @added_jobs = (); for my $index (@{$values}) { my $newjob = clone($job); $count++; # Actualiazr o id $newjob->{rule}{id} .= $index; # Actualizar a linha a executar for my $act (@{$newjob->{action}}) { if($act->{shell}){ $act->{shell} =~ s/\$$var\b/$index/g; } elsif($act->{perl}){ $act->{perl} =~ s/\$$var\b/$index/g; } } # Expand pipelines for my $dep (@{$newjob->{depend_on}}) { if ($dep->{vars} && (grep { $_ eq "\$$var"} @{$dep->{vars}} )) { # Expand the dependencie $dep->{vars} = [ grep { $_ ne "\$$var" } @{$dep->{vars}} ]; delete $dep->{vars} unless scalar @{$dep->{vars}}; $dep->{id} .= $index; } } push @{$queue}, $newjob; push @added_jobs, $newjob->{rule}{id}; } $logger->info("Expanded.. Created new $count jobs: @added_jobs"); } # Find joiners my $pos = 0; for my $dep (@{$job->{depend_on}}) { if ($dep->{vars} && (grep { $_ eq "\$$var" } @{$dep->{vars}} )) { $dep->{vars} = [ grep { $_ ne "\$$var" } @{$dep->{vars}}]; # Expand the dependencies delete $job->{depend_on}->[$pos]; for my $index (@{$values}) { my @vars = (scalar @{$dep->{vars}})?(vars => $dep->{vars}):(); push @{$job->{depend_on}}, { @vars, id => $dep->{id} . $index }; } } $pos++; } } # Now find constructors like @var for my $job (@{$queue}) { next unless $job; # Search on actions for my $action (@{$job->{action}}) { if($action->{shell} && $action->{shell} =~ /\@$var\b/) { my $string = ''; map { $string .= "$_ " } @{$values}; $action->{shell} =~ s/\@$var\b/$string/g; $logger->info("The job $job->{rule}->{id} has been action expanded with $string"); } elsif($action->{perl} && $action->{perl} =~ /\@$var\b/) { my $string = join(",", map { "q{$_}" } @{$values}); $action->{perl} =~ s/\@$var\b/($string)/g; $logger->info("The job $job->{rule}->{id} has been action expanded with ($string)"); } } # Search on asShell for my $action (@{$job->{action}}) { if($action->{asShell} && $action->{asShell} =~ /\@$var\b/) { my $string = ''; map { $string .= "$_ " } @{$values}; $action->{asShell} =~ s/\@$var\b/$string/g; $logger->info("The job $job->{rule}->{id} has been shell expanded with $string"); } } # Search on asPerl for my $action (@{$job->{action}}) { if($action->{asPerl} && $action->{asPerl} =~ /\@$var\b/) { my $string = 'qw/'; map { $string .= "$_ " } @{$values}; $string .= "/"; $action->{asPerl} =~ s/\@$var\b/$string/g; $logger->info("The job $job->{rule}->{id} has been Perl expanded with $string"); } } } } =head1 report Print a pretty report bla bla bla =cut sub report { $logger->info("Creating HTML report"); open REPORT, ">$filename.html" or die "Can't create $filename.html"; print REPORT "
ID | Start Time | End Time | Elapsed |
$id | $start | $stop | $interval |