package Cassandra::Client::AsyncAnyEvent;
our $AUTHORITY = 'cpan:TVDW';
$Cassandra::Client::AsyncAnyEvent::VERSION = '0.19';
use 5.010;
use strict;
use warnings;
use Time::HiRes qw(CLOCK_MONOTONIC);
use vars qw/@TIMEOUTS/;
sub new {
my ($class, %args)= @_;
my $options= $args{options};
require AnyEvent;
return bless {
timer_granularity => ($options->{timer_granularity} || 0.1),
ae_read => {},
ae_write => {},
ae_timeout => undef,
fh_to_obj => {},
timeouts => [],
}, $class;
}
sub register {
my ($self, $fh, $connection)= @_;
$self->{fh_to_obj}{$fh}= $connection;
return;
}
sub unregister {
my ($self, $fh)= @_;
delete $self->{fh_to_obj}{$fh};
if ($self->{timeouts} && grep { $_->[1] == $fh && !$_->[3] } @{$self->{timeouts}}) {
warn 'In unregister(): not all timeouts were dismissed!';
}
@{$self->{timeouts}}= grep { $_->[1] != $fh } @{$self->{timeouts}};
undef $self->{ae_timeout} unless @{$self->{timeouts}};
return;
}
sub register_read {
my ($self, $fh)= @_;
my $connection= $self->{fh_to_obj}{$fh} or die;
$self->{ae_read}{$fh}= AnyEvent->io(
poll => 'r',
fh => $fh,
cb => sub {
$connection->can_read;
},
);
return;
}
sub register_write {
my ($self, $fh)= @_;
my $connection= $self->{fh_to_obj}{$fh} or die;
$self->{ae_write}{$fh}= AnyEvent->io(
poll => 'w',
fh => $fh,
cb => sub {
$connection->can_write;
},
);
return;
}
sub unregister_read {
my ($self, $fh)= @_;
undef $self->{ae_read}{$fh};
return;
}
sub unregister_write {
my ($self, $fh)= @_;
undef $self->{ae_write}{$fh};
return;
}
sub deadline {
my ($self, $fh, $id, $timeout)= @_;
local *TIMEOUTS= $self->{timeouts};
if (!$self->{ae_timeout}) {
$self->{ae_timeout}= AnyEvent->timer(
after => $self->{timer_granularity},
interval => $self->{timer_granularity},
cb => sub { $self->handle_timeouts(Time::HiRes::clock_gettime(CLOCK_MONOTONIC)) },
);
}
my $curtime= Time::HiRes::clock_gettime(CLOCK_MONOTONIC);
my $deadline= $curtime + $timeout;
my $additem= [ $deadline, $fh, $id, 0 ];
if (@TIMEOUTS && $TIMEOUTS[-1][0] > $deadline) {
# Grumble... that's slow
push @TIMEOUTS, $additem;
@TIMEOUTS= sort { $a->[0] <=> $b->[0] } @TIMEOUTS;
} else {
# Common case
push @TIMEOUTS, $additem;
}
return \($additem->[3]);
}
sub handle_timeouts {
my ($self, $curtime)= @_;
local *TIMEOUTS= $self->{timeouts};
my %triggered_read;
while (@TIMEOUTS && $curtime >= $TIMEOUTS[0][0]) {
my $item= shift @TIMEOUTS;
if (!$item->[3]) { # If it timed out
my ($deadline, $fh, $id, $timedout)= @$item;
my $obj= $self->{fh_to_obj}{$fh};
$obj->can_read unless $triggered_read{$fh}++;
$obj->can_timeout($id) unless $item->[3]; # We may have received an answer...
}
}
if (!@TIMEOUTS) {
$self->{ae_timeout}= undef;
}
return;
}
sub timer {
my ($self, $callback, $wait)= @_;
my $t; $t= AE::timer($wait, 0, sub {
undef $t;
$callback->();
});
}
sub later {
my ($self, $callback)= @_;
&AE::postpone($callback);
}
# $something->($async->wait(my $w)); my ($error, $result)= $w->();
sub wait {
my ($self)= @_;
my $output= \$_[1];
my $cv= AnyEvent->condvar;
my @output;
my $callback= sub {
@output= @_;
$cv->send;
};
$$output= sub {
$cv->recv;
return @output;
};
return $callback;
}
1;
__END__
=pod
=head1 NAME
Cassandra::Client::AsyncAnyEvent
=head1 VERSION
version 0.19
=head1 AUTHOR
Tom van der Woerdt <tvdw@cpan.org>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2022 by Tom van der Woerdt.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut