use strict;
use warnings;

package AnyEvent::Proc;

# ABSTRACT: Run external commands

use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Util ();
use Try::Tiny;
use Class::Load;
use Exporter qw(import);
use Carp;
use POSIX;

our $VERSION = '0.105';    # VERSION

our @EXPORT_OK = qw(run run_cb reader writer);

sub _rpipe {
    my ( $R, $W ) = AnyEvent::Util::portable_pipe;
    (
        $R,
        AnyEvent::Handle->new(
            fh       => $W,
            on_error => sub {
                my ( $handle, $fatal, $message ) = @_;
                AE::log warn => "error writing to handle: $message";
                $handle->destroy;
            },
        ),
    );
}

sub _wpipe {
    my ( $R, $W ) = AnyEvent::Util::portable_pipe;
    (
        AnyEvent::Handle->new(
            fh       => $R,
            on_error => sub {
                my ( $handle, $fatal, $message ) = @_;
                AE::log warn => "error reading from handle: $message"
                  unless $message =~ m{unexpected end-of-file}i;
                $handle->destroy;
            },
        ),
        $W,
    );
}

sub _on_read_helper {
    my ( $aeh, $sub ) = @_;
    $aeh->on_read(
        sub {
            my $x = $_[0]->rbuf;
            $_[0]->rbuf = '';
            $sub->($x);
        }
    );
}

sub _read_on_scalar {
    my ( $var, $sub ) = @_;
    my $old = $$var || '';
    tie $$var, __PACKAGE__ . '::TiedScalar', $sub;
    $$var = $old;
    $var;
}

sub _reaper {
    my ( $self, $waiters ) = @_;
    my $sub = sub {

        # my $message = shift; # currently unused
        foreach my $waiter (@$waiters) {
            AE::log debug => "reap $waiter";
            if ( ref $waiter eq 'CODE' ) {
                $waiter->(undef);
            }
            elsif ( ref $waiter eq 'AnyEvent::CondVar' ) {
                $waiter->send(undef);
            }
            elsif ( ref $waiter eq 'Coro::Channel' ) {
                $waiter->shutdown;
            }
            else {
                AE::log note => "cannot reap $waiter";
            }
        }
    };
    push @{ $self->{reapers} } => $sub;
    $sub;
}

sub _push_waiter {
    my ( $self, $what, $var ) = @_;
    push @{ $self->{waiters}->{$what} } => $var;
}

sub _run_cmd {
    my ( $cmd, $redir, $pidref ) = @_;

    my $cv = AE::cv;

    my %redir = %$redir;

    my $pid = fork;
    AE::log error => "cannot fork: $!" unless defined $pid;

    unless ($pid) {

        # move any existing fd's out of the way
        # this also ensures that dup2 is never called with fd1==fd2
        # so the cloexec flag is always cleared
        my ( @oldfh, @close );
        for my $fh ( values %redir ) {
            push @oldfh, $fh;    # make sure we keep it open
            $fh = fileno $fh;    # we only want the fd

            # dup if we are in the way
            # if we "leak" fds here, they will be dup2'ed over later
            defined( $fh = POSIX::dup($fh) )
              or POSIX::_exit(124)
              while exists $redir{$fh};
        }

        # execute redirects
        while ( my ( $k, $v ) = each %redir ) {
            defined POSIX::dup2( $v, $k )
              or POSIX::_exit(123);
        }

        AnyEvent::Util::close_all_fds_except( keys %redir );

        my $bin = $cmd->[0];

        no warnings;    ## no critic

        exec {$bin} @$cmd;

        POSIX::_exit(126);
    }

    $$pidref = $pid;

    my $w;
    $w = AE::child $pid => sub {
        my $status   = $_[1] >> 8;
        my $signal   = $_[1] & 127;
        my $coredump = $_[1] & 128;
        AE::log info  => "child exited with status $status" if $status;
        AE::log debug => "child exited with signal $signal" if $signal;
        AE::log note  => "child exited with coredump"       if $coredump;
        undef $w;
        map { close $_ } values %redir;
        $cv->send($status);
    };

    $cv;
}

