# You may distribute under the terms of either the GNU General Public License
# or the Artistic License (the same terms as Perl itself)
#
# (C) Paul Evans, 2011-2022 -- leonerd@leonerd.org.uk
package Future::PP;
use v5.10;
use strict;
use warnings;
no warnings 'recursion'; # Disable the "deep recursion" warning
our $VERSION = '0.49';
our @ISA = qw( Future::_base );
use Carp qw(); # don't import croak
use Scalar::Util qw( weaken blessed reftype );
use Time::HiRes qw( gettimeofday );
our @CARP_NOT = qw( Future Future::Utils );
use constant DEBUG => !!$ENV{PERL_FUTURE_DEBUG};
use constant STRICT => !!$ENV{PERL_FUTURE_STRICT};
# Callback flags
use constant {
CB_DONE => 1<<0, # Execute callback on done
CB_FAIL => 1<<1, # Execute callback on fail
CB_CANCEL => 1<<2, # Execute callback on cancellation
CB_SELF => 1<<3, # Pass $self as first argument
CB_RESULT => 1<<4, # Pass result/failure as a list
CB_SEQ_ONDONE => 1<<5, # Sequencing on success (->then)
CB_SEQ_ONFAIL => 1<<6, # Sequencing on failure (->else)
CB_SEQ_IMDONE => 1<<7, # $code is in fact immediate ->done result
CB_SEQ_IMFAIL => 1<<8, # $code is in fact immediate ->fail result
CB_SEQ_STRICT => 1<<9, # Complain if $code didn't return a Future
};
use constant CB_ALWAYS => CB_DONE|CB_FAIL|CB_CANCEL;
sub _shortmess
{
my $at = Carp::shortmess( $_[0] );
chomp $at; $at =~ s/\.$//;
return $at;
}
sub _callable
{
my ( $cb ) = @_;
defined $cb and ( reftype($cb) eq 'CODE' || overload::Method($cb, '&{}') );
}
sub new
{
my $proto = shift;
return bless {
ready => 0,
callbacks => [], # [] = [$type, ...]
( DEBUG ?
( do { my $at = Carp::shortmess( "constructed" );
chomp $at; $at =~ s/\.$//;
constructed_at => $at } )
: () ),
( $Future::TIMES ?
( btime => [ gettimeofday ] )
: () ),
}, ( ref $proto || $proto );
}
sub __selfstr
{
my $self = shift;
return "$self" unless defined $self->{label};
return "$self (\"$self->{label}\")";
}
my $GLOBAL_END;
END { $GLOBAL_END = 1; }
sub DESTROY_debug {
my $self = shift;
return if $GLOBAL_END;
return if $self->{ready} and ( $self->{reported} or !$self->{failure} );
my $lost_at = join " line ", (caller)[1,2];
# We can't actually know the real line where the last reference was lost;
# a variable set to 'undef' or close of scope, because caller can't see it;
# the current op has already been updated. The best we can do is indicate
# 'near'.
if( $self->{ready} and $self->{failure} ) {
warn "${\$self->__selfstr} was $self->{constructed_at} and was lost near $lost_at with an unreported failure of: " .
$self->{failure}[0] . "\n";
}
elsif( !$self->{ready} ) {
warn "${\$self->__selfstr} was $self->{constructed_at} and was lost near $lost_at before it was ready.\n";
}
}
*DESTROY = \&DESTROY_debug if DEBUG;
sub is_ready
{
my $self = shift;
return $self->{ready};
}
sub is_done
{
my $self = shift;
return $self->{ready} && !$self->{failure} && !$self->{cancelled};
}
sub is_failed
{
my $self = shift;
return $self->{ready} && !!$self->{failure}; # boolify
}
sub is_cancelled
{
my $self = shift;
return $self->{cancelled};
}
sub state
{
my $self = shift;
return !$self->{ready} ? "pending" :
DEBUG ? $self->{ready_at} :
$self->{failure} ? "failed" :
$self->{cancelled} ? "cancelled" :
"done";
}
sub _mark_ready
{
my $self = shift;
$self->{ready} = 1;
$self->{ready_at} = _shortmess $_[0] if DEBUG;
if( $Future::TIMES ) {
$self->{rtime} = [ gettimeofday ];
}
delete $self->{on_cancel};
$_->[0] and $_->[0]->_revoke_on_cancel( $_->[1] ) for @{ $self->{revoke_when_ready} };
delete $self->{revoke_when_ready};
my $callbacks = delete $self->{callbacks} or return;
my $cancelled = $self->{cancelled};
my $fail = defined $self->{failure};
my $done = !$fail && !$cancelled;
my @result = $done ? @{ $self->{result} } :
$fail ? @{ $self->{failure} } :
();
foreach my $cb ( @$callbacks ) {
my ( $flags, $code ) = @$cb;
my $is_future = blessed( $code ) && $code->isa( "Future" );
next if $done and not( $flags & CB_DONE );
next if $fail and not( $flags & CB_FAIL );
next if $cancelled and not( $flags & CB_CANCEL );
$self->{reported} = 1 if $fail;
if( $is_future ) {
$done ? $code->done( @result ) :
$fail ? $code->fail( @result ) :
$code->cancel;
}
elsif( $flags & (CB_SEQ_ONDONE|CB_SEQ_ONFAIL) ) {
my ( undef, undef, $fseq ) = @$cb;
if( !$fseq ) { # weaken()ed; it might be gone now
# This warning should always be printed, even not in DEBUG mode.
# It's always an indication of a bug
Carp::carp +(DEBUG ? "${\$self->__selfstr} ($self->{constructed_at})"
: "${\$self->__selfstr} $self" ) .
" lost a sequence Future";
next;
}
my $f2;
if( $done and $flags & CB_SEQ_ONDONE or
$fail and $flags & CB_SEQ_ONFAIL ) {
if( $flags & CB_SEQ_IMDONE ) {
$fseq->done( @$code );
next;
}
elsif( $flags & CB_SEQ_IMFAIL ) {
$fseq->fail( @$code );
next;
}
my @args = (
( $flags & CB_SELF ? $self : () ),
( $flags & CB_RESULT ? @result : () ),
);
unless( eval { $f2 = $code->( @args ); 1 } ) {
$fseq->fail( $@ );
next;
}
unless( blessed $f2 and $f2->isa( "Future" ) ) {
# Upgrade a non-Future result, or complain in strict mode
if( $flags & CB_SEQ_STRICT ) {
$fseq->fail( "Expected " . Future::CvNAME_FILE_LINE($code) . " to return a Future" );
next;
}
$f2 = Future->done( $f2 );
}
$fseq->on_cancel( $f2 );
}
else {
$f2 = $self;
}
if( $f2->is_ready ) {
$f2->on_ready( $fseq ) if !$f2->{cancelled};
}
else {
push @{ $f2->{callbacks} }, [ CB_DONE|CB_FAIL, $fseq ];
weaken( $f2->{callbacks}[-1][1] );
}
}
else {
$code->(
( $flags & CB_SELF ? $self : () ),
( $flags & CB_RESULT ? @result : () ),
);
}
}
}
sub done
{
my $self = shift;
if( ref $self ) {
$self->{cancelled} and return $self;
$self->{ready} and Carp::croak "${\$self->__selfstr} is already ".$self->state." and cannot be ->done";
$self->{subs} and Carp::croak "${\$self->__selfstr} is not a leaf Future, cannot be ->done";
$self->{result} = [ @_ ];
$self->_mark_ready( "done" );
}
else {
$self = $self->new;
$self->{ready} = 1;
$self->{ready_at} = _shortmess "done" if DEBUG;
$self->{result} = [ @_ ];
if( $Future::TIMES ) {
$self->{rtime} = [ gettimeofday ];
}
}
return $self;
}
sub fail
{
my $self = shift;
my ( $exception, @more ) = @_;
if( ref $exception eq "Future::Exception" ) {
@more = ( $exception->category, $exception->details );
$exception = $exception->message;
}
$exception or Carp::croak "$self ->fail requires an exception that is true";
if( ref $self ) {
$self->{cancelled} and return $self;
$self->{ready} and Carp::croak "${\$self->__selfstr} is already ".$self->state." and cannot be ->fail'ed";
$self->{subs} and Carp::croak "${\$self->__selfstr} is not a leaf Future, cannot be ->fail'ed";
$self->{failure} = [ $exception, @more ];
$self->_mark_ready( "failed" );
}
else {
$self = $self->new;
$self->{ready} = 1;
$self->{ready_at} = _shortmess "failed" if DEBUG;
$self->{failure} = [ $exception, @more ];
if( $Future::TIMES ) {
$self->{rtime} = [ gettimeofday ];
}
}
return $self;
}
sub on_cancel
{
my $self = shift;
my ( $code ) = @_;
my $is_future = blessed( $code ) && $code->isa( "Future" );
$is_future or _callable( $code ) or
Carp::croak "Expected \$code to be callable or a Future in ->on_cancel";
$self->{ready} and return $self;
push @{ $self->{on_cancel} }, $code;
if( $is_future ) {
push @{ $code->{revoke_when_ready} }, my $r = [ $self, \$self->{on_cancel}[-1] ];
weaken( $r->[0] );
weaken( $r->[1] );
}
return $self;
}
# An optimised version for Awaitable role
sub AWAIT_ON_CANCEL
{
my $self = shift;
my ( $code ) = @_;
push @{ $self->{on_cancel} }, $code;
}
sub AWAIT_CHAIN_CANCEL
{
my $self = shift;
my ( $f2 ) = @_;
push @{ $self->{on_cancel} }, $f2;
push @{ $f2->{revoke_when_ready} }, my $r = [ $self, \$self->{on_cancel}[-1] ];
weaken( $r->[0] );
weaken( $r->[1] );
}
sub _revoke_on_cancel
{
my $self = shift;
my ( $ref ) = @_;
undef $$ref;
$self->{empty_on_cancel_slots}++;
my $on_cancel = $self->{on_cancel} or return;
# If the list is nontrivally large and over half-empty / under half-full, compact it
if( @$on_cancel >= 8 and $self->{empty_on_cancel_slots} >= 0.5 * @$on_cancel ) {
# We can't grep { defined } because that will break all the existing SCALAR refs
my $idx = 0;
while( $idx < @$on_cancel ) {
defined $on_cancel->[$idx] and $idx++, next;
splice @$on_cancel, $idx, 1, ();
}
$self->{empty_on_cancel_slots} = 0;
}
}
sub on_ready
{
my $self = shift;
my ( $code ) = @_;
my $is_future = blessed( $code ) && $code->isa( "Future" );
$is_future or _callable( $code ) or
Carp::croak "Expected \$code to be callable or a Future in ->on_ready";
if( $self->{ready} ) {
my $fail = defined $self->{failure};
my $done = !$fail && !$self->{cancelled};
$self->{reported} = 1 if $fail;
$is_future ? ( $done ? $code->done( @{ $self->{result} } ) :
$fail ? $code->fail( @{ $self->{failure} } ) :
$code->cancel )
: $code->( $self );
}
else {
push @{ $self->{callbacks} }, [ CB_ALWAYS|CB_SELF, $self->wrap_cb( on_ready => $code ) ];
}
return $self;
}
# An optimised version for Awaitable role
sub AWAIT_ON_READY
{
my $self = shift;
my ( $code ) = @_;
push @{ $self->{callbacks} }, [ CB_ALWAYS|CB_SELF, $self->wrap_cb( on_ready => $code ) ];
}
sub result
{
my $self = shift;
$self->{ready} or
Carp::croak( "${\$self->__selfstr} is not yet ready" );
if( my $failure = $self->{failure} ) {
$self->{reported} = 1;
my $exception = $failure->[0];
$exception = Future::Exception->new( @$failure ) if @$failure > 1;
!ref $exception && $exception =~ m/\n$/ ? CORE::die $exception : Carp::croak $exception;
}
$self->{cancelled} and Carp::croak "${\$self->__selfstr} was cancelled";
return $self->{result}->[0] unless wantarray;
return @{ $self->{result} };
}
sub get
{
my $self = shift;
$self->await unless $self->{ready};
return $self->result;
}
sub await
{
my $self = shift;
return $self if $self->{ready};
Carp::croak "$self is not yet complete and does not provide ->await";
}
sub on_done
{
my $self = shift;
my ( $code ) = @_;
my $is_future = blessed( $code ) && $code->isa( "Future" );
$is_future or _callable( $code ) or
Carp::croak "Expected \$code to be callable or a Future in ->on_done";
if( $self->{ready} ) {
return $self if $self->{failure} or $self->{cancelled};
$is_future ? $code->done( @{ $self->{result} } )
: $code->( @{ $self->{result} } );
}
else {
push @{ $self->{callbacks} }, [ CB_DONE|CB_RESULT, $self->wrap_cb( on_done => $code ) ];
}
return $self;
}
sub failure
{
my $self = shift;
$self->await unless $self->{ready};
return unless $self->{failure};
$self->{reported} = 1;
return $self->{failure}->[0] if !wantarray;
return @{ $self->{failure} };
}
sub on_fail
{
my $self = shift;
my ( $code ) = @_;
my $is_future = blessed( $code ) && $code->isa( "Future" );
$is_future or _callable( $code ) or
Carp::croak "Expected \$code to be callable or a Future in ->on_fail";
if( $self->{ready} ) {
return $self if not $self->{failure};
$self->{reported} = 1;
$is_future ? $code->fail( @{ $self->{failure} } )
: $code->( @{ $self->{failure} } );
}
else {
push @{ $self->{callbacks} }, [ CB_FAIL|CB_RESULT, $self->wrap_cb( on_fail => $code ) ];
}
return $self;
}
sub cancel
{
my $self = shift;
return $self if $self->{ready};
$self->{cancelled}++;
my $on_cancel = delete $self->{on_cancel};
foreach my $code ( $on_cancel ? reverse @$on_cancel : () ) {
defined $code or next;
my $is_future = blessed( $code ) && $code->isa( "Future" );
$is_future ? $code->cancel
: $code->( $self );
}
$self->_mark_ready( "cancel" );
return $self;
}
my $make_donecatchfail_sub = sub {
my ( $with_f, $done_code, $fail_code, @catch_list ) = @_;
my $func = (caller 1)[3];
$func =~ s/^.*:://;
!$done_code or _callable( $done_code ) or
Carp::croak "Expected \$done_code to be callable in ->$func";
!$fail_code or _callable( $fail_code ) or
Carp::croak "Expected \$fail_code to be callable in ->$func";
my %catch_handlers = @catch_list;
_callable( $catch_handlers{$_} ) or
Carp::croak "Expected catch handler for '$_' to be callable in ->$func"
for keys %catch_handlers;
sub {
my $self = shift;
my @maybe_self = $with_f ? ( $self ) : ();
if( !$self->{failure} ) {
return $self unless $done_code;
return $done_code->( @maybe_self, @{ $self->{result} } );
}
else {
my $name = $self->{failure}[1];
if( defined $name and $catch_handlers{$name} ) {
return $catch_handlers{$name}->( @maybe_self, @{ $self->{failure} } );
}
return $self unless $fail_code;
return $fail_code->( @maybe_self, @{ $self->{failure} } );
}
};
};
sub _sequence
{
my $f1 = shift;
my ( $code, $flags ) = @_;
$flags |= CB_SEQ_STRICT if STRICT;
# For later, we might want to know where we were called from
my $level = 1;
$level++ while (caller $level)[0] eq "Future::_base";
my $func = (caller $level)[3];
$func =~ s/^.*:://;
$flags & (CB_SEQ_IMDONE|CB_SEQ_IMFAIL) or _callable( $code ) or
Carp::croak "Expected \$code to be callable in ->$func";
if( !defined wantarray ) {
Carp::carp "Calling ->$func in void context";
}
if( $f1->is_ready ) {
# Take a shortcut
return $f1 if $f1->is_done and not( $flags & CB_SEQ_ONDONE ) or
$f1->{failure} and not( $flags & CB_SEQ_ONFAIL );
if( $flags & CB_SEQ_IMDONE ) {
return Future->done( @$code );
}
elsif( $flags & CB_SEQ_IMFAIL ) {
return Future->fail( @$code );
}
my @args = (
( $flags & CB_SELF ? $f1 : () ),
( $flags & CB_RESULT ? $f1->is_done ? @{ $f1->{result} } :
$f1->{failure} ? @{ $f1->{failure} } :
() : () ),
);
my $fseq;
unless( eval { $fseq = $code->( @args ); 1 } ) {
return Future->fail( $@ );
}
unless( blessed $fseq and $fseq->isa( "Future" ) ) {
# Upgrade a non-Future result, or complain in strict mode
$flags & CB_SEQ_STRICT and
return Future->fail( "Expected " . Future::CvNAME_FILE_LINE($code) . " to return a Future" );
$fseq = $f1->new->done( $fseq );
}
return $fseq;
}
my $fseq = $f1->new;
$fseq->on_cancel( $f1 );
# TODO: if anyone cares about the op name, we might have to synthesize it
# from $flags
$code = $f1->wrap_cb( sequence => $code ) unless $flags & (CB_SEQ_IMDONE|CB_SEQ_IMFAIL);
push @{ $f1->{callbacks} }, [ CB_DONE|CB_FAIL|$flags, $code, $fseq ];
weaken( $f1->{callbacks}[-1][2] );
return $fseq;
}
sub then
{
my $self = shift;
my $done_code = shift;
my $fail_code = ( @_ % 2 ) ? pop : undef;
my @catch_list = @_;
if( $done_code and !@catch_list and !$fail_code ) {
return $self->_sequence( $done_code, CB_SEQ_ONDONE|CB_RESULT );
}
# Complex
return $self->_sequence( $make_donecatchfail_sub->(
0, $done_code, $fail_code, @catch_list,
), CB_SEQ_ONDONE|CB_SEQ_ONFAIL|CB_SELF );
}
sub then_done
{
my $self = shift;
my ( @result ) = @_;
return $self->_sequence( \@result, CB_SEQ_ONDONE|CB_SEQ_IMDONE );
}
sub then_fail
{
my $self = shift;
my ( @failure ) = @_;
return $self->_sequence( \@failure, CB_SEQ_ONDONE|CB_SEQ_IMFAIL );
}
sub else
{
my $self = shift;
my ( $fail_code ) = @_;
return $self->_sequence( $fail_code, CB_SEQ_ONFAIL|CB_RESULT );
}
sub else_done
{
my $self = shift;
my ( @result ) = @_;
return $self->_sequence( \@result, CB_SEQ_ONFAIL|CB_SEQ_IMDONE );
}
sub else_fail
{
my $self = shift;
my ( @failure ) = @_;
return $self->_sequence( \@failure, CB_SEQ_ONFAIL|CB_SEQ_IMFAIL );
}
sub catch
{
my $self = shift;
my $fail_code = ( @_ % 2 ) ? pop : undef;
my @catch_list = @_;
return $self->_sequence( $make_donecatchfail_sub->(
0, undef, $fail_code, @catch_list,
), CB_SEQ_ONDONE|CB_SEQ_ONFAIL|CB_SELF );
}
sub then_with_f
{
my $self = shift;
my $done_code = shift;
my $fail_code = ( @_ % 2 ) ? pop : undef;
my @catch_list = @_;
if( $done_code and !@catch_list and !$fail_code ) {
return $self->_sequence( $done_code, CB_SEQ_ONDONE|CB_SELF|CB_RESULT );
}
return $self->_sequence( $make_donecatchfail_sub->(
1, $done_code, $fail_code, @catch_list,
), CB_SEQ_ONDONE|CB_SEQ_ONFAIL|CB_SELF );
}
sub else_with_f
{
my $self = shift;
my ( $fail_code ) = @_;
return $self->_sequence( $fail_code, CB_SEQ_ONFAIL|CB_SELF|CB_RESULT );
}
sub catch_with_f
{
my $self = shift;
my $fail_code = ( @_ % 2 ) ? pop : undef;
my @catch_list = @_;
return $self->_sequence( $make_donecatchfail_sub->(
1, undef, $fail_code, @catch_list,
), CB_SEQ_ONDONE|CB_SEQ_ONFAIL|CB_SELF );
}
sub followed_by
{
my $self = shift;
my ( $code ) = @_;
return $self->_sequence( $code, CB_SEQ_ONDONE|CB_SEQ_ONFAIL|CB_SELF );
}
sub without_cancel
{
my $self = shift;
my $new = $self->new;
$self->on_ready( sub {
my $self = shift;
if( $self->{cancelled} ) {
$new->cancel;
}
elsif( $self->{failure} ) {
$new->fail( @{ $self->{failure} } );
}
else {
$new->done( @{ $self->{result} } );
}
});
$new->{orig} = $self; # just to strongref it - RT122920
$new->on_ready( sub { undef $_[0]->{orig} } );
return $new;
}
sub _new_convergent
{
shift; # ignore this class
my ( $subs ) = @_;
foreach my $sub ( @$subs ) {
blessed $sub and $sub->isa( "Future" ) or Carp::croak "Expected a Future, got $sub";
}
# Find the best prototype. Ideally anything derived if we can find one.
my $self;
ref($_) eq "Future" or $self = $_->new, last for @$subs;
# No derived ones; just have to be a basic class then
$self ||= Future->new;
$self->{subs} = $subs;
# This might be called by a DESTROY during global destruction so it should
# be as defensive as possible (see RT88967)
$self->on_cancel( sub {
foreach my $sub ( @$subs ) {
$sub->cancel if $sub and !$sub->{ready};
}
} );
return $self;
}
sub wait_all
{
my $class = shift;
my @subs = @_;
unless( @subs ) {
my $self = $class->done;
$self->{subs} = [];
return $self;
}
my $self = Future->_new_convergent( \@subs );
my $pending = 0;
$_->{ready} or $pending++ for @subs;
# Look for immediate ready
if( !$pending ) {
$self->{result} = [ @subs ];
$self->_mark_ready( "wait_all" );
return $self;
}
weaken( my $weakself = $self );
my $sub_on_ready = sub {
return unless my $self = $weakself;
$pending--;
$pending and return;
$self->{result} = [ @subs ];
$self->_mark_ready( "wait_all" );
};
foreach my $sub ( @subs ) {
$sub->{ready} or $sub->on_ready( $sub_on_ready );
}
return $self;
}
sub wait_any
{
my $class = shift;
my @subs = @_;
unless( @subs ) {
my $self = $class->fail( "Cannot ->wait_any with no subfutures" );
$self->{subs} = [];
return $self;
}
my $self = Future->_new_convergent( \@subs );
# Look for immediate ready
my $immediate_ready;
foreach my $sub ( @subs ) {
$sub->{ready} and !$sub->{cancelled} and $immediate_ready = $sub, last;
}
if( $immediate_ready ) {
foreach my $sub ( @subs ) {
$sub->{ready} or $sub->cancel;
}
if( $immediate_ready->{failure} ) {
$self->{failure} = [ @{ $immediate_ready->{failure} } ];
}
else {
$self->{result} = [ @{ $immediate_ready->{result} } ];
}
$self->_mark_ready( "wait_any" );
return $self;
}
my $pending = 0;
weaken( my $weakself = $self );
my $sub_on_ready = sub {
return unless my $self = $weakself;
return if $self->{result} or $self->{failure}; # don't recurse on child ->cancel
return if --$pending and $_[0]->{cancelled};
if( $_[0]->{cancelled} ) {
$self->{failure} = [ "All component futures were cancelled" ];
}
elsif( $_[0]->{failure} ) {
$self->{failure} = [ @{ $_[0]->{failure} } ];
}
else {
$self->{result} = [ @{ $_[0]->{result} } ];
}
foreach my $sub ( @subs ) {
$sub->{ready} or $sub->cancel;
}
$self->_mark_ready( "wait_any" );
};
foreach my $sub ( @subs ) {
# No need to test $sub->{ready} since we know none of them are
next if $sub->{cancelled};
$sub->on_ready( $sub_on_ready );
$pending++;
}
return $self;
}
sub needs_all
{
my $class = shift;
my @subs = @_;
unless( @subs ) {
my $self = $class->done;
$self->{subs} = [];
return $self;
}
my $self = Future->_new_convergent( \@subs );
# Look for immediate fail
my $immediate_failure;
foreach my $sub ( @subs ) {
$sub->{cancelled} and $immediate_failure = [ "A component future was cancelled" ], last;
$sub->{ready} and $sub->{failure} and $immediate_failure = $sub->{failure}, last;
}
if( $immediate_failure ) {
foreach my $sub ( @subs ) {
$sub->{ready} or $sub->cancel;
}
$self->{failure} = [ @$immediate_failure ];
$self->_mark_ready( "needs_all" );
return $self;
}
my $pending = 0;
$_->{ready} or $pending++ for @subs;
# Look for immediate done
if( !$pending ) {
$self->{result} = [ map { @{ $_->{result} } } @subs ];
$self->_mark_ready( "needs_all" );
return $self;
}
weaken( my $weakself = $self );
my $sub_on_ready = sub {
return unless my $self = $weakself;
return if $self->{result} or $self->{failure}; # don't recurse on child ->cancel
if( $_[0]->{cancelled} ) {
$self->{failure} = [ "A component future was cancelled" ];
foreach my $sub ( @subs ) {
$sub->cancel if !$sub->{ready};
}
$self->_mark_ready( "needs_all" );
}
elsif( $_[0]->{failure} ) {
$self->{failure} = [ @{ $_[0]->{failure} } ];
foreach my $sub ( @subs ) {
$sub->cancel if !$sub->{ready};
}
$self->_mark_ready( "needs_all" );
}
else {
$pending--;
$pending and return;
$self->{result} = [ map { @{ $_->{result} } } @subs ];
$self->_mark_ready( "needs_all" );
}
};
foreach my $sub ( @subs ) {
$sub->{ready} or $sub->on_ready( $sub_on_ready );
}
return $self;
}
sub needs_any
{
my $class = shift;
my @subs = @_;
unless( @subs ) {
my $self = $class->fail( "Cannot ->needs_any with no subfutures" );
$self->{subs} = [];
return $self;
}
my $self = Future->_new_convergent( \@subs );
# Look for immediate done
my $immediate_done;
my $pending = 0;
foreach my $sub ( @subs ) {
$sub->{ready} and !$sub->{failure} and !$sub->{cancelled} and $immediate_done = $sub, last;
$sub->{ready} or $pending++;
}
if( $immediate_done ) {
foreach my $sub ( @subs ) {
$sub->{ready} ? $sub->{reported} = 1 : $sub->cancel;
}
$self->{result} = [ @{ $immediate_done->{result} } ];
$self->_mark_ready( "needs_any" );
return $self;
}
# Look for immediate fail
my $immediate_fail = 1;
foreach my $sub ( @subs ) {
$sub->{ready} or $immediate_fail = 0, last;
}
if( $immediate_fail ) {
$_->{reported} = 1 for @subs;
# For consistency we'll pick the last one for the failure
$self->{failure} = [ $subs[-1]->{failure} ];
$self->_mark_ready( "needs_any" );
return $self;
}
weaken( my $weakself = $self );
my $sub_on_ready = sub {
return unless my $self = $weakself;
return if $self->{result} or $self->{failure}; # don't recurse on child ->cancel
return if --$pending and $_[0]->{cancelled};
if( $_[0]->{cancelled} ) {
$self->{failure} = [ "All component futures were cancelled" ];
$self->_mark_ready( "needs_any" );
}
elsif( $_[0]->{failure} ) {
$pending and return;
$self->{failure} = [ @{ $_[0]->{failure} } ];
$self->_mark_ready( "needs_any" );
}
else {
$self->{result} = [ @{ $_[0]->{result} } ];
foreach my $sub ( @subs ) {
$sub->cancel if !$sub->{ready};
}
$self->_mark_ready( "needs_any" );
}
};
foreach my $sub ( @subs ) {
$sub->{ready} or $sub->on_ready( $sub_on_ready );
}
return $self;
}
sub pending_futures
{
my $self = shift;
$self->{subs} or Carp::croak "Cannot call ->pending_futures on a non-convergent Future";
return grep { not $_->{ready} } @{ $self->{subs} };
}
sub ready_futures
{
my $self = shift;
$self->{subs} or Carp::croak "Cannot call ->ready_futures on a non-convergent Future";
return grep { $_->{ready} } @{ $self->{subs} };
}
sub done_futures
{
my $self = shift;
$self->{subs} or Carp::croak "Cannot call ->done_futures on a non-convergent Future";
return grep { $_->{ready} and not $_->{failure} and not $_->{cancelled} } @{ $self->{subs} };
}
sub failed_futures
{
my $self = shift;
$self->{subs} or Carp::croak "Cannot call ->failed_futures on a non-convergent Future";
return grep { $_->{ready} and $_->{failure} } @{ $self->{subs} };
}
sub cancelled_futures
{
my $self = shift;
$self->{subs} or Carp::croak "Cannot call ->cancelled_futures on a non-convergent Future";
return grep { $_->{ready} and $_->{cancelled} } @{ $self->{subs} };
}
sub btime
{
my $self = shift;
return $self->{btime};
}
sub rtime
{
my $self = shift;
return $self->{rtime};
}
sub set_label
{
my $self = shift;
( $self->{label} ) = @_;
return $self;
}
sub label
{
my $self = shift;
return $self->{label};
}
sub set_udata
{
my $self = shift;
my ( $name, $value ) = @_;
$self->{"u_$name"} = $value;
return $self;
}
sub udata
{
my $self = shift;
my ( $name ) = @_;
return $self->{"u_$name"};
}
0x55AA;