package MogileFS::Worker;
use strict;
use fields ('psock',              # socket for parent/child communications
            'last_bcast_state',   # "{device|host}-$devid" => [$time, {alive|dead}]
            'readbuf',            # unparsed data from parent
            'monitor_has_run',    # true once we've heard of the monitor job being alive
            'last_ping',          # time we last said we're alive
            'woken_up',           # bool: if we've been woken up
            'last_wake'           # hashref: { $class -> time() } when we last woke up a certain job class
            );

use MogileFS::Util qw(error);
use MogileFS::Server;

use vars (
          '$got_live_vs_die',    # local'ized scalarref flag for whether we've
                                 # gotten a live-vs-die instruction from parent
          );

sub new {
    my ($self, $psock) = @_;
    $self = fields::new($self) unless ref $self;

    $self->{psock}            = $psock;
    $self->{readbuf}          = '';
    $self->{last_bcast_state} = {};
    $self->{monitor_has_run}  = 0;
    $self->{last_ping}        = 0;
    $self->{last_wake}        = {};

    IO::Handle::blocking($psock, 0);
    return $self;
}

sub psock_fd {
    my $self = shift;
    return fileno($self->{psock});
}

sub validate_dbh {
    return Mgd::validate_dbh();
}

sub get_dbh {
    return Mgd::get_dbh();
}

sub monitor_has_run {
    my $self = shift;
    return $self->{monitor_has_run} ? 1 : 0;
}

sub forget_that_monitor_has_run {
    my $self = shift;
    $self->{monitor_has_run} = 0;
}

sub wait_for_monitor {
    my $self = shift;
    while (! $self->monitor_has_run) {
        $self->read_from_parent;
        $self->still_alive;
        sleep 1;
    }
}

# method that workers can call just to write something to the parent, so worker
# doesn't get killed.  (during idle/slow operation, say)
# returns current time, so caller can avoid a time() call as well, for its loop
sub still_alive {
    my $self = shift;
    my $now = time();
    if ($now > $self->{last_ping}) {
        $self->send_to_parent(":still_alive");  # a no-op, just for the watchdog
        $self->{last_ping} = $now;
    }
    return $now;
}

sub send_to_parent {
    my $self = shift;

    # can be called as package method:  MogileFS::Worker->send_to_parent...
    unless (ref $self) {
        $self = MogileFS::ProcManager->is_child
            or return;
    }

    my $write = "$_[0]\r\n";
    my $totallen = length $write;
    my $rv = syswrite($self->{psock}, $write);
    return 1 if defined $rv && $rv == $totallen;
    die "Error writing to parent process: $!" if $! && ! $!{EAGAIN};

    $rv ||= 0;  # could've been undef, if EAGAIN immediately.
    my $remain = $totallen - $rv;
    my $offset = $rv;
    while ($remain > 0) {
        MogileFS::Util::wait_for_writeability(fileno($self->{psock}), 30)
            or die "Parent not writable in 30 seconds";

        $rv = syswrite($self->{psock}, $write, $remain, $offset);
        die "Error writing to parent process (in loop): $!" if $! && ! $!{EAGAIN};
        if ($rv) {
            $remain -= $rv;
            $offset += $rv;
        }
    }
    die "remain is negative:  $remain" if $remain < 0;
    return 1;
}

# override in children
sub watchdog_timeout {
    return 10;
}

# should be overridden by workers to process worker-specific directives
# from the parent process.  return 1 if you recognize the command, 0 otherwise.
sub process_line {
    my ($self, $lineref) = @_;
    return 0;
}

sub read_from_parent {
    my $self = shift;
    my $psock = $self->{psock};

    # while things are immediately available,
    while (MogileFS::Util::wait_for_readability(fileno($psock), 0)) {
        my $buf;
        my $rv = sysread($psock, $buf, 1024);
        if (!$rv) {
            if (defined $rv) {
                die "While reading pipe from parent, got EOF.  Parent's gone.  Quitting.\n";
            } else {
                die "Error reading pipe from parent: $!\n";
            }
        }

        if ($Mgd::POST_SLEEP_DEBUG) {
            my $out = $buf;
            $out =~ s/\s+$//;
            warn "proc ${self}[$$] read: [$out]\n"
        }
        $self->{readbuf} .= $buf;

        while ($self->{readbuf} =~ s/^(.+?)\r?\n//) {
            my $line = $1;

            next if $self->process_generic_command(\$line);
            my $ok = $self->process_line(\$line);
            unless ($ok) {
                error("Unrecognized command from parent: $line");
            }
        }
    }
}

sub parent_ping {
    my $self = shift;
    my $psock = $self->{psock};
    $self->send_to_parent(':ping');

    my $got_reply = 0;
    die "recursive parent_ping!" if $got_live_vs_die;
    local $got_live_vs_die = \$got_reply;

    my $loops = 0;

    while (!$got_reply) {
        $self->read_from_parent;
        return if $got_reply;

        $loops++;
        select undef, undef, undef, 0.20;
        if ($loops > 5) {
            warn "No simple reply from parent to child $self [$$] in $loops 0.2second loops.\n";
            die "No answer in 4 seconds from parent to child $self [$$], dying" if $loops > 20;
        }
    }
}