sub new {
    my ( $class, %options ) = @_;

    $options{args} ||= [];

    my ( $rIN,  $wIN )  = _rpipe;
    my ( $rOUT, $wOUT ) = _wpipe;
    my ( $rERR, $wERR ) = _wpipe;

    my @xhs = @{ delete( $options{extras} ) || [] };

    my @args = map { "$_" } @{ delete $options{args} };

    my $pid;

    my %redir = (
        0 => $rIN,
        1 => $wOUT,
        2 => $wERR,
        map { ( "$_" => $_->B ) } @xhs
    );

    my $cv = _run_cmd( [ delete $options{bin} => @args ], \%redir, \$pid );
    my $waiter = AE::cv;

    my $self = bless {
        handles => {
            in  => $wIN,
            out => $rOUT,
            err => $rERR,
            map { ( "$_" => $_->A ) } @xhs,
        },
        pid       => $pid,
        listeners => {
            exit       => delete $options{on_exit},
            ttl_exceed => delete $options{on_ttl_exceed},
        },
        eol     => "\n",
        cv      => $cv,
        alive   => 1,
        waiter  => $waiter,
        waiters => {
            in  => [],
            out => [],
            err => [],
            map { ( "$_" => [] ) } @xhs
        },
        reapers => [],
      } => ref $class
      || $class;

    map { $_->{proc} = $self } @xhs;

    {
        my $eol = quotemeta $self->_eol;
        $self->{reol} = delete $options{reol} || qr{$eol};
    }

    if ( $options{ttl} ) {
        $self->{timer} = AnyEvent->timer(
            after => delete $options{ttl},
            cb    => sub {
                return unless $self->alive;
                $self->kill;
                $self->_emit('ttl_exceed');
            }
        );
    }

    my $kill = sub { $self->end };

    if ( $options{timeout} ) {
        $wIN->timeout( $options{timeout} );
        $rOUT->timeout( $options{timeout} );
        $rERR->timeout( $options{timeout} );
        delete $options{timeout};

        $self->_on( timeout => ( delete( $options{on_timeout} ) || $kill ) );
        my $cb = sub { $self->_emit('timeout') };
        $wIN->on_timeout($cb);
        $rOUT->on_timeout($cb);
        $rERR->on_timeout($cb);
    }

    if ( $options{wtimeout} ) {
        $wIN->wtimeout( delete $options{wtimeout} );

        $self->_on( wtimeout => ( delete( $options{on_wtimeout} ) || $kill ) );
        my $cb = sub { $self->_emit('wtimeout') };
        $wIN->on_wtimeout($cb);
    }

    if ( $options{rtimeout} ) {
        $rOUT->rtimeout( delete $options{rtimeout} );

        $self->_on( rtimeout => ( delete( $options{on_rtimeout} ) || $kill ) );
        my $cb = sub { $self->_emit('rtimeout') };
        $rOUT->on_rtimeout($cb);
    }

    if ( $options{etimeout} ) {
        $rERR->rtimeout( delete $options{etimeout} );

        $self->_on( etimeout => ( delete( $options{on_etimeout} ) || $kill ) );
        my $cb = sub { $self->_emit('etimeout') };
        $rERR->on_rtimeout($cb);
    }

    if ( $options{errstr} ) {
        my $sref = delete $options{errstr};
        $$sref = '';
        $self->pipe( err => $sref );
    }

    if ( $options{outstr} ) {
        my $sref = delete $options{outstr};
        $$sref = '';
        $self->pipe( out => $sref );
    }

    $waiter->begin;
    $cv->cb(
        sub {
            $self->{status} = shift->recv;
            $self->{alive}  = 0;
            undef $self->{timer};
            $waiter->end;
            $self->_emit( exit => $self->{status} );
        }
    );

    if ( keys %options ) {
        AE::log note => "unknown left-over option(s): " . join ', ' =>
          keys %options;
    }

    $self;
}

sub reader {
    my ( $r, $w ) = _wpipe;
    bless {
        r      => $r,
        w      => $w,
        fileno => fileno( $r->fh )
      } => __PACKAGE__
      . '::R';
}

sub writer {
    my ( $r, $w ) = _rpipe;
    bless {
        r      => $r,
        w      => $w,
        fileno => fileno( $w->fh )
      } => __PACKAGE__
      . '::W';
}

