# $Id: Signal.pm,v 1.24 2010/03/25 12:52:36 dk Exp $ package IO::Lambda::Signal; use vars qw(@ISA %SIGDATA); @ISA = qw(Exporter); @EXPORT_OK = qw(signal pid spawn new_signal new_pid new_process); %EXPORT_TAGS = ( all => \@EXPORT_OK); our $DEBUG = $IO::Lambda::DEBUG{signal} || 0; use strict; use Carp; use IO::Handle; use POSIX ":sys_wait_h"; use IO::Lambda qw(:all :dev); my $MASTER = bless {}, __PACKAGE__; # register yield handler IO::Lambda::add_loop($MASTER); END { IO::Lambda::remove_loop($MASTER) }; sub empty { 0 == keys %SIGDATA } sub remove { my $lambda = $_[1]; my %rec; keys %SIGDATA; while ( my ($id, $v) = each %SIGDATA) { for my $r (@{$v-> {lambdas}}) { push @{$rec{$id}}, $r-> [0]; } } while ( my ($id, $v) = each %rec) { unwatch_signal( $id, $_ ) for @$v; } } sub yield { my %v = %SIGDATA; for my $id ( keys %v) { my $v = $v{$id}; # use mutex in case signal happens right here during handling $v-> {mutex} = 0; warn " yield sig $id\n" if $DEBUG > 1; AGAIN: next unless $v-> {signal}; my @r = @{$v-> {lambdas}}; warn " calling ", scalar(@r), " sig handlers\n" if $DEBUG > 1; for my $r ( @r) { my ( $lambda, $callback, @param) = @$r; $callback-> ( $lambda, @param); } my $sigs = $v-> {mutex}; if ( $sigs) { warn " caught $sigs signals during yield\n" if $DEBUG > 1; $v-> {signal} = $sigs; $v-> {mutex} -= $sigs; goto AGAIN; } } } sub signal_handler { my $id = shift; warn "SIG{$id}\n" if $DEBUG; return unless exists $SIGDATA{$id}; $SIGDATA{$id}-> {signal}++; $SIGDATA{$id}-> {mutex}++; $IO::Lambda::LOOP-> signal($id) if $IO::Lambda::LOOP-> can('signal'); } sub watch_signal { my ($id, $lambda, $callback, @param) = @_; my $entry = [ $lambda, $callback, @param ]; unless ( exists $SIGDATA{$id}) { $SIGDATA{$id} = { mutex => 0, signal => 0, save => $SIG{$id}, lambdas => [$entry], }; $SIG{$id} = sub { signal_handler($id) }; warn "install signal handler for $id ", _o($lambda), "\n" if $DEBUG > 1; } else { push @{ $SIGDATA{$id}-> {lambdas} }, $entry; warn "push signal handler for $id ", _o($lambda), "\n" if $DEBUG > 2; } } sub unwatch_signal { my ( $id, $lambda) = @_; return unless exists $SIGDATA{$id}; warn "remove signal handler for $id ", _o($lambda), "\n" if $DEBUG > 2; @{ $SIGDATA{$id}-> {lambdas} } = grep { $$_[0] != $lambda } @{ $SIGDATA{$id}-> {lambdas} }; return if @{ $SIGDATA{$id}-> {lambdas} }; warn "uninstall signal handler for $id\n" if $DEBUG > 1; if (defined($SIGDATA{$id}-> {save})) { $SIG{$id} = $SIGDATA{$id}-> {save}; } else { delete $SIG{$id}; } delete $SIGDATA{$id}; } # create a lambda that either returns undef on timeout, # or some custom value based on passed callback sub signal_or_timeout_lambda { my ( $id, $deadline, $condition) = @_; my $t; my $q = IO::Lambda-> new; # wait for signal my $c = $q-> bind; watch_signal( $id, $q, sub { my @ret = $condition-> (); return unless @ret; unwatch_signal( $id, $q); $q-> cancel_event($t) if $t; $q-> resolve($c); $q-> terminate(@ret); # result undef $c; undef $q; }); # or wait for timeout $t = $q-> watch_timer( $deadline, sub { unwatch_signal( $id, $q); $q-> resolve($c); undef $c; undef $q; return undef; #result }) if $deadline; return $q; } sub new_process; # condition sub signal (&) { new_signal (context)-> condition(shift, \&signal, 'signal') } sub pid (&) { new_pid (context)-> condition(shift, \&pid, 'pid') } sub spawn (&) { new_process-> call(context)-> condition(shift, \&spawn, 'spawn') } sub new_signal { my ( $id, $deadline) = @_; signal_or_timeout_lambda( $id, $deadline, sub { 1 }); } sub new_pid { my ( $pid, $deadline) = @_; croak 'bad pid' unless $pid =~ /^\-?\d+$/; warn "new_pid($pid) ", _t($deadline), "\n" if $DEBUG; # avoid race conditions my ( $savesig, $early_sigchld); unless ( defined $SIGDATA{CHLD}) { warn "new_pid: install early SIGCHLD detector\n" if $DEBUG > 1; $savesig = $SIG{CHLD}; $early_sigchld = 0; $SIG{CHLD} = sub { warn "new_pid: early SIGCHLD caught\n" if $DEBUG > 1; $early_sigchld++ }; } # finished already if ( waitpid( $pid, WNOHANG) != 0) { if ( defined $early_sigchld) { if ( defined( $savesig)) { $SIG{CHLD} = $savesig; } else { delete $SIG{CHLD}; } } warn "new_pid($pid): finished already with $?\n" if $DEBUG > 1; return IO::Lambda-> new-> call($?) } # wait my $p = signal_or_timeout_lambda( 'CHLD', $deadline, sub { my $wp = waitpid($pid, WNOHANG); warn "waitpid($pid) = $wp\n" if $DEBUG > 1; return if $wp == 0; return $?; }); warn "new_pid: new lambda(", _o($p), ")\n" if $DEBUG > 1; # don't let unwatch_signal() to restore it back to us $SIGDATA{CHLD}-> {save} = $savesig if defined $early_sigchld; # possibly have a race? gracefully remove the lambda if ( $early_sigchld) { # Got a signal, but that wasn't our pid. And neither it was # pid that we're watching. return $p if waitpid( $pid, WNOHANG) == 0; # Our pid is finished. Unwatch the signal. unwatch_signal( 'CHLD', $p); # Lambda will also never get executed - cancel it $p-> terminate; warn "new_pid($pid): finished with race: $?, ", _o($p), " killed\n" if $DEBUG > 1; return IO::Lambda-> new-> call($?); } return $p; } sub new_process_posix { lambda { my $h = IO::Handle-> new; my $pid = open( $h, '-|', @_); return undef, undef, $! unless $pid; this-> {pid} = $pid; $h-> blocking(0); my $buf; context readbuf, $h, \$buf, undef; # wait for EOF tail { my ($res, $error) = @_; if ( defined $error) { close $h; return ($buf, $?, $error); } # finished already if (waitpid($pid, WNOHANG) != 0) { my ( $exitcode, $error) = ( $?, $! ); close $h; return ($buf, $exitcode, $error); } # wait for it context $pid; pid { close $h; return ($buf, shift); }}}} sub new_process_win32 { lambda { my @cmd = @_; context IO::Lambda::Thread::threaded( sub { my $k = `@cmd`; return $? ? ( undef, $?, $! ) : ( $k, 0, undef ); }); &tail(); } } if ( $^O !~ /win32/i) { *new_process = \&new_process_posix; } else { require IO::Lambda::Thread; unless ( $IO::Lambda::Thread::DISABLED) { *new_process = \&new_process_win32; } else { *new_process = sub { lambda { undef, undef, $IO::Lambda::Thread::DISABLED } }; } } 1; __DATA__ =pod =head1 NAME IO::Lambda::Signal - wait for pids and signals =head1 DESCRIPTION The module provides access to the signal-based callbacks: generic signal listener C, process ID listener C, and the asynchronous version of I call, C. =head1 SYNOPSIS use strict; use IO::Lambda qw(:all); use IO::Lambda::Signal qw(pid spawn); # pid my $pid = fork; exec "/bin/ls" unless $pid; lambda { context $pid, 5; pid { my $ret = shift; print defined($ret) ? ("exitcode(", $ret>>8, ")\n") : "timeout\n"; } }-> wait; # spawn this lambda { context "perl -v"; spawn { my ( $buf, $exitcode, $error) = @_; print "buf=[$buf], exitcode=$exitcode, error=$error\n"; } }-> wait; =head2 USAGE =over =item pid ($PID, $TIMEOUT) -> $?|undef Accepts PID and an optional deadline/timeout, returns either the process' exit status, or undef on timeout. The corresponding lambda is C : new_pid ($PID, $TIMEOUT) :: () -> $?|undef =item signal ($SIG, $TIMEOUT) -> boolean Accepts signal name and optional deadline/timeout, returns 1 if the signal was caught, or C on timeout. The corresponding lambda is C : new_signal ($SIG, $TIMEOUT) :: () -> boolean =item spawn (@LIST) -> ( output, $?, $!) Calls pipe open on C<@LIST>, reads all data printed by the child process, and awaits for the process to finish. Returns three scalars - collected output, process exitcode C<$?>, and an error string (usually C<$!>). The corresponding lambda is C : new_process (@LIST) :: () -> ( output, $?, $!) Lambda objects created by C have an additional field C<'pid'> initialized with the process pid value. =back =head1 LIMITATION C and C don't work on win32 because win32 doesn't use SIGCHLD/waitpid. Native implementation of C and C doesn't work for the same reason on win32 as well, therefore those were reimplemented using threads, and require a threaded perl. =head1 SEE ALSO L, L, L, L =head1 AUTHOR Dmitry Karasik, Edmitry@karasik.eu.orgE. =cut