-
-
08 Jan 2021 17:20:26 UTC
- Distribution: Kafka-Producer-Avro
- Module version: v1.0.0
- Source (raw)
- Browse (raw)
- Changes
- Homepage
- How to Contribute
- Repository
- Issues
- Testers
- Kwalitee
Bus factor: 1- License: perl_5
- Perl: v5.10.0
- Activity
24 month- Tools
- Download (10.34KB)
- MetaCPAN Explorer
- Permissions
- Subscribe to distribution
- Permalinks
- This version
- Latest version
and 1 contributors- Alvaro Livraghi
- Dependencies
- Avro::BinaryEncoder
- Avro::Schema
- Confluent::SchemaRegistry
- JSON::XS
- Kafka::Producer
- and possibly others
- Reverse dependencies
- CPAN Testers List
- Dependency graph
NAME
Kafka::Producer::Avro - Avro message producer for Apache Kafka.
SYNOPSIS
use Kafka::Connection; use Kafka::Producer::Avro; my $connection = Kafka::Connection->new( host => 'localhost' ); my $producer = Kafka::Producer::Avro->new( Connection => $connection , SchemaRegistry => Confluent::SchemaRegistry->new() ); # Set Avro schema for message key (valid JSON-string) my $key_schema = <<KEY_SCHEMA; { "type": "long", "name": "_id" } KEY_SCHEMA # Set Avro schema for message value (payload) (valid JSON-string) my $value_schema = <<VALUE_SCHEMA; { "type": "record", "name": "myrecord", "fields": [ { "name": "f1", "type": "string" } ] } VALUE_SCHEMA # Sending a single message my $response = $producer->send( 'mytopic', # topic 0, # partition 'Single message', # message undef, # key undef, # compression_codec undef, # timestamps $key_schema, # key_schema $value_schema # value_schema ); # Sending a series of messages $response = $producer->send( 'mytopic', # topic 0, # partition [ # messages 'The first message', 'The second message', 'The third message', ], undef, # key(s) undef, # compression_codec undef, # timestamp(s) $key_schema, # key_schema $value_schema # value_schema ); # ...or use named parameters $producer->send( topic => $topic, partition => $partition, messages => $messages, keys => $keys, compression_codec => $compression_codec, timestamps => $timestamps, key_schema => $key_schema, value_schema => $value_schema ); # Closes the producer and cleans up undef $producer; $connection->close; undef $connection;
DESCRIPTION
Kafka::Producer::Avro
main feature is to provide object-oriented API to produce messages according to Confluent SchemaRegistry and Avro serialization.Kafka::Producer::Avro
inerhits from and extends Kafka::Producer.INSTALL
Installation of
Kafka::Producer::Avro
is a canonical:perl Makefile.PL make make test make install
TEST NOTES
Tests are focused on verifying Avro-formatted messages and theirs interactions with Confluent Schema Registry and are intended to extend
Kafka::Producer
test suite.They expect that in the target are listening Apache Kafka and Schema Registry services, respectively listening on
localhost:9092
andhttp://localhost:8081
.You can alternatively set a different URLs by exporting the following environment variable:
KAFKA_HOST
KAFKA_PORT
CONFLUENT_SCHEMA_REGISTY_URL
For example:
export KAFKA_HOST=my-kafka-host.my-domain.org export FALFA_PORT=9092 export CONFLUENT_SCHEMA_REGISTY_URL=http://my-schema-registry-host.my-domain.org
USAGE
CONSTRUCTOR
new
Creates new producer client object.
new()
takes arguments in key-value pairs as described in Kafka::Producer from which it inherits.In addition, takes in the following arguments:
SchemaRegistry => $schema_registry
(mandatory)-
Is a Confluent::SchemaRegistry instance.
METHODS
The following methods are defined for the
Kafka::Avro::Producer
class:schema_registry
()Returns the Confluent::SchemaRegistry instance supplied to the construcor.
get_error
()Returns a string containing last error message.
send( $topic, $partition, $messages, $keys, $compression_codec, $timestamps, $key_schema, $value_schema )
send( %named_params )
Sends Avro-formatted messages on a Kafka::Connection object.
Returns a non-blank value (a reference to a hash with server response description) if the message is successfully sent.
In order to handle Avro format,
Kafka::Producer|Kafka::Producer
send()
method is extended with two more positional arguments,$key_schema
and$value_schema
:$producer->send( $topic, # scalar $partition, # scalar $messages, # scalar | array $keys, # (optional) undef | scalar | array $compression_codec, # (optional) undef | scalar $timestamps, # (optional) undef | scalar | array $key_schema, # (optional) undef | JSON-string $value_schema # (optional) undef | JSON-string );
Both
$key_schema
and$value_schema
parameters are optional and must provide JSON strings that represent Avro schemas to use to validate and serialize key(s) and value(s).These schemas are validated against
$schema_registry
and, if compliant, they are added to the registry under the$topic+'key'
or$topic+'value'
Schema Registry's subjects.If an expected schema isn't provided, latest version from Schema Registry is used accordingly to the (topic + key/value) subject.
Alternatively, for ease of use, the
send()
method may be also used by suggesting named parameters:$producer->send( topic => $topic, # scalar partition => $partition, # scalar messages => $messages, # scalar | array keys => $keys, # (optional) undef | scalar | array compression_codec => $compression_codec, # (optional) undef | scalar timestamps => $timestamps, # (optional) undef | scalar | array key_schema => $key_schema, # (optional) undef | JSON-string value_schema => $value_schema # (optional) undef | JSON-string );
bulk_send( %params )
Similar to
send
but uses bulks to avoid memory leaking.Extra named parameters are expected:
size => $size
-
The size of the bulk
on_before_send_bulk => sub {...}
(optional)-
A code block that will be executed before the sending of each bulk.
The block will receive the following positional parameters:
on_after_send_bulk => sub {...}
(optional)-
A code block that will be executed after the sending of each bulk.
The block will receive the following positional parameters:
on_init => sub {...}
(optional)-
A code block that will be executed only once before at the beginning of the cycle.
The block will receive the following positional parameters:
on_complete => sub {...}
(optional)-
A code block that will be executed only once after the end of the cycle.
The block will receive the following positional parameters:
on_send_error => sub {...}
(optional)-
A code block that will be executed when a bulk registers an error.
AUTHOR
Alvaro Livraghi, <alvarol@cpan.org>
CONTRIBUTE
https://github.com/alivraghi/Kafka-Producer-Avro
BUGS
Please use GitHub project link above to report problems or contact authors.
COPYRIGHT AND LICENSE
Copyright 2018 by Alvaro Livraghi
This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
Module Install Instructions
To install Kafka::Producer::Avro, copy and paste the appropriate command in to your terminal.
cpanm Kafka::Producer::Avro
perl -MCPAN -e shell install Kafka::Producer::Avro
For more information on module installation, please visit the detailed CPAN module installation guide.