sub run {
    my $cv = AE::cv;
    run_cb(
        @_,
        sub {
            $cv->send( \@_ );
        }
    )->recv;
    my ( $out, $err, $status ) = @{ $cv->recv };
    $? = $status << 8;
    if (wantarray) {
        return ( $out, $err );
    }
    else {
        carp $err if $err;
        return $out;
    }
}

sub run_cb {
    my $bin  = shift;
    my $cb   = pop;
    my @args = @_;
    my ( $out, $err ) = ( '', '' );
    my $proc = __PACKAGE__->new(
        bin    => $bin,
        args   => \@args,
        outstr => \$out,
        errstr => \$err
    );
    $proc->finish;
    $proc->wait(
        sub {
            my $status = $proc->{status};
            $? = $status << 8;
            $cb->( $out, $err, $status );
        }
    );
}

sub _on {
    my ( $self, $name, $handler ) = @_;
    $self->{listeners}->{$name} = $handler;
}

sub in { shift->_geth('in') }

sub out { shift->_geth('out') }

sub err { shift->_geth('err') }

sub _geth {
    shift->{handles}->{ pop() };
}

sub _eol  { shift->{eol} }
sub _reol { shift->{reol} }

sub _emit {
    my ( $self, $name, @args ) = @_;
    AE::log debug => "trapped $name";
    if ( exists $self->{listeners}->{$name}
        and defined $self->{listeners}->{$name} )
    {
        $self->{listeners}->{$name}->( $self, @args );
    }
}

sub pid {
    shift->{pid};
}

sub fire {
    my ( $self, $signal ) = @_;
    $signal = 'TERM' unless defined $signal;
    $signal =~ s{^sig}{}i;
    AE::log debug   => "fire SIG$signal";
    kill uc $signal => $self->pid;
}

sub kill {
    my ($self) = @_;
    $self->fire('kill');
}

sub fire_and_kill {
    my $self   = shift;
    my $cb     = ( ref $_[-1] eq 'CODE' ? pop : undef );
    my $time   = pop;
    my $signal = uc( pop || 'TERM' );
    my $w      = AnyEvent->timer(
        after => $time,
        cb    => sub {
            return unless $self->alive;
            $self->kill;
        }
    );
    $self->fire($signal);
    if ($cb) {
        return $self->wait(
            sub {
                undef $w;
                $cb->(@_);
            }
        );
    }
    else {
        my $exit = $self->wait;
        undef $w;
        return $exit;
    }
}

sub alive {
    my $self = shift;
    return 0 unless $self->{alive};
    $self->fire(0) ? 1 : 0;
}

sub wait {
    my ( $self, $cb ) = @_;

    my $next = sub {
        my $cv = shift;
        $cv->recv;
        waitpid $self->{pid} => 0;
        $cb->( $self->{status} ) if ref $cb eq 'CODE';
        $self->end;
        $self->{status};
    };
    AE::log debug => "waiting for "
      . ( $self->{waiter}->{_ae_counter} ) . " ends";
    if ($cb) {
        $self->{waiter}->cb($next);
        return $self->{waiter};
    }
    else {
        $self->{waiter}->recv;
        return $next->( $self->{waiter} );
    }
}

sub finish {
    my ($self) = @_;
    $self->in->destroy;
    $self;
}

sub end {
    my ($self) = @_;
    map { $_->destroy } values %{ $self->{handles} };
    map { $_->() } @{ $self->{reapers} };
    $self;
}

sub stop_timeout {
    my ($self) = @_;
    $self->in->timeout(0);
    $self->out->timeout(0);
    $self->err->timeout(0);
}

sub stop_wtimeout {
    my ($self) = @_;
    $self->in->wtimeout(0);
}

sub stop_rtimeout {
    my ($self) = @_;
    $self->out->rtimeout(0);
}

sub stop_etimeout {
    my ($self) = @_;
    $self->err->rtimeout(0);
}

sub write {
    my ( $self, $type, @args ) = @_;
    my $ok = 0;
    try {
        $self->_geth('in')->push_write( $type => @args );
        $ok = 1;
    }
    catch {
        AE::log warn => $_;
    };
    $ok;
}

sub writeln {
    my ( $self, @lines ) = @_;
    $self->write( $_ . $self->_eol ) for @lines;
    $self;
}

sub pipe {
    my $self = shift;
    my $peer = pop;
    my $what = ( pop || 'out' );
    if ( ref $what ) {
        $what = "$what";
    }
    else {
        $what = lc $what;
        $what =~ s{^std}{};
    }
    use Scalar::Util qw(blessed);
    my $sub;
    if ( blessed $peer) {
        if ( $peer->isa(__PACKAGE__) ) {
            $sub = sub {
                $peer->write(shift);
              }
        }
        elsif ( $peer->isa('AnyEvent::Handle') ) {
            $sub = sub {
                $peer->push_write(shift);
              }
        }
        elsif ( $peer->isa('Coro::Channel') ) {
            $sub = sub {
                $peer->put(shift);
              }
        }
        elsif ( $peer->can('print') ) {
            $sub = sub {
                $peer->print(shift);
              }
        }
    }
    elsif ( ref $peer eq 'SCALAR' ) {
        $sub = sub {
            $$peer .= shift;
          }
    }
    elsif ( ref $peer eq 'GLOB' ) {
        $sub = sub {
            print $peer shift();
          }
    }
    elsif ( ref $peer eq 'CODE' ) {
        $sub = $peer;
    }
    if ($sub) {
        AE::log debug => "pipe $peer from $what";
        my $aeh = $self->_geth($what);
        $aeh->on_eof(
            sub {
                AE::log debug => "eof: $what";
                shift->destroy;
                $self->{waiter}->end;
            }
        );
        $self->{output}->{$what} = _on_read_helper( $aeh, $sub );
        $self->{waiter}->begin;
    }
    else {
        AE::log fatal => "cannot pipe $peer from $what";
    }
}

sub pull {
    my ( $self, $peer ) = @_;
    $self->{input} = $peer;
    AE::log debug => "pull $peer to stdin";
    use Scalar::Util qw(blessed);
    my $sub;
    if ( blessed $peer) {
        if ( $peer->isa(__PACKAGE__) ) {
            return $peer->pipe($self);
        }
        elsif ( $peer->isa('AnyEvent::Handle') ) {
            $peer->on_eof(
                sub {
                    AE::log debug => "pull($peer)->on_eof";
                    shift->destroy;
                    $self->finish;
                }
            );
            $peer->on_error(
                sub {
                    AE::log error => "pull($peer)->on_error(" . $_[2] . ")";
                    shift->destroy;
                }
            );
            return _on_read_helper(
                $peer,
                sub {
                    AE::log debug => "pull($peer)->on_read";
                    $self->write( shift() );
                }
            );
        }
        elsif ( $peer->isa('IO::Handle') ) {
            return $self->pull( AnyEvent::Handle->new( fh => $peer ) );
        }
        elsif ( $peer->isa('Coro::Channel') ) {
            if ( my $class = load_class('Coro') ) {
                return $class->new(
                    sub {
                        while ( my $x = $peer->get ) {
                            $self->write($x) or last;
                            Coro::cede();
                        }
                        $self->finish;
                    }
                );
            }
        }
    }
    elsif ( ref $peer eq 'SCALAR' ) {
        return _read_on_scalar(
            $peer,
            sub {
                AE::log debug => "pull($peer)->STORE";
                $self->write( shift() );
            }
        );
    }
    elsif ( ref $peer eq 'GLOB' ) {
        return $self->pull( AnyEvent::Handle->new( fh => $peer ) );
    }
    AE::log fatal => "cannot pull $peer to stdin";
}

sub _push_read {
    my ( $self, $what, @args ) = @_;
    my $ok = 0;
    try {
        $self->_geth($what)->push_read(@args);
        $ok = 1;
    }
    catch {
        AE::log note => "cannot push_read from std$what: $_";
    };
    $ok;
}

sub _unshift_read {
    my ( $self, $what, @args ) = @_;
    my $ok = 0;
    try {
        $self->_geth($what)->unshift_read(@args);
        $ok = 1;
    }
    catch {
        AE::log note => "cannot unshift_read from std$what: $_";
    };
    $ok;
}

sub _readline {
    my ( $self, $what, $sub ) = @_;
    $self->_push_read( $what => line => $self->_reol, $sub );
}

sub _readchunk {
    my ( $self, $what, $bytes, $sub ) = @_;
    $self->_push_read( $what => chunk => $bytes => $sub );
}

