package Mojolicious::Plugin::Minion::Workers; use Mojo::Base 'Mojolicious::Plugin::Minion'; our $VERSION = '0.090771';# as to Minion/100+0.000 has minion => undef, weak=>1; has qw(conf); sub register { my ($self, $app, $conf) = @_; my $workers = delete $conf->{workers}; my $manage = delete $conf->{manage}; my $tasks = delete $conf->{tasks} || {}; $conf->{Pg} = $conf->{Pg}->($app) if ref($conf->{Pg}) eq 'CODE'; $self->SUPER::register($app, $conf) unless $app->renderer->get_helper('minion'); $self->minion($app->minion); $self->conf({ %$conf, workers => $workers, is_manage => !$ARGV[0] || $ARGV[0] eq 'daemon' || $ARGV[0] eq 'prefork', is_prefork => $ENV{HYPNOTOAD_APP} || ($ARGV[0] && $ARGV[0] eq 'prefork'), }); $app->minion->attr('workers'=> sub { $self }, weak=>1); while (my ($name, $sub) = each %$tasks) { $app->log->debug(sprintf("Applied task [%s] in [%s] from config", $name, $app->minion->add_task($name => $sub))); } $self->manage() and $self->conf->{is_manage} = 0 if $manage; return $self; } sub manage { my ($self, $workers) = @_; my $conf = $self->conf; return unless $conf->{is_manage}; $workers ||= $conf->{workers} or return; my $minion = $self->minion; #~ $minion->app->log->info("Minion tasks @{[ keys %{$minion->tasks} ]}"); if ($conf->{is_prefork}) { $self->prefork; } else { $self->subprocess; } return $self; } # Cases: hypnotoad script/app.pl | perl script/app.pl prefork sub prefork { my ($self) = @_; # case hot deploy and kill -USR2 return #~ if $hypnotoad_pid && !$ENV{HYPNOTOAD_STOP}; if $ENV{HYPNOTOAD_PID} && !$ENV{HYPNOTOAD_STOP}; $self->kill_workers; return if $ENV{HYPNOTOAD_STOP}; my $workers = $self->conf->{workers}; while ($workers--) { defined(my $pid = fork()) || die "Can't fork: $!"; next if $pid; $self->worker_run; CORE::exit(0); } } # Cases: morbo script/app.pl | perl script/app.pl daemon sub subprocess { my ($self) = @_; $self->kill_workers; # subprocess allow run/restart worker later inside app worker my $subprocess = Mojo::IOLoop::Subprocess->new(); $subprocess->run( sub { my $subprocess = shift; $self->worker_run; return $$; }, sub {1} ); # Dont $subprocess->ioloop->start here! } sub worker_run { my ($self) = @_; my $minion = $self->minion; $ENV{MINION_PID} = $$; $0 = "$0 minion worker"; $minion->app->log->info("Minion worker (pid $$) was started"); $minion->worker->run; } sub kill_workers { my ($self, $workers) = @_; my $minion = $self->minion; $workers ||= $minion->backend->list_workers->{workers}; kill 'QUIT', $_->{pid} and $minion->app->log->info("Minion worker (pid $_->{pid}) was stoped") for @$workers; } 1; __END__ use Mojo::File 'path'; # check process sub check_pid { my ($pid_path) = @_; return undef unless -r $pid_path; my $pid = $pid_path->slurp; chomp $pid; # Running return $pid if $pid && kill 0, $pid; # Not running return undef; } # case hot deploy #~ my $hypnotoad_pid_file = $minion->app->config->{hypnotoad}{pid_file}; #~ my $hypnotoad_pid = check_pid( #~ ($hypnotoad_pid_file && path($hypnotoad_pid_file)) #~ || path($ENV{HYPNOTOAD_APP})->sibling('hypnotoad.pid') #~ ); # Minion job here would be better for graceful restart worker # when hot deploy hypnotoad (TODO) #~ # all workers sub list_workers { my ($minion) = @_; $minion->backend->pg->db->query(q{ select *, extract(epoch from now()-started) as "time_work", count(*) over() as total from minion_workers })->expand->hashes->to_array; } sub killer_task { my ($job, $worker, $log) = @_; return unless $worker->{pid} eq $ENV{MINION_PID}; $log && $log->info("Minion worker (pid $worker->{pid}) was stoped"); kill 'QUIT', $worker->{pid}; $job->finish($worker->{pid}); }