package Ryu::Async;
# ABSTRACT: IO::Async support for Ryu stream management
use strict;
use warnings;
our $VERSION = '0.020';
use utf8;
=encoding UTF8
=head1 NAME
Ryu::Async - use L<Ryu> with L<IO::Async>
=head1 SYNOPSIS
#!/usr/bin/env perl
use strict;
use warnings;
use IO::Async::Loop;
use Ryu::Async;
# This will generate a lot of output, but is useful
# for demonstrating lifecycles. Drop this to 'info' or
# 'debug' to make it more realistic.
use Log::Any::Adapter qw(Stdout), log_level => 'trace';
#
my $loop = IO::Async::Loop->new;
$loop->add(
my $ryu = Ryu::Async->new
);
{
my $timer = $ryu->timer(
interval => 0.10,
)->take(10)
->each(sub { print "tick\n" });
warn $timer->describe;
$timer->get;
}
=head1 DESCRIPTION
This is an L<IO::Async::Notifier> subclass for interacting with L<Ryu>.
=cut
use parent qw(IO::Async::Notifier);
use Future::AsyncAwait;
use IO::Async::Handle;
use IO::Async::Listener;
use IO::Async::Process;
use IO::Async::Resolver;
use IO::Async::Signal;
use IO::Async::Socket;
use IO::Async::Stream;
use IO::Async::Timer::Absolute;
use IO::Async::Timer::Countdown;
use IO::Async::Timer::Periodic;
use Ryu::Async::Client;
use Ryu::Async::Packet;
use Ryu::Async::Server;
use Ryu::Sink;
use Ryu::Source;
use URI::udp;
use URI::tcp;
use Socket qw(pack_sockaddr_in inet_pton AF_INET);
use curry::weak;
use Syntax::Keyword::Try;
use Ryu '2.000';
use Ryu::Source;
use Ryu::Async::Process;
use Scalar::Util qw(blessed weaken);
use Log::Any qw($log);
=head1 Interaction with L<Ryu>
On load, this module will provide a L<Ryu::Source/$FUTURE_FACTORY> which assigns
L<Future> instances from L<IO::Async::Loop>.
You can override this behaviour by doing this instead:
BEGIN {
require Ryu::Source;
local $Ryu::Source::FUTURE_FACTORY = sub { };
require Ryu::Async;
}
to ensure the original factory function is preserved.
=cut
$Ryu::Source::FUTURE_FACTORY = sub {
IO::Async::Loop->new->new_future(label => $_[1]);
};
=head1 METHODS
=cut
=head2 from
Creates a new L<Ryu::Source> from a thing.
The exact details of this are likely to change in future, but a few things that are expected to work:
$ryu->from($io_async_stream_instance)
->by_line
->each(sub { print "Line: $_\n" });
$ryu->from([1..1000])
->sum
->each(sub { print "Total was $_\n" });
=cut
sub from {
my $self = shift;
if(my $class = blessed $_[0]) {
if($class->isa('IO::Async::Stream')) {
return $self->from_stream($_[0]);
} else {
die "Unable to determine appropriate source for $class";
}
}
my $src = $self->source(label => 'from');
if(my $ref = ref $_[0]) {
if($ref eq 'ARRAY') {
# We'll run a background loop that emits one item from the arrayref
# every I/O loop iteration - the arrayref is used as-is, allowing for
# dynamic population over time
my $pending = $_[0];
weaken(my $weak_src = $src);
my $loop = $self->loop;
(async sub {
# Acquire instance on each iteration - this allows us to
# give up if nothing cares about the source any more.
ITEM:
while(my $src = $weak_src) {
await Future->wait_all(
$loop->later,
$src->unblocked
);
last ITEM unless $pending->@*;
$src->emit(shift $pending->@*);
}
# Probably overkill, but given stack-not-refcounted issues
# then let's not risk $src going through destruction partway
# through the ->finish handling
(my $src = $weak_src)->finish if $weak_src;
# We only needed this for ->later
weaken $loop;
return;
})->()->retain;
return $src;
} else {
die "Unknown type $ref"
}
}
my %args = @_;
if(my $dir = $args{directory}) {
opendir my $handle, $dir or die $!;
my $code;
$code = sub {
if(defined(my $item = readdir $handle)) {
$src->emit($item) unless $item eq '.' or $item eq '..';
$self->loop->later($code);
} else {
weaken($code);
closedir $handle or die $!;
$src->finish
}
};
$code->();
return $self;
}
die "unknown stuff";
}
=head2 from_stream
Create a new L<Ryu::Source> from an L<IO::Async::Stream> instance.
Note that a stream which is not already attached to an L<IO::Async::Notifier>
will be added as a child of this instance.
=cut
sub from_stream {
my ($self, $stream, %args) = @_;
my $src = $self->source(label => $args{label} // 'IaStream');
# Our ->flow_control monitoring gives us a boolean
# value every time the state changes:
# 1 - we are active
# 0 - we are paused
# through sheer coïncidence, this is also what the
# IO::Async::Stream `->want_(read|write)ready` methods
# expect.
$src->flow_control
->each($stream->curry::weak::want_readready);
$stream->configure(
on_read => sub {
my ($stream, $buffref, $eof) = @_;
$log->tracef("Have %d bytes of data, EOF = %s", length($$buffref), $eof ? 'yes' : 'no');
my $data = substr $$buffref, 0, length $$buffref, '';
$src->emit($data);
$src->finish if $eof && !$src->completed->is_ready;
}
);
unless($stream->parent) {
$self->add_child($stream);
$src->completed->on_ready(sub {
$self->remove_child($stream) if $stream->parent;
});
}
return $src;
}
=head2 to_stream
Provides a L<Ryu::Sink> that will send data to an L<IO::Async::Stream> instance.
Requires the L<IO::Async::Stream> and will return a new L<Ryu::Sink> instance.
=cut
sub to_stream {
my ($self, $stream, %args) = @_;
my $sink = $self->sink(label => $args{label} // 'IaStream');
$stream->configure(
on_writeable_start => $sink->curry::weak::resume,
on_writeable_stop => $sink->curry::weak::pause,
);
$sink->source
->each(sub {
$stream->write($_)
});
unless($stream->parent) {
$self->add_child($stream);
$sink->completed->on_ready($self->$curry::weak(sub {
my ($self) = @_;
$self->remove_child($stream) if $stream->parent;
}));
}
return $sink;
}
=head2 stdin
Create a new L<Ryu::Source> that wraps STDIN.
As with other L<IO::Async::Stream> wrappers, this will emit data as soon as it's available,
as raw bytes.
Use L<Ryu::Source/by_line> and L<Ryu::Source/decode> to split into lines and/or decode from UTF-8.
=cut
sub stdin {
my ($self) = @_;
return $self->from_stream(
IO::Async::Stream->new_for_stdin,
label => 'STDIN',
)
}
=head2 stdout
Returns a new L<Ryu::Sink> that wraps STDOUT.
=cut
sub stdout {
my ($self) = @_;
return $self->to_stream(
IO::Async::Stream->new_for_stdout,
label => 'STDOUT',
)
}
=head2 stderr
Returns a new L<Ryu::Sink> that wraps STDERR.
=cut
sub stderr {
my ($self) = @_;
return $self->to_stream(
IO::Async::Stream->new_for_stderr,
label => 'STDERR',
)
}
=head2 timer
Provides a L<Ryu::Source> which emits an empty string at selected intervals.
Takes the following named parameters:
=over 4
=item * interval - how often to trigger the timer, in seconds (fractional values allowed)
=item * reschedule - type of rescheduling to use, can be C<soft>, C<hard> or C<drift> as documented
in L<IO::Async::Timer::Periodic>
=back
Example:
$ryu->timer(interval => 1, reschedule => 'hard')
->combine_latest(...)
=cut
sub timer {
my ($self, %args) = @_;
my $src = $self->source(label => 'timer');
$self->add_child(
my $timer = IO::Async::Timer::Periodic->new(
reschedule => 'hard',
%args,
on_tick => $src->$curry::weak(sub { shift->emit('') }),
)
);
Scalar::Util::weaken($timer);
$src->on_ready($self->$curry::weak(sub {
my ($self) = @_;
return unless $timer;
$timer->stop if $timer->is_running;
$self->remove_child($timer)
}));
$timer->start;
$src
}
=head2 run
Creates an L<IO::Async::Process>.
=cut
sub run {
my ($self, $code, %args) = @_;
if(ref($code) eq 'ARRAY') {
# Fork and exec
$args{command} = $code;
} elsif(ref($code) eq 'CODE') {
$args{code} = $code;
}
$self->add_child(
my $process = Ryu::Async::Process->new(
process => IO::Async::Process->new(%args)
)
);
$process;
}
=head2 source
Returns a new L<Ryu::Source> instance.
=cut
sub source {
my ($self, %args) = @_;
my $label = delete($args{label}) // $self->label;
Ryu::Source->new(
new_future => $self->loop->curry::weak::new_future,
apply_timeout => $self->curry::timeout,
label => $label,
%args,
)
}
=head2 udp_client
Creates a new UDP client.
This provides a sink for L<Ryu::Async::Client/outgoing> packets, and a source for L<Ryu::Async::Client/incoming> responses.
=over 4
=item * C<uri> - an optional URI of the form C<< udp://host:port >>
=item * C<host> - which host to listen on, defaults to C<0.0.0.0>
=item * C<port> - the port to listen on
=back
Returns a L<Ryu::Async::Client> instance.
=cut
sub udp_client {
my ($self, %args) = @_;
my $uri = delete $args{uri};
$uri //= 'udp://' . join ':', $args{host} // '*', $args{port} // ();
$uri = URI->new($uri) unless ref $uri;
$log->debugf("UDP client for %s", $uri->as_string);
my $src = $self->source(
label => $args{label} // $uri->as_string,
);
my $sink = $self->sink(
label => $args{label} // $uri->as_string,
);
$self->add_child(
my $client = IO::Async::Socket->new(
on_recv => sub {
my ($sock, $payload, $addr) = @_;
try {
$log->tracef("Receiving [%s] from %s", $payload, $addr);
$src->emit(
Ryu::Async::Packet->new(
from => $addr,
payload => $payload
)
);
} catch {
$log->errorf("Exception when sending: %s", $@);
}
},
)
);
my $host = $uri->host || '0.0.0.0';
$host = '0.0.0.0' if $host eq '*';
my $port = $uri->port // 0;
my $f = $client->connect(
host => $host,
service => $port,
socktype => 'dgram',
);
$f->on_done(sub {
$log->debugf("UDP client connected");
})->on_fail(sub {
$log->errorf("UDP client failed to connect - %s", join ',', @_);
});
$sink->source->each(sub {
my $payload = $_;
$f->on_done(sub {
try {
$log->tracef("Sending [%s] to %s", $payload, $uri);
$client->send(
$payload,
undef,
pack_sockaddr_in(
$port,
'' . inet_pton(AF_INET, $host)
)
);
} catch {
$log->errorf("Exception when sending: %s", $@);
}
})->retain;
});
Ryu::Async::Client->new(
outgoing => $sink,
incoming => $src,
);
}
=head2 udp_server
=cut
sub udp_server {
my ($self, %args) = @_;
my $uri = delete $args{uri};
$uri //= do {
$args{host} //= '0.0.0.0';
'udp://' . join ':', $args{host}, $args{port} // ();
};
$uri = URI->new($uri) unless ref $uri;
$log->debugf("UDP server %s", $uri->as_string);
my $src = $self->source;
my $sink = $self->sink;
$self->add_child(
my $server = IO::Async::Socket->new(
on_recv => sub {
my ($sock, $msg, $addr) = @_;
$log->debugf("UDP server [%s] had %s from %s", $uri->as_string, $msg, $addr);
$src->emit(
Ryu::Async::Packet->new(
payload => $msg,
from => $addr
)
)
},
on_recv_error => sub {
my ($sock, $err) = @_;
$src->fail($err);
}
)
);
$sink->source->each(sub { $server->send($_->payload, 0, $_->addr) });
my $port_f = $server->bind(
service => $uri->port // 0,
socktype => 'dgram'
)->then(sub {
Future->done($server->write_handle->sockport)
});
Ryu::Async::Server->new(
port => $port_f,
incoming => $src,
outgoing => undef,
);
}
=head2 tcp_server
Creates a listening TCP socket, and provides a L<Ryu::Async::Server>
instance which will emit a new event every time a client connects.
=cut
sub tcp_server {
my ($self, %args) = @_;
my $uri = delete $args{uri};
$uri //= do {
$args{host} //= '0.0.0.0';
'tcp://' . join ':', $args{host}, $args{port} // ();
};
$uri = URI->new($uri) unless ref $uri;
$log->debugf("TCP server %s", $uri->as_string);
my $src = $self->source;
my $sink = $self->sink;
$self->add_child(
my $server = IO::Async::Listener->new(
on_stream => sub {
my ($sock, $msg, $addr) = @_;
$log->debugf("TCP server [%s] had %s from %s", $uri->as_string, $msg, $addr);
$src->emit(
Ryu::Async::Packet->new(
payload => $msg,
from => $addr
)
)
},
)
);
$sink->source->each(sub { $server->send($_->payload, 0, $_->addr) });
my $port_f = $server->listen(
service => $uri->port // 0,
socktype => 'stream'
)->then(sub {
my ($listener) = @_;
Future->done($listener->read_handle->sockport)
});
Ryu::Async::Server->new(
port => $port_f,
incoming => $src,
outgoing => undef,
);
}
sub timeout {
my ($self, $input, $output, $delay) = @_;
$self->add_child(
my $timer = IO::Async::Timer::Countdown->new(
interval => $delay,
on_expire => sub { $output->fail('timeout') },
)
);
$input->each_while_source(sub { $timer->reset }, $output);
return $self;
}
=head2 sink
Returns a new L<Ryu::Sink>.
The label will default to the calling package/class and method,
with some truncation rules:
=over 4
=item * A C<Net::Async::> prefix will be replaced by C<Na>.
=item * A C<Web::Async::> prefix will be replaced by C<Wa>.
=item * A C<Database::Async::> prefix will be replaced by C<Da>.
=item * A C<IO::Async::> prefix will be replaced by C<Ia>.
=item * A C<Tickit::Async::> prefix will be replaced by C<Ta>.
=item * A C<Tickit::Widget::> prefix will be replaced by C<TW>.
=back
This list of truncations is subject to change, so please don't
rely on any of these in string matches or similar - better to set
your own label if you need consistency.
=cut
sub sink {
my ($self, %args) = @_;
my $label = delete($args{label}) // $self->label;
Ryu::Sink->new(
new_future => $self->loop->curry::weak::new_future,
label => $label,
%args,
)
}
sub label {
my ($self) = @_;
my $label = (caller 2)[0];
for($label // ()) {
s/^Net::Async::/Na/g;
s/^IO::Async::/Ia/g;
s/^Web::Async::/Wa/g;
s/^Tickit::Async::/Ta/g;
s/^Tickit::Widget::/TW/g;
s/::([^:]*)$/->$1/;
}
return $label // 'unknown';
}
1;
__END__
=head1 SEE ALSO
=over 4
=item * L<Ryu>
=item * L<IO::Async>
=back
=head1 AUTHOR
Tom Molesworth C<< TEAM@cpan.org >> with contributions from Eyad Arnabeh.
=head1 LICENSE
Copyright Tom Molesworth 2011-2021. Licensed under the same terms as Perl itself.