package Job::Async::Worker::Redis;

use strict;
use warnings;

use parent qw(Job::Async::Worker);

our $VERSION = '0.004'; # VERSION

=head1 NAME

Job::Async::Worker::Redis - L<Net::Async::Redis> worker implementation for L<Job::Async::Worker>

=head1 SYNOPSIS

=head1 DESCRIPTION

=cut

use curry::weak;
use Syntax::Keyword::Try;
use Future::AsyncAwait;

use Job::Async::Utils;
use Future::Utils qw(repeat);
use JSON::MaybeUTF8 qw(:v1);
use Log::Any qw($log);

use Net::Async::Redis;

=head2 incoming_job

Source for jobs received from the C<< BRPOP(LPUSH) >> queue wait.

=cut

sub incoming_job {
    my ($self) = @_;
    $self->{incoming_job} //= do {
        die 'needs to be part of a loop' unless $self->loop;
        my $src = $self->ryu->source;
        $src->map($self->curry::weak::on_job_received)->map('retain')->retain;
        $src
    }
}

=head2 on_job_received

Called for each job that's received.

=cut

async sub on_job_received {
    my ($self, $id) = (shift, @$_);
    try {
        my ($queue) = $self->pending_queues;

        $log->debugf('Received job %s', $id);
        if(exists $self->{pending_jobs}{$id}) {
            $log->errorf("Already have job %s", $id);
            die 'Duplicate job ID';
        } else {
            undef $self->{pending_jobs}{$id};
        }

        my $job_count = 0 + keys %{$self->{pending_jobs}};
        $log->debugf("Current job count is %d", $job_count);
        $self->trigger;
        my ($items) = await $self->redis->hgetall('job::' . $id);
        $self->redis->hmset(
            'job::' . $id,
            _started => Time::HiRes::time()
        )->retain;
        my %data = @$items;
        my $result = delete $data{result};
        $log->debugf('Original job data is %s', \%data);
        $self->{pending_jobs}{$id} = my $job = Job::Async::Job->new(
            data   => Job::Async::Job->structured_data(\%data),
            id     => $id,
            future => my $f = $self->loop->new_future,
        );

        $log->debugf('Job content is %s', { map { $_ => $job->{$_} } qw(data id) });
        $f->on_done(sub {
            my ($rslt) = @_;
            $log->debugf("Result was %s", $rslt);
            my $code = sub {
                my $tx = shift;
                try {
                    delete $self->{pending_jobs}{$id};
                    $log->tracef('Removing job from processing queue');
                    return Future->needs_all(
                        map {
                            $_->on_ready(sub {
                                my $f = shift;
                                $log->tracef('ready for %s - %s', $f->label, $f->state);
                            });
                        }
                        $tx->hmset(
                            'job::' . $id,
                            _processed => Time::HiRes::time(),
                            result => ref($rslt) ? 'J' . encode_json_utf8($rslt) : 'T' . $rslt
                        ),
                        $tx->publish('client::' . $data{_reply_to}, $id),
                        $tx->lrem(
                            $self->prefixed_queue($self->processing_queue) => 1,
                            $id
                        ),
                    )
                } catch {
                    $log->errorf("Failed due to %s", $@);
                    return Future->fail($@, redis => $self->id, $id);
                }
            };
            (
                $self->use_multi
                ? $self->redis->multi($code)
                : $code->($self->redis)
            )->on_ready($self->curry::weak::trigger)
              ->on_fail(sub { $log->errorf('Failed to update Redis status for job %s - %s', $id, shift); })
              ->retain;
        });
        $f->on_ready($self->curry::weak::trigger);
        if(my $timeout = $self->timeout) {
            Future->needs_any(
                $f,
                $self->loop->timeout_future(after => $timeout)->on_fail(sub {
                    local @{$log->{context}}{qw(worker_id job_id)} = ($self->id, $id);
                    $log->errorf("Timeout but already completed with %s", $f->state) if $f->is_ready;
                    $f->fail('timeout')
                })
            )->retain;
        }
        $self->jobs->emit($job);
        return $f;
    } catch {
        $log->errorf("Unable to process received job %s - %s", $id, $@);
    }
}

sub use_multi { shift->{use_multi} }

