package Message::Passing::Output::AMQP;
use Moo;
use Types::Standard qw( Str CodeRef );
use namespace::autoclean;

with qw/

has routing_key => (
    isa => Str,
    is => 'ro',
    default => '',

has header_cb => (
    isa => CodeRef,
    is => 'ro',

has serialize_cb => (
    isa => CodeRef,
    is => 'ro',

sub consume {
    my $self = shift;
    my $data = shift;
    if (ref $data && ! defined $self->serialize_cb) {
        warn("Passed non-serialized data - is a perl reference. Dropping.\n");
    unless ($self->_exchange) {
        warn("No exchange yet, dropping message");
    my $header;
    $header = $self->header_cb->($data)
        if defined $self->header_cb;

    $data = $self->serialize_cb->($data)
        if defined $self->serialize_cb;

        body => $data,
        header => $header,
        exchange => $self->exchange_name,
        routing_key => $self->routing_key,


=head1 NAME

Message::Passing::Output::AMQP - output messages to AMQP.


    message-pass --input STDIN --output AMQP --output_options '{"exchange_name":"test","hostname":"","username":"guest","password":"guest"}'


A L<Message::Passing> L<AnyEvent::RabbitMQ> output class.

Can be used as part of a chain of classes with the L<message-pass> utility, or directly as
a logger in normal perl applications.


=head2 routing_key

The routing key for all messages, defaults to ''.

=head2 header_cb

Optional callback function which gets passed the message before it is
serialized using L</serialize_cb>.
Should return a hashref which gets passed to publish( header => ).

NOTE: if you want to set the message headers (note the s) you have to pass them inside headers, e.g.:

      content_type => 'application/json',
      headers => {
          key => 'value',

=head2 serialize_cb

Optional callback function which gets passed the message and should return a
scalar. This is useful when passing structured messages e.g. hashrefs or
objects where some attributes should be accessible for the L</header_cb> function.
If the serialization happens before using a L<Message::Passing::Role::Filter>
it would require to deserialize it again in header_cb.
To use a Message::Passing filter you can instantiate it and pass it's filter
function to serialize_cb:

  my $filter = Message::Passing::Filter::Encoder::JSON->new(output_to => undef);


      serialize_cb => sub { $filter->filter(shift) },

=head1 METHODS

=head2 consume

Sends a message.

=head1 SEE ALSO


=item L<Message::Passing::AMQP>

=item L<Message::Passing::Input::AMQP>

=item L<Message::Passing>

=item L<AMQP>

=item L<>



See L<Message::Passing::AMQP>.