package AnyEvent::Task::Server;

use common::sense;

use AnyEvent;
use AnyEvent::Util;
use AnyEvent::Socket;

use AnyEvent::Task::Util;
use AnyEvent::Task::Server::Worker;


sub new {
  my ($class, %arg) = @_;
  my $self = {};
  bless $self, $class;


  $self->{all_done_cv} = AE::cv;
  $self->{children} = {};
  $self->{curr_worker_id} = 0;


  $self->{name} = $arg{name};
  $self->{setup} = $arg{setup} || sub {};
  $self->{checkout_done} = $arg{checkout_done} || sub {};
  $self->{hung_worker_timeout} = exists $arg{hung_worker_timeout} ? $arg{hung_worker_timeout} : (60*5);


  if (defined $self->{name}) {
    $0 = "AET-Server:$self->{name}";
  }


  if ($arg{listen}) {
    $self->{listen} = $arg{listen};

    my $host = $self->{listen}->[0];
    my $service = $self->{listen}->[1];

    $self->{server_guard} = tcp_server $host, $service, sub {
      my ($fh) = @_;
      $self->handle_new_connection($fh);
    };
  } else {
    die "unspecified listen path";
  }


  if (exists $arg{interface}) {
    my $interface = $arg{interface};

    if (ref $interface eq 'CODE') {
      $self->{interface} = $interface;
    } elsif (ref $interface eq 'HASH') {
      $self->{interface} = sub {
        my $method = shift;
        $interface->{$method}->(@_);
      };
    } else {
      die "interface must be a sub or a hash";
    }
  } else {
    die "unspecified interface";
  }


  return $self;
}


sub fork_task_server {
  my (@args) = @_;

  if (wantarray) {
    return AnyEvent::Task::Util::fork_anyevent_subprocess(sub {
             AnyEvent::Task::Server->new(@args)->run;
           });
  } else {
    AnyEvent::Task::Util::fork_anyevent_subprocess(sub {
      AnyEvent::Task::Server->new(@args)->run;
      return undef;
    });

    return undef;
  }
}




sub handle_new_connection {
  my ($self, $fh) = @_;

  my ($monitor_fh1, $monitor_fh2) = AnyEvent::Util::portable_socketpair;

  $self->{curr_worker_id}++;

  my $rv = fork;

  if ($rv) {
    close($fh);
    close($monitor_fh2);

    $self->{children}->{$rv} = {
      monitor_fh => $monitor_fh1,
    };
  } elsif ($rv == 0) {
    close($monitor_fh1);

    ## Don't want keep-alive pipes of other workers open in this worker
    foreach my $child (keys %{$self->{children}}) {
      close($self->{children}->{$child}->{monitor_fh});
    }

    if (defined $self->{name}) {
      $0 = "AET-Worker:$self->{name}($self->{curr_worker_id})";
    }

    AnyEvent::Task::Server::Worker::handle_worker($self, $fh, $monitor_fh2);
    die "handle_worker should never return";
  } else {
    close($fh);
    close($monitor_fh1);
    close($monitor_fh2);
    die "fork failed: $!";
  }
}


sub run {
  my ($self) = @_;

  $self->{all_done_cv}->recv;
}


1;