Search::Elasticsearch::Async - Async API for Elasticsearch using Promises
version 8.00
use Search::Elasticsearch::Async; use Promises backend => ['AnyEvent']; # Connect to localhost:9200: my $e = Search::Elasticsearch::Async->new(); # Round-robin between two nodes: my $e = Search::Elasticsearch::Async->new( nodes => [ 'search1:9200', 'search2:9200' ] ); # Connect to cluster at search1:9200, sniff all nodes and round-robin between them: my $e = Search::Elasticsearch::Async->new( nodes => 'search1:9200', cxn_pool => 'Async::Sniff' ); # Index a document: $e->index( index => 'my_app', type => 'blog_post', id => 1, body => { title => 'Elasticsearch clients', content => 'Interesting content...', date => '2013-09-24' } )->then( sub { my $result = shift; do_something($result) } ); # Get the document: my $doc; $e->get( index => 'my_app', type => 'blog_post', id => 1 )->then( sub { $doc = shift }); # Search: my $results; $e->search( index => 'my_app', body => { query => { match => { title => 'elasticsearch' } } } )->then( sub { $results = shift }); # Cluster requests: $e->cluster->info ->then( sub { do_something(@_) }); $e->cluster->health ->then( sub { do_something(@_) }); $e->cluster->node_stats->then( sub { do_something(@_) }); # Index requests: $e->indices->create(index=>'my_index')->then( sub { do_something(@_) }); $e->indices->delete(index=>'my_index')->then( sub { do_something(@_) });
Search::Elasticsearch::Async is the official asynchronous Perl client for Elasticsearch, supported by elastic.co. Elasticsearch itself is a flexible and powerful open source, distributed real-time search and analytics engine for the cloud. You can read more about it on elastic.co.
This module uses Promises to provide a sane async interface, making your async code look more like synchronous code. It can be used with Mojolicious or with any of the event loops supported by AnyEvent.
Search::Elasticsearch::Async builds on Search::Elasticsearch, which you should see for the main documentation.
This version of the async client supports the Elasticsearch 5.0 branch, which is not backwards compatible with earlier branches.
If you need to talk to a version of Elasticsearch before 5.0.0, please install one of the following packages:
Search::Elasticsearch::Client::2_0::Async
Search::Elasticsearch::Client::1_0::Async
Search::Elasticsearch::Client::0_90::Async
First, go and read Promises::Cookbook::GentleIntro, which tells you everything you need to know about working with Promises. Using them with Search::Elasticsearch::Async is easy:
The Promises module does not use an event loop by default. You need to specify the one to use at the start of your application. Typically, you will be using the EV event loop (which both AnyEvent and Mojo prefer), in which case you need:
use Promises backend => ['EV'];
Otherwise you can specify the Mojo or AnyEvent backends.
Mojo
AnyEvent
use Search::Elasticsearch::Async; my $es = Search::Elasticsearch::Async->new( %params );
See "CREATING A NEW INSTANCE" for an explantion of %params.
%params
my $promise = $es->search;
All requests to Elasticsearch return a Promise object, which is a value that will be resolved later on. You can call then() on the $promise to specify a success callback and an error callback:
then()
$promise
$promise->then( sub { my $result = shift; do_something() }, # success callback sub { my $error = shift; warn $error } # error callback );
So far, so much like "CONDITION VARIABLES" in AnyEvent... but then() returns another $promise, which makes them chainable:
$promise->then(sub { print "Got a result"; return @_ }) ->then(sub { my $result = shift; something_async($result) }) ->then(sub { my $next_result = shift; ...etc...}) ->catch(sub { warn "Something failed: @_"});
See Promises::Cookbook::GentleIntro for a full explanation of what you can do with Promises.
Async requests are run by the event loop, so no promises will be resolved or rejected until the event loop is started. In a fully async application, you would start the event loop once and just let it run until the application exits. For instance, here's a simple example which reads search keywords from STDIN, performs an async search and prints the results. This process is repeated until the application is interrupted with Ctrl-C.:
Ctrl-C
use v5.12; use AnyEvent; use Search::Elasticsearch::Async; # EV must be installed use Promises (backend => ['EV'], 'deferred'); my $es = Search::Elasticsearch::Async->new; main(); say "Starting"; # start the event loop EV::run; sub main { read_input() ->then( \&do_search ) ->then( \&print_results ) # warn if either of the above steps throws an error ->catch( sub { warn "Something went wrong: @_"; } ) # regardless of success or failure, run main() again ->finally( \&main ); } sub read_input { say "Enter search keywords:"; # We wrap AnyEvent so that it returns a promise # which is resolved when we have read from STDIN my $d = deferred; my $w; $w = AnyEvent->io( fh => \*STDIN, poll => 'r', cb => sub { chomp( my $input = <STDIN> ); undef $w; # resolve the promise $d->resolve($input); } ); # return a promise return $d->promise; } sub do_search { my $keywords = shift(); # returns a promise $es->search( index => 'myindex', body => { query => { match => { title => $keywords } } } ); } sub print_results { my $results = shift; my $total = $results->{hits}{total}; unless ($total) { say "No results found"; return; } say "$total results found"; my $i = 1; for ( @{ $results->{hits}{hits} } ) { say $i++ . ': ' . $_->{_source}{title}; } }
The "new()" method returns a new client which can be used to run requests against the Elasticsearch cluster.
use Search::Elasticsearch::Async; my $e = Search::Elasticsearch::Async->new( %params );
The most important arguments to "new()" are the following:
nodes
The nodes parameter tells the client which Elasticsearch nodes it should talk to. It can be a single node, multiples nodes or, if not specified, will default to localhost:9200:
localhost:9200
# default: localhost:9200 $e = Search::Elasticsearch::Async->new(); # single $e = Search::Elasticsearch::Async->new( nodes => 'search_1:9200'); # multiple $e = Search::Elasticsearch::Async->new( nodes => [ 'search_1:9200', 'search_2:9200' ] );
Each node can be a URL including a scheme, host, port, path and userinfo (for authentication). For instance, this would be a valid node:
node
https://username:password@search.domain.com:443/prefix/path
See "node" in Search::Elasticsearch::Role::Cxn for more on node specification.
cxn_pool
The CxnPool modules manage connections to nodes in the Elasticsearch cluster. They handle the load balancing between nodes and failover when nodes fail. Which CxnPool you should use depends on where your cluster is. There are three choices:
CxnPool
Async::Static
$e = Search::Elasticsearch::Async->new( cxn_pool => 'Async::Static' # default nodes => [ 'search1.domain.com:9200', 'search2.domain.com:9200' ], );
The Async::Static connection pool, which is the default, should be used when you don't have direct access to the Elasticsearch cluster, eg when you are accessing the cluster through a proxy. See Search::Elasticsearch::CxnPool::Async::Static for more.
Async::Sniff
$e = Search::Elasticsearch::Async->new( cxn_pool => 'Async::Sniff', nodes => [ 'search1:9200', 'search2:9200' ], );
The Async::Sniff connection pool should be used when you do have direct access to the Elasticsearch cluster, eg when your web servers and Elasticsearch servers are on the same network. The nodes that you specify are used to discover the cluster, which is then sniffed to find the current list of live nodes that the cluster knows about. See Search::Elasticsearch::CxnPool::Async::Sniff.
Async::Static::NoPing
$e = Search::Elasticsearch::Async->new( cxn_pool => 'Async::Static::NoPing' nodes => [ 'proxy1.domain.com:80', 'proxy2.domain.com:80' ], );
The Async::Static::NoPing connection pool should be used when your access to a remote cluster is so limited that you cannot ping individual nodes with a HEAD / request.
HEAD /
See Search::Elasticsearch::CxnPool::Async::Static::NoPing for more.
trace_to
For debugging purposes, it is useful to be able to dump the actual HTTP requests which are sent to the cluster, and the response that is received. This can be enabled with the trace_to parameter, as follows:
# To STDERR $e = Search::Elasticsearch::Async->new( trace_to => 'Stderr' ); # To a file $e = Search::Elasticsearch::Async->new( trace_to => ['File','/path/to/filename'] );
Logging is handled by Log::Any. See Search::Elasticsearch::Logger::LogAny for more information.
Other arguments are explained in the respective module docs.
When you create a new instance of Search::Elasticsearch::Async, it returns a client object, which can be used for running requests.
use Search::Elasticsearch::Async; my $e = Search::Elasticsearch::Async->new( %params ); # create an index $e->indices->create( index => 'my_index' ) ->then(sub { # index a document $e->index( index => 'my_index', type => 'blog_post', id => 1, body => { title => 'Elasticsearch clients', content => 'Interesting content...', date => '2013-09-24' } ); });
See Search::Elasticsearch::Client::6_0::Direct for more details about the requests that can be run.
Each chunk of functionality is handled by a different module, which can be specified in the call to new() as shown in cxn_pool above. For instance, the following will use the Search::Elasticsearch::CxnPool::Async::Sniff module for the connection pool.
$e = Search::Elasticsearch::Async->new( cxn_pool => 'Async::Sniff' );
Custom modules can be named with the appropriate prefix, eg Search::Elasticsearch::CxnPool::, or by prefixing the full class name with +:
Search::Elasticsearch::CxnPool::
+
$e = Search::Elasticsearch::Async->new( cxn_pool => '+My::Custom::CxnClass' );
The modules that you can override are specified with the following arguments to "new()":
client
The class to use for the client functionality, which provides methods that can be called to execute requests, such as search(), index() or delete(). The client parses the user's requests and passes them to the "transport" class to be executed.
search()
index()
delete()
The default version of the client is 6_0::Direct, which can be explicitly specified as follows:
6_0::Direct
$e = Search::Elasticsearch::Async->new( client => '6_0::Direct' );
See :
Search::Elasticsearch::Client::6_0::Direct (default, for 6.0 branch)
Search::Elasticsearch::Client::5_0::Direct (for 5.0 branch)
Search::Elasticsearch::Client::2_0::Direct (for 2.0 branch)
Search::Elasticsearch::Client::1_0::Direct (for 1.0 branch)
Search::Elasticsearch::Client::0_90::Direct (for 0.90 branch)
transport
The Transport class accepts a parsed request from the "client" class, fetches a "cxn" from its "cxn_pool" and tries to execute the request, retrying after failure where appropriate. See:
Search::Elasticsearch::Async::Transport
cxn
The class which handles raw requests to Elasticsearch nodes. See:
Search::Elasticsearch::Cxn::AEHTTP (default)
Search::Elasticsearch::Cxn::Mojo
cxn_factory
The class which the "cxn_pool" uses to create new "cxn" objects. See:
Search::Elasticsearch::Cxn::Factory
The class to use for the connection pool functionality. It calls the "cxn_factory" class to create new "cxn" objects when appropriate. See:
Search::Elasticsearch::CxnPool::Async::Static (default)
Search::Elasticsearch::CxnPool::Async::Sniff
Search::Elasticsearch::CxnPool::Async::Static::NoPing
logger
The class to use for logging events and tracing HTTP requests/responses. See:
Search::Elasticsearch::Logger::LogAny
serializer
The class to use for serializing request bodies and deserializing response bodies. See:
Search::Elasticsearch::Serializer::JSON (default)
Search::Elasticsearch::Serializer::JSON::Cpanel
Search::Elasticsearch::Serializer::JSON::XS
Search::Elasticsearch::Serializer::JSON::PP
Search::Elasticsearch::Client::6_0::Async::Bulk and Search::Elasticsearch::Client::6_0::Async::Scroll are helper modules which assist with bulk indexing and scrolled searching, eg:
$es->scroll_helper( index => 'myindex', on_result => sub { my $doc = shift; do_something($doc) } )->then( sub { say "Done!" });
This is a stable API but this implementation is new. Watch this space for new releases.
If you have any suggestions for improvements, or find any bugs, please report them to http://github.com/elasticsearch/elasticsearch-perl/issues. I will be notified, and then you'll automatically be notified of progress on your bug as I make changes.
You can find documentation for this module with the perldoc command.
perldoc Search::Elasticsearch::Async
You can also look for information at:
GitHub
http://github.com/elasticsearch/elasticsearch-perl
CPAN Ratings
http://cpanratings.perl.org/d/Search::Elasticsearch::Async
Search MetaCPAN
https://metacpan.org/module/Search::Elasticsearch::Async
IRC
The #elasticsearch channel on irc.freenode.net.
irc.freenode.net
Mailing list
The main Elasticsearch mailing list.
The full test suite requires a live Elasticsearch node to run, and should be run as :
perl Makefile.PL ES=localhost:9200 make test
TESTS RUN IN THIS WAY ARE DESTRUCTIVE! DO NOT RUN AGAINST A CLUSTER WITH DATA YOU WANT TO KEEP!
Enrico Zimuel <enrico.zimuel@elastic.co>
This software is Copyright (c) 2022 by Elasticsearch BV.
This is free software, licensed under:
The Apache License, Version 2.0, January 2004
To install Search::Elasticsearch::Async, copy and paste the appropriate command in to your terminal.
cpanm
cpanm Search::Elasticsearch::Async
CPAN shell
perl -MCPAN -e shell install Search::Elasticsearch::Async
For more information on module installation, please visit the detailed CPAN module installation guide.