package Cassandra::Client::NetworkStatus; our $AUTHORITY = 'cpan:TVDW'; $Cassandra::Client::NetworkStatus::VERSION = '0.19'; use 5.010; use strict; use warnings; use Scalar::Util qw/weaken/; use Cassandra::Client::Util; sub new { my ($class, %args)= @_; my $self= bless { pool => $args{pool}, async_io => $args{async_io}, waiting_for_cb => [], master_id => undef, shutdown => undef, }, $class; weaken($self->{pool}); return $self; } sub init { my ($self, $callback)= @_; $self->select_master($callback); } sub select_master { my ($self, $callback)= @_; return $callback->() if $self->{master_id}; if (@{$self->{waiting_for_cb}}) { push @{$self->{waiting_for_cb}}, $callback; return; } push @{$self->{waiting_for_cb}}, $callback; my $pool= $self->{pool}; # non-weak my $attempts= 0; whilst( sub { # condition !$self->{shutdown} && !$self->{master_id} }, sub { # while my ($wnext)= @_; series([ sub { my ($next)= @_; if ($attempts++) { # Don't retry immediately $self->{async_io}->timer($next, 1); } else { $next->(); } }, sub { my ($next)= @_; $pool->get_one_cb($next); }, sub { my ($next, $connection)= @_; parallel([ sub { my ($pnext)= @_; $connection->register_events($pnext); }, sub { my ($pnext)= @_; $connection->get_network_status($pnext); }, sub { $_[0]->(undef, $connection); }, ], $next); }, sub { my ($next, undef, $networkstatus, $connection)= @_; $self->{master_id}= $connection->get_pool_id; $self->load_status($networkstatus); $next->(); }, ], sub { $wnext->(); }); }, sub { # finish my ($error)= @_; my @cb= @{$self->{waiting_for_cb}}; $self->{waiting_for_cb}= []; $error= $error || ($self->{master_id} ? undef : "Master selection aborted"); $_->($error) for @cb; } ); } sub shutdown { my ($self)= @_; $self->{shutdown}= 1; } sub load_status { my ($self, $new_status)= @_; my $old_status= $self->{status}; $self->{status}= $new_status; my @old_hosts= grep {!$new_status->{$_}} keys %$old_status; my @new_hosts= grep {!$old_status->{$_}} keys %$new_status; $self->{pool}->on_removed_node($old_status->{$_}) for @old_hosts; $self->{pool}->on_new_node($new_status->{$_}) for @new_hosts; } sub event_added_node { my ($self, $ipaddress)= @_; $self->refresh_network_status unless $self->{status}{$ipaddress}; } sub event_removed_node { my ($self, $ipaddress)= @_; my $old_node= delete $self->{status}{$ipaddress}; if ($old_node) { $self->{pool}->on_removed_node($old_node); } } sub disconnected { my ($self, $id)= @_; if ($self->{master_id} && $self->{master_id} == $id) { $self->{master_id}= undef; $self->select_master(sub{}); } } sub refresh_network_status { my ($self)= @_; series([ sub { my ($next)= @_; $self->{pool}->get_one_cb($next); }, sub { my ($next, $connection)= @_; $connection->get_network_status($next); }, sub { my ($next, $status)= @_; $self->load_status($status); return $next->(); } ], sub { my ($error)= @_; # XXX And now? }); } 1; __END__ =pod =head1 NAME Cassandra::Client::NetworkStatus =head1 VERSION version 0.19 =head1 AUTHOR Tom van der Woerdt =head1 COPYRIGHT AND LICENSE This software is copyright (c) 2022 by Tom van der Woerdt. This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself. =cut