package Cassandra::Client::Policy::LoadBalancing::Default;
our $AUTHORITY = 'cpan:TVDW';
$Cassandra::Client::Policy::LoadBalancing::Default::VERSION = '0.19';
use 5.010;
use strict;
use warnings;
use List::Util 'shuffle';
use Time::HiRes qw/time/;
sub new {
my ($class, %args)= @_;
return bless {
datacenter => undef,
nodes => {},
local_nodes => {},
connected => {},
candidates => [],
try_times => {},
}, $class;
}
sub get_distance {
my ($self, $peer)= @_;
my $node= $self->{nodes}{$peer};
if (!$node) {
warn 'Being asked about a distance for a node we don\'t know';
return 'ignored';
}
if ($self->{local_nodes}{$peer}) {
return 'local';
}
return 'remote';
}
sub on_new_node {
my ($self, $node)= @_;
my $peer= $node->{peer};
if ($self->{nodes}{$peer}) {
warn 'BUG: "new" node is already known!';
}
$self->{nodes}{$peer}= $node;
if (!$self->{datacenter} || $node->{data_center} eq $self->{datacenter}) {
$self->{local_nodes}{$peer}= $node;
}
}
sub on_removed_node {
my ($self, $node)= @_;
my $peer= $node->{peer};
if (!$self->{nodes}{$peer}) {
warn 'BUG: "removed" node wasn\'t there!';
}
delete $self->{nodes}{$peer};
delete $self->{local_nodes}{$peer};
}
sub get_next_candidate {
my ($self)= @_;
my $candidates= $self->{candidates};
while (my $maybe= shift @$candidates) {
if ($self->{local_nodes}{$maybe} && !$self->{connected}{$maybe} && $self->check_backoff($maybe)) {
return $maybe;
}
}
@$candidates= shuffle grep { !$self->{connected}{$_} && $self->check_backoff($_) } keys %{$self->{local_nodes}};
return shift @$candidates;
}
my @all_backoff= map { $_ * (rand()*.4 + 0.8) } (1, 5, 20, 60, 180, 600);
sub check_backoff {
my ($self, $peer)= @_;
my $times= $self->{try_times}{$peer};
return 1 unless $times;
my $count= 0+@$times;
$count= @all_backoff if $count > @all_backoff;
my $backoff= $all_backoff[$count-1];
if (time() - $times->[-1] < $backoff) {
return;
}
return 1;
}
sub set_connecting {
my ($self, $peer)= @_;
$self->{connected}{$peer}= 1;
push @{$self->{try_times}{$peer} ||= []}, time;
}
sub set_connected {
my ($self, $peer)= @_;
warn "BUG" unless $self->{connected}{$peer};
delete $self->{try_times}{$peer};
}
sub set_disconnected {
my ($self, $peer)= @_;
delete $self->{connected}{$peer};
}
sub known_node_count {
my ($self)= @_;
return (0+ keys %{$self->{local_nodes}});
}
1;
__END__
=pod
=head1 NAME
Cassandra::Client::Policy::LoadBalancing::Default
=head1 VERSION
version 0.19
=head1 AUTHOR
Tom van der Woerdt <tvdw@cpan.org>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2022 by Tom van der Woerdt.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut