package Mojo::MySQL5::Database;
use Mojo::Base 'Mojo::EventEmitter';

use Mojo::MySQL5::Connection;
use Mojo::MySQL5::Results;
use Mojo::MySQL5::Transaction;
use Encode '_utf8_off';
use Scalar::Util 'weaken';
use Carp 'croak';

has ['mysql', 'connection'];

sub DESTROY {
  my $self = shift;
  return unless my $c = $self->connection;
  return unless my $mysql = $self->mysql;
  $mysql->_enqueue($c);
}

sub backlog { scalar @{shift->{waiting} || []} }

sub begin {
  my $self = shift;
  croak 'Already in a transaction' if ($self->connection->{status_flags} & 0x0001);
  $self->query('START TRANSACTION');
  $self->query('SET autocommit=0');
  my $tx = Mojo::MySQL5::Transaction->new(db => $self);
  weaken $tx->{db};
  return $tx;
}

sub connect {
  my $self = shift;
  my $c = Mojo::MySQL5::Connection->new(url => $self->mysql->url->clone);
  $c->on(error => sub {
    my ($c, $err) = @_;
    warn 'Unable to connect to "', $self->mysql->url, '" ', $err, "\n";
  });
  $c->connect();
  $c->unsubscribe('error');
  return $self->connection($c);
}

sub disconnect { shift->connection->disconnect }

sub pid { shift->connection->{connection_id} }

sub ping { shift->connection->ping }

sub query {
  my $self = shift;
  my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
  my $sql = shift;
  my $expand_sql = '';

  _utf8_off $sql;

  while (length($sql) > 0) {
    my $token;
    if ($sql =~ /^(\s+)/s                                   # whitespace
      or $sql =~ /^(\w+)/) {                                # general name
      $token = $1;
    }
    elsif ($sql =~ /^--.*(?:\n|\z)/p                        # double-dash comment
      or $sql =~ /^\#.*(?:\n|\z)/p                          # hash comment
      or $sql =~ /^\/\*(?:[^\*]|\*[^\/])*(?:\*\/|\*\z|\z)/p # C-style comment
      or $sql =~ /^'(?:[^'\\]*|\\(?:.|\n)|'')*(?:'|\z)/p    # single-quoted literal text
      or $sql =~ /^"(?:[^"\\]*|\\(?:.|\n)|"")*(?:"|\z)/p    # double-quoted literal text
      or $sql =~ /^`(?:[^`]*|``)*(?:`|\z)/p) {              # schema-quoted literal text
      $token = ${^MATCH};
    }
    else {
      $token = substr($sql, 0, 1);
    }
    $expand_sql .= $token eq '?' ? $self->quote(shift) : $token;
    substr($sql, 0, length($token), '');
  }

  croak 'async query in flight' if $self->backlog and !$cb;
  $self->_subscribe unless $self->backlog;

  push @{$self->{waiting}}, { cb => $cb, sql => $expand_sql, count => 0, started => 0,
    results => Mojo::MySQL5::Results->new };

  # Blocking
  unless ($cb) {
    $self->connection->query($expand_sql);
    $self->_unsubscribe;
    my $current = shift @{$self->{waiting}};
    croak $self->connection->{error_message} if $self->connection->{error_code};
    return $current->{results};
  }

  # Non-blocking
  $self->_next;
}

sub quote {
  my ($self, $string) = @_;
  return 'NULL' unless defined $string;

  for ($string) {
    s/\\/\\\\/g;
    s/\0/\\0/g;
    s/\n/\\n/g;
    s/\r/\\r/g;
    s/'/\\'/g;
    # s/"/\\"/g;
    s/\x1a/\\Z/g;
  }

  return "'$string'";
}

sub quote_id {
  my ($self, $id) = @_;
  return 'NULL' unless defined $id;
  $id =~ s/`/``/g;
  return "`$id`";
}


sub _next {
  my $self = shift;

  return unless my $next = $self->{waiting}[0];
  return if $next->{started}++;

  $self->connection->query($next->{sql}, sub { 
    my $c = shift;
    my $current = shift @{$self->{waiting}};
    my $error = $c->{error_message};

    $self->backlog ? $self->_next : $self->_unsubscribe;

    my $cb = $current->{cb};
    $self->$cb($error, $current->{results});
  });

}

sub _subscribe {
  my $self = shift;

  $self->connection->on(fields => sub {
    my ($c, $fields) = @_;
    return unless my $res = $self->{waiting}->[0]->{results};
    push @{ $res->{_columns} }, $fields;
    $self->{waiting}->[0]->{count}++;
  });

  $self->connection->on(result => sub {
    my ($c, $row) = @_;
    return unless my $res = $self->{waiting}->[0]->{results};
    push @{ $res->{_results}->[$self->{waiting}->[0]->{count} - 1] //= [] }, $row;
  });

  $self->connection->on(end => sub {
    my $c = shift;
    return unless my $res = $self->{waiting}->[0]->{results};
    $res->{$_} = $c->{$_} for qw(affected_rows last_insert_id warnings_count);
  });

  $self->connection->on(error => sub {
    my $c = shift;
    return unless my $res = $self->{waiting}->[0]->{results};
    $res->{$_} = $c->{$_} for qw(error_code sql_state error_message);
  });
}

sub _unsubscribe {
  my $self = shift;
  $self->connection->unsubscribe($_) for qw(fields result end error);
}

1;

=encoding utf8

=head1 NAME

Mojo::MySQL5::Database - Database

=head1 SYNOPSIS

  use Mojo::MySQL5::Database;

  my $db = Mojo::MySQL5::Database->new(
    mysql => $mysql,
    connection => Mojo::MySQL5::Connection->new);

=head1 DESCRIPTION

L<Mojo::MySQL5::Database> is a container for L<Mojo::MySQL5::Connection> handles used by L<Mojo::MySQL5>.

=head1 ATTRIBUTES

L<Mojo::MySQL5::Database> implements the following attributes.

=head2 connection

  my $c = $db->connection;
  $db   = $db->connection(Mojo::MySQL5::Connection->new);

Database connection used for all queries.

=head2 mysql

L<Mojo::MySQL5> object this database belongs to.

=head1 METHODS

L<Mojo::MySQL5::Database> inherits all methods from L<Mojo::EventEmitter> and
implements the following ones.

=head2 backlog

  my $num = $db->backlog;

Number of waiting non-blocking queries.

=head2 begin

  my $tx = $db->begin;

Begin transaction and return L<Mojo::MySQL5::Transaction> object, which will
automatically roll back the transaction unless
L<Mojo::MySQL5::Transaction/"commit"> bas been called before it is destroyed.

  # Insert rows in a transaction
  eval {
    my $tx = $db->begin;
    $db->query('insert into frameworks values (?)', 'Catalyst');
    $db->query('insert into frameworks values (?)', 'Mojolicious');
    $tx->commit;
  };
  say $@ if $@;

=head2 connect

  $db->connect;

Connect to MySQL server.

=head2 disconnect

  $db->disconnect;

Disconnect database connection and prevent it from getting cached again.

=head2 pid

  my $pid = $db->pid;

Return the connection id of the backend server process.

=head2 ping

  my $bool = $db->ping;

Check database connection.

=head2 query

  my $results = $db->query('select * from foo');
  my $results = $db->query('insert into foo values (?, ?, ?)', @values);

Execute a blocking statement and return a L<Mojo::MySQL5::Results> object with the
results. You can also append a callback to perform operation non-blocking.

  $db->query('select * from foo' => sub {
    my ($db, $err, $results) = @_;
    ...
  });
  Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

=head2 quote
 
  my $escaped = $db->quote($str);
 
Quote string value for passing to SQL query.
 
=head2 quote_id
 
  my $escaped = $db->quote_id($id);
 
Quote identifier for passing to SQL query.

=head1 SEE ALSO

L<Mojo::MySQL5>.

=cut