sub _sub_cb {
    my ($cb) = @_;
    sub { $cb->( $_[1] ) }
}

sub _sub_cv {
    my ($cv) = @_;
    sub { $cv->send( $_[1] ) }
}

sub _sub_ch {
    my ($ch) = @_;
    sub { $ch->put( $_[1] ) }
}

sub _readline_cb {
    my ( $self, $what, $cb ) = @_;
    $self->_push_waiter( $what => $cb );
    $self->_readline( $what => _sub_cb($cb) );
}

sub _readline_cv {
    my ( $self, $what, $cv ) = @_;
    $cv ||= AE::cv;
    $self->_push_waiter( $what => $cv );
    $cv->send unless $self->_readline( $what => _sub_cv($cv) );
    $cv;
}

sub _readline_ch {
    my ( $self, $what, $channel ) = @_;
    unless ($channel) {
        if ( my $class = load_class('Coro::Channel') ) {
            $channel ||= $class->new;
        }
    }
    $self->_push_waiter( $what => $channel );
    $channel->shutdown unless $self->_readline( $what => _sub_ch($channel) );
    $channel;
}

sub _readlines_cb {
    my ( $self, $what, $cb ) = @_;
    $self->_push_waiter( $what => $cb );
    $self->_geth($what)->on_read(
        sub {
            $self->_readline( $what => _sub_cb($cb) );
        }
    );
}

sub _readlines_ch {
    my ( $self, $what, $channel ) = @_;
    unless ($channel) {
        if ( my $class = load_class('Coro::Channel') ) {
            $channel ||= $class->new;
        }
    }
    $self->_push_waiter( $what => $channel );
    $channel->shutdown unless $self->_geth($what)->on_read(
        sub {
            $self->_readline( $what => _sub_ch($channel) );
        }
    );
    $channel;
}

sub _readchunk_cb {
    my ( $self, $what, $bytes, $cb ) = @_;
    $self->_push_waiter( $what => $cb );
    $self->_readchunk( $what, $bytes, _sub_cb($cb) );
}

sub _readchunk_cv {
    my ( $self, $what, $bytes, $cv ) = @_;
    $cv ||= AE::cv;
    $self->_push_waiter( $what => $cv );
    $self->_readchunk( $what, $bytes, _sub_cv($cv) );
    $cv;
}

sub _readchunk_ch {
    my ( $self, $what, $bytes, $channel ) = @_;
    unless ($channel) {
        if ( my $class = load_class('Coro::Channel') ) {
            $channel ||= $class->new;
        }
    }
    $self->_push_waiter( $what => $channel );
    $channel->shutdown unless $self->_readline( $what => _sub_ch($channel) );
    $channel;
}

sub _readchunks_ch {
    my ( $self, $what, $bytes, $channel ) = @_;
    unless ($channel) {
        if ( my $class = load_class('Coro::Channel') ) {
            $channel ||= $class->new;
        }
    }
    $self->_push_waiter( $what => $channel );
    $channel->shutdown unless $self->_geth($what)->on_read(
        sub {
            $self->_readline( $what => _sub_ch($channel) );
        }
    );
    $channel;
}

sub readline_cb {
    my ( $self, $cb ) = @_;
    $self->_readline_cb( out => $cb );
}

sub readline_cv {
    my ( $self, $cv ) = @_;
    $self->_readline_cv( out => $cv );
}

sub readline_ch {
    my ( $self, $ch ) = @_;
    $self->_readline_ch( out => $ch );
}

sub readlines_cb {
    my ( $self, $cb ) = @_;
    $self->_readlines_cb( out => $cb );
}

sub readlines_ch {
    my ( $self, $ch ) = @_;
    $self->_readlines_ch( out => $ch );
}

sub readline {
    shift->readline_cv->recv;
}

sub readline_error_cb {
    my ( $self, $cb ) = @_;
    $self->_readline_cb( err => $cb );
}

sub readline_error_cv {
    my ( $self, $cv ) = @_;
    $self->_readline_cv( err => $cv );
}

sub readline_error_ch {
    my ( $self, $ch ) = @_;
    $self->_readline_ch( err => $ch );
}

sub readlines_error_cb {
    my ( $self, $cb ) = @_;
    $self->_readlines_cb( out => $cb );
}

sub readlines_error_ch {
    my ( $self, $ch ) = @_;
    $self->_readlines_ch( out => $ch );
}

sub readline_error {
    shift->readline_error_cv->recv;
}

# AnyEvent::Impl::Perl has some issues with POSIX::dup.
# This statement solves the problem.
AnyEvent::post_detect {
    AE::child $$ => sub { };
};

1;

package    # hidden
  AnyEvent::Proc::R;

use overload '""' => sub { shift->{fileno} };

sub A { shift->{r} }
sub B { shift->{w} }

sub on_timeout {
    shift->A->on_wtimeout(pop);
}

sub stop_timeout {
    shift->A->stop_wtimeout;
}

sub pipe {
    my ( $self, $peer ) = @_;
    $self->{proc}->pipe( $self => $peer );
}

sub readline_cb {
    my ( $self, $cb ) = @_;
    $self->{proc}->_readline_cb( $self => $cb );
}

sub readline_cv {
    my ( $self, $cv ) = @_;
    $self->{proc}->_readline_cv( $self => $cv );
}

sub readline_ch {
    my ( $self, $ch ) = @_;
    $self->{proc}->_readline_ch( $self => $ch );
}

sub readlines_cb {
    my ( $self, $cb ) = @_;
    $self->{proc}->_readlines_cb( $self => $cb );
}

sub readlines_ch {
    my ( $self, $ch ) = @_;
    $self->{proc}->_readlines_cb( $self => $ch );
}

sub readline {
    shift->readline_cv->recv;
}

1;

package    # hidden
  AnyEvent::Proc::W;

use overload '""' => sub { shift->{fileno} };

use Try::Tiny;

sub A { shift->{w} }
sub B { shift->{r} }

sub finish {
    shift->A->destroy;
}

sub on_timeout {
    shift->A->on_rtimeout(pop);
}

sub stop_timeout {
    shift->A->stop_rtimeout;
}

sub write {
    my ( $self, $type, @args ) = @_;
    my $ok = 0;
    try {
        $self->A->push_write( $type => @args );
        $ok = 1;
    }
    catch {
        AE::log note => $_;
    };
    $ok;
}

sub writeln {
    my ( $self, @lines ) = @_;
    my $eol = $self->{proc}->_eol;
    $self->write( $_ . $eol ) for @lines;
    $self;
}

sub pull { die 'UNIMPLEMENTED' }

1;

package    # hidden
  AnyEvent::Proc::TiedScalar;

use Tie::Scalar;

our @ISA = ('Tie::Scalar');

sub TIESCALAR {
    bless pop, shift;
}

sub FETCH {
    undef;
}

sub STORE {
    shift->(pop);
}

1;

__END__

=pod

=head1 NAME

AnyEvent::Proc - Run external commands

=head1 VERSION

version 0.105

=head1 SYNOPSIS

	my $proc = AnyEvent::Proc->new(bin => 'cat');
	$proc->writeln('hello');
	my $hello = $proc->readline;
	$proc->fire;
	$proc->wait;

=head1 DESCRIPTION

AnyEvent::Proc is a L<AnyEvent>-based helper class for running external commands with full control over STDIN, STDOUT and STDERR.

=head1 METHODS

=head2 new(%options)

=over 4

=item * I<bin> (mandatory)

Name (or path) to a binary

=item * I<args>

ArrayRef with command arguments

=item * I<ttl>

Time-to-life timeout after the process automatically gets killed

See also I<on_ttl_exceed> callback handler.

=item * I<timeout>

Inactive timeout value (fractional seconds) for all handlers.

See also I<on_timeout> callback handler. If omitted, after exceeding timeout all handlers will be closed and the subprocess can finish.

=item * I<wtimeout>

Like I<timeout> but sets only the write timeout value for STDIN.

Corresponding callback handler: I<on_wtimeout>

=item * I<rtimeout>

Like I<timeout> but sets only the read timeout value for STDOUT.

Corresponding callback handler: I<on_rtimeout>

=item * I<etimeout>

Like I<timeout> but sets only the read timeout value for STDERR.

Corresponding callback handler: I<on_etimeout>

=item * I<outstr>

When set to a ScalarRef, any output (STDOUT) will be appended to this scalar

=item * I<errstr>

Same as I<outstr>, but for STDERR.

=item * I<on_exit>

Callback handler called when process exits

=item * I<on_ttl_exceed>

Callback handler called when I<ttl> exceeds

=item * I<on_timeout>

Callback handler called when any inactivity I<timeout> value exceeds

=item * I<on_wtimeout>

Callback handler called when STDIN write inactivity I<wtimeout> value exceeds

=item * I<on_rtimeout>

Callback handler called when STDOUT read inactivity I<rtimeout> value exceeds

=item * I<on_etimeout>

Callback handler called when STDERR read inactivity I<etimeout> value exceeds

=back

=head2 in()

Returns a L<AnyEvent::Handle> for STDIN

Useful for piping data into us:

	$socket->print($proc->in->fh)

=head2 out()

Returns a L<AnyEvent::Handle> for STDOUT

=head2 err()

Returns a L<AnyEvent::Handle> for STDERR

=head2 pid()

Returns the PID of the subprocess

=head2 fire([$signal])

Sends a named signal to the subprocess. C<$signal> defaults to I<TERM> if omitted.

=head2 kill()

Kills the subprocess the most brutal way. Equals to

	$proc->fire('kill')

=head2 fire_and_kill([$signal, ]$time[, $callback])

Fires specified signal C<$signal> (or I<TERM> if omitted) and after C<$time> seconds kills the subprocess.

See L</wait> for the meaning of the callback parameter and return value.

Without calllback, this is a synchronous call. After this call, the subprocess can be considered to be dead. Returns the exit code of the subprocess.

=head2 alive()

Check whether is subprocess is still alive. Returns I<1> or I<0>

In fact, the method equals to

	$proc->fire(0)

=head2 wait([$callback])

Waits for the subprocess to be finished call the callback with the exit code. Returns a condvar.

Without callback, this is a synchronous call directly returning the exit code.

=head2 finish()

Closes STDIN of subprocess

=head2 end()

Closes all handles of subprocess

=head2 stop_timeout()

Stopps read/write timeout for STDIN, STDOUT and STDERR.

See I<timeout> and I<on_timeout> options in I<new()>.

=head2 stop_wtimeout()

Stopps write timeout for STDIN.

See I<wtimeout> and I<on_wtimeout> options in I<new()>.

=head2 stop_rtimeout()

Stopps read timeout for STDIN.

See I<rtimeout> and I<on_rtimeout> options in I<new()>.

=head2 stop_etimeout()

Stopps read timeout for STDIN.

See I<etimeout> and I<on_etimeout> options in I<new()>.

=head2 write($scalar)

Queues the given scalar to be written.

=head2 write($type => @args)

See L<AnyEvent::Handle>::push_write for more information.

=head2 writeln(@lines)

Queues one or more line to be written.

=head2 pipe([$fd, ]$peer)

Pipes any output of STDOUT to another handle. C<$peer> maybe another L<AnyEvent::Proc> instance, an L<AnyEvent::Handle>, a L<Coro::Channel>, an object that implements the I<print> method (like L<IO::Handle>, including any subclass), a ScalarRef or a GlobRef or a CodeRef.

C<$fd> defaults to I<stdout>.

	$proc->pipe(stderr => $socket);

=head2 pull($peer)

Pulls any data from another handle to STDIN. C<$peer> maybe another L<AnyEvent::Proc> instance, an L<AnyEvent::Handle>, an L<IO::Handle> (including any subclass), a L<Coro::Channel>, a ScalarRef or a GlobRef.

	$proc->pull($socket);

=head2 readline_cb($callback)

Reads a single line from STDOUT and calls C<$callback>

=head2 readline_cv([$condvar])

Reads a single line from STDOUT and send the result to C<$condvar>. A condition variable will be created and returned, if C<$condvar> is omitted.

=head2 readline_ch([$channel])

Reads a singe line from STDOUT and put the result to coro channel C<$channel>. A L<Coro::Channel> will be created and returned, if C<$channel> is omitted.

=head2 readlines_cb($callback)

Read lines continiously from STDOUT and calls on every line the handler C<$callback>.

=head2 readlines_ch([$channel])

Read lines continiously from STDOUT and put every line to coro channel C<$channel>. A L<Coro::Channel> will be created and returned, if C<$channel> is omitted.

=head2 readline()

Reads a single line from STDOUT synchronously and return the result.

