#!/usr/bin/perl

eval 'exec /usr/bin/perl -w -S $0 ${1+"$@"}'
    if 0; # not running under some shell

#
# MogileFS storage node daemon
#  (perlbal front-end)
#
# (c) 2004, Brad Fitzpatrick, <brad@danga.com>
# (c) 2006-2007, Six Apart, Ltd.

use strict;
use lib 'lib';
use IO::Socket::INET;
use POSIX qw(WNOHANG);
use Perlbal 1.53;
use FindBin qw($Bin $RealScript);
use Gearman::Server 1.08;
use Gearman::Client::Async 0.93;

use Mogstored::HTTPServer;
use Mogstored::HTTPServer::Perlbal;
use Mogstored::HTTPServer::Lighttpd;
use Mogstored::HTTPServer::Apache;
use Mogstored::SideChannelListener;
use Mogstored::SideChannelClient;

my $selfexe = "$Bin/$RealScript";

# State:
my %on_death;             # pid -> subref (to run when pid dies)
my %devnum_to_device;     # mogile device number (eg. 'dev1' would be '1') -> os device path (eg. '/dev/rd0')
my %osdevnum_to_device;   # os device number (fetched via stat(file)[0]) -> os device path (ec. '/dev/rd0')
my %iostat_listeners;     # fd => SideChannel client: clients interested in iostat data.
my $iostat_available = 1; # bool: iostat working.  assume working to start.
my ($iostat_pipe_r, $iostat_pipe_w);  # pipes for talking to iostat process

# Config:
my $opt_daemonize;
my $opt_config;
my $opt_iostat = 1;  # default to on now
my $max_conns = 10000;
my $http_listen = "0.0.0.0:7500";
my $mgmt_listen = "0.0.0.0:7501";
my $docroot     = "/var/mogdata";
my $default_config = "/etc/mogilefs/mogstored.conf";
my $server      = $ENV{MOGSTORED_SERVER_TYPE} || "perlbal";
my $serverbin   = "";

# Rename binary in process list to make init scripts saner
$0 = "mogstored";

my %config_opts = (
                   'iostat'       => \$opt_iostat,
                   'daemonize|d'  => \$opt_daemonize,
                   'config=s'     => \$opt_config,
                   'httplisten=s' => \$http_listen,
                   'mgmtlisten=s' => \$mgmt_listen,
                   'docroot=s'    => \$docroot,
                   'maxconns=i'   => \$max_conns,
                   'server=s'     => \$server,
                   'serverbin=s'  => \$serverbin,
                   );
usage() unless Getopt::Long::GetOptions(%config_opts);

die "Unknown server type.  Valid options: --server={perlbal,lighttpd,apache}"
    unless $server =~ /^perlbal|lighttpd|apache$/;

$opt_config = $default_config if ! $opt_config && -e $default_config;
load_config_file($opt_config => \%config_opts) if $opt_config;

# initialize basic required Perlbal machinery, for any HTTP server
Perlbal::run_manage_commands(qq{
   CREATE SERVICE mogstored
     SET role = web_server
     SET docroot = $docroot

    # don't listen... this is just a stub service.
    CREATE SERVICE mgmt
       SET role = management
     ENABLE mgmt
}, sub { print STDERR "$_[0]\n"; });

# start HTTP server
my $httpsrv_class = "Mogstored::HTTPServer::" . ucfirst($server);
my $httpsrv       = $httpsrv_class->new(
                                        listen   => $http_listen,
                                        docroot  => $docroot,
                                        maxconns => $max_conns,
                                        bin      => $serverbin,
                                        );
$httpsrv->start;

if ($opt_daemonize) {
    $httpsrv->pre_daemonize;
    Perlbal::daemonize();
} else {
    print "Running.\n";
}

$httpsrv->post_daemonize;

# kill our children processes on exit:
my $parent_pid = $$;

