package Message::Passing::Input::Redis;
use Moo;
use MooX::Types::MooseLike::Base qw/ ArrayRef Str /;
use Scalar::Util qw/ weaken /;
use AnyEvent;
use namespace::clean -except => 'meta';

with qw/
    Message::Passing::Redis::Role::HasAConnection
    Message::Passing::Role::Input
/;

has topics => (
    isa => ArrayRef[Str],
    coerce => sub {
        my $str = shift;
        return $str     if     ref $str eq 'ARRAY';
        return [ $str ] unless ref $str;
        return [];
    },
    is => 'ro',
    default => sub { [] },
);

has ptopics => (
    isa => ArrayRef[Str],
    coerce => sub {
        my $str = shift;
        return $str     if     ref $str eq 'ARRAY';
        return [ $str ] unless ref $str;
        return [];
    },
    is => 'ro',
    default => sub { [] },
);

has _handle => (
    is => 'rw',
    clearer => '_clear_handle',
);

sub connected {
    my ($self, $client) = @_;
    weaken($self);
    weaken($client);
    $client->subscribe(
        @{ $self->topics },
        sub {
            my ($message, $topic, $subscribed_topic) = @_;
            $self->output_to->consume($message);
        },
    ) if @{ $self->topics };
    $client->psubscribe(
        @{ $self->ptopics },
        sub {
            my ($message, $topic, $subscribed_topic) = @_;
            $self->output_to->consume($message);
        },
    ) if @{ $self->ptopics };
    $self->_handle(AnyEvent->io(
        fh   => $client->{sock},
        poll => "r",
        cb   => sub {
            $client->wait_for_messages(0);
        },
    ));
}

sub disconnect {
    my ($self) = @_;
    $self->_clear_handle;
}

1;

=head1 NAME

Message::Passing::Input::Redis - A Redis consumer for Message::Passing

=head1 SYNOPSIS

    $ message-pass --output STDOUT --input Redis --input_options '{"topics":["foo"],"hostname":"127.0.0.1","port":"6379"}'

=head1 DESCRIPTION

A simple subscriber a Redis PubSub topic

=head1 ATTRIBUTES

=head2 hostname

The hostname of the Redis server. Required.

=head2 port

The port number of the Redis server. Defaults to 6379.

=head2 topics

A list of topics to consume messages from.

These topic names are matched exactly.

=head2 ptopics

A list of pattern topics to consume messages from.

These topic names can wildcard match, so for example C<< prefix1.* >>
will match topics C<< prefix1.foo >> and C<< prefix1.bar >>.

=head1 METHODS

=head2 connected

Called by L<Message::Passing::Redis::ConnectionManager> to indicate a
connection to the Redis server has been made.

Causes the subscription to the topic(s) to be started

=head2 disconnect

Called by L<Message::Passing::Redis::ConnectionManager> to indicate a
connection to the Redis server has failed.

=head1 SEE ALSO

=over

=item L<Message::Passing::Output::Redis>

=item L<Message::Passing::Redis>

=back

=head1 AUTHOR, COPYRIGHT AND LICENSE

See L<Message::Passing::Redis>.

=cut