#!/usr/bin/perl

use strict;
use warnings;

use IO::Async::Test;

use Test::More;
use Test::Fatal;
use Test::Metrics::Any;
use Test::Refcount;

use IO::File;
use POSIX qw( ECONNRESET );

use IO::Async::Loop;

use IO::Async::OS;

use IO::Async::Stream;

my $loop = IO::Async::Loop->new_builtin;

testing_loop( $loop );

sub mkhandles
{
   my ( $rd, $wr ) = IO::Async::OS->pipepair or die "Cannot pipe() - $!";
   # Need handles in nonblocking mode
   $rd->blocking( 0 );
   $wr->blocking( 0 );

   return ( $rd, $wr );
}

{
   my ( $rd, $wr ) = mkhandles;

   my @lines;

   my $stream = IO::Async::Stream->new( 
      read_handle => $rd,
      on_read => sub {
         my $self = shift;
         my ( $buffref, $eof ) = @_;

         push @lines, $1 while $$buffref =~ s/^(.*\n)//;
         return 0;
      },
   );

   ok( defined $stream, 'reading $stream defined' );
   isa_ok( $stream, "IO::Async::Stream", 'reading $stream isa IO::Async::Stream' );

   is_oneref( $stream, 'reading $stream has refcount 1 initially' );

   $loop->add( $stream );

   is_refcount( $stream, 2, 'reading $stream has refcount 2 after adding to Loop' );

   $wr->syswrite( "message\n" );

   is_deeply( \@lines, [], '@lines before wait' );

   wait_for { scalar @lines };

   is_deeply( \@lines, [ "message\n" ], '@lines after wait' );

   undef @lines;

   $wr->syswrite( "return" );

   $loop->loop_once( 0.1 ); # nothing happens

   is_deeply( \@lines, [], '@lines partial still empty' );

   $wr->syswrite( "\n" );

   wait_for { scalar @lines };

   is_deeply( \@lines, [ "return\n" ], '@lines partial completed now received' );

   undef @lines;

   $wr->syswrite( "hello\nworld\n" );
   wait_for { scalar @lines };

   is_deeply( \@lines, [ "hello\n", "world\n" ], '@lines two at once' );

   undef @lines;
   my @new_lines;
   $stream->configure( 
      on_read => sub {
         my $self = shift;
         my ( $buffref, $eof ) = @_;

         push @new_lines, $1 while $$buffref =~ s/^(.*\n)//;
         return 0;
      },
   );

   $wr->syswrite( "new\nlines\n" );

   wait_for { scalar @new_lines };

   is( scalar @lines, 0, '@lines still empty after on_read replace' );
   is_deeply( \@new_lines, [ "new\n", "lines\n" ], '@new_lines after on_read replace' );

   is_refcount( $stream, 2, 'reading $stream has refcount 2 before removing from Loop' );

   $loop->remove( $stream );

   is_oneref( $stream, 'reading $stream refcount 1 finally' );
}

# Abstract reading with reader function
{
   my ( $rd, $wr ) = mkhandles;
   my $buffer = "Here is the contents\n";

   my @lines;
   my $stream = IO::Async::Stream->new(
      read_handle => $rd,
      reader => sub {
         my $self = shift;
         my $more = substr( $buffer, 0, $_[2], "" );
         $_[1] .= $more;
         return length $more;
      },
      on_read => sub {
         my $self = shift;
         my ( $buffref, $eof ) = @_;

         push @lines, $1 while $$buffref =~ s/^(.*\n)//;
         return 0;
      },
   );

   $loop->add( $stream );

   # make it readready
   $wr->syswrite( "1" );

   wait_for { scalar @lines };

   is_deeply( \@lines, [ "Here is the contents\n" ], '@lines from stream with abstract reader' );

   $loop->remove( $stream );
}

# ->want_readready_for_write
{
   my ( $rd, $wr ) = mkhandles;

   my $reader_called;
   my $writer_called;
   my $stream = IO::Async::Stream->new(
      handle => $rd,
      on_read => sub { return 0; }, # ignore reading
      reader => sub { $reader_called++; sysread( $rd, $_[2], $_[3] ) },
      writer => sub { $writer_called++; return 1 },
   );

   $loop->add( $stream );

   # Hacky hack - make the stream want to write, but don't mark the stream write-ready
   $stream->write( "A" );
   $stream->want_writeready_for_write( 0 );
   # End hack

   # make it readready
   $wr->syswrite( "1" );

   wait_for { $reader_called };

   ok( !$writer_called, 'writer not yet called before ->want_readready_for_write' );

   $stream->want_readready_for_write( 1 );

   undef $reader_called;
   $wr->syswrite( "2" );
   wait_for { $reader_called && $writer_called };

   ok( $writer_called, 'writer now invoked with ->want_readready_for_write' );

   $loop->remove( $stream );
}