$SIG{TERM} = $SIG{INT} = sub {
    return unless $$ == $parent_pid; # don't let this be inherited
    kill 'TERM', grep { $_ } keys %on_death;
    POSIX::_exit(0);
};

setup_iostat_pipes();
start_disk_usage_process();
start_iostat_process() if $opt_iostat;
harvest_dead_children();  # every 2 seconds, it reschedules itself
setup_sidechannel_listener();
start_fidsizes_worker();

# now start the main loop
Perlbal::run();

############################################################################

sub usage {
    my $note = shift;
    $note = $note ? "NOTE: $note\n\n" : "";

    die "${note}Usage: mogstored [OPTS]

OPTS:
 --daemonize  -d        Daemonize
 --config=<file>        Set config file (default is /etc/mogilefs/mogstored.conf)
 --httplisten=<ip:port> IP/Port HTTP server listens on
 --mgmtlisten=<ip:port> IP/Port management/sidechannel listens on
 --docroot=<path>       Docroot above device mount points.  Defaults to /var/mogdata
";

}

# accessor for SideChannelClient:
sub Mogstored::iostat_available {
    return $iostat_available;
}

sub load_config_file {
    my ($conffile, $opts) = @_;

    # parse the mogstored config file, which is just lines of comments and
    # "key = value" lines, where keys are just the same as commandline
    # options.
    die "Config file $opt_config doesn't exist.\n" unless -e $conffile;
    open my $fh, $conffile or die "Couldn't open config file for reading: $!";
    while (<$fh>) {
        s/\#.*//;
        next unless /\S/;
        if (/SERVER max_connect/i || /CREATE SERVICE/i) {
            usage("Your $opt_config file is the old syntax.  The new format is simply lines of <key> = <value> where keys are the same as mogstored's command line options.");

        }
        die "Unknown config syntax: $_\n" unless /^\s*(\w+)\s*=\s*(.+?)\s*$/;
        my ($key, $val) = ($1, $2);
        my $dest;
        foreach my $ck (keys %$opts) {
            next unless $ck =~ /^$key\b/;
            $dest = $opts->{$ck};
        }
        die "Unknown config setting: $key\n" unless $dest;
        $$dest = $val;
    }
}

sub harvest_dead_children {
    my $dead = waitpid(-1, WNOHANG);
    if ($dead > 0) {
        my $code = delete $on_death{$dead};
        $code->() if $code;
    }
    Danga::Socket->AddTimer(2, \&harvest_dead_children);
}

sub start_fidsizes_worker {

    # Note: in this case, this load is *before* the fork (which happens
    # in Gearman::Server's start_worker), so be careful nothing
    # heavy/gross is added to the FIDSizes worker.
    require Mogstored::ChildProcess::FIDSizes;
    my $class = "Mogstored::ChildProcess::FIDSizes";
    $class->pre_exec_init;

    my $pid = Mogstored->gearman_server->start_worker(sub {
        $class->exec;
    });

    # old Gearman::Servers didn't return pid integegers
    if ($pid =~ /^\d+$/) {
        Mogstored->on_pid_death($pid, \&start_fidsizes_worker);
    }
}

sub Mogstored::on_pid_death {
    my ($class, $pid, $code) = @_;
    $on_death{$pid} = $code;
}

# returns $pid of child, if parent, else runs child.
sub start_disk_usage_process {
    my $child = fork;
    unless (defined $child) {
        Perlbal::log('crit', "Fork error creating disk usage tracking process");
        return undef;
    }

    # if we're the parent.
    if ($child) {
        $on_death{$child} = sub {
            start_disk_usage_process();  # start a new one
        };
        return $child;
    }

    require Mogstored::ChildProcess::DiskUsage;
    my $class = "Mogstored::ChildProcess::DiskUsage";
    $class->pre_exec_init;
    $class->exec;
}

sub Mogstored::iostat_subscribe {
    my ($class, $sock) = @_;
    $iostat_listeners{fileno($sock->sock)} = $sock;
}

