package Message::Passing::Input::AMQP;
use Moo;
use AnyEvent;
use Scalar::Util qw/ weaken refaddr /;
use Try::Tiny;
use namespace::autoclean;

with qw/
    Message::Passing::AMQP::Role::BindsAQueue
    Message::Passing::Role::Input
/;


after '_set_queue' => sub {
    my $self = shift;
    weaken($self);
    $self->_channel->consume(
        on_consume => sub {
            my $message = shift;
            try {
                $self->output_to->consume($message->{body}->payload);
            }
            catch {
                warn("Error in consume_message callback: $_");
            };
        },
        consumer_tag => refaddr($self),
        on_success => sub {
        },
        on_failure => sub {
            Carp::cluck("Failed to start message consumer in $self response " . Dumper(@_));
        },
    );
};

1;

=head1 NAME

Message::Passing::Input::AMQP - input logstash messages from AMQP.

=head1 SYNOPSIS

    message-pass --output STDOUT --input AMQP --input_options '{"queue_name":"test","exchange_name":"test","hostname":"127.0.0.1","username":"guest","password":"guest"}'

=head1 DESCRIPTION

=head1 SEE ALSO

=over

=item L<Message::Passing::AMQP>

=item L<Message::Passing::Output::AMQP>

=item L<Message::Passing>

=item L<AMQP>

=item L<http://www.zeromq.org/>

=back

=head1 AUTHOR, COPYRIGHT AND LICENSE

See L<Message::Passing::AMQP>.

=cut