{
   my ( $rd, $wr ) = mkhandles;

   my @chunks;

   my $stream = IO::Async::Stream->new(
      read_handle => $rd,
      read_len => 2,
      on_read => sub {
         my ( $self, $buffref, $eof ) = @_;
         push @chunks, $$buffref;
         $$buffref = "";
      },
   );

   $loop->add( $stream );

   $wr->syswrite( "partial" );

   wait_for { scalar @chunks };

   is_deeply( \@chunks, [ "pa" ], '@lines with read_len=2 without read_all' );

   wait_for { @chunks == 4 };

   is_deeply( \@chunks, [ "pa", "rt", "ia", "l" ], '@lines finally with read_len=2 without read_all' );

   undef @chunks;
   $stream->configure( read_all => 1 );

   $wr->syswrite( "partial" );

   wait_for { scalar @chunks };

   is_deeply( \@chunks, [ "pa", "rt", "ia", "l" ], '@lines with read_len=2 with read_all' );

   $loop->remove( $stream );
}

{
   my ( $rd, $wr ) = mkhandles;

   my $no_on_read_stream;
   ok( !exception { $no_on_read_stream = IO::Async::Stream->new( read_handle => $rd ) },
       'Allowed to construct a Stream without an on_read handler' );
   ok( exception { $loop->add( $no_on_read_stream ) },
       'Not allowed to add an on_read-less Stream to a Loop' );
}

# Subclass
my @sub_lines;

{
   my ( $rd, $wr ) = mkhandles;

   my $stream = TestStream->new(
      read_handle => $rd,
   );

   ok( defined $stream, 'reading subclass $stream defined' );
   isa_ok( $stream, "IO::Async::Stream", 'reading $stream isa IO::Async::Stream' );

   is_oneref( $stream, 'subclass $stream has refcount 1 initially' );

   $loop->add( $stream );

   is_refcount( $stream, 2, 'subclass $stream has refcount 2 after adding to Loop' );

   $wr->syswrite( "message\n" );

   is_deeply( \@sub_lines, [], '@sub_lines before wait' );

   wait_for { scalar @sub_lines };

   is_deeply( \@sub_lines, [ "message\n" ], '@sub_lines after wait' );

   $loop->remove( $stream );
}

# Dynamic on_read chaining
{
   my ( $rd, $wr ) = mkhandles;

   my $outer_count = 0;
   my $inner_count = 0;

   my $record;

   my $stream = IO::Async::Stream->new(
      read_handle => $rd,
      on_read => sub {
         my ( $self, $buffref, $eof ) = @_;
         $outer_count++;

         return 0 unless $$buffref =~ s/^(.*\n)//;

         my $length = $1;

         return sub {
            my ( $self, $buffref, $eof ) = @_;
            $inner_count++;

            return 0 unless length $$buffref >= $length;

            $record = substr( $$buffref, 0, $length, "" );

            return undef;
         }
      },
   );

   is_oneref( $stream, 'dynamic reading $stream has refcount 1 initially' );

   $loop->add( $stream );

   $wr->syswrite( "11" ); # No linefeed yet
   wait_for { $outer_count > 0 };
   is( $outer_count, 1, '$outer_count after idle' );
   is( $inner_count, 0, '$inner_count after idle' );

   $wr->syswrite( "\n" );
   wait_for { $inner_count > 0 };
   is( $outer_count, 2, '$outer_count after received length' );
   is( $inner_count, 1, '$inner_count after received length' );

   $wr->syswrite( "Hello " );
   wait_for { $inner_count > 1 };
   is( $outer_count, 2, '$outer_count after partial body' );
   is( $inner_count, 2, '$inner_count after partial body' );

   $wr->syswrite( "world" );
   wait_for { $inner_count > 2 };
   is( $outer_count, 3, '$outer_count after complete body' );
   is( $inner_count, 3, '$inner_count after complete body' );
   is( $record, "Hello world", '$record after complete body' );

   $loop->remove( $stream );

   is_oneref( $stream, 'dynamic reading $stream has refcount 1 finally' );
}

