### simple package for handling the stream request port
package Mogstored::SideChannelClient;

use strict;
use base qw{Perlbal::Socket};
use fields (
            'count',      # how many requests we've serviced
            'read_buf',   # unprocessed read buffer
            'mogsvc',     # the mogstored Perlbal::Service object
            );

# needed since we're pretending to be a Perlbal::Socket... never idle out
sub max_idle_time { return 0; }

sub new {
    my Mogstored::SideChannelClient $self = shift;
    $self = fields::new($self) unless ref $self;
    $self->SUPER::new(@_);
    $self->{count} = 0;
    $self->{read_buf} = '';
    $self->{mogsvc} = Perlbal->service('mogstored');
    return $self;
}

sub event_read {
    my Mogstored::SideChannelClient $self = shift;

    my $bref = $self->read(1024);
    return $self->close unless defined $bref;
    $self->{read_buf} .= $$bref;

    my $path = $self->{mogsvc}->{docroot};

    while ($self->{read_buf} =~ s/^(.+?)\r?\n//) {
        my $cmd = $1;
        if ($cmd =~ /^size (\S+)$/) {
            # increase our count
            $self->{count}++;

            # validate uri
            my $uri = $1;
            if ($uri =~ /\.\./) {
                $self->write("ERROR: uri invalid (contains ..)\r\n");
                return;
            }

            # now stat the file to get the size and such
            Perlbal::AIO::aio_stat("$path$uri", sub {
                return if $self->{closed};
                my $size = -e _ ? -s _ : -1;
                $self->write("$uri $size\r\n");
            });
        } elsif ($cmd =~ /^watch$/i) {
            unless (Mogstored->iostat_available) {
                $self->write("ERR iostat unavailable\r\n");
                next;
            }
            $self->watch_read(0);
            Mogstored->iostat_subscribe($self);
        } elsif (my ($start, $end, $raw_devs) = 
                 $cmd =~ /^fid_sizes \s+ (\d+)-(\d+) \s+ ([\d\s]+)$/ix) {
            my @devs = split /\s+/, $raw_devs;
            my $args = Storable::nfreeze([$start, $end, \@devs]);
            my $task = Gearman::Task->new(
                "fid_sizes" => \$args,
                {
                    on_complete => sub {
                        my $res = shift;
                        my $devices = Storable::thaw($$res);
                        foreach my $device_ent (@$devices) {
                            my ($device, $entries) = @$device_ent;
                            foreach my $entry (@$entries) {
                                my ($fid, $size) = @$entry;
                                $self->write("$device\t$fid\t$size\n");
                            }
                        }
                        $self->write(".\n");
                        $self->watch_read(1);
                    },
                    on_fail => sub {
                        $self->write("ERR error processing fid_sizes request\n");
                        $self->watch_read(1);
                    },
                });
            Mogstored->gearman_client->add_task($task);
            $self->watch_read(0);
        } else {
            # we don't understand this so pass it on to manage command interface
            my @out;
            Perlbal::run_manage_command($cmd, sub { push @out, $_[0]; });
            $self->write(join("\r\n", @out) . "\r\n");
        }
    }
}

# stop watching writability if we've nothing else to
# write to them.  else just kick off more writes.
sub event_write {
    my $self = shift;
    $self->watch_write(0) if $self->write(undef);
}

# override Danga::Socket's event handlers which die
sub event_err { $_[0]->close; }
sub event_hup { $_[0]->close; }

# as_string handler
sub as_string {
    my Mogstored::SideChannelClient $self = shift;

    my $ret = $self->SUPER::as_string;
    $ret .= "; size_requests=$self->{count}";

    return $ret;
}

sub close {
    my Mogstored::SideChannelClient $self = shift;
    Mogstored->iostat_unsubscribe($self);
    $self->SUPER::close;
}

sub die_gracefully {
    Mogstored->on_sidechannel_die_gracefully;
}

1;