#!/usr/bin/perl

package Verby::Dispatcher;
use Moose;

our $VERSION = "0.05";

use Set::Object;
use Verby::Context;
use Carp qw/croak/;
use Tie::RefHash;

use POE;

require overload;

has step_set => (
	isa => "Set::Object",
	is	=> "ro",
	default => sub { Set::Object->new },
);

has satisfied_set => (
	isa => "Set::Object",
	is	=> "ro",
	default => sub { Set::Object->new },
);

has cxt_of_step => (
	isa => "HashRef",
	is	=> "ro",
	default => sub {
		tie my %cxt_of_step, "Tie::RefHash";
		return \%cxt_of_step;
	},
);

has derivable_cxts => (
	isa => "HashRef",
	is	=> "ro",
	default => sub {
		tie my %derivable_cxts, "Tie::RefHash";
		return \%derivable_cxts;
	},
);

has config_hub => (
	isa => "Object",
	is	=> "rw",
	default => sub {
		require Verby::Config::Data;
		Verby::Config::Data->new;
	},
);

has global_context => (
	isa => "Object",
	is	=> "ro",
	lazy	=> 1,
	default => sub { $_[0]->config_hub->derive("Verby::Context") },
);

has resource_pool => (
	isa => "POE::Component::ResourcePool",
	is  => "ro",
	predicate => "has_resource_pool",
);

sub add_step {
	my $self = shift;

	my $steps = $self->step_set;

	foreach my $step (@_) {
		next if $steps->includes($step);

		$self->add_step($step->depends);

		(my $logger = $self->global_context->logger)->debug("adding step $step");
		$steps->insert($step);
	}
}

sub add_steps {
	my $self = shift;
	$self->add_step(@_);
}

sub get_cxt {
	my $self = shift;
	my $step = shift;

	$self->cxt_of_step->{$step} ||= Verby::Context->new($self->get_derivable_cxts($step));
}

sub get_derivable_cxts {
	my $self = shift;
	my $step = shift;

	@{ $self->derivable_cxts->{$step} ||= (
		$step->provides_cxt
			? [ Verby::Context->new($self->get_parent_cxts($step)) ]
			: [ $self->get_parent_cxts($step) ]
	)};
}

sub get_parent_cxts {
	my $self = shift;
	my $step = shift;

	if ( my @cxts = map { $self->get_derivable_cxts($_) } $step->depends ) {
		return @cxts;
	} else {
		return $self->global_context;
	}
}

sub create_poe_sessions {
	my ( $self ) = @_;

	my $g_cxt = $self->global_context;
	$g_cxt->logger->debug("Creating parent POE session");

	POE::Session->create(
		inline_states => {
			_start => sub {
				my ( $kernel, $heap ) = @_[KERNEL, HEAP];
				my $self = $heap->{verby_dispatcher};

				# FIXME
				# handle sigint

				my $g_cxt = $self->global_context;

				my $all_steps = $self->step_set;
				my $satisfied = $self->satisfied_set;

				my $pending = $all_steps->difference( $satisfied );

				foreach my $step ( $pending->members ) {
					$g_cxt->logger->debug("Creating POE session for step $step");

					POE::Session->create(
						inline_states => {
							_start => sub {
								my ( $kernel, $session) = @_[KERNEL, SESSION];

								$kernel->sig("VERBY_STEP_FINISHED" => "step_finished");
								$kernel->refcount_increment( $session->ID, "unresolved_dependencies" );

								$kernel->yield("try_executing_step");
							},
							step_finished => sub {
								my ( $kernel, $heap, $done ) = @_[KERNEL, HEAP, ARG1];

								my $deps = $heap->{dependencies};

								if ( $deps->includes($done) ) {
									$deps->remove( $done );
									$kernel->yield("try_executing_step") unless $deps->size;
								}
							},
							try_executing_step => sub {
								my ( $kernel, $session, $heap ) = @_[KERNEL, SESSION, HEAP];

								return if $heap->{dependencies}->size; # don't run if we're waiting
								return if $heap->{ran}++; # don't run twice

								$heap->{g_cxt}->logger->debug("All dependencies of '$step' have finished, starting");

								$kernel->sig("VERBY_STEP_FINISHED"); # we're no longer waiting for other steps to finish
								$kernel->refcount_decrement( $session->ID, "unresolved_dependencies" );

								if ( my $pool = $heap->{resource_pool} and my @req = $heap->{step}->resources ) {
									$heap->{resource_request} = $pool->request(
										params => { @req },
										event  => "execute_step",
									);
								} else {
									$kernel->call( $session, "execute_step" );
								}
							},
							execute_step => sub {
								my ( $kernel, $session, $heap ) = @_[KERNEL, SESSION, HEAP];

								# this may create child sessions. If it doesn't this session will go away
								$heap->{verby_dispatcher}->start_step( $heap->{step}, \@_ );
							},
							_stop => sub {
								my ( $kernel, $heap ) = @_[KERNEL, HEAP];
								my $step = $heap->{step};

								if ( my $request = delete $heap->{resource_request} ) {
									$request->dismiss;
								}

								$heap->{g_cxt}->logger->info("step $step has finished.");

								$_->() for @{ $heap->{post_hooks} };

								return $step;
							},
							DIE     => sub { $_[HEAP]{g_cxt}->logger->warn("cought exception: @_") },
							_child  => sub { $_[HEAP]{g_cxt}->logger->debug("Step $_[HEAP]{step} _child event: $_[ARG0]") },
						},
						heap => {
							%{ $heap },
							step         => $step,
							dependencies => Set::Object->new( $step->depends )->difference($satisfied),
							ran          => 0,
							post_hooks   => [],
						},
					);
				}
			},
			_child => sub {
				my ( $kernel, $session, $heap, $type, $step ) = @_[KERNEL, SESSION, HEAP, ARG0, ARG2];

				if ( $type eq "lose" ) {
					$heap->{satisfied}->insert($step);
					$kernel->signal( $session, "VERBY_STEP_FINISHED", $step );
				}
			},
			DIE   => sub { $_[HEAP]{g_cxt}->logger->warn("cought exception: @_") },
			_stop => sub { $_[HEAP]{g_cxt}->logger->debug("parent POE session closing") },
		},
		heap => {
			verby_dispatcher => $self,
			g_cxt            => $g_cxt, # convenience
			satisfied        => $self->satisfied_set,
			( $self->has_resource_pool ? ( resource_pool => $self->resource_pool ) : () ),
		}
	);
}

sub do_all {
	my $self = shift;
	$self->create_poe_sessions;
	$self->global_context->logger->debug("Starting POE main loop");
	$poe_kernel->run;
}

sub start_step {
	my ( $self, $step, $poe ) = @_;

	my $g_cxt = $self->global_context;
	my $cxt = $self->get_cxt($step);

	if ($step->is_satisfied($cxt, $poe)){
		$g_cxt->logger->debug("step $step has already been satisfied, running isn't necessary.");
		return;
	}

	$g_cxt->logger->debug("starting step $step");
	$step->do($cxt, $poe);
}

sub _set_members_query {
	my $self = shift;
	my $set = shift;
	return wantarray ? $set->members : $set->size;
}

sub steps {
	my $self = shift;
	$self->_set_members_query($self->step_set);
}

sub is_satisfied {
	my $self = shift;
	my $step = shift;

	croak "$step is not registered at all"
		unless $self->step_set->contains($step);

	$self->satisfied_set->contains($step);
}

__PACKAGE__

__END__

=pod

=head1 NAME

Verby::Dispatcher - Takes steps and executes them. Sort of like what make(1) is to a
Makefile.

=head1 SYNOPSIS

	use Verby::Dispatcher;
	use Verby::Config::Data; # or something equiv

	my $c = Verby::Config::Data->new(); # ... needs the "logger" field set

	my $d = Verby::Dispatcher->new;
	$d->config_hub($c);

	$d->add_steps(@steps);

	$d->do_all;

=head1 DESCRIPTION

=head1 ATTRIBUTES

=item B<resource_pool>

If provided with a L<POE::Component::ResourcePool> instance, that resource pool
will be used to handle resource allocation.

The L<Verby::Step/resources> method is used to declare the required resources
for each step.

=item B<step_set>

Returns the L<Set::Object> that is used for internal bookkeeping of the steps
involved.

=item B<satisfied_set>

Returns the L<Set::Object> that is used to track which steps are satisfied.

=item B<config_hub>

The configuration hub that all contexts inherit from.

Defaults to an empty parameter set.

=item B<global_context>

The global context objects.

Defaults to a derivation of B<config_hub>.

=head1 METHODS

=over 4

=item B<new>

Returns a new L<Verby::Dispatcher>. Duh!

=item B<add_steps *@steps>

=item B<add_step *@steps>

Add a number of steps into the dispatcher pool.

Anything returned from L<Verby::Step/depends> is aggregated recursively here, and
added into the batch too.

=item B<do_all>

Calculate all the dependencies, and then dispatch in order.

=back

=begin private

=over 4

=item B<is_satisfied $step>

Whether or not $step does not need to be executed (because it was already
executed or because it didn't need to be in the first place).

=item B<get_cxt $step>

Returns the context associated with $step. This is where $step will write it's
data.

=item B<get_derivable_cxts $step>

Returns the contexts to derive from, when creating a context for $step.

If $step starts a new context (L<Step/provides_cxt> is true) then a new context
is created here, derived from get_parent_cxts($step). Otherwise it simply
returns get_parent_cxts($step).

Note that when a step 'provides a context' this really means that a new context
is created, and this context is derived for the step, and any step that depends
on it.

=item B<get_parent_cxts $step>

If $step depends on any other steps, take their contexts. Otherwise, returns
the global context.

=item B<start_step $step>

Starts the 

=item B<steps>

Returns a list of steps that the dispatcher cares about.

=back

=end

=head1 BUGS

None that we are aware of. Of course, if you find a bug, let us know, and we
will be sure to fix it.

=head1 CODE COVERAGE

We use B<Devel::Cover> to test the code coverage of the tests, please refer to
COVERAGE section of the L<Verby> module for more information.

=head1 SEE ALSO

=head1 AUTHOR

Yuval Kogman, E<lt>nothingmuch@woobling.orgE<gt>
stevan little, E<lt>stevan@iinteractive.comE<gt>

=head1 COPYRIGHT AND LICENSE

Copyright 2005-2008 by Infinity Interactive, Inc.

L<http://www.iinteractive.com>

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.

=cut