package Cassandra::Client::NetworkStatus;
our $AUTHORITY = 'cpan:TVDW';
$Cassandra::Client::NetworkStatus::VERSION = '0.19';
use 5.010;
use strict;
use warnings;

use Scalar::Util qw/weaken/;
use Cassandra::Client::Util;

sub new {
    my ($class, %args)= @_;

    my $self= bless {
        pool => $args{pool},
        async_io => $args{async_io},

        waiting_for_cb => [],
        master_id => undef,

        shutdown => undef,
    }, $class;
    weaken($self->{pool});
    return $self;
}

sub init {
    my ($self, $callback)= @_;
    $self->select_master($callback);
}

sub select_master {
    my ($self, $callback)= @_;

    return $callback->() if $self->{master_id};
    if (@{$self->{waiting_for_cb}}) {
        push @{$self->{waiting_for_cb}}, $callback;
        return;
    }
    push @{$self->{waiting_for_cb}}, $callback;

    my $pool= $self->{pool}; # non-weak

    my $attempts= 0;
    whilst(
        sub { # condition
            !$self->{shutdown} && !$self->{master_id}
        },
        sub { # while
            my ($wnext)= @_;
            series([
                sub {
                    my ($next)= @_;
                    if ($attempts++) {
                        # Don't retry immediately
                        $self->{async_io}->timer($next, 1);
                    } else {
                        $next->();
                    }
                },
                sub {
                    my ($next)= @_;
                    $pool->get_one_cb($next);
                },
                sub {
                    my ($next, $connection)= @_;
                    parallel([
                        sub {
                            my ($pnext)= @_;
                            $connection->register_events($pnext);
                        },
                        sub {
                            my ($pnext)= @_;
                            $connection->get_network_status($pnext);
                        },
                        sub {
                            $_[0]->(undef, $connection);
                        },
                    ], $next);
                }, sub {
                    my ($next, undef, $networkstatus, $connection)= @_;
                    $self->{master_id}= $connection->get_pool_id;
                    $self->load_status($networkstatus);
                    $next->();
                },
            ], sub {
                $wnext->();
            });
        },
        sub { # finish
            my ($error)= @_;
            my @cb= @{$self->{waiting_for_cb}};
            $self->{waiting_for_cb}= [];
            $error= $error || ($self->{master_id} ? undef : "Master selection aborted");
            $_->($error) for @cb;
        }
    );
}

sub shutdown {
    my ($self)= @_;
    $self->{shutdown}= 1;
}

sub load_status {
    my ($self, $new_status)= @_;
    my $old_status= $self->{status};
    $self->{status}= $new_status;

    my @old_hosts= grep {!$new_status->{$_}} keys %$old_status;
    my @new_hosts= grep {!$old_status->{$_}} keys %$new_status;

    $self->{pool}->on_removed_node($old_status->{$_}) for @old_hosts;
    $self->{pool}->on_new_node($new_status->{$_}) for @new_hosts;
}

sub event_added_node {
    my ($self, $ipaddress)= @_;
    $self->refresh_network_status unless $self->{status}{$ipaddress};
}

sub event_removed_node {
    my ($self, $ipaddress)= @_;
    my $old_node= delete $self->{status}{$ipaddress};
    if ($old_node) {
        $self->{pool}->on_removed_node($old_node);
    }
}

sub disconnected {
    my ($self, $id)= @_;
    if ($self->{master_id} && $self->{master_id} == $id) {
        $self->{master_id}= undef;
        $self->select_master(sub{});
    }
}

sub refresh_network_status {
    my ($self)= @_;

    series([
        sub {
            my ($next)= @_;
            $self->{pool}->get_one_cb($next);
        }, sub {
            my ($next, $connection)= @_;
            $connection->get_network_status($next);
        }, sub {
            my ($next, $status)= @_;
            $self->load_status($status);
            return $next->();
        }
    ], sub {
        my ($error)= @_;
        # XXX And now?
    });
}

1;

__END__

=pod

=head1 NAME

Cassandra::Client::NetworkStatus

=head1 VERSION

version 0.19

=head1 AUTHOR

Tom van der Woerdt <tvdw@cpan.org>

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2022 by Tom van der Woerdt.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.

=cut