#!/usr/bin/env perl

=head1 SYNOPSIS

 $0 [--src src-range(default `hostname`)] --dst dst-range --sp src-path [--dp dst-path] \
    [--timeout seconds(default 300)]
    [--max number(default 128)]
    [--retry number(default 2)]
    [--gave number(default 3)]

    [--user username(default `id -un`)] 
    [--sudo user1 ] 
    [--chown root]
    [--chmod 777]
    [--cc]
    [--delete]

     -1      Forces grsync to try protocol version 1
     -2      Forces grsync to try protocol version 2
     -3      Forces grsync to try protocol version 3
     -4      Forces grsync to try protocol version 4

     --sp /path/file --dp /path/foo/newfile
     --sp /path/file --dp /path/foo/
     --sp /path/     --dp /path/foo/
     --sp /path/foo* --dp /path/foo/
     --sp '/path/file1;/path/file2' --dp /path/foo/

     [--immediately]
     [--addr 10.10.10.10]
     [--listen 9999]

=cut
use strict;
use warnings;
use threads;
use threads::shared;
use Sys::Hostname;

use MYDan::Node;
use MYDan::Agent::GrsyncM;
use MYDan::Util::OptConf;

use AE;
use AnyEvent;  
use AnyEvent::Socket;  
use AnyEvent::Handle;  
use Data::UUID;

use MYDan::Util::Progress;
use MYDan::Util::Percent;

$| ++;

@MYDan::Util::OptConf::CONF = qw( pass_through no_ignore_case );

my $option = MYDan::Util::OptConf->load();
my %o = $option->set( retry => 2, timeout => 300, gave => 3, addr => 0 )
    ->get( qw( src=s dst=s sp=s dp=s timeout=i max=i retry=i nice=i user=s sudo=s gave=i chown=s chmod=s cc delete 1 2 3 4 immediately addr=s listen=i ) )
    ->dump();

$o{delete $o{ProtocolVersion}} = 1 unless $o{ProtocolVersion} && $o{ProtocolVersion} =~ /^\d$/ && grep{ $o{$_} }1..3;

$option->assert( qw( dst sp ) );

my ( $ptime, $pcb, $p ) = ( 0 );

my %pp:shared;

if( $o{immediately} )
{
    $p = MYDan::Util::Progress->new();
    my ( %percent, %pcb );
    $pcb = sub
    {
        my ( $info, $node, $pp ) = @_;
        return unless defined $info;
        $node ||= 'localhost';
        my $percb = sub{ $pcb{$node} = shift };

        unless( $percent{$node} )
        {
            if( $info =~ /^\d+$/ )
            {
                my $x = $pp ? "2$pp" : '';
                $percent{$node} = MYDan::Util::Percent->new( 
                    $info, "Dump$x." .hostname. " ..", $percb );
                return;
            }
            else
            {
                $percent{$node} = MYDan::Util::Percent->new( 
                    undef, 'sync ..', $percb );
            }
        }

        map{ 
            if( $_ =~ /^\d+$/ )
            {
                $percent{$node}->add( $_ )->print()
            }
            else
            {
                $pcb{$node} = $_;
            }
        }split "\n", $info;

        $pp{$node} = $pcb{$node};
        my $time = time;
        if( $time ne $ptime )
        {
            $p->load( %pp );
            $p->print();
            $ptime = $time;
        }
    };

    my $listen = $o{listen};
    unless( $listen )
    {
        my $scan = `netstat  -tun|awk '{print \$4}'|awk -F: '{print \$2}'`;
        my %open = map{ $_ => 1 }my @open = $scan =~ /(\d+)/g;
        my %port = map{ $_ => 1 }65112 .. 65535;
        ( $listen ) = grep{ ! $open{$_} }keys %port;
    }
    
    my ( $cv, $index, $uuid, %index, %head ) = ( AE::cv, 0, Data::UUID->new->create_str() );
    tcp_server undef, $listen, sub {
        my ( $fh, $ip, $port ) = @_ or die "[MYDan]tcp_server: $!";
    
        $index ++;
        warn "[MYDan]tcp connet from $ip:$port\n";
    
        my $handle; $handle = new AnyEvent::Handle(
            fh => $fh,
            rbuf_max => 10240000,
            wbuf_max => 10240000,
            autocork => 1,
            on_read => sub {
                my $self = shift;
                $self->unshift_read (
                	chunk => length $self->{rbuf},
                	sub {
                        if( $head{"$ip:$port"} )
                        {
                            map{ 
                                if( $_ =~ /^(.+) => (.+)$/ )
                                {
                                    $pp{$1} = $2;
                                    $p->load( %pp );
                                }
                                else
                                {
                                    $pp{$head{"$ip:$port"}} = $_;
                                    $p->load( %pp );
                                }
                            }split "\n", $_[1];
                            my $time = time;
                            if( $time ne $ptime )
                            {
                                $p->print();
                                $ptime = $time;
                            }
                        }
                        else
                        {
                            my $x = $_[1];

                            $x =~ s/^([^:]+):([^:]+)://;
                            if( $1 ne $uuid )
                            {
                                $handle->push_shutdown;
                                return;
                            }
                            warn "[MYDan]host:$2 ip:$ip port:$port:\n";
                            $head{"$ip:$port"} = $2;
                        }
                    },
                )
            },

            on_error => sub{
                warn "[MYDan]tcp error: $ip:$port\n";
            },
            on_eof => sub{
                warn "[MYDan]tcp close: $ip:$port\n";
            },
        );
        $index{$index}{handle} = $handle;
    };
    
    my $env = 'MYDan_rlog='.join ":", $uuid, $o{addr}, $listen;
    $o{env} = $o{env} ? join( ';', $env, $o{env} ) : $env;
}

if( $o{env} )
{
    my %env;
    map{ my @env = split /=/, $_; $env{$env[0]} = $env[1] if 2 == @env }split /;/, $o{env};
    $o{env} = \%env if %env;
}

$o{dp} ||= $o{sp};

$o{user} = `id -un` and chop $o{user} unless $o{user};

my $range = MYDan::Node->new( $option->dump( 'range' ) );

my %sync = (
    ( map { $_ => [ $range->load( delete $o{$_} || [] )->list ] } qw( src dst ) ),
    agent => +{ $option->dump( 'agent' ) }
);

if( $o{dp} eq $o{sp} )
{
    my %src = map{ $_ => 1 }@{$sync{src}};
    die "'$o{sp}' are the same file on same host.\n" unless grep{ ! $src{$_} }@{$sync{dst}};
}

$o{pcb} = $pcb;
my @failed = MYDan::Agent::GrsyncM->new( opt => \%o, sync => \%sync )->run();

if( $o{immediately} )
{
    $p->load( %pp );
    $p->print();
}

exit 0 unless  @failed;
die $range->load( \@failed )->dump . ": failed.\n\n";