package Message::Passing::Output::ZeroMQ;
use Moo;
use MooX::Types::MooseLike::Base qw/ :all /;
use namespace::clean -except => 'meta';

use ZMQ::FFI::Constants qw/ :all /;
use Time::HiRes;

with qw/

has '+_socket' => (
    handles => {
        '_zmq_send' => 'send',

sub _socket_type { 'PUB' }

has socket_hwm => (
    is      => 'rw',
    default => 10000,

has subscribe_delay => (
    is      => 'ro',
    isa     => Num,
    default => 0.2,

# socket_(probably)_subscribed, but who has the bytes for that
has socket_subscribed => (
    is  => 'rw',
    isa => Bool,
has socket_connect_time => (
    is  => 'rw',
    isa => Num,

sub BUILD {
    my $self = shift;
    # Force a socket to be built, so that there's more chance the first message will be sent
    if ($self->_should_connect){
        my $socket = $self->_socket;


sub consume {
    my ($self, $data) = @_;

    # See the slow joiner problem for PUB/SUB, outlined in
    if (!$self->socket_subscribed && $self->socket_connect_time){
        my $time = Time::HiRes::time;
        my $alive_time = $time - $self->socket_connect_time;
        my $sleep_time = sprintf "%.4f", ($self->subscribe_delay - $alive_time);
        # warn "Alive $alive_time, so sleep time $sleep_time";
        if ($sleep_time > 0){
            Time::HiRes::sleep $sleep_time;

    return $self->_zmq_send($data);

sub setsockopt {
    my ($self, $socket) = @_;

    if ($self->zmq_major_version >= 3){
        $socket->set(ZMQ_SNDHWM, 'int', $self->socket_hwm);
    else {
        $socket->set(ZMQ_HWM, 'uint64_t', $self->socket_hwm);


after _build_socket => sub {
    my $self = shift;
    $self->socket_connect_time( Time::HiRes::time );


=head1 NAME

Message::Passing::Output::ZeroMQ - output messages to ZeroMQ.


    use Message::Passing::Output::ZeroMQ;

    my $logger = Message::Passing::Output::ZeroMQ->new;
    $logger->consume({data => { some => 'data'}, '@metadata' => 'value' });

    # Or see Log::Dispatch::Message::Passing for a more 'normal' interface to
    # simple logging.

    # Or use directly on command line:
    message-passing --input STDIN --output ZeroMQ --output_options \


A L<Message::Passing> ZeroMQ output class.

Can be used as part of a chain of classes with the L<message-passing> utility, or directly as
a logger in normal perl applications.


See L<Message::Passing::ZeroMQ/CONNECTION ATTRIBUTES>.

=head2 subscribe_delay

Time in floating seconds to sleep to ensure the receiving socket has subscribed.
This is the longest the sleep might take.

See the slow-joiner problem: L<>.

DEFAULT: 0.2 seconds

=head1 METHODS

=head2 consume ($msg)

Sends a message, as-is. This means that you must have encoded the message to a string before
sending it. The C<message-pass> utility will do this for you into JSON, or you can
do it manually as shown in the example in L<Message::Passing::ZeroMQ>.

=head1 SEE ALSO


=item L<Message::Passing::ZeroMQ>

=item L<Message::Passing::Input::ZeroMQ>

=item L<Message::Passing>

=item L<ZeroMQ>

=item L<>



This module exists due to the wonderful people at Suretec Systems Ltd.
<> who sponsored its development for its
VoIP division called SureVoIP <> for use with
the SureVoIP API - 


See L<Message::Passing>.