package MogileFS::Worker::Delete;
# deletes files

use strict;
use base 'MogileFS::Worker';
use MogileFS::Util qw(error);

# we select 1000 but only do a random 100 of them, to allow
# for stateless paralleism
use constant LIMIT => 1000;
use constant PER_BATCH => 100;

# TODO: use LWP and persistent connections to do deletes.  less local ports used.

sub new {
    my ($class, $psock) = @_;
    my $self = fields::new($class);
    $self->SUPER::new($psock);

    return $self;
}

sub watchdog_timeout { 60 }

sub work {
    my $self = shift;

    my $sleep_for = 0; # we sleep longer and longer until we hit max_sleep
    my $sleep_max = 5; # max sleep when there's nothing to do.

    # wait for one pass of the monitor
    $self->wait_for_monitor;

  PASS:
    while (1) {
        $self->parent_ping;
        $self->validate_dbh;

        # see if we have anything from the parent
        my $start_time = time();
        my $end_time   = $start_time + 5;

        while (1) {
            # report in to parent periodically
            next PASS if time() >= $end_time;

            # call our workers, and have them do things
            #    RETVAL = 0; I think I am done working for now
            #    RETVAL = 1; I have more work to do
            my $tempres = $self->process_tempfiles;
            $self->reenqueue_delayed_deletes;
            my $delres = $self->process_deletes;

            # unless someone did some work, let's sleep
            unless ($tempres || $delres) {
                $sleep_for++ if $sleep_for < $sleep_max;
                sleep $sleep_for;
                next PASS;
            }
            $sleep_for = 0;
        }
    }

}

sub process_tempfiles {
    my $self = shift;
    # also clean the tempfile table
    #mysql> select * from tempfile where createtime < unix_timestamp() - 86400 limit 50;
    #+--------+------------+---------+------+---------+--------+
    #| fid    | createtime | classid | dmid | dkey    | devids |
    #+--------+------------+---------+------+---------+--------+
    #|   3253 | 1149451058 |       1 |    1 | file574 | 1,2    |
    #|   4559 | 1149451156 |       1 |    1 | file83  | 1,2    |
    #|  11024 | 1149451697 |       1 |    1 | file836 | 2,1    |
    #|  19885 | 1149454542 |       1 |    1 | file531 | 1,2    |

    # BUT NOTE:
    #    the fids might exist on one of the devices in devids column if we assigned them those,
    #    they wrote some to one of them, then they died or for wahtever reason didn't create_close
    #    to use, so we shouldn't delete from tempfile before going on a hunt of the missing fid.
    #    perhaps we should just add to the file_on table for both devids, and let the regular delete
    #    process discover via 404 that they're not there.
    # so we should:
    #    select fid, devids from tempfile where createtime < unix_timestamp() - 86400
    #    add file_on rows for both of those,
    #    add fid to fids_to_delete table,
    #    delete from tempfile where fid=?


    # dig up some temporary files to purge
    my $sto = Mgd::get_store();
    my $too_old = int($ENV{T_TEMPFILE_TOO_OLD} || 3600);
    my $tempfiles = $sto->old_tempfiles($too_old);
    return 0 unless $tempfiles && @$tempfiles;

    # insert the right rows into file_on and file_to_delete and remove the
    # now expunged (or soon to be) rows from tempfile
    my (@devfids, @fidids);
    foreach my $row (@$tempfiles) {
        push @fidids, $row->[0];

        # sanity check the string column.
        my $devids = $row->[1];
        unless ($devids =~ /^(\d+)(,\d+)*$/) {
            $devids = "";
        }

        foreach my $devid (split /,/, $devids) {
            push @devfids, MogileFS::DevFID->new($devid, $row->[0]);
        }
    }

    $sto->mass_insert_file_on(@devfids);
    $sto->enqueue_fids_to_delete(@fidids);
    $sto->dbh->do("DELETE FROM tempfile WHERE fid IN (" . join(',', @fidids) . ")");
    return 1;
}