sub prefix { shift->{prefix} //= 'jobs' }

=head2 pending_queues

Note that L<reliable mode|Job::Async::Redis/reliable> only
supports a single queue, and will fail if you attempt to start with multiple
queues defined.

=cut

sub pending_queues { @{ shift->{pending_queues} ||= [qw(pending)] } }

=head2 processing_queue

=cut

sub processing_queue { shift->{processing_queue} //= 'processing' }

=head2 start

=cut

sub start {
    my ($self) = @_;

    $self->trigger;
}

=head2 stop

Requests to stop processing.

Returns a future which will complete when all currently-processing jobs have
finished.

=cut

sub stop {
    my ($self) = @_;
    my $pending = 0 + keys %{$self->{pending_jobs}};
    if(!$pending && $self->{awaiting_job}) {
        return $self->stopping_future->done;
    }
    # else, either a job is being processed, or there are pending ones.
    # sub trigger will recheck
    return $self->stopping_future;
}

sub stopping_future {
    my ($self) = @_;
    $self->{stopping_future} ||= $self->loop->new_future->set_label('Job::Async::Worker::Redis shutdown');
}

sub queue_redis {
    my ($self) = @_;
    unless($self->{queue_redis}) {
        $self->add_child(
            $self->{queue_redis} = Net::Async::Redis->new(
                uri => $self->uri,
            )
        );
        $self->{queue_redis}->connect;
    }
    return $self->{queue_redis};
}

sub redis {
    my ($self) = @_;
    unless($self->{redis}) {
        $self->add_child(
            $self->{redis} = Net::Async::Redis->new(
                uri => $self->uri,
            )
        );
        $self->{redis}->connect;
    }
    return $self->{redis};
}

sub prefixed_queue {
    my ($self, $q) = @_;
    return $q unless length(my $prefix = $self->prefix);
    return join '::', $self->prefix, $q;
}

sub trigger {
    my ($self) = @_;
    local @{$log->{context}}{qw(worker_id queue)} = ($self->id, my ($queue) = $self->pending_queues);
    try {
        my $pending = 0 + keys %{$self->{pending_jobs}};
        $log->tracef('Trigger called with %d pending tasks, %d max', $pending, $self->max_concurrent_jobs);
        return if $pending >= $self->max_concurrent_jobs;

        return $self->{awaiting_job} //= do {
            $log->debugf('Awaiting job on %s', $queue);
            Future->wait_any(
                # If this is cancelled, we don't retrigger. Failure or success should retrigger as usual.
                $self->queue_redis->brpoplpush(
                    $self->prefixed_queue($queue) => $self->prefixed_queue($self->processing_queue),
                    $self->job_poll_interval
                )->on_done(sub {
                    my ($id, $queue, @details) = @_;
                    try {
                        $log->tracef('And we have an event on %s', $queue);
                        if($id) {
                            $log->tracef('Had task from queue, pending now %d', 0 + keys %{$self->{pending_jobs}});
                            $self->incoming_job->emit([ $id, $queue ]);
                        } else {
                            $log->tracef('No ID, full details were %s - maybe timeout?', join ' ', $id // (), $queue // (), @details);
                        }
                    } catch {
                        $log->errorf("Failed to retrieve and process job: %s", $@);
                    }
                    $self->loop->later($self->curry::weak::trigger) unless $self->stopping_future->is_ready;
                })->on_fail(sub {
                    my $failure = shift;
                    $log->errorf("Failed to retrieve job from redis: %s", $failure);
                    $self->loop->later($self->curry::weak::trigger) unless $self->stopping_future->is_ready;
                }),
                $self->stopping_future->without_cancel
            )->on_ready(sub {
                delete $self->{awaiting_job};
            });
        };
    } catch {
        $log->errorf('Failed to trigger job handling on %s - %s', $queue, $@);
    }
    return;
}

=head2 max_concurrent_jobs

Number of jobs to process in parallel. Defaults to 1.

=cut

sub max_concurrent_jobs { shift->{max_concurrent_jobs} //= 1 }

=head2 job_poll_interval

Polling interval (e.g. for C<BRPOPLPUSH> in C<reliable> mode), in seconds.

Defaults to 3 seconds.

=cut

sub job_poll_interval { shift->{job_poll_interval} //= 3 }

sub uri { shift->{uri} }

sub configure {
    my ($self, %args) = @_;
    for my $k (qw(uri max_concurrent_jobs prefix mode processing_queue use_multi job_poll_interval)) {
        $self->{$k} = delete $args{$k} if exists $args{$k};
    }

    if(exists $args{pending_queues}) {
        if(my $queues = $args{pending_queues}) {
            die 'Only a single queue is supported in reliable mode' if $self->mode eq 'reliable' and @$queues > 1;
            $self->{pending_queues} = $queues;
        } else {
            delete $self->{pending_queues}
        }
    }
    return $self->next::method(%args);
}

1;

=head1 AUTHOR

Tom Molesworth <TEAM@cpan.org>

=head1 LICENSE

Copyright Tom Molesworth 2016-2019. Licensed under the same terms as Perl itself.