package Test2::Harness::UI::Controller::Stream;
use strict;
use warnings;

our $VERSION = '0.000127';

use Data::GUID;
use List::Util qw/max/;
use Test2::Harness::UI::Util qw/find_job/;
use Test2::Harness::UI::Response qw/resp error/;
use Test2::Harness::Util::JSON qw/encode_json/;
use JSON::PP();
use Carp qw/croak/;

use parent 'Test2::Harness::UI::Controller';
use Test2::Harness::UI::Util::HashBase qw{
    <run
    <job
};

use constant RUN_LIMIT => 100;

sub title { 'Stream' }

sub handle {
    my $self = shift;
    my ($route) = @_;

    my $req = $self->{+REQUEST};
    my $res = resp(200);

    my @sets = (
        $self->stream_runs($req, $route),
        $self->stream_jobs($req, $route),
        $self->stream_events($req, $route),
    );

    my $cache = 1;
    for my $it ($self->{+RUN}, $self->{+JOB}) {
        next unless $it;
        next if $it->complete;
        $cache = 0;
        last;
    }

    $res->stream(
        env          => $req->env,
        content_type => 'application/x-jsonl; charset=utf-8',
        cache => $cache,

        done => sub {
            my @keep;
            while (my $set = shift @sets) {
                my ($check) = @$set;
                next if $check->(); # Next if done

                # Not done, keep it
                push @keep => $set;
            }

            @sets = @keep;

            return @sets ? 0 : 1;
        },

        fetch => sub { map { $_->[1]->() } @sets },
    );

    return $res;
}

sub stream_runs {
    my $self = shift;
    my ($req, $route) = @_;

    my $schema = $self->{+CONFIG}->schema;

    my $opts = {
        collapse       => 1,
        remove_columns => [qw/log_data run_fields.data parameters/],

        join       => [qw/user_join project run_fields/],
        '+columns' => {
            'prefetched_fields'       => \'1',
            'run_fields.run_field_id' => 'run_fields.run_field_id',
            'run_fields.name'         => 'run_fields.name',
            'run_fields.details'      => 'run_fields.details',
            'run_fields.raw'          => 'run_fields.raw',
            'run_fields.link'         => 'run_fields.link',
            'run_fields.data',        => \"run_fields.data IS NOT NULL",
            'user'                    => \'user_join.username',
            'project'                 => \'project.name',
        },
    };

    my %params = (
        type => 'run',

        req => $req,

        track_status  => 1,
        id_field      => 'run_id',
        ord_field     => 'run_ord',
        sort_field    => 'run_ord',
        search_base   => $schema->resultset('Run'),
        initial_limit => RUN_LIMIT,

        custom_opts => $opts,

        timeout => 60 * 30,    # 30 min.
    );

    my $id     = $route->{id};
    my $run_id = $route->{run_id};
    my ($project, $user);

    if ($id) {
        my $p_rs = $schema->resultset('Project');
        $project //= eval { $p_rs->search({project_id => $id})->first };
        $project //= eval { $p_rs->search({name => $id})->first };

        if ($project) {
            $params{search_base} = $params{search_base}->search_rs({project_id => $project->project_id});
        }
        else {
            my $u_rs = $schema->resultset('User');
            $user //= eval { $u_rs->search({user_id  => $id})->first };
            $user //= eval { $u_rs->search({username => $id})->first };

            if ($user) {
                $params{search_base} = $params{search_base}->search_rs({'me.user_id' => $user->user_id});
            }
            else {
                $run_id //= $id;
            }
        }
    }

    if($run_id) {
        return $self->stream_single(%params, id => $run_id);
    }

    return $self->stream_set(%params);
}

sub stream_jobs {
    my $self = shift;
    my ($req, $route) = @_;

    my $run = $self->{+RUN} // return;

    my $opts = {
        join => 'test_file',
        remove_columns => [qw/stdout stderr parameters/],
        '+select' => [
            'test_file.filename AS file',
        ],
        '+as' => [
            'file',
        ],
    };

    my %params = (
        type   => 'job',
        parent => $run,

        req => $req,

        track_status => 1,
        id_field     => 'job_key',
        ord_field    => 'job_ord',
        method       => 'glance_data',
        search_base  => scalar($run->jobs),
        custom_opts  => $opts,

        order_by => [{'-desc' => 'status'}, {'-desc' => [qw/job_try job_ord name/]}],
    );

    if (my $job_uuid = $route->{job}) {
        my $schema = $self->{+CONFIG}->schema;
        return $self->stream_single(%params, item => find_job($schema, $job_uuid, $route->{try}));
    }

    return $self->stream_set(%params);
}