sub Mogstored::iostat_unsubscribe {
    my ($class, $sock) = @_;
    my $fdno = fileno($sock->sock);
    return unless defined $fdno;
    delete $iostat_listeners{$fdno};
}

# to be honest, I have no clue why this exists.  I just had to move it
# around for multi-server refactoring, and I felt better not
# understanding it but preserving than killing it.  in particular, why
# is this "graceful"?  (gets called from SideChannelClient's
# die_gracefully)
sub Mogstored::on_sidechannel_die_gracefully {
    if ($$ == $parent_pid) {
        kill 'TERM', grep { $_ } keys %on_death;
    }
}

sub setup_sidechannel_listener {
    Mogstored::SideChannelListener->new($mgmt_listen);
}

my $iostat_read_buf = "";
sub setup_iostat_pipes {
    pipe ($iostat_pipe_r, $iostat_pipe_w);
    IO::Handle::blocking($iostat_pipe_r, 0);
    IO::Handle::blocking($iostat_pipe_w, 0);

    Danga::Socket->AddOtherFds(fileno($iostat_pipe_r), sub {
        read_from_iostat_child();
    });
}

sub start_iostat_process {
    my $pid = fork;
    unless (defined $pid) {
        warn "Fork for iostat failed: $!";
        return;
    }

    if ($pid) {
        # Parent
        $on_death{$pid} = sub {
            start_iostat_process();
        };
        return;
    }

    require Mogstored::ChildProcess::IOStat;
    my $class = "Mogstored::ChildProcess::IOStat";
    $class->pre_exec_init;
    $class->exec;
}

sub Mogstored::get_iostat_writer_pipe { $iostat_pipe_w }

# (runs in parent event-loop process)
sub read_from_iostat_child {
    my $data;
    my $rv = sysread($iostat_pipe_r, $data, 10240);
    return unless $rv && $rv > 0;

    $iostat_read_buf .= $data;

    # only write complete lines to sockets (in case for some reason we get
    # a partial read and child process dies...)
    while ($iostat_read_buf =~ s/(.+)\r?\n//) {
        my $line = $1;
        foreach my $out_sock (values %iostat_listeners) {
            # where $line will be like "dev53\t53.23" or a "." to signal end of a group of devices.
            $out_sock->write("$line\n");
        }
    }
}

my $gearman_server;
sub Mogstored::gearman_server {
    return $gearman_server ||= Gearman::Server->new;
}

my $gearman_client;
sub Mogstored::gearman_client {
    return $gearman_client ||=
        Gearman::Client::Async->new(job_servers => [ Mogstored->gearman_server ]);
}

# Local Variables:
# mode: perl
# c-basic-indent: 4
# indent-tabs-mode: nil
# End:

__END__

=head1 NAME

mogstored -- MogileFS storage daemon

=head1 USAGE

This is the MogileFS storage daemon, which is just an HTTP server that
supports PUT, DELETE, etc.  It's actually a wrapper around L<Perlbal>,
doing all the proper Perlbal config for you.

In addition, it monitors disk usage, I/O activity, etc, which are
checked from the L<MogileFS tracker|mogilefsd>.

=head1 AUTHORS

Brad Fitzpatrick E<lt>brad@danga.comE<gt>

Mark Smith E<lt>junior@danga.comE<gt>

Jonathan Steinert E<lt>jsteinert@sixapart.comE<gt>

=head1 ENVIRONMENT

=over 4

=item PERLBAL_XS_HEADERS

If defined and 0, Perlbal::XS::HTTPHeaders will not be used, if
present.  Otherwise, it will be enabled by default, if installed and
loadable.

=back

=head1 COPYRIGHT

 Copyright 2004, Danga Interactive
 Copyright 2005-2006, Six Apart Ltd.

=head1 LICENSE

Same terms as Perl itself.  Artistic/GPLv2, at your choosing.

=head1 SEE ALSO

L<MogileFS::Overview> -- high level overview of MogileFS

L<mogilefsd> -- MogileFS daemon

L<http://danga.com/mogilefs/>