NAME
    Kafka::Producer::Avro - Avro message producer for Apache Kafka.

SYNOPSIS
      use Kafka::Connection;
      use Kafka::Producer::Avro;
  
      my $connection = Kafka::Connection->new( host => 'localhost' );
  
      my $producer = Kafka::Producer::Avro->new( Connection => $connection , SchemaRegistry => Confluent::SchemaRegistry->new() );
  
      # Set Avro schema for message key (valid JSON-string)
      my $key_schema = <<KEY_SCHEMA;
            {
                    "type": "long",
                    "name": "_id"
            }
      KEY_SCHEMA
      # Set Avro schema for message value (payload) (valid JSON-string)
      my $value_schema = <<VALUE_SCHEMA;
            {
                    "type": "record",
                    "name": "myrecord",
                    "fields": [
                            {
                                    "name": "f1",
                                    "type": "string"
                            }
                    ]
            }
      VALUE_SCHEMA
  
      # Sending a single message
      my $response = $producer->send(
            'mytopic',          # topic
            0,                  # partition
            'Single message',   # message
            undef,              # key
            undef,              # compression_codec
            undef,              # timestamps
            $key_schema,        # key_schema
            $value_schema       # value_schema
      );
  
      # Sending a series of messages
      $response = $producer->send(
            'mytopic',          # topic
            0,                  # partition
            [                   # messages
                    'The first message',
                    'The second message',
                    'The third message',
            ],
            undef,              # key(s)
            undef,              # compression_codec
            undef,              # timestamp(s)
            $key_schema,        # key_schema
            $value_schema       # value_schema
      );
  
      # ...or use named parameters
  
      $producer->send(
            topic             => $topic,
            partition         => $partition,
            messages          => $messages,
            keys              => $keys,
            compression_codec => $compression_codec,
            timestamps        => $timestamps,
            key_schema        => $key_schema,
            value_schema      => $value_schema
      );    
  
      # Closes the producer and cleans up
      undef $producer;
      $connection->close;
      undef $connection;

DESCRIPTION
    "Kafka::Producer::Avro" main feature is to provide object-oriented API
    to produce messages according to *Confluent SchemaRegistry* and *Avro*
    serialization.

    "Kafka::Producer::Avro" inerhits from and extends Kafka::Producer.

INSTALL
    Installation of "Kafka::Producer::Avro" is a canonical:

      perl Makefile.PL
      make
      make test
      make install

  TEST NOTES
    Tests are focused on verifying Avro-formatted messages and theirs
    interactions with Confluent Schema Registry and are intended to extend
    "Kafka::Producer" test suite.

    They expect that in the target are listening Apache Kafka and Schema
    Registry services, respectively listening on "localhost:9092" and
    "http://localhost:8081".

    You can alternatively set a different URLs by exporting the following
    environment variable:

    "KAFKA_HOST"
    "KAFKA_PORT"
    "CONFLUENT_SCHEMA_REGISTY_URL"

    For example:

      export KAFKA_HOST=my-kafka-host.my-domain.org
      export FALFA_PORT=9092
      export CONFLUENT_SCHEMA_REGISTY_URL=http://my-schema-registry-host.my-domain.org

USAGE
  CONSTRUCTOR
   "new"
    Creates new producer client object.

    "new()" takes arguments in key-value pairs as described in
    Kafka::Producer from which it inherits.

    In addition, takes in the following arguments:

    "SchemaRegistry => $schema_registry" (mandatory)
       Is a Confluent::SchemaRegistry instance.

  METHODS
    The following methods are defined for the "Kafka::Avro::Producer" class:

   "schema_registry"()
    Returns the Confluent::SchemaRegistry instance supplied to the
    construcor.

   "get_error"()
    Returns a string containing last error message.

   "send( $topic, $partition, $messages, $keys, $compression_codec, $timestamps, $key_schema, $value_schema )"
   "send( %named_params )"
    Sends Avro-formatted messages on a Kafka::Connection object.

    Returns a non-blank value (a reference to a hash with server response
    description) if the message is successfully sent.

    In order to handle Avro format, "Kafka::Producer|Kafka::Producer"
    "send()" method is extended with two more positional arguments,
    $key_schema and $value_schema:

      $producer->send(
            $topic,             # scalar 
            $partition,         # scalar
            $messages,          # scalar | array
            $keys,              # (optional) undef | scalar | array
            $compression_codec, # (optional) undef | scalar
            $timestamps,        # (optional) undef | scalar | array
            $key_schema,        # (optional) undef | JSON-string
            $value_schema       # (optional) undef | JSON-string
      );

    Both $key_schema and $value_schema parameters are optional and must
    provide JSON strings that represent Avro schemas to use to validate and
    serialize key(s) and value(s).

    These schemas are validated against $schema_registry and, if compliant,
    they are added to the registry under the "$topic+'key'" or
    "$topic+'value'" Schema Registry's subjects.

    If an expected schema isn't provided, latest version from Schema
    Registry is used accordingly to the (topic + key/value) subject.

    Alternatively, for ease of use, the "send()" method may be also used by
    suggesting named parameters:

      $producer->send(
            topic             => $topic,             # scalar 
            partition         => $partition,         # scalar
            messages          => $messages,          # scalar | array
            keys              => $keys,              # (optional) undef | scalar | array
            compression_codec => $compression_codec, # (optional) undef | scalar
            timestamps        => $timestamps,        # (optional) undef | scalar | array
            key_schema        => $key_schema,        # (optional) undef | JSON-string
            value_schema      => $value_schema       # (optional) undef | JSON-string
      );

   "bulk_send( %params )"
    Similar to "send" but uses bulks to avoid memory leaking.

    Extra named parameters are expected:

    "size => $size"
       The size of the bulk

    "on_before_send_bulk => sub {...} " (optional)
       A code block that will be executed before the sending of each bulk.

       The block will receive the following positional parameters:

       $bulk_num the number of the bulk
       $bulk_messages the number of messages in the bulk
       $bulk_keys the number of keys in the bulk
       $index_from the absolute index of the first message in the bulk
       $index_to the absolute index of the last message in the bulk

    "on_after_send_bulk => sub {...} " (optional)
       A code block that will be executed after the sending of each bulk.

       The block will receive the following positional parameters:

       $sent the number of sent messages in the bulk
       $total_sent the total number of messages sent

    "on_init => sub {...} " (optional)
       A code block that will be executed only once before at the beginning
       of the cycle.

       The block will receive the following positional parameters:

       $to_send the total number of messages to send
       $bulk_size the size of the bulk

    "on_complete => sub {...} " (optional)
       A code block that will be executed only once after the end of the
       cycle.

       The block will receive the following positional parameters:

       $to_send the total number of messages to send
       $total_sent the total number of messages sent
       $errors the number bulks sent with errors

    "on_send_error => sub {...} " (optional)
       A code block that will be executed when a bulk registers an error.

AUTHOR
    Alvaro Livraghi, <alvarol@cpan.org>

CONTRIBUTE
    <https://github.com/alivraghi/Kafka-Producer-Avro>

BUGS
    Please use GitHub project link above to report problems or contact
    authors.

COPYRIGHT AND LICENSE
    Copyright 2018 by Alvaro Livraghi

    This program is free software; you can redistribute it and/or modify it
    under the same terms as Perl itself.