package Catmandu::Iterable;

use Catmandu::Sane;

our $VERSION = '1.2013';

use Catmandu::Util qw(
    is_number
    is_value
    is_string
    is_array_ref
    is_hash_ref
    is_regex_ref
    is_same
    check_positive
);
use Time::HiRes qw(gettimeofday tv_interval);
use Hash::Util::FieldHash qw(fieldhash);
use Role::Tiny;
use namespace::clean;

# delay loading these because of circular dependency
require Catmandu::Iterator;
require Catmandu::ArrayIterator;

requires 'generator';

{
    # can't use Moo attribute because of circular dependency
    fieldhash my %_generators;

    sub next {
        my ($self) = @_;
        ($_generators{$self} ||= $self->generator)->();
    }

    sub rewind {
        my ($self) = @_;
        $_generators{$self} = $self->generator;
    }
}

sub to_array {
    my ($self) = @_;
    my $next = $self->generator;
    my @a;
    my $data;
    while (defined($data = $next->())) {
        push @a, $data;
    }
    \@a;
}

sub count {
    my ($self) = @_;
    my $next   = $self->generator;
    my $n      = 0;
    while ($next->()) {
        $n++;
    }
    $n;
}

sub slice {
    my ($self, $start, $total) = @_;
    $start //= 0;
    Catmandu::Iterator->new(
        sub {
            sub {
                if (defined $total) {
                    $total || return;
                }
                state $next = $self->generator;
                state $data;
                while (defined($data = $next->())) {
                    if ($start > 0) {
                        $start--;
                        next;
                    }
                    if (defined $total) {
                        $total--;
                    }
                    return $data;
                }
                return;
            }
        }
    );
}

sub each {
    my ($self, $sub) = @_;
    my $next = $self->generator;
    my $n    = 0;
    my $data;
    while (defined($data = $next->())) {
        $sub->($data);
        $n++;
    }
    $n;
}

sub each_until {
    my ($self, $sub) = @_;
    my $next = $self->generator;
    my $n    = 0;
    my $data;
    while (defined($data = $next->())) {
        $sub->($data) || last;
        $n++;
    }
    $n;
}

sub tap {
    my ($self, $sub) = @_;
    Catmandu::Iterator->new(
        sub {
            sub {
                state $next = $self->generator;
                state $data;
                if (defined($data = $next->())) {
                    $sub->($data);
                    return $data;
                }
                return;
            }
        }
    );
}

sub every {
    my ($self, $n, $sub) = @_;
    check_positive($n);
    Catmandu::Iterator->new(
        sub {
            sub {
                state $next = $self->generator;
                state $data;
                state $i = 0;
                if (defined($data = $next->())) {
                    if (++$i == $n) {
                        $sub->($data);
                        $i = 0;
                    }
                    return $data;
                }
                return;
            }
        }
    );
}

sub any {
    my ($self, $sub) = @_;
    my $next = $self->generator;
    my $data;
    while (defined($data = $next->())) {
        $sub->($data) && return 1;
    }
    return 0;
}

sub many {
    my ($self, $sub) = @_;
    my $next = $self->generator;
    my $n    = 0;
    my $data;
    while (defined($data = $next->())) {
        $sub->($data) && ++$n > 1 && return 1;
    }
    return 0;
}

sub all {
    my ($self, $sub) = @_;
    my $next = $self->generator;
    my $data;
    while (defined($data = $next->())) {
        $sub->($data) || return 0;
    }
    return 1;
}