sub stream_events {
    my $self = shift;
    my ($req, $route) = @_;

    my $job = $self->{+JOB} // return;

    # we only stream nested events when the job is still running
    my $query = $job->complete ? {nested => 0} : undef;

    my $opts = {
        remove_columns => ['orphan'],
        '+select' => [
            'facets IS NOT NULL AS has_facets',
            'orphan IS NOT NULL AS has_orphan',
        ],
        '+as' => [
            'has_facets',
            'has_orphan',
        ],
    };

    return $self->stream_set(
        type   => 'event',
        parent => $job,

        req => $req,

        track_status => 0,
        id_field     => 'event_id',
        ord_field    => 'insert_ord',
        sort_field   => 'event_ord',
        sort_dir     => '-asc',
        method       => 'line_data',
        custom_query => $query,
        custom_opts  => $opts,
        search_base  => scalar($job->events),
    );
}

sub stream_single {
    my $self = shift;
    my %params = @_;

    my $id_field    = $params{id_field};
    my $method      = $params{method};
    my $search_base = $params{search_base};
    my $type        = $params{type};
    my $id          = $params{id};
    my $custom_opts  = $params{custom_opts} // {};
    my $custom_query = $params{custom_query} // {};

    my $it;
    if (exists $params{item}) {
        $it = $params{item} or die error(404 => "Invalid Item");
    }
    else {
        $it = $search_base->search({%$custom_query, "me.$id_field" => $id}, $custom_opts)->first or die error(404 => "Invalid $type");
    }
    $self->{$type} = $it;

    my $sig;
    return [
        sub { $sig && $it->complete ? 1 : 0 },
        sub {
            my $update = JSON::PP::false;
            if ($sig) {
                $it->discard_changes;
                $update = JSON::PP::true;
            }

            my $new_sig   = $it->sig;
            my $unchanged = $sig && $sig eq $new_sig;
            $sig = $new_sig;

            return if $unchanged;

            my $data = $method ? $it->$method : $it->TO_JSON;
            return encode_json({type => $type, update => $update, data => $data}) . "\n";
        },
    ];
}

sub stream_set {
    my $self = shift;
    my (%params) = @_;

    my $custom_opts  = $params{custom_opts} // {};
    my $custom_query = $params{custom_query} // undef;
    my $id_field     = $params{id_field};
    my $limit        = $params{initial_limit};
    my $method       = $params{method};
    my $ord_field    = $params{ord_field};
    my $parent       = $params{parent};
    my $search_base  = $params{search_base};
    my $sort_field   = $params{sort_field};
    my $sort_dir     = $params{sort_dir} // '-desc';
    my $timeout      = $params{timeout};
    my $track        = $params{track_status};
    my $type         = $params{type};

    my $order_by = $params{order_by} // $sort_field ? {$sort_dir => $sort_field} : croak "Must specify either 'order_by' or 'sort_field'";

    my $items = $search_base->search($custom_query, {%$custom_opts, order_by => $order_by, $limit ? (rows => $limit) : ()});

    my $start = time;
    my $ord = 0;
    my $incomplete = {};

    return [
        sub {
            return 0 if $items;
            return 0 if $track && keys %$incomplete;

            return 1 if $parent && $parent->complete;

            my $running = time - $start;
            return 1 if $timeout && $running > $timeout; # Stop if they have been camping the page for 30 min.

            return 0;
        },
        sub {
            unless ($items) {
                my $query = {
                    ($custom_query ? %$custom_query : ()),
                    $ord_field => {'>' => $ord},
                };

                my @ids = $track ? keys %$incomplete : ();
                $query = [$query, {"me.$id_field" => {'IN' => \@ids}}] if @ids;

                $items = $search_base->search(
                    $query,
                    {%$custom_opts, order_by => $order_by}
                );
            }

            while (my $item = $items->next()) {
                $ord = max($ord, $item->$ord_field);

                my $update = JSON::PP::false;
                if ($track) {
                    my $id = $item->$id_field;
                    if (my $old = $incomplete->{$id}) {
                        $update = JSON::PP::true;
                        # Nothing has changed, no need to send it.
                        next if $old->sig eq $item->sig;
                    }

                    if ($item->complete) {
                        delete $incomplete->{$id};
                    }
                    else {
                        $incomplete->{$id} = $item;
                    }
                }

                my $data = $method ? $item->$method : $item->TO_JSON;
                return encode_json({type => $type, update => $update, data => $data}) . "\n";
            }

            $items = undef;
            return;
        },
    ];
}

1;

__END__

=pod

=encoding UTF-8

=head1 NAME

Test2::Harness::UI::Controller::Stream

=head1 DESCRIPTION

=head1 SYNOPSIS

TODO

=head1 SOURCE

The source code repository for Test2-Harness-UI can be found at
F<http://github.com/Test-More/Test2-Harness-UI/>.

=head1 MAINTAINERS

=over 4

=item Chad Granum E<lt>exodist@cpan.orgE<gt>

=back

=head1 AUTHORS

=over 4

=item Chad Granum E<lt>exodist@cpan.orgE<gt>

=back

=head1 COPYRIGHT

Copyright 2019 Chad Granum E<lt>exodist7@gmail.comE<gt>.

This program is free software; you can redistribute it and/or
modify it under the same terms as Perl itself.

See F<http://dev.perl.org/licenses/>

=cut