# ->push_on_read
{
   my ( $rd, $wr ) = mkhandles;

   my $base;
   my $stream = IO::Async::Stream->new( read_handle => $rd,
      on_read => sub {
         my ( $self, $buffref ) = @_;
         $base = $$buffref; $$buffref = "";
         return 0;
      },
   );

   $loop->add( $stream );

   my $firstline;
   $stream->push_on_read(
      sub {
         my ( $stream, $buffref, $eof ) = @_;
         return 0 unless $$buffref =~ s/(.*)\n//;
         $firstline = $1;
         return undef;
      }
   );

   my $eightbytes;
   $stream->push_on_read(
      sub {
         my ( $stream, $buffref, $eof ) = @_;
         return 0 unless length $$buffref >= 8;
         $eightbytes = substr( $$buffref, 0, 8, "" );
         return undef;
      }
   );

   $wr->syswrite( "The first line\nABCDEFGHIJK" );

   wait_for { defined $firstline and defined $eightbytes };

   is( $firstline,  "The first line", '$firstline from ->push_on_read CODE' );
   is( $eightbytes, "ABCDEFGH",       '$eightbytes from ->push_on_read CODE' );
   is( $base,       "IJK",            '$base from ->push_on_read CODE' );

   $loop->remove( $stream );
}

# EOF
{
   my ( $rd, $wr ) = mkhandles;

   my $eof = 0;
   my $partial;

   my $stream = IO::Async::Stream->new( read_handle => $rd,
      on_read => sub {
         my ( undef, $buffref, $eof ) = @_;
         $partial = $$buffref if $eof;
         return 0;
      },
      on_read_eof => sub { $eof++ },
   );

   $loop->add( $stream );

   $wr->syswrite( "Incomplete" );

   $wr->close;

   ok( !$stream->is_read_eof, '$stream ->is_read_eof before wait' );
   is( $eof, 0, 'EOF indication before wait' );

   wait_for { $eof };

   ok( $stream->is_read_eof, '$stream ->is_read_eof after wait' );
   is( $eof, 1, 'EOF indication after wait' );
   is( $partial, "Incomplete", 'EOF stream retains partial input' );

   ok( !defined $stream->loop, 'EOF stream no longer member of Loop' );
   ok( !defined $stream->read_handle, 'Stream no longer has a read_handle' );
}

# Disabled close_on_read_eof
{
   my ( $rd, $wr ) = mkhandles;

   my $eof = 0;
   my $partial;

   my $stream = IO::Async::Stream->new( read_handle => $rd,
      on_read => sub {
         my ( undef, $buffref, $eof ) = @_;
         $partial = $$buffref if $eof;
         return 0;
      },
      on_read_eof => sub { $eof++ },
      close_on_read_eof => 0,
   );

   $loop->add( $stream );

   $wr->syswrite( "Incomplete" );

   $wr->close;

   is( $eof, 0, 'EOF indication before wait' );

   wait_for { $eof };

   is( $eof, 1, 'EOF indication after wait' );
   is( $partial, "Incomplete", 'EOF stream retains partial input' );

   ok( defined $stream->loop, 'EOF stream still member of Loop' );
   ok( defined $stream->read_handle, 'Stream still has a read_handle' );
}

# Close
{
   my ( $rd, $wr ) = mkhandles;

   my $closed = 0;
   my $loop_during_closed;

   my $stream = IO::Async::Stream->new( read_handle => $rd,
      on_read   => sub { },
      on_closed => sub {
         my ( $self ) = @_;
         $closed = 1;
         $loop_during_closed = $self->loop;
      },
   );

   is_oneref( $stream, 'closing $stream has refcount 1 initially' );

   $loop->add( $stream );

   is_refcount( $stream, 2, 'closing $stream has refcount 2 after adding to Loop' );

   is( $closed, 0, 'closed before close' );

   $stream->close;

   is( $closed, 1, 'closed after close' );
   is( $loop_during_closed, $loop, 'loop during closed' );

   ok( !defined $stream->loop, 'Stream no longer member of Loop' );

   is_oneref( $stream, 'closing $stream refcount 1 finally' );
}

# ->read Futures
{
   my ( $rd, $wr ) = mkhandles;

   my $stream = IO::Async::Stream->new( read_handle => $rd,
      on_read => sub {
         my ( $self, $buffref ) = @_;
         die "Base on_read invoked with data in the buffer" if length $$buffref;
         return 0;
      },
   );

   $loop->add( $stream );

   my $f_atmost = $stream->read_atmost( 256 );

   $wr->syswrite( "Some data\n" );
   wait_for { $f_atmost->is_ready };

   is( scalar $f_atmost->get, "Some data\n", '->read_atmost' );

   my $f_exactly   = $stream->read_exactly( 4 );
   my $f_until_qr  = $stream->read_until( qr/[A-Z][a-z]*/ );
   my $f_until_str = $stream->read_until( "\n" );

   $wr->syswrite( "Here is the First line of input\n" );

   wait_for { $f_exactly->is_ready and $f_until_qr->is_ready and $f_until_str->is_ready };

   is( scalar $f_exactly->get,   "Here", '->read_exactly' );
   is( scalar $f_until_qr->get,  " is the First", '->read_until regexp' );
   is( scalar $f_until_str->get, " line of input\n", '->read_until str' );

   my $f_first = $stream->read_until( "\n" );
   my $f_second = $stream->read_until( "\n" );
   $f_first->cancel;

   $wr->syswrite( "For the second\n" );

   wait_for { $f_second->is_ready };

   is( scalar $f_second->get, "For the second\n", 'Second ->read_until recieves data after first is ->cancelled' );

   my $f_until_eof = $stream->read_until_eof;

   $wr->syswrite( "And the rest of it" );
   $wr->close;

   wait_for { $f_until_eof->is_ready };

   is( scalar $f_until_eof->get, "And the rest of it", '->read_until_eof' );

   # No need to remove as ->close did it
}

# RT101774
{
   my ( $rd, $wr ) = mkhandles;

   my $stream = IO::Async::Stream->new( read_handle => $rd,
      on_read => sub { 0 },
   );

   $loop->add( $stream );

   $wr->syswrite( "lalaLALA" );

   my $f = wait_for_future $stream->read_exactly( 4 )->then( sub {
      $stream->read_exactly( 4 );
   });

   is( scalar $f->get, "LALA", 'chained ->read_exactly' );

   $loop->remove( $stream );
}

# watermarks
{
   my ( $rd, $wr ) = mkhandles;

   my $high_hit = 0;
   my $low_hit  = 0;

   my $stream = IO::Async::Stream->new(
      read_handle => $rd,
      on_read => sub { 0 }, # we'll work by Futures
      read_high_watermark => 8,
      read_low_watermark  => 4,
      on_read_high_watermark => sub { $high_hit++ },
      on_read_low_watermark  => sub { $low_hit++ },
   );

   $loop->add( $stream );

   $wr->syswrite( "1234567890" );

   wait_for { $high_hit };
   ok( 1, "Reading too much hits high watermark" );

   is( $stream->read_exactly( 8 )->get, "12345678", 'Stream->read_exactly yields bytes' );

   is( $low_hit, 1, 'Low watermark hit after ->read' );
}

# Errors
{
   my ( $rd, $wr ) = mkhandles;
   $wr->syswrite( "X" ); # ensuring $rd is read-ready

   no warnings 'redefine';
   local *IO::Handle::sysread = sub {
      $! = ECONNRESET;
      return undef;
   };

   my $read_errno;

   my $stream = IO::Async::Stream->new(
      read_handle => $rd,
      on_read => sub {},
      on_read_error  => sub { ( undef, $read_errno ) = @_ },
   );

   $loop->add( $stream );

   wait_for { defined $read_errno };

   cmp_ok( $read_errno, "==", ECONNRESET, 'errno after failed read' );

   my $f = wait_for_future $stream->read_atmost( 256 );

   cmp_ok( ( $f->failure )[-1], "==", ECONNRESET, 'failure from ->read_atmost after failed read' );

   $loop->remove( $stream );
}

{
   binmode STDIN; # Avoid harmless warning in case -CS is in effect
   my $stream = IO::Async::Stream->new_for_stdin;
   is( $stream->read_handle, \*STDIN, 'Stream->new_for_stdin->read_handle is STDIN' );
}

# Metrics
SKIP: {
   skip "Metrics are unavailable" unless $IO::Async::Metrics::METRICS;
   my ( $rd, $wr ) = mkhandles;

   my $done;
   my $stream = IO::Async::Stream->new(
      read_handle => $rd,
      on_read => sub {
         my ( $self, $bufref ) = @_;
         $done = 1 if length $$bufref == 100;
         return 0;
      },
   );

   $loop->add( $stream );

   $wr->syswrite( "X"x100 );

   is_metrics_from(
      sub { wait_for { $done } },
      { io_async_stream_read => 100 },
      'Stream reading increments metric'
   );

   $loop->remove( $stream );
}

done_testing;

package TestStream;
use base qw( IO::Async::Stream );

sub on_read
{
   my $self = shift;
   my ( $buffref, $eof ) = @_;

   return 0 unless $$buffref =~ s/^(.*\n)//;

   push @sub_lines, $1;
   return 1;
}