sub broadcast_device_writeable {
    $_[0]->_broadcast_state("device", $_[1], "writeable");
}
sub broadcast_device_readable {
    $_[0]->_broadcast_state("device", $_[1], "readable");
}
sub broadcast_device_unreachable {
    $_[0]->_broadcast_state("device", $_[1], "unreachable");
}
sub broadcast_host_reachable {
    $_[0]->_broadcast_state("host", $_[1], "reachable");
}
sub broadcast_host_unreachable {
    $_[0]->_broadcast_state("host", $_[1], "unreachable");
}

sub _broadcast_state {
    my ($self, $what, $whatid, $state) = @_;
    if ($what eq "host") {
        MogileFS::Host->of_hostid($whatid)->set_observed_state($state);
    } elsif ($what eq "device") {
        MogileFS::Device->of_devid($whatid)->set_observed_state($state);
    }
    my $key = "$what-$whatid";
    my $laststate = $self->{last_bcast_state}{$key};
    my $now = time();
    # broadcast on initial discovery, state change, and every 10 seconds
    if (!$laststate || $laststate->[1] ne $state || $laststate->[0] < $now - 10) {
        $self->send_to_parent(":state_change $what $whatid $state");
        $self->{last_bcast_state}{$key} = [$now, $state];
    }
}

sub invalidate_meta {
    my ($self, $what) = @_;
    return if $Mgd::INVALIDATE_NO_PROPOGATE;  # anti recursion
    $self->send_to_parent(":invalidate_meta $what");
}

# tries to parse generic (not job-specific) commands sent from parent
# to child.  returns 1 on success, or 0 if comman given isn't generic,
# and child should parse.
# lineref doesn't have \r\n at end.
sub process_generic_command {
    my ($self, $lineref) = @_;
    return 0 unless $$lineref =~ /^:/;  # all generic commands start with colon

    if ($$lineref =~ /^:state_change (\w+) (\d+) (\w+)/) {
        my ($what, $whatid, $state) = ($1, $2, $3);
        if ($what eq "host") {
            MogileFS::Host->of_hostid($whatid)->set_observed_state($state);
        } elsif ($what eq "device") {
            MogileFS::Device->of_devid($whatid)->set_observed_state($state);
        }
        return 1;
    }

    if ($$lineref =~ /^:shutdown/) {
        $$got_live_vs_die = 1 if $got_live_vs_die;
        exit 0;
    }

    if ($$lineref =~ /^:stay_alive/) {
        $$got_live_vs_die = 1 if $got_live_vs_die;
        return 1;
    }

    if ($$lineref =~ /^:invalidate_meta_once (\w+)/) {
        local $Mgd::INVALIDATE_NO_PROPOGATE = 1;
        # where $1 is one of {"domain", "device", "host", "class"}
        my $class = "MogileFS::" . ucfirst(lc($1));
        $class->invalidate_cache;
        return 1;
    }

    if ($$lineref =~ /^:monitor_has_run/) {
        $self->{monitor_has_run} = 1;
        return 1;
    }

    if ($$lineref =~ /^:wake_up/) {
        $self->{woken_up} = 1;
        return 1;
    }

    if ($$lineref =~ /^:set_config_from_parent (\S+) (.+)/) {
        # the 'no_broadcast' API keeps us from looping forever.
        MogileFS::Config->set_config_no_broadcast($1, $2);
        return 1;
    }

    # :set_dev_utilization dev# 45.2 dev# 45.2 dev# 45.2 dev# 45.2 dev 45.2\n
    # (dev#, utilz%)+
    if (my ($devid, $util) = $$lineref =~ /^:set_dev_utilization (.+)/) {
        my %pairs = split(/\s+/, $1);
        local $MogileFS::Device::util_no_broadcast = 1;
        while (my ($devid, $util) = each %pairs) {
            my $dev = eval { MogileFS::Device->of_devid($devid) } or next;
            $dev->set_observed_utilization($util);
        }
        return 1;
    }

    # TODO: warn on unknown commands?

    return 0;
}

sub was_woken_up {
    my MogileFS::Worker $self = shift;
    return $self->{woken_up};
}

sub forget_woken_up {
    my MogileFS::Worker $self = shift;
    $self->{woken_up} = 0;
}

# don't wake processes more than once a second... not necessary.
sub wake_a {
    my ($self, $class) = @_;
    my $now = time();
    return if ($self->{last_wake}{$class}||0) == $now;
    $self->{last_wake}{$class} = $now;
    $self->send_to_parent(":wake_a $class");
}

1;

# Local Variables:
# mode: perl
# c-basic-indent: 4
# indent-tabs-mode: nil
# End: