package AnyEvent::RabbitMQ::LocalQueue;

use strict;
use warnings;

our $VERSION = '1.22'; # VERSION

sub new {
    my $class = shift;
    return bless {
        _message_queue    => [],
        _drain_code_queue => [],
    }, $class;
}

sub push {
    my $self = shift;

    CORE::push @{$self->{_message_queue}}, @_;
    return $self->_drain_queue();
}

sub get {
    my $self = shift;

    CORE::push @{$self->{_drain_code_queue}}, @_;
    return $self->_drain_queue();
}

sub _drain_queue {
    my $self = shift;

    my $message_count = scalar @{$self->{_message_queue}};
    my $drain_code_count = scalar @{$self->{_drain_code_queue}};

    my $count = $message_count < $drain_code_count
              ? $message_count : $drain_code_count;

    for (1 .. $count) {
        &{shift @{$self->{_drain_code_queue}}}(
            shift @{$self->{_message_queue}}
        );
    }

    return $self;
}

sub _flush {
    my ($self, $frame) = @_;

    $self->_drain_queue;

    while (my $cb = shift @{$self->{_drain_code_queue}}) {
        local $@; # Flush frames immediately, throwing away errors for on-close
        eval { $cb->($frame) };
    }
}

1;