package AI::Evolve::Befunge::Migrator;
use strict;
use warnings;
use Carp;
use IO::Select;
use IO::Socket::INET;
use Perl6::Export::Attrs;
use POSIX qw(sysconf _SC_OPEN_MAX);
use AI::Evolve::Befunge::Util;
=head1 NAME
AI::Evolve::Befunge::Migrator - connection to migration server
=head1 SYNOPSIS
my $migrator = AI::Evolve::Befunge::Migrator->new(Local => $socket);
$migrator->spin() while $migrator->alive();
=head1 DESCRIPTION
Maintains a connection to the migration server, migrationd. This
module is meant to run in a child process, which will die when the
Local socket is closed.
It provides a non-blocking, fault tolerant adaptation layer between
migrationd (which may be somewhere across the internet) and the
AI::Evolve::Befunge population object (which spends most of its time
evolving critters, and only occasionally polls us).
=head1 CONSTRUCTOR
=head2 new
my $migrator = AI::Evolve::Befunge::Migrator->new(Local => $socket);
Construct a new Migrator object.
The Local parameter is mandatory, it is the socket (typically a UNIX
domain socket) used to pass critters to and from the parent process.
Note that you probably don't want to call this directly... in most
cases you should call spawn_migrator, see below.
=cut
sub new {
my ($package, %args) = @_;
croak("The 'Local' parameter is required!") unless exists $args{Local};
my $host = global_config('migrationd_host', 'quack.glines.org');
my $port = global_config('migrationd_port', 29522);
my $self = {
host => $host,
port => $port,
dead => 0,
loc => $args{Local},
rxbuf => '',
txbuf => '',
lastc => 0,
};
return bless($self, $package);
}
=head2 spawn_migrator
my $socket = spawn_migrator($config);
Spawn off an external migration child process. This process will live
as long as the returned socket lives; it will die when the socket is
closed. See AI::Evolve::Befunge::Migrator for implementation details.
=cut
sub spawn_migrator :Export(:DEFAULT) {
my ($sock1, $sock2) = IO::Socket->socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC);
my $pid = fork();
if($pid) {
close($sock2);
return $sock1;
}
close($sock1);
for my $fd (0..sysconf(_SC_OPEN_MAX)-1) {
next if $fd == $sock2->fileno();
next if $fd == STDERR->fileno();
POSIX::close($fd);
}
$sock2->blocking(0);
my $migrator = AI::Evolve::Befunge::Migrator->new(Local => $sock2);
$migrator->spin() while $migrator->alive();
exit(0);
}
=head1 METHODS
=head2 spin
$migrator->spin();
This is the main control component of this module. It looks for
incoming events and responds to them.
=cut
sub spin {
my $self = shift;
$self->spin_reads();
$self->spin_writes();
$self->spin_exceptions();
}
=head2 spin_reads
$migrator->spin_reads();
Handle read-related events. This method will delay for up to 2
seconds if no reading is necessary.
=cut
sub spin_reads {
my $self = shift;
$self->try_connect() unless defined $$self{sock};
my $select = IO::Select->new($$self{loc});
$select->add($$self{sock}) if defined $$self{sock};
my @sockets = $select->can_read(2);
foreach my $socket (@sockets) {
if($socket == $$self{loc}) {
my $rv = $socket->sysread($$self{txbuf}, 4096, length($$self{txbuf}));
$$self{dead} = 1 unless $rv;
} else {
my $rv = $socket->sysread($$self{rxbuf}, 4096, length($$self{rxbuf}));
if(!defined($rv) || $rv < 0) {
debug("Migrator: closing socket due to read error: $!\n");
undef $$self{sock};
next;
}
if(!$rv) {
debug("Migrator: closing socket due to EOF\n");
undef $$self{sock};
}
}
}
}
=head2 spin_writes
$migrator->spin_writes();
Handle write-related events. This method will not block.
=cut
sub spin_writes {
my $self = shift;
$self->try_connect() unless defined $$self{sock};
return unless length($$self{txbuf} . $$self{rxbuf});
my $select = IO::Select->new();
$select->add($$self{loc}) if length $$self{rxbuf};
$select->add($$self{sock}) if(length $$self{txbuf} && defined($$self{sock}));
my @sockets = $select->can_write(0);
foreach my $socket (@sockets) {
if($socket == $$self{loc}) {
my $rv = $socket->syswrite($$self{rxbuf}, length($$self{rxbuf}));
if($rv > 0) {
substr($$self{rxbuf}, 0, $rv, '');
}
debug("Migrator: write on loc socket reported error $!\n") if($rv < 0);
}
if($socket == $$self{sock}) {
my $rv = $socket->syswrite($$self{txbuf}, length($$self{txbuf}));
if(!defined($rv)) {
debug("Migrator: closing socket due to undefined syswrite retval\n");
undef $$self{sock};
next;
}
if($rv > 0) {
substr($$self{txbuf}, 0, $rv, '');
}
if($rv < 0) {
debug("Migrator: closing socket due to write error $!\n");
undef $$self{sock};
}
}
}
}
=head2 spin_exceptions
$migrator->spin_exceptions();
Handle exception-related events. This method will not block.
=cut
sub spin_exceptions {
my $self = shift;
my $select = IO::Select->new();
$select->add($$self{loc});
$select->add($$self{sock}) if defined($$self{sock});
my @sockets = $select->has_exception(0);
foreach my $socket (@sockets) {
if($socket == $$self{loc}) {
debug("Migrator: dying: select exception on loc socket\n");
$$self{dead} = 1;
}
if($socket == $$self{sock}) {
debug("Migrator: closing socket due to select exception\n");
undef $$self{sock};
}
}
}
=head2 alive
exit unless $migrator->alive();
Returns true while migrator still wants to live.
=cut
sub alive {
my $self = shift;
return !$$self{dead};
}
=head2 try_connect
$migrator->try_connect();
Try to establish a new connection to migrationd.
=cut
sub try_connect {
my $self = shift;
my $host = $$self{host};
my $port = $$self{port};
my $last = $$self{lastc};
return if $last > (time() - 2);
return if $$self{dead};
debug("Migrator: attempting to connect to $host:$port\n");
$$self{lastc} = time();
$$self{sock} = IO::Socket::INET->new(
Proto => 'tcp',
PeerAddr => $host,
PeerPort => $port,
Blocking => 0,
);
}
1;