Same as

	$proc->readline_cv->recv

=head2 readline_error_cb($callback)

Bevahes equivalent as I<readline_cb>, but for STDERR.

=head2 readline_error_cv([$condvar])

Bevahes equivalent as I<readline_cv>, but for STDERR.

=head2 readline_error_ch([$channel])

Bevahes equivalent as I<readline_ch>, but for STDERR.

=head2 readlines_error_cb($callback)

Bevahes equivalent as I<readlines_cb>, but for STDERR.

=head2 readlines_error_ch([$channel])

Bevahes equivalent as I<readlines_ch>, but for STDERR.

=head2 readline_error()

Bevahes equivalent as I<readline>, but for STDERR.

=head1 FUNCTIONS

=head2 reader()

Creates a new file descriptor for pulling data from process.

	use AnyEvent::Proc qw(reader);
	my $reader = reader();
	my $proc = AnyEvent::Proc->new(
		bin => '/bin/sh',
		args => [ -c => "echo hi >&$reader" ] # overloads to fileno
		extras => [ $reader ], # unordered list of all extra descriptors
	);
	my $out;
	$reader->pipe(\$out);
	$proc->wait;
	# $out contains now 'hi'

This calls C<< /bin/sh -c "echo hi >&3" >>, so that any output will be dupped into fd #3.

C<$reader> provides following methods:

=over 4

=item * L</on_timeout>

=item * L</stop_timeout>

=item * L</pipe>

=item * L</readline_cb>

=item * L</readline_cv>

=item * L</readline_ch>

=item * L</readlines_cb>

=item * L</readlines_ch>

=item * L</readline>

=back

=head2 writer()

Creates a new file descriptor for pushing data to process.

	use AnyEvent::Proc qw(writer);
	my $writer = writer();
	my $out;
	my $proc = AnyEvent::Proc->new(
		bin => '/bin/sh',
		args => [ -c => "cat <&$writer" ] # overloads to fileno
		extras => [ $writer ], # unordered list of all extra descriptors
		outstr => \$out,
	);
	my $out;
	$writer->writeln('hi');
	$writer->finish;
	$proc->wait;
	# $out contains now 'hi'

This calls C</bin/sh -c "cat <&3">, so that any input will be dupped from fd #3.

C<$writer> provides following methods:

=over 4

=item * L</finish>

=item * L</on_timeout>

=item * L</stop_timeout>

=item * L</write>

=item * L</writeln>

=back

Unfortunally L</pull> is unimplemented.

=head2 run($bin[, @args])

Bevahes similar to L<perlfunc/system>. In scalar context, it returns STDOUT of the subprocess. STDERR will be passed-through by L<perlfunc/warn>.

	$out = AnyEvent::Proc::run(...)

In list context, STDOUT and STDERR will be separately returned.

	($out, $err) = AnyEvent::Proc::run(...)

The exit-code is stored in C<$?>. Please keep in mind that for portability reasons C<$?> is shifted by 8 bits.

	$exitcode = $? >> 8

=head2 run_cb($bin[, @args], $callback)

Like L</run>, but asynchronous with callback handler. Returns the condvar. See L</wait> for more information.

	AnyEvent::Proc::run_cb($bin, @args, sub {
		my ($out, $err, $status) = @_;
		...;
	});

=head1 LIMITATIONS

Use L<EV>. The fallback module L<AnyEvent::Impl::Perl> has some issues with pipes. In some cases, L<AnyEvent::Handle> don't receive data from its pipe peer and the application will block forever. I haven't a solution yet, so don't rely on pipes when you use AE's pure-perl backend.

=head1 EXPORTS

Nothing by default. The following functions will be exported on request:

=over 4

=item * L</run>

=item * L</run_cb>

=item * L</reader>

=item * L</writer>

=back

=head1 BUGS

Please report any bugs or feature requests on the bugtracker website
https://github.com/zurborg/libanyevent-proc-perl/issues

When submitting a bug or request, please include a test-file or a
patch to an existing test-file that illustrates the bug or desired
feature.

=head1 AUTHOR

David Zurborg <zurborg@cpan.org>

=head1 COPYRIGHT AND LICENSE

This software is Copyright (c) 2014 by David Zurborg.

This is free software, licensed under:

  The ISC License

=cut