package MogileFS::Worker::Fsck;

use strict;
use base 'MogileFS::Worker';
use fields (
            'last_stop_check',     # unixtime 'should_stop_running' last called
            'last_maxcheck_write', # unixtime maxcheck written
            'size_checker',        # subref which, given a DevFID, returns size of file
            'opt_nostat',          # bool: do we trust mogstoreds? skipping size stats?
            );
use MogileFS::Util qw(every error debug);
use List::Util ();
use Time::HiRes ();

use constant SUCCESS => 0;
use constant TEMPORARY => 1;
use constant PERMANENT => 2;
use constant REPLICATE => 3;

use constant EV_NO_PATHS         => "NOPA";
use constant EV_POLICY_VIOLATION => "POVI";
use constant EV_FILE_MISSING     => "MISS";
use constant EV_BAD_LENGTH       => "BLEN";
use constant EV_CANT_FIX         => "GONE";
use constant EV_START_SEARCH     => "SRCH";
use constant EV_FOUND_FID        => "FOND";
use constant EV_RE_REPLICATE     => "REPL";

use POSIX ();

my $nowish;  # approximate unixtime, updated once per loop.

sub watchdog_timeout { 30 }

sub work {
    my $self = shift;

    my $run_count = 0;

    # this can be CPU-intensive.  let's nice ourselves down.
    POSIX::nice(10);

    # <debug crap>
    my $running = 0; # start time
    my $n_check = 0; # items checked
    my $start = sub {
        return if $running;
        $running = $nowish = time();
    };
    my $stats = sub {
        return unless $running;
        my $elap = $nowish - $running;
        debug(sprintf("In %d secs, %d fids, %0.02f fids/sec\n", $elap, $n_check, ($n_check / ($elap || 1))));
    };
    my $last_beat = 0;
    my $beat = sub {
        return unless $nowish >= $last_beat + 5;
        $stats->();
        $last_beat = $nowish;
    };
    my $stop = sub {
        return unless $running;
        $stats->();
        debug("done.");
        $running = 0;
    };
    # </debug crap>

    my $sto         = Mgd::get_store();
    my $max_checked = 0;

    every(5.0, sub {
        my $sleep_set = shift;
        $self->parent_ping;
        $nowish = time();
        local $Mgd::nowish = $nowish;

        # see if we're even enabled for this host.
        unless ($self->should_be_running) {
            $max_checked = 0;  # uncache this
            return;
        }

        # checking doesn't go well if the monitor job hasn't actively started
        # marking things as being available
        unless ($self->monitor_has_run) {
            # only warn on runs after the first.  gives the monitor job some time to work
            # before we throw a message.
            debug("waiting for monitor job to complete a cycle before beginning")
                if $run_count++ > 0;
            return;
        }

        $max_checked ||= MogileFS::Config->server_setting('fsck_highest_fid_checked') || 0;
        $self->{opt_nostat} = MogileFS::Config->server_setting('fsck_opt_policy_only')     || 0;
        my @fids       = $sto->get_fids_above_id($max_checked, 5000);

        unless (@fids) {
            $sto->set_server_setting("fsck_host", undef);
            $sto->set_server_setting("fsck_stop_time", $sto->get_db_unixtime);
            $self->set_max_checked($max_checked) if $max_checked;
            $stop->();
            return;
        }
        $start->();

        MogileFS::FID->mass_load_devids(@fids);

        $self->init_size_checker(\@fids);

        # don't sleep in loop, next round, since we found stuff to work on
        # this round...
        $sleep_set->(0);

        my $new_max;
        my $hit_problem = 0;

        foreach my $fid (@fids) {
            last if $self->should_stop_running;
            if (!$self->check_fid($fid)) {
                # some connectivity problem... abort checking more
                # for now.
                $hit_problem = 1;
                last;
            }
            $max_checked = $fid->id;
            $self->set_max_checked_lazy($max_checked);
            $n_check++;
            $beat->();
        }

        # if we had connectivity problems, let's sleep a bit
        if ($hit_problem) {
            error("connectivity problems; stalling 5s");
            sleep 5;
        }
    });
}

# only write to server_settings table our position every 5 seconds
sub set_max_checked_lazy {
    my ($self, $nmax) = @_;
    return 0 if $nowish < ($self->{last_maxcheck_write} || 0) + 5;
    $self->{last_maxcheck_write} = $nowish;
    $self->set_max_checked($nmax);
}

