package Message::Passing::Output::Search::Elasticsearch;
$Message::Passing::Output::Search::Elasticsearch::VERSION = '0.005';
# ABSTRACT: index messages in Elasticsearch

use Moo;
use MooX::Types::MooseLike::Base
    qw( Str ArrayRef HashRef CodeRef is_CodeRef AnyOf ConsumerOf InstanceOf );

use Search::Elasticsearch::Async;
use Promises backend => ['AnyEvent'];

with 'Message::Passing::Role::Output';



has es_params => (
    is      => 'ro',
    isa     => HashRef,
    default => sub { {} },
);


has es => (
    is      => 'ro',
    lazy    => 1,
    isa     => ConsumerOf ['Search::Elasticsearch::Role::Client'],
    builder => sub {
        my $self = shift;
        return Search::Elasticsearch::Async->new( %{ $self->es_params } );
    },
);


has es_bulk_params => (
    is      => 'ro',
    isa     => HashRef,
    default => sub { {} },
);


has es_bulk => (
    is   => 'ro',
    lazy => 1,
    isa  => ConsumerOf [
        'Search::Elasticsearch::Client::6_0::Role::Bulk',
        'Search::Elasticsearch::Role::Is_Async'
    ],
    builder => sub {
        my $self = shift;
        return $self->es->bulk_helper( %{ $self->es_bulk_params } );
    },
);


has type => (
    is       => 'ro',
    required => 1,
    isa      => AnyOf [ Str, CodeRef ],
);


has index_name => (
    is       => 'ro',
    required => 1,
    isa      => AnyOf [ Str, CodeRef ],
);


sub consume {
    my ( $self, $data ) = @_;
    return
        unless defined $data && ref $data eq 'HASH';

    #if ( my $epochtime = delete $data->{epochtime} ) {
    #$date = DateTime->from_epoch(epoch => $epochtime);
    #}
    #$date ||= DateTime->from_epoch(epoch => time());

    my $type =
        is_CodeRef( $self->type )
        ? $self->type->($data)
        : $self->type;
    my $index_name =
        is_CodeRef( $self->index_name )
        ? $self->index_name->($data)
        : $self->index_name;

    #$self->_indexes->{$index_name} = 1;
    #    my $to_queue = {
    #        '@timestamp'   => to_ISO8601DateTimeStr($date),
    #        '@tags'        => [],
    #        '@type'        => $type,
    #        '@source_host' => delete( $data->{hostname} ) || 'none',
    #        '@message'     => exists( $data->{message} )
    #        ? delete( $data->{message} )
    #        : encode_json($data),
    #        '@fields' => $data,
    #        exists( $data->{uuid} ) ? ( id => delete( $data->{uuid} ) ) : (),
    #    };
    $self->es_bulk->index(
        {   index  => $index_name,
            type   => $type,
            source => $data,
        }
    );
}

1;

__END__

=pod

=encoding UTF-8

=head1 NAME

Message::Passing::Output::Search::Elasticsearch - index messages in Elasticsearch

=head1 VERSION

version 0.005

=head1 DESCRIPTION

This output is intentionally kept simple to not add dependencies.
If you need a special format use a filter like
L<Message::Passing::Filter::ToLogstash> before sending messages to this
output.

=head1 ATTRIBUTES

=head2 es_params

A hashref of L<Search::Elasticsearch::Async/"CREATING A NEW INSTANCE"> parameters.

=head2 es

A L<Search::Elasticsearch::Async> instance. Can either be passed directly or
gets constructed from L</es_params>.

=head2 es_bulk_params

A hashref of
L<Search::Elasticsearch::Client::6_0::Async::Bulk/"CREATING A NEW INSTANCE">
parameters.

=head2 es_bulk

A L<Search::Elasticsearch::Client::6_0::Async::Bulk> instance. Can either be
passed directly or gets constructed from L</es> and L</es_bulk_params> using
bulk_helper.

=head2 type

Can be either set to a fixed string or a coderef that's called for every
message to return the type depending on the contents of the message.

=head2 index_name

Can be either set to a fixed string or a coderef that's called for every
message to return the index name depending on the contents of the message.

=head1 METHODS

=head2 consume ($msg)

Consumes a message, queuing it for consumption by Elasticsearch.
Assumes that the message is a hashref, skips silently in case it isn't.

=head1 SEE ALSO

=over

=item L<Message::Passing>

=back

=head1 AUTHOR

Alexander Hartmaier <abraxxa@cpan.org>

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2018 by Alexander Hartmaier.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.

=cut