sub map {
    my ($self, $sub) = @_;
    Catmandu::Iterator->new(
        sub {
            sub {
                state $next = $self->generator;
                state @buff;
                @buff = $sub->($next->() // return) unless @buff;
                shift @buff;
            }
        }
    );
}

sub reduce {
    my $self     = shift;
    my $memo_set = @_ > 1;
    my $sub      = pop;
    my $memo     = shift;
    my $next     = $self->generator;
    my $data;
    while (defined($data = $next->())) {
        if ($memo_set) {
            $memo = $sub->($memo, $data);
        }
        else {
            $memo     = $data;
            $memo_set = 1;
        }
    }
    $memo;
}

sub first {
    $_[0]->generator->();
}

sub rest {
    $_[0]->slice(1);
}

sub take {
    $_[0]->slice(0, $_[1]);
}

{
    my $to_sub = sub {
        my ($arg1, $arg2) = @_;

        if (is_string($arg1)) {
            if (is_regex_ref($arg2)) {
                return sub {
                    is_hash_ref($_[0]) || return 0;
                    my $val = $_[0]->{$arg1};
                    is_value($val) && $val =~ $arg2;
                };
            }
            if (is_array_ref($arg2)) {
                return sub {
                    is_hash_ref($_[0])                 || return 0;
                    is_value(my $val = $_[0]->{$arg1}) || return 0;
                    for my $v (@$arg2) {
                        return 1 if $val eq $v;
                    }
                    0;
                };
            }
            return sub {
                is_hash_ref($_[0]) || return 0;
                my $val = $_[0]->{$arg1};
                is_value($val) && $val eq $arg2;
            };
        }

        if (is_regex_ref($arg1)) {
            return sub {
                my $val = $_[0];
                is_value($val) && $val =~ $arg1;
            };
        }

        $arg1;
    };

    sub detect {
        my $self = shift;
        my $sub  = $to_sub->(@_);
        my $next = $self->generator;
        my $data;
        while (defined($data = $next->())) {
            $sub->($data) && return $data;
        }
        return;
    }

    sub select {
        my $self = shift;
        my $sub  = $to_sub->(@_);
        Catmandu::Iterator->new(
            sub {
                sub {
                    state $next = $self->generator;
                    state $data;
                    while (defined($data = $next->())) {
                        $sub->($data) && return $data;
                    }
                    return;
                }
            }
        );
    }

    sub grep {goto &select}

    sub reject {
        my $self = shift;
        my $sub  = $to_sub->(@_);
        Catmandu::Iterator->new(
            sub {
                sub {
                    state $next = $self->generator;
                    state $data;
                    while (defined($data = $next->())) {
                        $sub->($data) || return $data;
                    }
                    return;
                }
            }
        );
    }
};

sub sorted {
    my ($self, $cmp) = @_;
    if (!defined $cmp) {
        Catmandu::ArrayIterator->new([sort @{$self->to_array}]);
    }
    elsif (ref $cmp) {
        Catmandu::ArrayIterator->new(
            [sort {$cmp->($a, $b)} @{$self->to_array}]);
    }
    else {
        # TODO: use Schwartzian transform for more complex key
        Catmandu::ArrayIterator->new(
            [sort {$a->{$cmp} cmp $b->{$cmp}} @{$self->to_array}]);
    }
}

sub pluck {
    my ($self, $key) = @_;
    $self->map(
        sub {
            $_[0]->{$key};
        }
    );
}

sub invoke {
    my ($self, $method, @args) = @_;
    $self->map(
        sub {
            $_[0]->$method(@args);
        }
    );
}

sub contains {goto &includes}

sub includes {
    my ($self, $data) = @_;
    $self->any(
        sub {
            is_same($data, $_[0]);
        }
    );
}

sub group {
    my ($self, $size) = @_;
    Catmandu::Iterator->new(
        sub {
            sub {
                state $next = $self->generator;

                my $group = [];

                for (my $i = 0; $i < $size; $i++) {
                    push @$group, $next->() // last;
                }

                unless (@$group) {
                    return;
                }

                Catmandu::ArrayIterator->new($group);
            }
        }
    );
}

sub interleave {
    my @iterators = @_;
    Catmandu::Iterator->new(
        sub {
            sub {
                state @generators;
                state $n = @iterators;
                state $i = 0;
                while ($n) {
                    $i = 0 if $i == $n;
                    my $next = $generators[$i] ||= $iterators[$i]->generator;
                    if (defined(my $data = $next->())) {
                        $i++;
                        return $data;
                    }
                    else {
                        splice @generators, $i, 1;
                        $n--;
                    }
                }
                return;
            }
        }
    );
}

sub max {
    my ($self, $sub) = @_;
    $self->reduce(
        undef,
        sub {
            my ($memo, $data) = @_;
            my $val = defined $sub ? $sub->($data) : $data;
            return $val > $memo ? $val : $memo
                if is_number($memo) && is_number($val);
            return $memo if is_number($memo);
            return $val  if is_number($val);
            return;
        }
    );
}

sub min {
    my ($self, $sub) = @_;
    $_[0]->reduce(
        undef,
        sub {
            my ($memo, $data) = @_;
            my $val = defined $sub ? $sub->($data) : $data;
            return $val < $memo ? $val : $memo
                if is_number($memo) && is_number($val);
            return $memo if is_number($memo);
            return $val  if is_number($val);
            return;
        }
    );
}

sub benchmark {
    my ($self) = @_;
    $self->tap(
        sub {
            state $n = 0;
            state $t = [gettimeofday];
            if (++$n % 100 == 0) {
                printf STDERR "added %9d (%d/sec)\n", $n,
                    $n / tv_interval($t);
            }
        }
    );
}

sub format {
    my ($self, %opts) = @_;
    $opts{header}  //= 1;
    $opts{col_sep} //= " | ";
    my @cols        = $opts{cols} ? @{$opts{cols}} : ();
    my @col_lengths = map length, @cols;

    my $rows = $self->map(
        sub {
            my $data = $_[0];
            my $row  = [];
            for (my $i = 0; $i < @cols; $i++) {
                my $col = $data->{$cols[$i]} // "";
                my $len = length $col;
                $col_lengths[$i] = $len if $len > $col_lengths[$i];
                push @$row, $col;
            }
            $row;
        }
    )->to_array;

    my @indices = 0 .. @cols - 1;
    my $pattern
        = join($opts{col_sep}, map {"%-$col_lengths[$_]s"} @indices) . "\n";

    if ($opts{header}) {
        printf $pattern, @cols;
        my $sep = $opts{col_sep};
        $sep =~ s/[^|]/-/g;
        print join($sep, map {'-' x $col_lengths[$_]} @indices) . "\n";
    }
    for my $row (@$rows) {
        printf $pattern, @$row;
    }
}

sub stop_if {
    my ($self, $sub) = @_;
    Catmandu::Iterator->new(
        sub {
            sub {
                state $next = $self->generator;
                my $data = $next->() // return;
                $sub->($data) && return;
                $data;
            }
        }
    );
}

sub run {
    my ($self) = @_;
    my $next   = $self->generator;
    my $run    = 0;
    while (defined($next->())) {
        $run ||= 1;
    }
    $run;
}

1;

__END__

=pod

=head1 NAME

Catmandu::Iterable - Base role for all iterable Catmandu classes

=head1 SYNOPSIS

    # Create an example Iterable using the Catmandu::Importer::Mock class
    my $it = Catmandu::Importer::Mock->new(size => 10);

    my $array_ref = $it->to_array;
    my $num       = $it->count;

    # Loop functions
    $it->each(sub { print shift->{n} });

    my $item = $it->first;

    $it->rest
       ->each(sub { print shift->{n} });

    $it->slice(3,2)
       ->each(sub { print shift->{n} });

    $it->take(5)
       ->each(sub { print shift->{n} });

    $it->group(5)
       ->each(sub { printf "group of %d items\n" , shift->count});

    $it->tap(\&logme)->tap(\&printme)->tap(\&mailme)
       ->each(sub { print shift->{n} });

    my $titles = $it->pluck('title')->to_array;

    # Select and loop
    my $item = $it->detect(sub { shift->{n} > 5 });

    $it->select(sub { shift->{n} > 5})
       ->each(sub { print shift->{n} });

    $it->reject(sub { shift->{n} > 5})
       ->each(sub { print shift->{n} });

    # Boolean
    if ($it->any(sub { shift->{n} > 5}) {
     .. at least one n > 5 ..
    }

    if ($it->many(sub { shift->{n} > 5}) {
     .. at least two n > 5 ..
    }

    if ($it->all(sub { shift->{n} > 5}) {
     .. all n > 5 ..
    }

    # Modify and summary
    my $it2 = $it->map(sub { shift->{n} * 2 });

    my $sum = $it2->reduce(0,sub {
        my ($prev,$this) = @_;
        $prev + $this;
        });

    my $it3 = $it->group(2)->invoke('to_array');

    # Calculate maximum of 'n' field
    my $max = $it->max(sub {
            shift->{n};
    });

    # Calculate minimum of 'n' field
    my $in = $it->min(sub {
            shift->{n};
    });

=head1 DESCRIPTION

The Catmandu::Iterable class provides many list methods to Iterators such as Importers and
Exporters. Most of the methods are lazy if the underlying datastream supports it. Beware of
idempotence: many iterators contain state information and calls will give different results on
a second invocation.

=head1 METHODS

=head2 to_array

Return all the items in the iterator as an array ref.

=head2 count

Return the count of all the items in the iterator.

=head3 LOOPING

=head2 each(\&callback)

For each item in the iterator execute the callback function with the item as
first argument. Returns the number of items in the iterator.

=head2 first

Return the first item from the iterator.

=head2 rest

Returns an iterator containing everything except the first item.

=head2 slice($index,$length)

Returns an new iterator starting at the item at C<$index> returning at most <$length> items.

=head2 take($num)

Returns an iterator with the first C<$num> items.

=head2 group($num)

Splits the iterator into new iterators each containing C<$num> items.

    $it->group(500)->each(sub {
        my $group_it = $_[0];
        $group_it->each(sub {
            my $item = $_[0];
            # ...
        });
    });

Note that the group iterators load their items in memory. The last group
iterator will contain less than C<$num> item unless the item count is divisible
by C<$num>.

=head2 interleave(@iterators)

Returns an iterator which returns the first item of each iterator then the
second of each and so on.

=head2 contains($data)

Alias for C<includes>.

=head2 includes($data)

return true if any item in the collection is deeply equal to C<$data>.

=head2 tap(\&callback)

Returns a copy of the iterator and executing callback on each item. This method works
like the Unix C<tee> command. Use this command to peek into an iterable while it is
processing results. E.g. you are writing code to process an iterable and wrote
something like:

   $it->each(sub {
      # Very complicated routine
      ...
   });

Now you would like to benchmark this piece of code (how fast are we processing).
This can be done by tapping into the iterator and calling a 'benchmark' subroutine
in your program that for instance counts the number of items divided by the
execution time.

   $it->tap(\&benchmark)->each(sub {
      # Very complicated routine
      ...
   });

   sub benchmark {
       my $item = shift;
       $start ||= time;
       $count++;

       printf "%d recs/sec\n" , $count/(time - $start + 1) if $count % 100 == 0;
   }

Note that the C<benchmark> method already implements this common case.

=head2 every($num, \&callback)

Similar to C<tap>, but only calls the callback every C<$num> times. Useful for
benchmarking and sampling.

=head2 detect(\&callback)

Returns the first item for which callback returns a true value.

=head2 detect(qr/..../)

If the iterator contains STRING values, then return the first item which matches the
regex.

=head2 detect($key => $val)

If the iterator contains HASH values, then return the first item where the value of
C<$key> is equal to C<$val>.

=head2 detect($key => qr/..../)

If the iterator contains HASH values, then return the first item where the value of
C<$key> matches the regex.

=head2 detect($key => [$val, ...])

If the iterator contains HASH values, then return the first item where the value of
C<$key> is equal to any of the values given.

=head2 pluck($key)

Return an iterator that only contains the values of the given C<$key>.

=head2 select(\&callback)

Returns an iterator containing only items item for which the callback returns a true value.

=head2 select(qr/..../)

If the iterator contains STRING values, then return each item which matches the regex.

=head2 select($key => $val)

If the iterator contains HASH values, then return each item where the value of
C<$key> is equal to C<$val>.

=head2 select($key => qr/..../)

If the iterator contains HASH values, then return each item where the value of C<$key>
matches the regex.

=head2 select($key => [$val, ...])

If the iterator contains HASH values, then return each item where the value of
C<$key> is equal to any of the vals given.

=head2 grep( ... )

Alias for C<select( ... )>.

=head2 reject(\&callback)

Returns an iterator containing each item for which callback returns a false value.

=head2 reject(qr/..../)

If the iterator contains STRING values, then reject every item except those
matching the regex.

=head2 reject($key => qr/..../)

If the iterator contains HASH values, then reject every item for where the value of C<$key>
DOESN'T match the regex.

=head2 reject($key => $val)

If the iterator contains HASH values, then return each item where the value of
C<$key> is NOT equal to C<$val>.

=head2 reject($key => [$val, ...])

If the iterator contains HASH values, then return each item where the value of
C<$key> is NOT equal to any of the values given.

=head2 sorted

Returns an iterator with items sorted lexically. Note that sorting requires
memory because all items are buffered in a L<Catmandu::ArrayIterator>.

=head2 sorted(\&callback)

Returns an iterator with items sorted by a callback. The callback is expected to
returns an integer less than, equal to, or greater than C<0>. The following code
snippets result in equal arrays:

    $iterator->sorted(\&callback)->to_array
    [ sort \&callback @{ $iterator->to_array } ] 

=head2 sorted($key) 

Returns an iterator with items lexically sorted by a key. This is equivalent to
sorting with the following callback:

    $iterator->sorted(sub { $_[0]->{$key} cmp $_[1]->{$key} })

=head3 EXTERNAL ITERATOR

L<Catmandu::Iterable> behaves like an internal iterator. C<next> and C<rewind>
allow you to use it like an external iterator.

=head2 next

Each call to C<next> will return the next item until the iterator is exhausted,
then it will keep returning C<undef>.

    while (my $data = $it->next) {
      # do stuff
    }

    $it->next; # returns undef

=head2 rewind

Rewind the external iterator to the first item.

    $it->next; # => {n => 1}
    $it->next; # => {n => 2}
    $it->next; # => {n => 3}
    $it->rewind
    $it->next; # => {n => 1}

Note the the iterator must support this behavior. Many importers are not
rewindable.

=head3 BOOLEAN FUNCTIONS

=head2 any(\&callback)

Returns true if at least one item generates a true value when executing callback.

=head2 many(\&callback)

Alias for C<many>.

=head2 many(\&callback)

Returns true if at least two items generate a true value when executing callback.

=head2 all(\&callback)

Returns true if all the items generate a true value when executing callback.

=head3 MAP & REDUCE

=head2 map(\&callback)

Returns a new iterator containing for each item the result of the callback. If
the callback returns multiple or no items, the resulting iterator will grow or
shrink.

=head2 reduce([$start],\&callback)

For each item in the iterator execute C<&callback($prev,$item)> where C<$prev> is the
optional C<$start> value or the result of the previous call to callback. Returns the
final result of the callback function.

=head2 invoke($name)

Returns an interator were the method C<$name> is called on every object in the iterable.
This is a shortcut for C<$it->map(sub { $_[0]->$name })>.

=head2 max()

Returns the maximum of an iterator containing only numbers.

=head2 max(\&callback)

Returns the maximum of the numbers returned by executing callback.

=head2 min()

Returns the minimum of an iterator containing only numbers.

=head2 min(\&callback)

Returns the minimum of the numbers returned by executing callback.

=head2 benchmark()

Prints the number of records processed per second to STDERR.

=head2 format(cols => ['key', ...], col_sep => '  |  ', header => 1|0)

Print the iterator data formatted as a spreadsheet like table. Note that this
method will load the whole dataset in memory to calculate column widths. See
also L<Catmandu::Exporter::Table> for a more elaborated method of printing
iterators in tabular form.

=head2 stop_if(\&callback)

Returns a new iterator thats stops processing if the callback returns false.

    # stop after encountering 3 frobnitzes
    my $frobnitzes = 0;
    $iterator->stop_if(sub {
        my $rec = shift;
        $frobnitzes++ if $rec->{title} =~ /frobnitz/;
        $frobnitzes > 3;
    })->each(sub {
        my $rec = shift;
        ...
    });

=head2 run

Simply invokes the iterator and returns 1 if any records were processed, 0 otherwise.

    $it = $it->tap(sub {
        # do something
    });
    $it = $it->tap(sub {
        # do another thing
    });
    $it->run

    print 'not empty' if $it->run;

=head1 SEE ALSO

L<Catmandu::Iterator>.

=cut