sub set_max_checked {
    my ($self, $nmax) = @_;
    MogileFS::Config->set_server_setting('fsck_highest_fid_checked', $nmax);
}

# this version is accurate,
sub should_be_running {
    my $self = shift;
    my $fhost = MogileFS::Config->server_setting('fsck_host')
        or return;
    return $fhost eq MogileFS::Config->hostname;
}

# this version is sloppy, optimized for speed.  only checks db every 5 seconds.
sub should_stop_running {
    my $self = shift;
    return 0 if $nowish < ($self->{last_stop_check} || 0) + 5;
    $self->{last_stop_check} = $nowish;
    return ! $self->should_be_running;
}

# given a $fid (MogileFS::FID, with pre-populated ->devids data)
# return 0 if reachability problems.
# return 1 if fid was checked (regardless of there being problems or not)
#   if no problems, no action.
#   if problems, log & enqueue fixes
use constant STALLED => 0;
use constant HANDLED => 1;
sub check_fid {
    my ($self, $fid) = @_;

    my $fix = sub {
        my $fixed = eval { $self->fix_fid($fid) };
        if (! defined $fixed) {
            error("Fsck stalled: $@");
            return STALLED;
        }
        $fid->fsck_log(EV_CANT_FIX) if ! $fixed;

        # that might've all taken awhile, let's update our approximate time
        $nowish = $self->still_alive;
        return HANDLED;
    };

    # first obvious fucked-up case:  no devids even presumed to exist.
    unless ($fid->devids) {
        # first, log this weird condition.
        $fid->fsck_log(EV_NO_PATHS);

        # weird, schedule a fix (which will do a search over all
        # devices as a last-ditch effort to locate it)
        return $fix->();
    }

    # first, see if the assumed devids meet the replication policy for
    # the fid's class.
    unless ($fid->devids_meet_policy) {
        # log a policy violation
        $fid->fsck_log(EV_POLICY_VIOLATION);
        return $fix->();
    }

    # in the fast case, do nothing else (don't check if assumed file
    # locations are actually there).  in the fast case, all we do is
    # check the replication policy, which is already done, so finish.
    return HANDLED if $self->{opt_nostat};

    # stat each device to see if it's still there.  on first problem,
    # stop and go into the slow(er) fix function.
    my $err;
    my $rv = $self->parallel_check_sizes([ $fid->devfids ], sub {
        my ($dfid, $disk_size) = @_;
        if (! defined $disk_size) {
            my $dev  = $dfid->device;
            error("Connectivity problem reaching device " . $dev->id . " on host " . $dev->host->ip . "\n");
            $err = "stalled";
            return 0;
        }
        return 1 if $disk_size == $fid->length;
        $err = "needfix";
        # Note: not doing fsck_log, as fix_fid will log status for each device.
        return 0;
    });

    if ($rv) {
        return HANDLED;
    } elsif ($err eq "stalled") {
        return STALLED;
    } elsif ($err eq "needfix") {
        return $fix->();
    } else {
        die "Unknown error checking fid sizes in parallel.\n";
    }
}

sub parallel_check_sizes {
    my ($self, $dflist, $cb) = @_;
    # serial, for now: (just prepping for future parallel future,
    # getting interface right)
    foreach my $df (@$dflist) {
        my $size = $self->size_on_disk($df);
        return 0 unless $cb->($df, $size);
    }
    return 1;
}

