#!/usr/bin/perl package Verby::Dispatcher; use Moose; our $VERSION = "0.05"; use Set::Object; use Verby::Context; use Carp qw/croak/; use Tie::RefHash; use POE; require overload; has step_set => ( isa => "Set::Object", is => "ro", default => sub { Set::Object->new }, ); has satisfied_set => ( isa => "Set::Object", is => "ro", default => sub { Set::Object->new }, ); has cxt_of_step => ( isa => "HashRef", is => "ro", default => sub { tie my %cxt_of_step, "Tie::RefHash"; return \%cxt_of_step; }, ); has derivable_cxts => ( isa => "HashRef", is => "ro", default => sub { tie my %derivable_cxts, "Tie::RefHash"; return \%derivable_cxts; }, ); has config_hub => ( isa => "Object", is => "rw", default => sub { require Verby::Config::Data; Verby::Config::Data->new; }, ); has global_context => ( isa => "Object", is => "ro", lazy => 1, default => sub { $_[0]->config_hub->derive("Verby::Context") }, ); has resource_pool => ( isa => "POE::Component::ResourcePool", is => "ro", predicate => "has_resource_pool", ); sub add_step { my $self = shift; my $steps = $self->step_set; foreach my $step (@_) { next if $steps->includes($step); $self->add_step($step->depends); (my $logger = $self->global_context->logger)->debug("adding step $step"); $steps->insert($step); } } sub add_steps { my $self = shift; $self->add_step(@_); } sub get_cxt { my $self = shift; my $step = shift; $self->cxt_of_step->{$step} ||= Verby::Context->new($self->get_derivable_cxts($step)); } sub get_derivable_cxts { my $self = shift; my $step = shift; @{ $self->derivable_cxts->{$step} ||= ( $step->provides_cxt ? [ Verby::Context->new($self->get_parent_cxts($step)) ] : [ $self->get_parent_cxts($step) ] )}; } sub get_parent_cxts { my $self = shift; my $step = shift; if ( my @cxts = map { $self->get_derivable_cxts($_) } $step->depends ) { return @cxts; } else { return $self->global_context; } } sub create_poe_sessions { my ( $self ) = @_; my $g_cxt = $self->global_context; $g_cxt->logger->debug("Creating parent POE session"); POE::Session->create( inline_states => { _start => sub { my ( $kernel, $heap ) = @_[KERNEL, HEAP]; my $self = $heap->{verby_dispatcher}; # FIXME # handle sigint my $g_cxt = $self->global_context; my $all_steps = $self->step_set; my $satisfied = $self->satisfied_set; my $pending = $all_steps->difference( $satisfied ); foreach my $step ( $pending->members ) { $g_cxt->logger->debug("Creating POE session for step $step"); POE::Session->create( inline_states => { _start => sub { my ( $kernel, $session) = @_[KERNEL, SESSION]; $kernel->sig("VERBY_STEP_FINISHED" => "step_finished"); $kernel->refcount_increment( $session->ID, "unresolved_dependencies" ); $kernel->yield("try_executing_step"); }, step_finished => sub { my ( $kernel, $heap, $done ) = @_[KERNEL, HEAP, ARG1]; my $deps = $heap->{dependencies}; if ( $deps->includes($done) ) { $deps->remove( $done ); $kernel->yield("try_executing_step") unless $deps->size; } }, try_executing_step => sub { my ( $kernel, $session, $heap ) = @_[KERNEL, SESSION, HEAP]; return if $heap->{dependencies}->size; # don't run if we're waiting return if $heap->{ran}++; # don't run twice $heap->{g_cxt}->logger->debug("All dependencies of '$step' have finished, starting"); $kernel->sig("VERBY_STEP_FINISHED"); # we're no longer waiting for other steps to finish $kernel->refcount_decrement( $session->ID, "unresolved_dependencies" ); if ( my $pool = $heap->{resource_pool} and my @req = $heap->{step}->resources ) { $heap->{resource_request} = $pool->request( params => { @req }, event => "execute_step", ); } else { $kernel->call( $session, "execute_step" ); } }, execute_step => sub { my ( $kernel, $session, $heap ) = @_[KERNEL, SESSION, HEAP]; # this may create child sessions. If it doesn't this session will go away $heap->{verby_dispatcher}->start_step( $heap->{step}, \@_ ); }, _stop => sub { my ( $kernel, $heap ) = @_[KERNEL, HEAP]; my $step = $heap->{step}; if ( my $request = delete $heap->{resource_request} ) { $request->dismiss; } $heap->{g_cxt}->logger->info("step $step has finished."); $_->() for @{ $heap->{post_hooks} }; return $step; }, DIE => sub { $_[HEAP]{g_cxt}->logger->warn("cought exception: @_") }, _child => sub { $_[HEAP]{g_cxt}->logger->debug("Step $_[HEAP]{step} _child event: $_[ARG0]") }, }, heap => { %{ $heap }, step => $step, dependencies => Set::Object->new( $step->depends )->difference($satisfied), ran => 0, post_hooks => [], }, ); } }, _child => sub { my ( $kernel, $session, $heap, $type, $step ) = @_[KERNEL, SESSION, HEAP, ARG0, ARG2]; if ( $type eq "lose" ) { $heap->{satisfied}->insert($step); $kernel->signal( $session, "VERBY_STEP_FINISHED", $step ); } }, DIE => sub { $_[HEAP]{g_cxt}->logger->warn("cought exception: @_") }, _stop => sub { $_[HEAP]{g_cxt}->logger->debug("parent POE session closing") }, }, heap => { verby_dispatcher => $self, g_cxt => $g_cxt, # convenience satisfied => $self->satisfied_set, ( $self->has_resource_pool ? ( resource_pool => $self->resource_pool ) : () ), } ); } sub do_all { my $self = shift; $self->create_poe_sessions; $self->global_context->logger->debug("Starting POE main loop"); $poe_kernel->run; } sub start_step { my ( $self, $step, $poe ) = @_; my $g_cxt = $self->global_context; my $cxt = $self->get_cxt($step); if ($step->is_satisfied($cxt, $poe)){ $g_cxt->logger->debug("step $step has already been satisfied, running isn't necessary."); return; } $g_cxt->logger->debug("starting step $step"); $step->do($cxt, $poe); } sub _set_members_query { my $self = shift; my $set = shift; return wantarray ? $set->members : $set->size; } sub steps { my $self = shift; $self->_set_members_query($self->step_set); } sub is_satisfied { my $self = shift; my $step = shift; croak "$step is not registered at all" unless $self->step_set->contains($step); $self->satisfied_set->contains($step); } __PACKAGE__ __END__ =pod =head1 NAME Verby::Dispatcher - Takes steps and executes them. Sort of like what make(1) is to a Makefile. =head1 SYNOPSIS use Verby::Dispatcher; use Verby::Config::Data; # or something equiv my $c = Verby::Config::Data->new(); # ... needs the "logger" field set my $d = Verby::Dispatcher->new; $d->config_hub($c); $d->add_steps(@steps); $d->do_all; =head1 DESCRIPTION =head1 ATTRIBUTES =item B If provided with a L instance, that resource pool will be used to handle resource allocation. The L method is used to declare the required resources for each step. =item B Returns the L that is used for internal bookkeeping of the steps involved. =item B Returns the L that is used to track which steps are satisfied. =item B The configuration hub that all contexts inherit from. Defaults to an empty parameter set. =item B The global context objects. Defaults to a derivation of B. =head1 METHODS =over 4 =item B Returns a new L. Duh! =item B =item B Add a number of steps into the dispatcher pool. Anything returned from L is aggregated recursively here, and added into the batch too. =item B Calculate all the dependencies, and then dispatch in order. =back =begin private =over 4 =item B Whether or not $step does not need to be executed (because it was already executed or because it didn't need to be in the first place). =item B Returns the context associated with $step. This is where $step will write it's data. =item B Returns the contexts to derive from, when creating a context for $step. If $step starts a new context (L is true) then a new context is created here, derived from get_parent_cxts($step). Otherwise it simply returns get_parent_cxts($step). Note that when a step 'provides a context' this really means that a new context is created, and this context is derived for the step, and any step that depends on it. =item B If $step depends on any other steps, take their contexts. Otherwise, returns the global context. =item B Starts the =item B Returns a list of steps that the dispatcher cares about. =back =end =head1 BUGS None that we are aware of. Of course, if you find a bug, let us know, and we will be sure to fix it. =head1 CODE COVERAGE We use B to test the code coverage of the tests, please refer to COVERAGE section of the L module for more information. =head1 SEE ALSO =head1 AUTHOR Yuval Kogman, Enothingmuch@woobling.orgE stevan little, Estevan@iinteractive.comE =head1 COPYRIGHT AND LICENSE Copyright 2005-2008 by Infinity Interactive, Inc. L This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself. =cut