sub process_deletes {
    my $self = shift;

    my $sto = Mgd::get_store();
    my $dbh = $sto->dbh;

    my $delmap = $dbh->selectall_arrayref("SELECT fd.fid, fo.devid ".
                                          "FROM file_to_delete fd ".
                                          "LEFT JOIN file_on fo ON fd.fid=fo.fid ".
                                          "LIMIT " . LIMIT);
    my $count = $delmap ? scalar @$delmap : 0;
    return 0 unless $count;

    my $done = 0;
    foreach my $dm (List::Util::shuffle(@$delmap)) {
        last if ++$done > PER_BATCH;

        $self->still_alive;
        $self->read_from_parent;
        my ($fid, $devid) = @$dm;
        error("deleting fid $fid, on devid ".($devid || 'NULL')."...") if $Mgd::DEBUG >= 2;

        my $done_with_fid = sub {
            my $reason = shift;
            $dbh->do("DELETE FROM file_to_delete WHERE fid=?", undef, $fid);
            $sto->condthrow("Failure to delete from file_to_delete for fid=$fid");
        };

        my $done_with_devid = sub {
            my $reason = shift;
            $dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
                     undef, $fid, $devid);
            $sto->condthrow("Failure to delete from file_on for $fid/$devid");
            die "Failed to delete from file_on: " . $dbh->errstr if $dbh->err;
        };

        my $reschedule_fid = sub {
            my ($secs, $reason) = (int(shift), shift);
            $sto->insert_ignore("INTO file_to_delete_later (fid, delafter) ".
                "VALUES (?,".$sto->unix_timestamp."+$secs)", undef,
                $fid);
            error("delete of fid $fid rescheduled: $reason") if $Mgd::DEBUG >= 2;
            $done_with_fid->("rescheduled");
        };

        # Cases:
        #   devid is null:  doesn't exist anywhere anymore, we're done with this fid.
        #   devid is observed down/readonly: delay for 10 minutes
        #   devid is marked readonly: delay for 2 hours
        #   devid is marked dead or doesn't exist: consider it deleted on this devid.

        # CASE: devid is null, which means we're done deleting all instances.
        unless (defined $devid) {
            $done_with_fid->("no_more_locations");
            next;
        }

        # CASE: devid is marked dead or doesn't exist: consider it deleted on this devid.
        # (Note: we're tolerant of '0' as a devid, due to old buggy version which
        # would sometimes put that in there)
        my $dev = $devid ? MogileFS::Device->of_devid($devid) : undef;
        unless ($dev && $dev->exists) {
            $done_with_devid->("devid_doesnt_exist");
            next;
        }
        if ($dev->dstate->is_perm_dead) {
            $done_with_devid->("devid_marked_dead");
            next;
        }

        # CASE: devid is observed down/readonly: delay for 10 minutes
        unless ($dev->observed_writeable) {
            $reschedule_fid->(60 * 10, "not_observed_writeable");
            next;
        }

        # CASE: devid is marked readonly/down/etc: delay for 2 hours
        unless ($dev->can_delete_from) {
            $reschedule_fid->(60 * 60 * 2, "devid_marked_not_alive");
            next;
        }

        my $dfid = MogileFS::DevFID->new($dev, $fid);
        my $path = $dfid->url;

        # dormando: "There are cases where url can return undefined,
        # Mogile appears to try to replicate to bogus devices
        # sometimes?"
        unless ($path) {
            error("in deleter, url(devid=$devid, fid=$fid) returned nothing");
            next;
        }

        my $urlparts = MogileFS::Util::url_parts($path);

        # hit up the server and delete it
        # TODO: (optimization) use MogileFS->get_observed_state and don't try to delete things known to be down/etc
        my $sock = IO::Socket::INET->new(PeerAddr => $urlparts->[0],
                                         PeerPort => $urlparts->[1],
                                         Timeout => 2);
        unless ($sock) {
            # timeout or something, mark this device as down for now and move on
            $self->broadcast_host_unreachable($dev->hostid);
            $reschedule_fid->(60 * 60 * 2, "no_sock_to_hostid");
            next;
        }

        # send delete request
        error("Sending delete for $path") if $Mgd::DEBUG >= 2;

        $sock->write("DELETE $urlparts->[2] HTTP/1.0\r\n\r\n");
        my $response = <$sock>;
        if ($response =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
            if (($1 >= 200 && $1 <= 299) || $1 == 404) {
                # effectively means all went well
                $done_with_devid->("deleted");
            } else {
                # remote file system error?  mark node as down
                my $httpcode = $1;
                error("Error: unlink failure: $path: HTTP code $httpcode");
                $reschedule_fid->(60 * 30, "http_code_$httpcode");
                next;
            }
        } else {
            error("Error: unknown response line deleting $path: $response");
        }
    }

    # as far as we know, we have more work to do
    return 1;
}

sub reenqueue_delayed_deletes {
    my $self = shift;

    my $sto = Mgd::get_store();
    my $dbh = $sto->dbh;

    my @fidids = $sto->fids_to_delete_again
        or return;

    $sto->enqueue_fids_to_delete(@fidids);

    $dbh->do("DELETE FROM file_to_delete_later WHERE fid IN (" .
             join(",", @fidids) . ")");
    $sto->condthrow("reenqueue file_to_delete_later delete");
}

1;

# Local Variables:
# mode: perl
# c-basic-indent: 4
# indent-tabs-mode: nil
# End: