package Mojo::Pg::Che;

use Mojo::Base 'Mojo::EventEmitter';#'Mojo::Pg';

=pod

=encoding utf-8

Доброго всем

=head1 Mojo::Pg::Che

¡ ¡ ¡ ALL GLORY TO GLORIA ! ! !

=head1 NAME

Mojo::Pg::Che - mix of parent Mojo::Pg and DBI.pm

=head1 DESCRIPTION

See L<Mojo::Pg>

=head1 VERSION

Version 0.853

=cut

our $VERSION = '0.853';


=head1 SYNOPSIS

    use Mojo::Pg::Che;

    my $pg = Mojo::Pg::Che->connect("dbname=test;", "postgres", 'pg-pwd', \%attrs, max_connections=>10);
    # or
    my $pg = Mojo::Pg::Che->new
      ->dsn("DBI:Pg:dbname=test;")
      ->username("postgres")
      ->password('pg--pw')
      ->options(\%attrs)
      ->connect();
    
    # or
    my $pg = Mojo::Pg::Che->new('pg://postgres@/test');
    # or
    my $pg = Mojo::Pg::Che->new()->connect(...);

    # Bloking query
    my $result = $pg->query('select ...', undef, @bind);
    
    # Non-blocking query
    my $result = $pg->query('select ...', {Async => 1, ...}, @bind);
    
    # Cached query
    my $result = $pg->query('select ...', {Cached => 1, ...}, @bind);
    
    # prepare sth
    my $sth = $pg->prepare('select ...');
    
    # cached async sth
    my $sth = $pg->prepare_cached('select ...', {Async => 1,},);
    
    # Non-blocking query for async sth
    $pg->query($sth, undef, @bind, sub {my ($db, $err, $result) = @_; ...});
    Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
    
    
    # Result non-blocking query for async sth
    my $ref_cb = $pg->query($sth, {Async => 1,}, @bind,);
    Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
    # Mojo::Pg::Results style
    my res = $$ref_cb->()->hash;
    # same DBI style
    my $res  = $$ref_cb->()->fetchrow_hashref;
    
    # Mojo::Pg style
    my $now = $pg->db->query('select now() as now')->hash->{now};
    $pg->db->query('select pg_sleep(?::int), now() as now', undef, 2, $cb);
    
    # DBI style
    my $now = $pg->selectrow_hashref('select now() as now')->{now};
    my $now = $pg->db->selectrow_hashref('select now() as now')->{now};
    
    my $now = $pg->selectrow_array('select now() as now');

=head2 Transaction syntax

  eval {
    my $tx = $pg->begin;
    $tx->query('insert into foo (name) values (?)', 'bar');
    $tx->do('insert into foo (name) values (?)', 'baz');
    $tx->commit;
  };
  die $@ if $@;
  
  my $db = $pg->db;
  $db->begin;
  $db->do('insert into foo (name) values (?)', 'bazzzz');
  $db->rollback;
  $db->begin;
  $db->query('insert into foo (name) values (?)', 'barrr');
  $db->commit;

=head1 Non-blocking query

  my @results;
  my $cb = sub {
    my ($db, $err, $results) = @_;
    die $err if $err;
    push @results, $results;
  };
  $pg->query('select ?::date as d, pg_sleep(?::int)', undef, ("2016-06-$_", 1), $cb)
    for 17..23;
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
  like($_->hash->{d}, qr/2016-06-\d+/, 'correct async query')
    for @results;



=head1 METHODS

All methods from parent module L<Mojo::Pg> are inherits and implements the following new ones.

=head2 connect

DBI-style of new object instance. See L<DBI#connect>

=head2 db

Overriden method of L<Mojo::Pg#db>. Because can first input param - DBI database handler (when prepared statement used).

=head2 prepare

Prepare and return DBI statement handler for query string.

=head2 prepare_cached

Prepare and return DBI cached statement handler for query string.

=head2 query

Like L<Mojo::Pg::Database#query> but input params - L<Mojo::Pg::Che#Params-for-quering-methods>

Blocking query without attr B<Async> or callback.

Non-blocking query with attr B<Async> or callback.

=head2 select

Same method C<query>.

=head2 selectrow_array

DBI style quering. See L<DBI#selectrow_array>. Blocking | non-blocking. Input params - L<Mojo::Pg::Che#Params-for-quering-methods>.

=head2 selectrow_arrayref

DBI style quering. See L<DBI#selectrow_arrayref>. Blocking | non-blocking. Input params - L<Mojo::Pg::Che#Params-for-quering-methods>.

=head2 selectrow_hashref

DBI style quering. See L<DBI#selectrow_hashref>. Blocking | non-blocking. Input params - L<Mojo::Pg::Che#Params-for-quering-methods>.

=head2 selectall_arrayref

DBI style quering. See L<DBI#selectall_arrayref>. Blocking | non-blocking. Input params - L<Mojo::Pg::Che#Params-for-quering-methods>.

=head2 selectall_hashref

DBI style quering. See L<DBI#selectall_hashref>. Blocking | non-blocking. Input params - L<Mojo::Pg::Che#Params-for-quering-methods>.

=head2 selectcol_arrayref

DBI style quering. See L<DBI#selectcol_arrayref>. Blocking | non-blocking. Input params - L<Mojo::Pg::Che#Params-for-quering-methods>.

=head2 do

DBI style quering. See L<DBI#do>. Blocking | non-blocking. Input params - L<Mojo::Pg::Che#Params-for-quering-methods>.

=head2 begin

Start transaction and return new L<Mojo::Pg::Che::Database> object which attr C< {tx} > is a L<Mojo::Pg::Transaction> object. Sinonyms are: C<< ->tx >> and C<< ->begin_work >>.

=head1 Params for quering methods

The methods C<query>, C<select...>, C<do> has next ordered input params:

=over 4

=item * String query | statement handler object

=item * Hashref attrs (optional)

=item * Array of bind values (optional)

=item * Last param - callback/coderef for non-blocking (optional)

=back

=head1 SEE ALSO

L<Mojo::Pg>

L<DBI>

=head1 AUTHOR

Михаил Че (Mikhail Che), C<< <mche[-at-]cpan.org> >>

=head1 BUGS / CONTRIBUTING

Please report any bugs or feature requests at L<https://github.com/mche/Mojo-Pg-Che/issues>. Pull requests also welcome.

=head1 COPYRIGHT

Copyright 2016+ Mikhail Che.

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.

=cut

use Mojo::Pg;
use DBI;
use Carp qw(croak);
use Mojo::Pg::Che::Database;
#~ use Mojo::URL;
#~ use Scalar::Util 'blessed';

has 'pg';# => undef, weak => 1;
has database_class => 'Mojo::Pg::Che::Database';
has dsn             => 'dbi:Pg:';
has max_connections => 5;
has [qw(password username)] => '';
has [qw(parent search_path)];
has options => sub {
  {
    AutoCommit => 1,
    AutoInactiveDestroy => 1,
    PrintError => 0,
    RaiseError => 1,
    ShowErrorStatement => 1,
    pg_enable_utf8 => 1,
  };
};

has debug => $ENV{DEBUG_Mojo_Pg_Che} || 0;
my $PKG = __PACKAGE__;

sub new {
  my $class = shift;
  my $from_string = @_ == 1;
  my $pg = $from_string
    ? Mojo::Pg->new->from_string(shift)
    : Mojo::Pg->new();

  my $self = $class->SUPER::new(@_)->pg($pg);

  if ($from_string) {
    map $self->$_($self->pg->$_),
    qw(dsn username password options search_path);
  } else {
    map $self->pg->$_($self->$_),
    qw(dsn username password options search_path max_connections);#database_class pubsub
  }

  $self->dsn('dbi:Pg:'.$self->dsn)
    unless !$self->dsn || $self->dsn =~ /^dbi:Pg:/;
  
  return $self;
}

#DBI
sub connect {
  my $self = ref $_[0] ? shift : shift->SUPER::new;
  map { my $has = shift; $has && $self->$_($has)} qw(dsn username password);

  if (ref $_[0]) {
    my $attrs =  shift;
    my $options = $self->options;
    @$options{ keys %$attrs } = values %$attrs;
  } elsif (@_) {
    my $attrs = {@_};
    map $self->$_($attrs->{$_}), keys %$attrs;
  }
  
  $self->dsn('dbi:Pg:'.$self->dsn)
    unless !$self->dsn || $self->dsn =~ /^dbi:Pg:/;
  say STDERR sprintf("[$PKG->connect] prepare connection data for [%s]", $self->dsn, )
    if $self->debug;
  
  $self->pg(Mojo::Pg->new)
    unless $self->pg;
  
  map $self->pg->$_($self->$_),
    qw(dsn username password options search_path max_connections);#database_class  pubsub
  
  return $self;
}

sub db {
  my ($self, $dbh) = (shift, shift);

  # Fork-safety
  delete @$self{qw(pid queue)} unless ($self->{pid} //= $$) eq $$;
  
  $dbh ||= $self->_dequeue;

  return $self->database_class->new(dbh => $dbh, pg => $self);
}

sub prepare { shift->db->prepare(@_); }
sub prepare_cached { shift->db->prepare_cached(@_); }

# если уже sth и он не в асинхроне - взять его dbh
sub _db {
  my $self = shift;
  my $sth = ref $_[0] ? shift : return $self->db;
  return $self->db
    if !!$sth->{pg_async_status};
  #~ my $db = $sth->{private_mojo_db};
  #~ return $db
    #~ if $db && !$db->dbh->{pg_async_status};

  $self->db($sth->{Database});
}

# если уже sth и он не в асинхроне - взять в запрос его
# или просто у него взять строку запроса для нового dbh
sub _sth {
  my $self = shift;
  my $sth = ref $_[0] ? shift : return @_;
  #~ warn "_sth: $sth->{pg_async_status}, ";
  return ($sth, @_)
    if $sth->{pg_async_status} != 1;
  return ($sth->{Statement}, @_);
}

sub query { my $self = shift; $self->_db(@_)->select($self->_sth(@_)) }
sub select { my $self = shift; $self->_db(@_)->select($self->_sth(@_)) }
sub selectrow_array { my $self = shift; $self->_db(@_)->selectrow_array($self->_sth(@_)) }
sub selectrow_arrayref { my $self = shift; $self->_db(@_)->selectrow_arrayref($self->_sth(@_)) }
sub selectrow_hashref { my $self = shift; $self->_db(@_)->selectrow_hashref($self->_sth(@_)) }
sub selectall_arrayref { my $self = shift; $self->_db(@_)->selectall_arrayref($self->_sth(@_)) }
sub selectall_hashref { my $self = shift; $self->_db(@_)->selectall_hashref($self->_sth(@_)) }
sub selectcol_arrayref { my $self = shift; $self->_db(@_)->selectcol_arrayref($self->_sth(@_)) }
sub do { my $self = shift; $self->_db(@_)->do($self->_sth(@_)) }

#~ sub begin_work {croak 'Use $pg->db->tx | $pg->db->begin';}
sub tx {shift->begin}
sub begin_work {shift->begin}
sub begin {
  my $self = shift;
  my $db = $self->db;
  $db->begin;
  return $db;
}

sub commit  {croak 'Instead use: $tx = $pg->begin; $tx->do(...); $tx->commit;';}
sub rollback {croak 'Instead use: $tx = $pg->begin; $tx->do(...); $tx->rollback;';}

# Patch parent Mojo::Pg::_dequeue
sub _dequeue {
  my $self = shift;
  
  my $queue = $self->{queue} ||= [];
  for my $i (0..$#$queue) {
    
    my $dbh = $queue->[$i];

    next
      if $dbh->{pg_async_status} && $dbh->{pg_async_status} > 0;
    
    splice(@$queue, $i, 1);    #~ delete $queue->[$i]
    
    ($self->debug
      && (say STDERR sprintf("[$PKG->_dequeue] [$dbh] does dequeued, pool count:[%s]", scalar @$queue))
      && 0)
      or return $dbh
      if $dbh->ping;
    
  }
  
  my $dbh = DBI->connect(map { $self->$_ } qw(dsn username password options));
  $self->debug
    && say STDERR sprintf("[$PKG->_dequeue] new DBI connection [$dbh]", );
  

  $self->emit(connection => $dbh);

  return $dbh;
}

sub _enqueue {
  my ($self, $dbh) = @_;
  my $queue = $self->{queue} ||= [];
  #~ warn "queue++ $dbh:", scalar @$queue and
  
  if ($dbh->{Active} && ($dbh->{pg_async_status} && $dbh->{pg_async_status} > 0) || @$queue < $self->max_connections) {
    unshift @$queue, $dbh;
    $self->debug
      && say STDERR sprintf("[$PKG->_enqueue] [$dbh] does enqueued, pool count:[%s], pg_async_status=[%s]", scalar @$queue, $dbh->{pg_async_status});
    return;
  }
  #~ shift @$queue while @$queue > $self->max_connections;
  $self->debug
    && say STDERR sprintf("[$PKG->_enqueue] [$dbh] does not enqueued, pool count:[%s]", scalar @$queue);
}

1;

__END__

has pubsub => sub {
  require Mojo::Pg::PubSub;
  my $pubsub = Mojo::Pg::PubSub->new(pg => shift);
  #~ weaken $pubsub->{pg};#???
#Mojo::Reactor::EV: Timer failed: Can't call method "db" on an undefined value at t/06-pubsub.t line 21.
#EV: error in callback (ignoring): Can't call method "db" on an undefined value at Mojo/Pg/PubSub.pm line 44.
  return $pubsub;
};