NAME
Kafka::Producer::Avro - Avro message producer for Apache Kafka.
SYNOPSIS
use Kafka::Connection;
use Kafka::Producer::Avro;
my $connection = Kafka::Connection->new( host => 'localhost' );
my $producer = Kafka::Producer::Avro->new( Connection => $connection , SchemaRegistry => Confluent::SchemaRegistry->new() );
# Set Avro schema for message key (valid JSON-string)
my $key_schema = <<KEY_SCHEMA;
{
"type": "long",
"name": "_id"
}
KEY_SCHEMA
# Set Avro schema for message value (payload) (valid JSON-string)
my $value_schema = <<VALUE_SCHEMA;
{
"type": "record",
"name": "myrecord",
"fields": [
{
"name": "f1",
"type": "string"
}
]
}
VALUE_SCHEMA
# Sending a single message
my $response = $producer->send(
'mytopic', # topic
0, # partition
'Single message', # message
undef, # key
undef, # compression_codec
undef, # timestamps
$key_schema, # key_schema
$value_schema # value_schema
);
# Sending a series of messages
$response = $producer->send(
'mytopic', # topic
0, # partition
[ # messages
'The first message',
'The second message',
'The third message',
],
undef, # key(s)
undef, # compression_codec
undef, # timestamp(s)
$key_schema, # key_schema
$value_schema # value_schema
);
# ...or use named parameters
$producer->send(
topic => $topic,
partition => $partition,
messages => $messages,
keys => $keys,
compression_codec => $compression_codec,
timestamps => $timestamps,
key_schema => $key_schema,
value_schema => $value_schema
);
# Closes the producer and cleans up
undef $producer;
$connection->close;
undef $connection;
DESCRIPTION
"Kafka::Producer::Avro" main feature is to provide object-oriented API
to produce messages according to *Confluent SchemaRegistry* and *Avro*
serialization.
"Kafka::Producer::Avro" inerhits from and extends Kafka::Producer.
INSTALL
Installation of "Kafka::Producer::Avro" is a canonical:
perl Makefile.PL
make
make test
make install
TEST NOTES
Tests are focused on verifying Avro-formatted messages and theirs
interactions with Confluent Schema Registry and are intended to extend
"Kafka::Producer" test suite.
They expect that in the target are listening Apache Kafka and Schema
Registry services, respectively listening on "localhost:9092" and
"http://localhost:8081".
You can alternatively set a different URLs by exporting the following
environment variable:
"KAFKA_HOST"
"KAFKA_PORT"
"CONFLUENT_SCHEMA_REGISTY_URL"
For example:
export KAFKA_HOST=my-kafka-host.my-domain.org
export FALFA_PORT=9092
export CONFLUENT_SCHEMA_REGISTY_URL=http://my-schema-registry-host.my-domain.org
USAGE
CONSTRUCTOR
"new"
Creates new producer client object.
"new()" takes arguments in key-value pairs as described in
Kafka::Producer from which it inherits.
In addition, takes in the following arguments:
"SchemaRegistry => $schema_registry" (mandatory)
Is a Confluent::SchemaRegistry instance.
METHODS
The following methods are defined for the "Kafka::Avro::Producer" class:
"schema_registry"()
Returns the Confluent::SchemaRegistry instance supplied to the
construcor.
"get_error"()
Returns a string containing last error message.
"send( $topic, $partition, $messages, $keys, $compression_codec, $timestamps, $key_schema, $value_schema )"
"send( %named_params )"
Sends Avro-formatted messages on a Kafka::Connection object.
Returns a non-blank value (a reference to a hash with server response
description) if the message is successfully sent.
In order to handle Avro format, "Kafka::Producer|Kafka::Producer"
"send()" method is extended with two more positional arguments,
$key_schema and $value_schema:
$producer->send(
$topic, # scalar
$partition, # scalar
$messages, # scalar | array
$keys, # (optional) undef | scalar | array
$compression_codec, # (optional) undef | scalar
$timestamps, # (optional) undef | scalar | array
$key_schema, # (optional) undef | JSON-string
$value_schema # (optional) undef | JSON-string
);
Both $key_schema and $value_schema parameters are optional and must
provide JSON strings that represent Avro schemas to use to validate and
serialize key(s) and value(s).
These schemas are validated against $schema_registry and, if compliant,
they are added to the registry under the "$topic+'key'" or
"$topic+'value'" Schema Registry's subjects.
If an expected schema isn't provided, latest version from Schema
Registry is used accordingly to the (topic + key/value) subject.
Alternatively, for ease of use, the "send()" method may be also used by
suggesting named parameters:
$producer->send(
topic => $topic, # scalar
partition => $partition, # scalar
messages => $messages, # scalar | array
keys => $keys, # (optional) undef | scalar | array
compression_codec => $compression_codec, # (optional) undef | scalar
timestamps => $timestamps, # (optional) undef | scalar | array
key_schema => $key_schema, # (optional) undef | JSON-string
value_schema => $value_schema # (optional) undef | JSON-string
);
"bulk_send( %params )"
Similar to "send" but uses bulks to avoid memory leaking.
Extra named parameters are expected:
"size => $size"
The size of the bulk
"on_before_send_bulk => sub {...} " (optional)
A code block that will be executed before the sending of each bulk.
The block will receive the following positional parameters:
$bulk_num the number of the bulk
$bulk_messages the number of messages in the bulk
$bulk_keys the number of keys in the bulk
$index_from the absolute index of the first message in the bulk
$index_to the absolute index of the last message in the bulk
"on_after_send_bulk => sub {...} " (optional)
A code block that will be executed after the sending of each bulk.
The block will receive the following positional parameters:
$sent the number of sent messages in the bulk
$total_sent the total number of messages sent
"on_init => sub {...} " (optional)
A code block that will be executed only once before at the beginning
of the cycle.
The block will receive the following positional parameters:
$to_send the total number of messages to send
$bulk_size the size of the bulk
"on_complete => sub {...} " (optional)
A code block that will be executed only once after the end of the
cycle.
The block will receive the following positional parameters:
$to_send the total number of messages to send
$total_sent the total number of messages sent
$errors the number bulks sent with errors
"on_send_error => sub {...} " (optional)
A code block that will be executed when a bulk registers an error.
AUTHOR
Alvaro Livraghi, <alvarol@cpan.org>
CONTRIBUTE
<https://github.com/alivraghi/Kafka-Producer-Avro>
BUGS
Please use GitHub project link above to report problems or contact
authors.
COPYRIGHT AND LICENSE
Copyright 2018 by Alvaro Livraghi
This program is free software; you can redistribute it and/or modify it
under the same terms as Perl itself.