# this is the slow path.  if something above in check_fid finds
# something amiss in any way, we went the slow path on a fid and try
# really hard to fix the situation.
#
# return true if situation handled, 0 if nothing could be done.
# die on errors (like connectivity problems).
use constant CANT_FIX => 0;
sub fix_fid {
    my ($self, $fid) = @_;
    error(sprintf("Fixing FID %d\n", $fid->id));

    # make devfid objects from the devids that this fid is on,
    my @dfids = map { MogileFS::DevFID->new($_, $fid) } $fid->devids;

    # track all known good copies (dev objects), as well as all bad
    # copies (places it should've been, but isn't)
    my @good_devs;
    my @bad_devs;
    my %already_checked;  # devid -> 1.

    my $check_dfids = sub {
        my $is_desperate_mode = shift;

        # stat all devices.
        foreach my $dfid (@dfids) {
            my $dev = $dfid->device;
            next if $already_checked{$dev->id}++;

            my $disk_size = $self->size_on_disk($dfid);
            die "dev unreachable" unless defined $disk_size;

            if ($disk_size == $fid->length) {
                push @good_devs, $dfid->device;
                # if we were doing a desperate search, one is enough, we can stop now!
                return if $is_desperate_mode;
                next;
            }

            # don't log in desperate mode, as we'd have "file missing!" log entries
            # for every device in the normal case, which is expected.
            unless ($is_desperate_mode) {
                if (! $disk_size) {
                    $fid->fsck_log(EV_FILE_MISSING, $dev);
                } else {
                    $fid->fsck_log(EV_BAD_LENGTH, $dev);
                }
            }

            push @bad_devs, $dfid->device;
        }
    };

    $check_dfids->();

    # if we didn't find it anywhere, let's go do an exhaustive search over
    # all devices, looking for it...
    unless (@good_devs) {
        # replace @dfids with list of all (alive) devices.  dups will be ignored by
        # check_dfids
        $fid->fsck_log(EV_START_SEARCH);
        @dfids = List::Util::shuffle(
                                     map  { MogileFS::DevFID->new($_, $fid)  }
                                     grep { $_->dstate->should_fsck_search_on }
                                     MogileFS::Device->devices
                                     );
        $check_dfids->("desperate");

        # still can't fix it?
        return CANT_FIX unless @good_devs;

        # wow, we actually found it!
        $fid->fsck_log(EV_FOUND_FID);
        $fid->note_on_device($good_devs[0]); # at least one good one.

        # fall through to check policy (which will most likely be
        # wrong, with only one file_on record...) and re-replicate
    }

    # remove the file_on mappings for devices that were bogus/missing.
    foreach my $bdev (@bad_devs) {
        error("removing file_on mapping for fid=" . $fid->id . ", dev=" . $bdev->id);
        $fid->forget_about_device($bdev);
    }

    # Note: this will reload devids, if they called 'note_on_device'
    # or 'forget_about_device'
    unless ($fid->devids_meet_policy) {
        $fid->enqueue_for_replication;
        $fid->fsck_log(EV_RE_REPLICATE);
        return HANDLED;
    }

    return HANDLED;
}

sub init_size_checker {
    my ($self, $fidlist) = @_;

    $self->still_alive;

    my $lo_fid = $fidlist->[0]->id;
    my $hi_fid = $fidlist->[-1]->id;

    my %size;           # $devid -> { $fid -> $size }
    my %tried_bulkstat; # $devid -> 1

    $self->{size_checker} = sub {
        my $dfid  = shift;
        my $devid = $dfid->devid;

        if (my $map = $size{$devid}) {
            return $map->{$dfid->fidid} || 0;
        }

        unless ($tried_bulkstat{$devid}++) {
            my $mogconn = $dfid->device->host->mogstored_conn;
            my $sock    = $mogconn->sock(5);
            my $good = 0;
            my $unknown_cmd = 0;
            if ($sock) {
                my $cmd = "fid_sizes $lo_fid-$hi_fid $devid\n";
                print $sock $cmd;
                my $map = {};
                while (my $line = <$sock>) {
                    if ($line =~ /^\./) {
                        $good = 1;
                        last;
                    } elsif ($line =~ /^(\d+)\s+(\d+)\s+(\d+)/) {
                        my ($res_devid, $res_fid, $size) = ($1, $2, $3);
                        last unless $res_devid == $devid;
                        $map->{$res_fid} = $size;
                    } elsif ($line =~ /^ERR/) {
                        $unknown_cmd = 1;
                        last;
                    } else {
                        last;
                    }
                }

                # we only update our $nowish (approximate time) lazily, when we
                # know time might've advanced (like during potentially slow RPC call)
                $nowish = $self->still_alive;

                if ($good) {
                    $size{$devid} = $map;
                    return $map->{$dfid->fidid} || 0;
                } elsif (!$unknown_cmd) {
                    # mogstored connection is unknown state... can't
                    # trust it, so close it.
                    $mogconn->mark_dead;
                }
            }
            error("fid_sizes mogstored cmd unavailable for dev $devid; using slower method");
        }

        # slow case (not using new command)
        $nowish = $self->still_alive;
        return $dfid->size_on_disk;
    };
}

# returns 0 on missing,
# undef on connectivity error,
# else size of file on disk (after HTTP HEAD or mogstored stat)
sub size_on_disk {
    my ($self, $dfid) = @_;
    return $self->{size_checker}->($dfid);
}

1;

# Local Variables:
# mode: perl
# c-basic-indent: 4
# indent-tabs-mode: nil
# End: