# <@LICENSE>
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to you under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# </@LICENSE>

package Mail::SpamAssassin::Util::TinyRedis;
# Implements the new unified request protocol, introduced in Redis 1.2 .

use strict;
use re 'taint';
use warnings;

use Errno qw(EINTR EAGAIN EPIPE ENOTCONN ECONNRESET ECONNABORTED);
use IO::Socket::UNIX;
use Time::HiRes ();

our $io_socket_module_name;
BEGIN {
  if (eval { require IO::Socket::IP }) {
    $io_socket_module_name = 'IO::Socket::IP';
  } elsif (eval { require IO::Socket::INET6 }) {
    $io_socket_module_name = 'IO::Socket::INET6';
  } elsif (eval { require IO::Socket::INET }) {
    $io_socket_module_name = 'IO::Socket::INET';
  }
}

sub new {
  my($class, %args) = @_;
  my $self = bless { args => {%args} }, $class;
  my $outbuf = ''; $self->{outbuf} = \$outbuf;
  $self->{batch_size} = 0;
  $self->{server} = $args{server} || $args{sock} || '127.0.0.1:6379';
  $self->{on_connect} = $args{on_connect};
  return if !$self->connect;
  $self;
}

sub DESTROY {
  my $self = $_[0];
  local($@, $!, $_);
  undef $self->{sock};
}

sub disconnect {
  my $self = $_[0];
  local($@, $!);
  undef $self->{sock};
}

sub connect {
  my $self = $_[0];

  $self->disconnect;
  my $sock;
  my $server = $self->{server};
  if ($server =~ m{^/}) {
    $sock = IO::Socket::UNIX->new(
              Peer => $server, Type => SOCK_STREAM);
  } elsif ($server =~ /^(?: \[ ([^\]]+) \] | ([^:]+) ) : ([^:]+) \z/xs) {
    $server = defined $1 ? $1 : $2;  my $port = $3;
    $sock = $io_socket_module_name->new(
              PeerAddr => $server, PeerPort => $port, Proto => 'tcp');
  } else {
    die "Invalid 'server:port' specification: $server";
  }
  if ($sock) {
    $self->{sock} = $sock;

    $self->{sock_fd} = $sock->fileno; $self->{fd_mask} = '';
    vec($self->{fd_mask}, $self->{sock_fd}, 1) = 1;

    # an on_connect() callback must not use batched calls!
    $self->{on_connect}->($self)  if $self->{on_connect};
  }
  $sock;
}

# Receive, parse and return $cnt consecutive redis replies as a list.
#
sub _response {
  my($self, $cnt) = @_;

  my $sock = $self->{sock};
  if (!$sock) {
    $self->connect  or die "Connect failed: $!";
    $sock = $self->{sock};
  };

  my @list;

  for (1 .. $cnt) {

    my $result = <$sock>;
    if (!defined $result) {
      $self->disconnect;
      die "Error reading from Redis server: $!";
    }
    chomp $result;
    my $resp_type = substr($result, 0, 1, '');

    if ($resp_type eq '$') {  # bulk reply
      if ($result < 0) {
        push(@list, undef);  # null bulk reply
      } else {
        my $data = ''; my $ofs = 0; my $len = $result + 2;
        while ($len > 0) {
          my $nbytes = read($sock, $data, $len, $ofs);
          if (!$nbytes) {
            $self->disconnect;
            defined $nbytes  or die "Error reading from Redis server: $!";
            die "Redis server closed connection";
          }
          $ofs += $nbytes; $len -= $nbytes;
        }
        chomp $data;
        push(@list, $data);
      }

    } elsif ($resp_type eq ':') {  # integer reply
      push(@list, 0+$result);

    } elsif ($resp_type eq '+') {  # status reply
      push(@list, $result);

    } elsif ($resp_type eq '*') {  # multi-bulk reply
      push(@list, $result < 0 ? undef : $self->_response(0+$result) );

    } elsif ($resp_type eq '-') {  # error reply
      die "$result\n";

    } else {
      die "Unknown Redis reply: $resp_type ($result)";
    }
  }
  \@list;
}

sub _write_buff {
  my($self, $bufref) = @_;

  if (!$self->{sock}) { $self->connect or die "Connect failed: $!" };
  my $nwrite;
  for (my $ofs = 0; $ofs < length($$bufref); $ofs += $nwrite) {
    # to reliably detect a disconnect we need to check for an input event
    # using a select; checking status of syswrite is not sufficient
    my($rout, $wout, $inbuff); my $fd_mask = $self->{fd_mask};
    my $nfound = select($rout=$fd_mask, $wout=$fd_mask, undef, undef);
    defined $nfound && $nfound >= 0 or die "Select failed: $!";
    if (vec($rout, $self->{sock_fd}, 1) &&
        !sysread($self->{sock}, $inbuff, 1024)) {
      # eof, try reconnecting
      $self->connect  or die "Connect failed: $!";
    }
    local $SIG{PIPE} = 'IGNORE';  # don't signal on a write to a widowed pipe
    $nwrite = syswrite($self->{sock}, $$bufref, length($$bufref)-$ofs, $ofs);
    next if defined $nwrite;
    $nwrite = 0;
    if ($! == EINTR || $! == EAGAIN) {  # no big deal, try again
      Time::HiRes::sleep(0.1);  # slow down, just in case
    } else {
      $self->disconnect;
      if ($! == ENOTCONN   || $! == EPIPE ||
          $! == ECONNRESET || $! == ECONNABORTED) {
        $self->connect  or die "Connect failed: $!";
      } else {
        die "Error writing to redis socket: $!";
      }
    }
  }
  1;
}

# Send a redis command with arguments, returning a redis reply.
#
sub call {
  my $self = shift;

  my $buff = '*' . scalar(@_) . "\015\012";
  $buff .= '$' . length($_) . "\015\012" . $_ . "\015\012"  for @_;

  $self->_write_buff(\$buff);
  local($/) = "\015\012";
  my $arr_ref = $self->_response(1);
  $arr_ref && $arr_ref->[0];
}

# Append a redis command with arguments to a batch.
#
sub b_call {
  my $self = shift;

  my $bufref = $self->{outbuf};
  $$bufref .= '*' . scalar(@_) . "\015\012";
  $$bufref .= '$' . length($_) . "\015\012" . $_ . "\015\012"  for @_;
  ++ $self->{batch_size};
}

# Send a batch of commands, returning an arrayref of redis replies,
# each array element corresponding to one command in a batch.
#
sub b_results {
  my $self = $_[0];
  my $batch_size = $self->{batch_size};
  return if !$batch_size;
  my $bufref = $self->{outbuf};
  $self->_write_buff($bufref);
  $$bufref = ''; $self->{batch_size} = 0;
  local($/) = "\015\012";
  $self->_response($batch_size);
}

1;