package Minion::Backend::mysql;

use 5.010;

use Mojo::Base 'Minion::Backend';

use constant DEBUG => $ENV{MINION_BACKEND_DEBUG} || 0;
use Mojo::IOLoop;
use Mojo::JSON qw(encode_json decode_json);
use Mojo::mysql;
use Scalar::Util qw(blessed);
use Sys::Hostname 'hostname';
use Time::Piece ();

has 'mysql';
has 'no_txn' => sub { 0 };

our $VERSION = '1.001';

# The dequeue system has a couple limitations:
# 1. There is no way to directly notify a sleeping worker of an incoming
#    job
# 2. There is a race condition between identifying a runnable job
#    and claiming it for the current worker.
#
# The first is solved by Mojo::mysql::PubSub, which currently makes
# a new connection to MySQL, records the connection ID, and then sleeps. 
# When a message is "published", the publisher writes the message to
# a table and then `KILL`s all the sleeping "subscribers". One of the
# subscribers reads the message from the table and continues.
#
# The second is solved by checking how many rows are `UPDATE`d when
# claiming the job. If none, we try again to dequeue immediately. This
# happens $DEQUEUE_RACE_ATTEMPTS times before giving up.
#
# When it gives up, there may still be jobs available to be dequeued. In
# most cases, this is okay: The worker will start sleeping for an
# incoming job and when one comes in, it will wake up and start
# dequeuing again (even if the new job itself isn't ready to run).
my $DEQUEUE_RACE_ATTEMPTS = 10;

sub dequeue {
  my ($self, $worker_id, $wait, $options) = @_;

  if ((my $job = $self->_try($worker_id, $options))) { return $job }
  return undef if Mojo::IOLoop->is_running;

  my $cb = $self->mysql->pubsub->listen("minion.job" => sub {
    Mojo::IOLoop->stop;
  });

  my $timer = Mojo::IOLoop->timer($wait => sub { Mojo::IOLoop->stop });
  Mojo::IOLoop->start;

  $self->mysql->pubsub->unlisten("minion.job" => $cb) and Mojo::IOLoop->remove($timer);

  return $self->_try($worker_id, $options);
}

sub history {
  my $self = shift;

  my $sql = <<SQL;
SELECT
  MIN(UNIX_TIMESTAMP(jobs.finished)) as `epoch`,
  DAY(jobs.finished) as `day`,
  HOUR(jobs.finished) as `hour`,
  SUM(CASE jobs.state WHEN 'failed' THEN 1 ELSE 0 END) AS failed_jobs,
  SUM(CASE jobs.state WHEN 'finished' THEN 1 ELSE 0 END) AS finished_jobs
FROM minion_jobs jobs
WHERE jobs.finished > SUBTIME(NOW(), '23:00:00')
GROUP BY `day`, `hour`
ORDER BY `day`, `hour`
SQL

  my $data = $self->mysql->db->query($sql)->hashes;

  # Fill in missing hours to create a full time series
  my $now = Time::Piece->new();
  my $current_hour = $now->hour;
  for my $i ( 0..23 ) {
    my $i_hour = ( $current_hour - ( 23 - $i ) ) % 24;
    if ( exists $data->[$i] and $data->[ $i ]{ hour } != $i_hour ) {
      my $epoch = $now->epoch - ( 3600 * ( 24 - $i ) );
      splice @$data, $i, 0, {
        epoch => $epoch - ( $epoch % 3600 ),
        failed_jobs => 0,
        finished_jobs => 0,
      };
    }
    else {
      delete $data->[ $i ]{hour};
      delete $data->[ $i ]{day};
    }
  }

  return {daily => $data};
}

sub enqueue {
  my ($self, $task) = (shift, shift);
  my $args    = shift // [];
  my $options = shift // {};

  # Pre-compute parameters to reduce time holding DB transaction
  my @insert_params = (
    encode_json( $args ),
    $options->{attempts} // 1,
    $options->{delay} // 0,
    ($options->{expire})x2,
    $options->{lax} ? 1 : 0,
    $options->{priority} // 0,
    $options->{queue} // 'default',
    $task,
  );

  my @parents = @{ $options->{parents} // [] };
  my %notes = %{ $options->{notes} // {} };

  my ( $insert_parents_sql, $insert_notes_sql );
  if ( @parents ) {
    $insert_parents_sql = "INSERT IGNORE INTO minion_jobs_depends (`parent_id`, `child_id`) VALUES "
      . join( ", ", map "( ?, ? )", @parents );
  }
  if ( keys %notes ) {
    $insert_notes_sql = 'INSERT INTO minion_notes (`job_id`, `note_key`, `note_value`) VALUES '
      . join( ', ', map '( ?, ?, ? )', keys %notes );
    $notes{ $_ } = encode_json( $notes{ $_ } ) for keys %notes;
  }

  my $db = $self->mysql->db;
  # If we are adding data in related tables, use a transaction to commit
  # the entire job at once. Without this, a job may be started before
  # one of its parents, since the parent restriction is added after the
  # job itself.
  my $tx = (!$insert_notes_sql && !$insert_parents_sql) || $self->no_txn ? undef : $db->begin;

  my $job_id = $db->query(
    "insert into minion_jobs (`args`, `attempts`, `delayed`, `expires`, `lax`, `priority`, `queue`, `task`)
     values (?, ?, (DATE_ADD(NOW(), INTERVAL ? SECOND)), case when ? is not null then date_add( now(), interval ? second ) end, ?, ?, ?, ?)",
     @insert_params,
  )->last_insert_id;

  if ( $insert_parents_sql ) {
    my @insert_parents_params = map { $_, $job_id  } @parents;
    $db->query( $insert_parents_sql, @insert_parents_params );
  }

  if ( $insert_notes_sql ) {
    my @insert_notes_params = map { $job_id, $_, $notes{$_} } keys %notes;
    $db->query( $insert_notes_sql, @insert_notes_params );
  }

  $tx->commit if defined $tx;
  $self->mysql->pubsub->notify("minion.job" => $job_id);

  return $job_id;
}

sub note {
  my ($self, $id, $notes) = @_;

  my @replace_keys = grep defined $notes->{ $_ }, keys %$notes;
  my @delete_keys = grep !defined $notes->{ $_ }, keys %$notes;

  my ( $replaced, $deleted );
  my $db = $self->mysql->db;
  if ( @replace_keys ) {
    local $@; # Don't clobber global
    $replaced = !!eval {
      $db->query(
        'REPLACE INTO minion_notes (`job_id`, `note_key`, `note_value`) VALUES '
        . join( ', ', map '( ?, ?, ? )', @replace_keys ),
        map { $id, $_, encode_json( $notes->{$_} ) } @replace_keys
      )->rows;
    };
  }
  if ( @delete_keys ) {
    $deleted = !!$db->delete(
      minion_notes => {
        job_id => $id,
        note_key => { -in => \@delete_keys },
      }
    )->rows;
  }

  return $replaced || $deleted;
}

sub fail_job   { shift->_update(1, @_) }
sub finish_job { shift->_update(0, @_) }

sub list_jobs {
  my ($self, $offset, $limit, $options) = @_;

  my ( @where, @params );
  if ( my $states = $options->{states} ) {
    push @where, 'j.state in (' . join( ',', ('?') x @$states ) . ')';
    push @params, @$states;
  }
  if ( my $queues = $options->{queues} ) {
    push @where, 'j.queue in (' . join( ',', ('?') x @$queues ) . ')';
    push @params, @$queues;
  }
  if ( my $tasks = $options->{tasks} ) {
    push @where, 'j.task in (' . join( ',', ('?') x @$tasks ) . ')';
    push @params, @$tasks;
  }
  if ( my $ids = $options->{ids} ) {
    push @where, 'j.id in (' . join( ',', ('?') x @$ids ) . ')';
    push @params, @$ids;
  }
  if ( my $id = $options->{before} ) {
    push @where, 'j.id < ?';
    push @params, $id;
  }
  if ( my $notes = $options->{notes} ) {
    push @where, '( '
      . join( ' or ', ('? in ( select minion_notes.note_key from minion_notes where minion_notes.job_id=j.id )')x@$notes )
      . ' )';
    push @params, @$notes;
  }

  push @where, q{(j.state != 'inactive' or j.expires is null or j.expires > now())};
  my $where = @where ? 'WHERE ' . join( ' AND ', @where ) : '';

  my $db = $self->mysql->db;

  my $list_sql = qq{
    SELECT
      minion_jobs.id, minion_jobs.args, minion_jobs.attempts,
      minion_jobs.state, minion_jobs.task, minion_jobs.worker,
      minion_jobs.lax, minion_jobs.priority, minion_jobs.queue,
      minion_jobs.result, minion_jobs.retries,
      filtered.children, filtered.parents,
      UNIX_TIMESTAMP(minion_jobs.created) AS created,
      UNIX_TIMESTAMP(minion_jobs.`delayed`) AS `delayed`,
      UNIX_TIMESTAMP(minion_jobs.finished) AS finished,
      UNIX_TIMESTAMP(minion_jobs.retried) AS retried,
      UNIX_TIMESTAMP(minion_jobs.started) AS started,
      UNIX_TIMESTAMP(NOW()) AS time,
      UNIX_TIMESTAMP(minion_jobs.expires) AS expires
    FROM minion_jobs
    JOIN (
      SELECT j.id,
        GROUP_CONCAT( child_jobs.child_id ORDER BY child_jobs.child_id SEPARATOR ':' ) AS children,
        GROUP_CONCAT( parent_jobs.parent_id ORDER BY parent_jobs.parent_id SEPARATOR ':' ) AS parents
      FROM minion_jobs j
      LEFT JOIN minion_jobs_depends child_jobs ON j.id=child_jobs.parent_id
      LEFT JOIN minion_jobs_depends parent_jobs ON j.id=parent_jobs.child_id
      $where
      GROUP BY j.id
    ) filtered USING ( id )
    ORDER BY minion_jobs.id DESC
    LIMIT ? OFFSET ?
  };

  my $jobs = $db->query( $list_sql, @params, $limit, $offset )->hashes;
  $jobs->map( _decode_json_fields(qw{ args result }) )
    ->each( sub {
      $_->{children} = [ split /:/, $_->{children} // '' ];
      $_->{parents} = [ split /:/, $_->{parents} // '' ];
      $_->{notes} = {
        $db->select( minion_notes => [qw( note_key note_value )], { job_id => $_->{id} } )
        ->arrays->map(sub{ $_->[0], decode_json( $_->[1] ) })->each
      };
    } );

  # ; use Data::Dumper;
  # ; say Dumper $jobs;

  my $total = $db->query(
    qq{SELECT COUNT(*) AS count FROM minion_jobs j $where}, @params
  )->hash->{count};

  return {
    jobs => $jobs,
    total => $total,
  }
}

sub _decode_json_fields {
  my @fields = @_;
  return sub {
    my $hash = shift;
    for my $field ( @fields ) {
      next unless $hash->{ $field };
      $hash->{ $field } = decode_json( $hash->{ $field } );
    }
    return $hash;
  };
}

sub list_workers {
  my ($self, $offset, $limit, $options) = @_;

  my ( @where, @params );
  if ( my $ids = $options->{ids} ) {
    push @where, 'id in (' . join( ',', ('?') x @{$options->{ids}} ) . ')';
    push @params, @{ $options->{ids} };
  }
  if ( my $id = $options->{before} ) {
    push @where, 'id < ?';
    push @params, $id;
  }

  my $db = $self->mysql->db;

  my $where = @where ? 'WHERE ' . join ' AND ', @where : '';
  my $sql = "SELECT
    id, UNIX_TIMESTAMP(notified) AS notified, host, pid,
    UNIX_TIMESTAMP(started) AS started, status
  FROM minion_workers $where ORDER BY id DESC LIMIT ? OFFSET ?";
  my $workers = $db->query($sql, @params, $limit, $offset)
    ->hashes;

  # Add jobs to each worker
  my $jobs_sql = q{SELECT id FROM minion_jobs WHERE state='active' AND worker=?};
  $workers->map( sub {
      $_->{status} = decode_json( $_->{status} );
      $_->{jobs} = $db->query($jobs_sql, $_->{id})->arrays->flatten->to_array
  } );

  my $total = $db->query(
    qq{SELECT COUNT(*) AS count FROM minion_workers $where}, @params
  )->hash->{count};

  return {
    workers => $workers,
    total => $total,
  };
}

sub list_locks {
  my ($self, $offset, $limit, $options) = @_;

  my ( @where, @params );
  if ( my $name = $options->{names} // $options->{name} ) {
    my @names = ref $name eq 'ARRAY' ? @$name : ( $name );
    push @where, 'name in (' . join( ',', ('?') x @names ) . ')';
    push @params, @names;
  }

  push @where, 'expires > now()';

  my $where = @where ? 'WHERE ' . join ' AND ', @where : '';
  my $sql = "SELECT
          id, name, UNIX_TIMESTAMP(expires) AS expires
      FROM minion_locks
      $where
      ORDER BY id
      DESC LIMIT ? OFFSET ?";

  my $db = $self->mysql->db;

  my $locks = $db->query($sql, @params, $limit || 0, $offset || 0)->hashes;

  my $total = $db->query(
    "SELECT COUNT(name) AS total FROM minion_locks $where", @params
  )->hash->{total};

  return {
    locks => $locks,
    total => $total,
  };
}

sub new {
  my ( $class, @args ) = @_;
  state $skip_migration = 0;

  my $mysql;
  my $force_migration = 0;
  if ( @args == 1 && blessed($args[0]) && $args[0]->isa('Mojo::mysql') ) {
    $mysql = $args[0];
    # We need to force an immediate migration if given a Mojo::mysql
    # object, because the user may decide to add another migration after
    # adding the Minion plugin.
    $force_migration = 1;
  }
  else {
    if ( ref $args[0] eq 'HASH' ) {
      @args = %{ $args[0] };
    }
    $mysql = Mojo::mysql->new(@args);
  }

  my $self = $class->SUPER::new(mysql => $mysql);

  if ( !$skip_migration ) {
    if ($force_migration) {

      # First make sure any impending migrations happen
      # before we overwrite them:
      $mysql->migrations->migrate;

      # Then load this module's migrations and run them:
      $mysql->migrations->name('minion')->from_data;
      $mysql->migrations->migrate;
    }
    else {
      # Load this module's migrations and run them
      # the first time a DB connection is attempted:
      $mysql->migrations->name('minion')->from_data;
      $mysql->once(connection => sub {
          my ( $mysql ) = @_;
          $mysql->migrations->migrate;
      });
    }
  }

  return $self;
}

sub register_worker {
  my ($self, $id, $options) = @_;

  my $db = $self->mysql->db;
  my $sql = q{INSERT INTO minion_workers (id, host, pid, status)
    VALUES (?, ?, ?, ?)
    ON DUPLICATE KEY UPDATE notified=NOW(), host=VALUES(host), pid=VALUES(pid), status=VALUES(status)};
  my $res = $db->query($sql, $id, hostname, $$, encode_json( $options->{status} // {} ) );

  return $id // $res->last_insert_id;
}

sub remove_job {
  !!shift->mysql->db->query(
    "delete minion_jobs, minion_jobs_depends from minion_jobs
     left join minion_jobs_depends
        on minion_jobs.id = minion_jobs_depends.parent_id
     where minion_jobs.id = ? and minion_jobs.state in ('inactive', 'failed', 'finished')",
     shift
  )->{affected_rows};
}

sub repair {
  my $self = shift;

  # Workers without heartbeats
  my $db     = $self->mysql->db;
  my $minion = $self->minion;
  $db->query(
    "delete from minion_workers
     where notified < DATE_SUB(NOW(), INTERVAL ? SECOND)",
     $minion->missing_after
  );

  # Old jobs with no unresolved dependencies and expired jobs
  $db->query(
    q{
      DELETE job
      FROM minion_jobs job
      LEFT JOIN minion_jobs_depends depends ON depends.parent_id = job.id
      LEFT JOIN minion_jobs child ON child.id = depends.child_id AND child.state != 'finished'
      WHERE
        (
          job.expires <= NOW() AND job.state = 'inactive'
        )
        OR (
          job.state = 'finished'
          AND job.`finished` <= DATE_SUB(NOW(), INTERVAL ? SECOND)
          AND child.id IS NULL
        )
    },
    $minion->remove_after,
  );

  # Jobs with missing worker (can be retried)
  $db->query(
    "select job.id, job.retries from minion_jobs job
     left join minion_workers worker on job.worker = worker.id
     where job.state = 'active' and job.queue != 'minion_foreground'
       and worker.id is null"
  )->hashes->each(sub { $self->fail_job(@$_{qw(id retries)}, 'Worker went away') });

  # Jobs in queue without workers or not enough workers (cannot be retried and requires admin attention)
  $db->query(
    q{update minion_jobs set state = 'failed', result = '"Job appears stuck in queue"'
      where state = 'inactive' and DATE_ADD( `delayed`, interval ? second ) < now()}, $minion->stuck_after
  );

}

sub reset {
  my ($self, $options) = (shift, shift // {});

  my $db = $self->mysql->db;
  if ( $options->{all} ) {
    $db->query("delete from minion_jobs");
    $db->query("ALTER TABLE minion_jobs AUTO_INCREMENT = 1");
    $db->query("truncate table minion_locks");
    $db->query("ALTER TABLE minion_locks AUTO_INCREMENT = 1");
    $db->query("truncate table minion_jobs_depends");
    $db->query("truncate table minion_notes");
    $db->query("DELETE FROM minion_workers");
    $db->query("ALTER TABLE minion_workers AUTO_INCREMENT = 1");
  }
  elsif ( $options->{locks} ) {
    $db->query("truncate table minion_locks");
    $db->query("ALTER TABLE minion_locks AUTO_INCREMENT = 1");
  }
}

sub lock {
  my ($self, $name, $duration, $options) = (shift, shift, shift, shift // {});
  return !!$self->mysql->db->query('SELECT minion_lock(?, ?, ?)',
    $name, $duration, $options->{limit} || 1)->array->[0];
}

sub unlock {
  !!shift->mysql->db->query(
    'DELETE FROM minion_locks
      WHERE expires > NOW() AND name = ? ORDER BY EXPIRES
      LIMIT 1', shift
  )->rows;
}

sub retry_job {
  my ($self, $id, $retries) = (shift, shift, shift);
  my $db = $self->mysql->db;
  my $options = shift // {};

  if ( my $parents = delete $options->{ parents } ) {
    $db->query(
      'DELETE FROM `minion_jobs_depends` WHERE child_id=?',
      $id,
    );
    if ( @$parents ) {
      $db->query(
        "INSERT INTO minion_jobs_depends (`parent_id`, `child_id`) VALUES "
        . join( ", ", map "( ?, ? )", @$parents ),
        map { $_, $id  } @$parents
      );
    }
  }

  return !!$db->query(
    "UPDATE `minion_jobs`
     SET attempts = COALESCE(?, attempts),
       `delayed` = DATE_ADD(NOW(), INTERVAL ? SECOND),
       expires = case when ? is not null then date_add( now(), interval ? second ) else expires end,
       lax = coalesce(?, lax),
       priority = COALESCE(?, priority), queue = COALESCE(?, queue),
       retried = NOW(), retries = retries + 1, state = 'inactive'
     WHERE id = ? AND retries = ?",
     $options->{attempts}, $options->{delay} // 0, ($options->{expire})x2,
     exists $options->{lax} ? $options->{lax} ? 1 : 0 : undef,
     @$options{qw(priority queue)}, $id, $retries
  )->{affected_rows};
}

sub stats {
  my $self = shift;

  my %stats = map { $_ => 0 } qw(
    active_workers inactive_workers active_jobs inactive_jobs failed_jobs finished_jobs
    enqueued_jobs delayed_jobs active_locks uptime
  );

  my $db = $self->mysql->db;

  # Worker statistics
  my $worker_sql = q{
    SELECT
      SUM(job.id IS NOT NULL) AS is_active
    FROM minion_workers worker
    LEFT JOIN minion_jobs job
        ON worker.id = job.worker
        AND job.state = 'active'
    GROUP BY worker.id
  };
  my $res = $db->query( $worker_sql );
  while ( my $row = $res->hash ) {
    $stats{ $row->{is_active} ? 'active_workers' : 'inactive_workers' }++;
  }

  # Job statistics
  my $job_sql = q{
    SELECT
        minion_jobs.state,
        COUNT(minion_jobs.state) AS jobs,
        SUM(minion_jobs.`delayed` > NOW()) AS `delayed`
    FROM minion_jobs
    GROUP BY minion_jobs.state
  };
  $res = $db->query( $job_sql );
  while ( my $row = $res->hash ) {
    my $state = $row->{state};
    $stats{ "${state}_jobs" } += $row->{jobs};
    $stats{ enqueued_jobs } += $row->{jobs};
    if ( $state eq 'inactive' ) {
      $stats{ delayed_jobs } += $row->{delayed};
    }
  }

  $stats{active_locks} = $db->query("SELECT COUNT(*) FROM minion_locks WHERE expires > now()")->array->[0];
  $stats{uptime} = $db->query( "SHOW GLOBAL STATUS LIKE 'Uptime'" )->hash->{Value};

  return \%stats;
}

sub unregister_worker {
  shift->mysql->db->query('delete from minion_workers where id = ?', shift);
}

sub _try {
  my ($self, $worker_id, $options) = @_;

  my $tasks = [keys %{$self->minion->tasks}];

  return unless @$tasks;

  my $queues = $options->{queues} // ['default'];

  my $qq = join ", ", map({ "?" } @$queues);
  my $qt = join ", ", map({ "?" } @$tasks );
  my @bind_vars = ( @$queues, @$tasks );

  my $sql_job_id = '';
  if ( defined $options->{id} ) {
    $sql_job_id = 'AND job.id = ?';
    push @bind_vars, $options->{id};
  }

  # The dependencies table stores a copy of the parent job's state and
  # expiration (updated automatically via triggers). We use this to roll
  # up a count of pending and failed parent jobs for the current child
  # job. If there are no pending jobs, or if the current child job is
  # "lax" and there are only failed jobs, the child job is ready.
  # An expired job is considered "failed" for this check.
  my $sql = qq{
    SELECT job.id, job.args, job.retries, job.task
    FROM minion_jobs job
    LEFT JOIN (
        SELECT depends.child_id, COUNT(depends.child_id) AS pending,
            COALESCE( SUM(depends.state = 'failed' OR (depends.expires IS NOT NULL AND depends.expires > NOW())), 0 ) AS failed
        FROM minion_jobs_depends depends
        WHERE depends.state IS NOT NULL AND (
            depends.state = 'active'
            OR ( depends.state = 'failed' )
            OR ( depends.state = 'inactive' AND (depends.expires IS NULL OR depends.expires > NOW()))
        )
        GROUP BY depends.child_id
    ) depends ON depends.child_id = job.id
    WHERE job.state = 'inactive'
      AND job.`delayed` <= NOW()
      AND job.queue IN ($qq) AND job.task IN ($qt)
      $sql_job_id
      AND (job.expires IS NULL OR job.expires > NOW())
      AND (
        depends.pending IS NULL
        OR ( depends.pending = depends.failed AND job.lax )
      )
    ORDER BY job.priority DESC, job.created
    LIMIT 1
  };

  warn "Dequeuing SQL: $sql" if DEBUG;
  my $db = $self->mysql->db;
  my $i = $DEQUEUE_RACE_ATTEMPTS;
  my $job;
  while ( $i-- > 0 ) {
    # Find a candidate job to run
    my $res = $db->query( $sql, @bind_vars )->hash;
    return unless $res; # No jobs ready to work on

    # Try to claim the job for this worker. There is a race condition
    # between selecting the job and claiming it, so we need to make sure
    # we were the one to claim it.
    my $claimed = $db->query(
      qq{ UPDATE minion_jobs SET started = NOW(), state = 'active', worker = ? WHERE id = ? AND state = 'inactive' },
      $worker_id, $res->{id},
    )->affected_rows;

    # Try again if we lose the race
    next if !$claimed;

    # Won the race, so do the job
    $job = $res;
    last;
  }

  return undef unless $job;
  $job->{args} = $job->{args} ? decode_json($job->{args}) : undef;
  return $job;
}

sub _update {
  my ($self, $fail, $id, $retries, $result) = @_;
  my $updated = $self->mysql->db->query(
    "update minion_jobs
     set finished = now(), result = ?, state = ?
     where id = ? and retries = ? and state = 'active'",
     encode_json($result), $fail ? 'failed' : 'finished', $id,
    $retries
  )->{affected_rows};
  #; say "Updated $updated job rows (id: $id, fail: $fail, result: @{[encode_json( $result )]})";
  return undef unless $updated;

  return 1 if !$fail;    # finished

  my $job = $self->list_jobs( 0, 1, { ids => [$id] } )->{jobs}[0];
  return $fail ? $self->auto_retry_job($id, $retries, $job->{attempts}) : 1;
}

sub broadcast {
  my ($self, $command, $args, $ids) = (shift, shift, shift || [], shift || []);

  my $db = $self->mysql->db;

  my $message = encode_json( [ $command, @$args ] );
  if ( !@$ids ) {
    @$ids = map { $_->{id} }
      @{ $db->query( 'SELECT id FROM minion_workers' )->hashes },
  }
  my $rows = 0;
  for my $id ( @$ids ) {
    $rows += $db->query(
      'INSERT INTO minion_workers_inbox ( worker_id, message ) VALUES ( ?, ? )',
      $id, $message,
    )->rows;
  }
  return $rows;
}

sub receive {
  my ($self, $worker_id) = @_;
  #; use Data::Dumper;
  my $db = $self->mysql->db;
  my $rows = $db->query(
    'SELECT id, message FROM minion_workers_inbox WHERE worker_id=?', $worker_id,
  )->hashes;
  return [] unless $rows && @$rows;
  #; say Dumper $rows;
  my @ids = map { $_->{id} } @$rows;
  #; say Dumper \@ids;
  $db->query(
    'DELETE FROM minion_workers_inbox WHERE id IN (' . ( join ", ", ( '?' ) x @ids ) . ')',
    @ids,
  );
  return [ map { decode_json( $_->{message} ) } @$rows ];
}

1;

#pod =encoding utf8
#pod
#pod =head1 NAME
#pod
#pod Minion::Backend::mysql - MySQL backend
#pod
#pod =head1 SYNOPSIS
#pod
#pod   use Mojolicious::Lite;
#pod
#pod   plugin Minion => {mysql => 'mysql://user@127.0.0.1/minion_jobs'};
#pod
#pod   # Slow task
#pod   app->minion->add_task(poke_mojo => sub {
#pod     my $job = shift;
#pod     $job->app->ua->get('mojolicio.us');
#pod     $job->app->log->debug('We have poked mojolicio.us for a visitor');
#pod   });
#pod
#pod   # Perform job in a background worker process
#pod   get '/' => sub {
#pod     my $c = shift;
#pod     $c->minion->enqueue('poke_mojo');
#pod     $c->render(text => 'We will poke mojolicio.us for you soon.');
#pod   };
#pod
#pod   app->start;
#pod
#pod =head1 DESCRIPTION
#pod
#pod L<Minion::Backend::mysql> is a backend for L<Minion> based on L<Mojo::mysql>. All
#pod necessary tables will be created automatically with a set of migrations named
#pod C<minion>. This backend requires at least v5.6.5 of MySQL.
#pod
#pod =head1 ATTRIBUTES
#pod
#pod L<Minion::Backend::mysql> inherits all attributes from L<Minion::Backend> and
#pod implements the following new ones.
#pod
#pod =head2 mysql
#pod
#pod   my $mysql   = $backend->mysql;
#pod   $backend = $backend->mysql(Mojo::mysql->new);
#pod
#pod L<Mojo::mysql> object used to store all data.
#pod
#pod =head2 no_txn
#pod
#pod If true, will not make a transaction around the L</enqueue> insertions
#pod when a job has parent jobs. Without a transaction, the job could be
#pod dequeued before its parent relationships are written to the database.
#pod However, since MySQL does not support nested transactions (despite
#pod supporting something almost exactly like them...), you can disable
#pod transactions for testing by setting this attribute (if you perform your
#pod tests in a transaction so they can be rolled back when the test is
#pod complete).
#pod
#pod =head1 METHODS
#pod
#pod L<Minion::Backend::mysql> inherits all methods from L<Minion::Backend> and
#pod implements the following new ones.
#pod
#pod =head2 dequeue
#pod
#pod   my $job_info = $backend->dequeue($worker_id, 0.5);
#pod   my $job_info = $backend->dequeue($worker_id, 0.5, {queues => ['important']});
#pod
#pod Wait for job, dequeue it and transition from C<inactive> to C<active> state or
#pod return C<undef> if queues were empty.
#pod
#pod These options are currently available:
#pod
#pod =over 2
#pod
#pod =item queues
#pod
#pod   queues => ['important']
#pod
#pod One or more queues to dequeue jobs from, defaults to C<default>.
#pod
#pod =back
#pod
#pod These fields are currently available:
#pod
#pod =over 2
#pod
#pod =item args
#pod
#pod   args => ['foo', 'bar']
#pod
#pod Job arguments.
#pod
#pod =item id
#pod
#pod   id => '10023'
#pod
#pod Job ID.
#pod
#pod =item retries
#pod
#pod   retries => 3
#pod
#pod Number of times job has been retried.
#pod
#pod =item task
#pod
#pod   task => 'foo'
#pod
#pod Task name.
#pod
#pod =back
#pod
#pod =head2 enqueue
#pod
#pod   my $job_id = $backend->enqueue('foo');
#pod   my $job_id = $backend->enqueue(foo => [@args]);
#pod   my $job_id = $backend->enqueue(foo => [@args] => {priority => 1});
#pod
#pod Enqueue a new job with C<inactive> state.
#pod
#pod These options are currently available:
#pod
#pod =over 2
#pod
#pod =item delay
#pod
#pod   delay => 10
#pod
#pod Delay job for this many seconds (from now).
#pod
#pod =item priority
#pod
#pod   priority => 5
#pod
#pod Job priority, defaults to C<0>.
#pod
#pod =item queue
#pod
#pod   queue => 'important'
#pod
#pod Queue to put job in, defaults to C<default>.
#pod
#pod =back
#pod
#pod =head2 fail_job
#pod
#pod   my $bool = $backend->fail_job($job_id, $retries);
#pod   my $bool = $backend->fail_job($job_id, $retries, 'Something went wrong!');
#pod   my $bool = $backend->fail_job(
#pod     $job_id, $retries, {msg => 'Something went wrong!'});
#pod
#pod Transition from C<active> to C<failed> state.
#pod
#pod =head2 finish_job
#pod
#pod   my $bool = $backend->finish_job($job_id, $retries);
#pod   my $bool = $backend->finish_job($job_id, $retries, 'All went well!');
#pod   my $bool = $backend->finish_job($job_id, $retries, {msg => 'All went well!'});
#pod
#pod Transition from C<active> to C<finished> state.
#pod
#pod =head2 job_info
#pod
#pod   my $job_info = $backend->job_info($job_id);
#pod
#pod Get information about a job or return C<undef> if job does not exist.
#pod
#pod   # Check job state
#pod   my $state = $backend->job_info($job_id)->{state};
#pod
#pod   # Get job result
#pod   my $result = $backend->job_info($job_id)->{result};
#pod
#pod These fields are currently available:
#pod
#pod =over 2
#pod
#pod =item args
#pod
#pod   args => ['foo', 'bar']
#pod
#pod Job arguments.
#pod
#pod =item created
#pod
#pod   created => 784111777
#pod
#pod Time job was created.
#pod
#pod =item delayed
#pod
#pod   delayed => 784111777
#pod
#pod Time job was delayed to.
#pod
#pod =item finished
#pod
#pod   finished => 784111777
#pod
#pod Time job was finished.
#pod
#pod =item priority
#pod
#pod   priority => 3
#pod
#pod Job priority.
#pod
#pod =item queue
#pod
#pod   queue => 'important'
#pod
#pod Queue name.
#pod
#pod =item result
#pod
#pod   result => 'All went well!'
#pod
#pod Job result.
#pod
#pod =item retried
#pod
#pod   retried => 784111777
#pod
#pod Time job has been retried.
#pod
#pod =item retries
#pod
#pod   retries => 3
#pod
#pod Number of times job has been retried.
#pod
#pod =item started
#pod
#pod   started => 784111777
#pod
#pod Time job was started.
#pod
#pod =item state
#pod
#pod   state => 'inactive'
#pod
#pod Current job state, usually C<active>, C<failed>, C<finished> or C<inactive>.
#pod
#pod =item task
#pod
#pod   task => 'foo'
#pod
#pod Task name.
#pod
#pod =item worker
#pod
#pod   worker => '154'
#pod
#pod Id of worker that is processing the job.
#pod
#pod =back
#pod
#pod =head2 list_jobs
#pod
#pod   my $batch = $backend->list_jobs($offset, $limit);
#pod   my $batch = $backend->list_jobs($offset, $limit, {states => 'inactive'});
#pod
#pod Returns the same information as L</"job_info"> but in batches.
#pod
#pod These options are currently available:
#pod
#pod =over 2
#pod
#pod =item state
#pod
#pod   state => 'inactive'
#pod
#pod List only jobs in this state.
#pod
#pod =item task
#pod
#pod   task => 'test'
#pod
#pod List only jobs for this task.
#pod
#pod =back
#pod
#pod =head2 list_workers
#pod
#pod   my $batch = $backend->list_workers($offset, $limit);
#pod
#pod Returns the same information as L</"worker_info"> but in batches.
#pod
#pod =head2 new
#pod
#pod   my $backend = Minion::Backend::mysql->new('mysql://mysql@/test');
#pod
#pod Construct a new L<Minion::Backend::mysql> object.
#pod
#pod =head2 register_worker
#pod
#pod   my $worker_id = $backend->register_worker;
#pod   my $worker_id = $backend->register_worker($worker_id);
#pod
#pod Register worker or send heartbeat to show that this worker is still alive.
#pod
#pod =head2 remove_job
#pod
#pod   my $bool = $backend->remove_job($job_id);
#pod
#pod Remove C<failed>, C<finished> or C<inactive> job from queue.
#pod
#pod =head2 repair
#pod
#pod   $backend->repair;
#pod
#pod Repair worker registry and job queue if necessary.
#pod
#pod =head2 reset
#pod
#pod   $backend->reset;
#pod
#pod Reset job queue.
#pod
#pod =head2 retry_job
#pod
#pod   my $bool = $backend->retry_job($job_id, $retries);
#pod   my $bool = $backend->retry_job($job_id, $retries, {delay => 10});
#pod
#pod Transition from C<failed> or C<finished> state back to C<inactive>.
#pod
#pod These options are currently available:
#pod
#pod =over 2
#pod
#pod =item delay
#pod
#pod   delay => 10
#pod
#pod Delay job for this many seconds (from now).
#pod
#pod =item parents
#pod
#pod   parents => [$id1, $id2, $id3]
#pod
#pod Jobs this job depends on.
#pod
#pod =item priority
#pod
#pod   priority => 5
#pod
#pod Job priority.
#pod
#pod =item queue
#pod
#pod   queue => 'important'
#pod
#pod Queue to put job in.
#pod
#pod =back
#pod
#pod =head2 stats
#pod
#pod   my $stats = $backend->stats;
#pod
#pod Get statistics for jobs and workers.
#pod
#pod =head2 unregister_worker
#pod
#pod   $backend->unregister_worker($worker_id);
#pod
#pod Unregister worker.
#pod
#pod =head2 worker_info
#pod
#pod   my $worker_info = $backend->worker_info($worker_id);
#pod
#pod Get information about a worker or return C<undef> if worker does not exist.
#pod
#pod   # Check worker host
#pod   my $host = $backend->worker_info($worker_id)->{host};
#pod
#pod These fields are currently available:
#pod
#pod =over 2
#pod
#pod =item host
#pod
#pod   host => 'localhost'
#pod
#pod Worker host.
#pod
#pod =item jobs
#pod
#pod   jobs => ['10023', '10024', '10025', '10029']
#pod
#pod Ids of jobs the worker is currently processing.
#pod
#pod =item notified
#pod
#pod   notified => 784111777
#pod
#pod Last time worker sent a heartbeat.
#pod
#pod =item pid
#pod
#pod   pid => 12345
#pod
#pod Process id of worker.
#pod
#pod =item started
#pod
#pod   started => 784111777
#pod
#pod Time worker was started.
#pod
#pod =back
#pod
#pod =head1 ERRORS
#pod
#pod =head2 DBD::mysql::st execute failed: Table '*.minion_workers' doesn't exist
#pod
#pod This may happen when the SQL create/upgrade scripts fail to run
#pod completely due to permission errors. Re-running with the environment
#pod variable C<MOJO_MIGRATIONS_DEBUG=1> should produce the error message
#pod returned by the database.
#pod
#pod A common reason for the database install to fail on MySQL >= 8 is that
#pod the user installing the database does not have C<SUPER> privileges
#pod needed to create functions when binlogs are enabled: (C<DBD::mysql::st
#pod execute failed: You do not have the SUPER privilege and binary logging
#pod is enabled>). See L<the MySQL documentation for Stored Program Binary
#pod Logging|https://dev.mysql.com/doc/refman/8.0/en/stored-programs-logging.html>
#pod for more information about this problem and how to correct it.
#pod
#pod =head1 SEE ALSO
#pod
#pod L<Minion>, L<Mojolicious::Guides>, L<http://mojolicio.us>.
#pod
#pod =cut

=pod

=encoding UTF-8

=head1 NAME

Minion::Backend::mysql

=head1 VERSION

version 1.001

=head1 SYNOPSIS

  use Mojolicious::Lite;

  plugin Minion => {mysql => 'mysql://user@127.0.0.1/minion_jobs'};

  # Slow task
  app->minion->add_task(poke_mojo => sub {
    my $job = shift;
    $job->app->ua->get('mojolicio.us');
    $job->app->log->debug('We have poked mojolicio.us for a visitor');
  });

  # Perform job in a background worker process
  get '/' => sub {
    my $c = shift;
    $c->minion->enqueue('poke_mojo');
    $c->render(text => 'We will poke mojolicio.us for you soon.');
  };

  app->start;

=head1 DESCRIPTION

L<Minion::Backend::mysql> is a backend for L<Minion> based on L<Mojo::mysql>. All
necessary tables will be created automatically with a set of migrations named
C<minion>. This backend requires at least v5.6.5 of MySQL.

=head1 NAME

Minion::Backend::mysql - MySQL backend

=head1 ATTRIBUTES

L<Minion::Backend::mysql> inherits all attributes from L<Minion::Backend> and
implements the following new ones.

=head2 mysql

  my $mysql   = $backend->mysql;
  $backend = $backend->mysql(Mojo::mysql->new);

L<Mojo::mysql> object used to store all data.

=head2 no_txn

If true, will not make a transaction around the L</enqueue> insertions
when a job has parent jobs. Without a transaction, the job could be
dequeued before its parent relationships are written to the database.
However, since MySQL does not support nested transactions (despite
supporting something almost exactly like them...), you can disable
transactions for testing by setting this attribute (if you perform your
tests in a transaction so they can be rolled back when the test is
complete).

=head1 METHODS

L<Minion::Backend::mysql> inherits all methods from L<Minion::Backend> and
implements the following new ones.

=head2 dequeue

  my $job_info = $backend->dequeue($worker_id, 0.5);
  my $job_info = $backend->dequeue($worker_id, 0.5, {queues => ['important']});

Wait for job, dequeue it and transition from C<inactive> to C<active> state or
return C<undef> if queues were empty.

These options are currently available:

=over 2

=item queues

  queues => ['important']

One or more queues to dequeue jobs from, defaults to C<default>.

=back

These fields are currently available:

=over 2

=item args

  args => ['foo', 'bar']

Job arguments.

=item id

  id => '10023'

Job ID.

=item retries

  retries => 3

Number of times job has been retried.

=item task

  task => 'foo'

Task name.

=back

=head2 enqueue

  my $job_id = $backend->enqueue('foo');
  my $job_id = $backend->enqueue(foo => [@args]);
  my $job_id = $backend->enqueue(foo => [@args] => {priority => 1});

Enqueue a new job with C<inactive> state.

These options are currently available:

=over 2

=item delay

  delay => 10

Delay job for this many seconds (from now).

=item priority

  priority => 5

Job priority, defaults to C<0>.

=item queue

  queue => 'important'

Queue to put job in, defaults to C<default>.

=back

=head2 fail_job

  my $bool = $backend->fail_job($job_id, $retries);
  my $bool = $backend->fail_job($job_id, $retries, 'Something went wrong!');
  my $bool = $backend->fail_job(
    $job_id, $retries, {msg => 'Something went wrong!'});

Transition from C<active> to C<failed> state.

=head2 finish_job

  my $bool = $backend->finish_job($job_id, $retries);
  my $bool = $backend->finish_job($job_id, $retries, 'All went well!');
  my $bool = $backend->finish_job($job_id, $retries, {msg => 'All went well!'});

Transition from C<active> to C<finished> state.

=head2 job_info

  my $job_info = $backend->job_info($job_id);

Get information about a job or return C<undef> if job does not exist.

  # Check job state
  my $state = $backend->job_info($job_id)->{state};

  # Get job result
  my $result = $backend->job_info($job_id)->{result};

These fields are currently available:

=over 2

=item args

  args => ['foo', 'bar']

Job arguments.

=item created

  created => 784111777

Time job was created.

=item delayed

  delayed => 784111777

Time job was delayed to.

=item finished

  finished => 784111777

Time job was finished.

=item priority

  priority => 3

Job priority.

=item queue

  queue => 'important'

Queue name.

=item result

  result => 'All went well!'

Job result.

=item retried

  retried => 784111777

Time job has been retried.

=item retries

  retries => 3

Number of times job has been retried.

=item started

  started => 784111777

Time job was started.

=item state

  state => 'inactive'

Current job state, usually C<active>, C<failed>, C<finished> or C<inactive>.

=item task

  task => 'foo'

Task name.

=item worker

  worker => '154'

Id of worker that is processing the job.

=back

=head2 list_jobs

  my $batch = $backend->list_jobs($offset, $limit);
  my $batch = $backend->list_jobs($offset, $limit, {states => 'inactive'});

Returns the same information as L</"job_info"> but in batches.

These options are currently available:

=over 2

=item state

  state => 'inactive'

List only jobs in this state.

=item task

  task => 'test'

List only jobs for this task.

=back

=head2 list_workers

  my $batch = $backend->list_workers($offset, $limit);

Returns the same information as L</"worker_info"> but in batches.

=head2 new

  my $backend = Minion::Backend::mysql->new('mysql://mysql@/test');

Construct a new L<Minion::Backend::mysql> object.

=head2 register_worker

  my $worker_id = $backend->register_worker;
  my $worker_id = $backend->register_worker($worker_id);

Register worker or send heartbeat to show that this worker is still alive.

=head2 remove_job

  my $bool = $backend->remove_job($job_id);

Remove C<failed>, C<finished> or C<inactive> job from queue.

=head2 repair

  $backend->repair;

Repair worker registry and job queue if necessary.

=head2 reset

  $backend->reset;

Reset job queue.

=head2 retry_job

  my $bool = $backend->retry_job($job_id, $retries);
  my $bool = $backend->retry_job($job_id, $retries, {delay => 10});

Transition from C<failed> or C<finished> state back to C<inactive>.

These options are currently available:

=over 2

=item delay

  delay => 10

Delay job for this many seconds (from now).

=item parents

  parents => [$id1, $id2, $id3]

Jobs this job depends on.

=item priority

  priority => 5

Job priority.

=item queue

  queue => 'important'

Queue to put job in.

=back

=head2 stats

  my $stats = $backend->stats;

Get statistics for jobs and workers.

=head2 unregister_worker

  $backend->unregister_worker($worker_id);

Unregister worker.

=head2 worker_info

  my $worker_info = $backend->worker_info($worker_id);

Get information about a worker or return C<undef> if worker does not exist.

  # Check worker host
  my $host = $backend->worker_info($worker_id)->{host};

These fields are currently available:

=over 2

=item host

  host => 'localhost'

Worker host.

=item jobs

  jobs => ['10023', '10024', '10025', '10029']

Ids of jobs the worker is currently processing.

=item notified

  notified => 784111777

Last time worker sent a heartbeat.

=item pid

  pid => 12345

Process id of worker.

=item started

  started => 784111777

Time worker was started.

=back

=head1 ERRORS

=head2 DBD::mysql::st execute failed: Table '*.minion_workers' doesn't exist

This may happen when the SQL create/upgrade scripts fail to run
completely due to permission errors. Re-running with the environment
variable C<MOJO_MIGRATIONS_DEBUG=1> should produce the error message
returned by the database.

A common reason for the database install to fail on MySQL >= 8 is that
the user installing the database does not have C<SUPER> privileges
needed to create functions when binlogs are enabled: (C<DBD::mysql::st
execute failed: You do not have the SUPER privilege and binary logging
is enabled>). See L<the MySQL documentation for Stored Program Binary
Logging|https://dev.mysql.com/doc/refman/8.0/en/stored-programs-logging.html>
for more information about this problem and how to correct it.

=head1 SEE ALSO

L<Minion>, L<Mojolicious::Guides>, L<http://mojolicio.us>.

=head1 AUTHORS

=over 4

=item *

Brian Medley <bpmedley@cpan.org>

=item *

Doug Bell <preaction@cpan.org>

=back

=head1 CONTRIBUTORS

=for stopwords Alexander Nalobin Dmitry Krylov Hu Yin Jason A. Crome Larry Leszczynski Olaf Alders Paul Cochrane Sergey Andreev Zoffix Znet

=over 4

=item *

Alexander Nalobin <nalobin@reg.ru>

=item *

Dmitry Krylov <pentabion@gmail.com>

=item *

Hu Yin <huyin8@gmail.com>

=item *

Jason A. Crome <jcrome@empoweredbenefits.com>

=item *

Larry Leszczynski <larryl@cpan.org>

=item *

Olaf Alders <olaf@wundersolutions.com>

=item *

Paul Cochrane <paul@liekut.de>

=item *

Sergey Andreev <40195653+saintserge@users.noreply.github.com>

=item *

Zoffix Znet <cpan@zoffix.com>

=back

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2021 by Doug Bell and Brian Medley.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.

=cut

__DATA__

@@ minion
-- 13 up
CREATE TABLE IF NOT EXISTS minion_jobs (
  `id`       BIGINT NOT NULL AUTO_INCREMENT UNIQUE,
  `args`     MEDIUMBLOB NOT NULL,
  `created`  TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `delayed`  TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `finished` TIMESTAMP NULL,
  `priority` INT NOT NULL,
  `result`   MEDIUMBLOB,
  `attempts` INT NOT NULL DEFAULT 1,
  `retried`  TIMESTAMP NULL,
  `retries`  INT NOT NULL DEFAULT 0,
  `started`  TIMESTAMP NULL,
  `expires`  DATETIME,
  `state`    ENUM('inactive','active','finished','failed') NOT NULL DEFAULT 'inactive',
  `lax`      BOOLEAN NOT NULL DEFAULT FALSE,
  `task`     VARCHAR(50) NOT NULL,
  `queue`    VARCHAR(128) NOT NULL DEFAULT 'default',
  `worker`   BIGINT
);

CREATE TABLE IF NOT EXISTS minion_jobs_depends (
  parent_id BIGINT NOT NULL,
  child_id BIGINT NOT NULL,
  state ENUM('inactive','active','finished','failed') NOT NULL DEFAULT 'inactive',
  expires DATETIME,
  FOREIGN KEY (child_id) REFERENCES minion_jobs(id) ON DELETE CASCADE,
  PRIMARY KEY (parent_id, child_id)
);
CREATE TRIGGER minion_trigger_insert_depends BEFORE INSERT ON minion_jobs_depends FOR EACH ROW
  SET NEW.state = COALESCE(( SELECT state FROM minion_jobs WHERE id=NEW.parent_id ), 'finished'),
    NEW.expires = ( SELECT expires FROM minion_jobs WHERE id=NEW.parent_id );
CREATE TRIGGER minion_trigger_update_jobs AFTER UPDATE ON minion_jobs FOR EACH ROW
  UPDATE minion_jobs_depends SET state=NEW.state, expires=NEW.expires WHERE parent_id=OLD.id;

CREATE TABLE IF NOT EXISTS minion_notes (
  job_id BIGINT NOT NULL,
  note_key VARCHAR(191) NOT NULL,
  note_value TEXT,
  PRIMARY KEY (job_id, note_key),
  FOREIGN KEY (job_id) REFERENCES minion_jobs(id) ON DELETE CASCADE
);

CREATE TABLE IF NOT EXISTS minion_workers (
  `id`      BIGINT AUTO_INCREMENT PRIMARY KEY,
  `host`    TEXT NOT NULL,
  `pid`     INT NOT NULL,
  `started` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `notified` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `status` MEDIUMBLOB
);

CREATE TABLE IF NOT EXISTS minion_workers_inbox (
  `id` BIGINT AUTO_INCREMENT PRIMARY KEY,
  `worker_id` BIGINT NOT NULL,
  `message` BLOB NOT NULL,
  FOREIGN KEY (worker_id) REFERENCES minion_workers(id) ON DELETE CASCADE
);

CREATE TABLE IF NOT EXISTS minion_locks (
  id      SERIAL NOT NULL PRIMARY KEY,
  -- InnoDB index prefix limit is 767 bytes, and if you're using utf8mb4
  -- that makes 767/4 = 191 characters
  name    VARCHAR(191) NOT NULL,
  expires TIMESTAMP NOT NULL,
  INDEX (name, expires)
);
DELIMITER //
CREATE FUNCTION minion_lock( $1 VARCHAR(191), $2 INTEGER, $3 INTEGER) RETURNS BOOL
  NOT DETERMINISTIC MODIFIES SQL DATA SQL SECURITY INVOKER
BEGIN
  DECLARE new_expires TIMESTAMP DEFAULT DATE_ADD( NOW(), INTERVAL 1*$2 SECOND );
  DELETE FROM minion_locks WHERE expires < NOW();
  IF (SELECT COUNT(*) >= $3 FROM minion_locks WHERE name = $1)
  THEN
    RETURN FALSE;
  END IF;
  IF new_expires > NOW()
  THEN
    INSERT INTO minion_locks (name, expires) VALUES ($1, new_expires);
  END IF;
  RETURN TRUE;
END
//
DELIMITER ;

CREATE INDEX minion_jobs_state_idx ON minion_jobs (state, priority DESC, created);
CREATE INDEX minion_jobs_depends_state_expires ON minion_jobs_depends (state, expires);
CREATE INDEX minion_jobs_stats_idx ON minion_jobs (state, `delayed`);

-- 13 down
DROP TRIGGER minion_trigger_insert_depends;
DROP TRIGGER minion_trigger_update_jobs;
DROP TABLE IF EXISTS minion_locks;
DROP TABLE IF EXISTS minion_workers_inbox;
DROP TABLE IF EXISTS minion_workers;
DROP TABLE IF EXISTS minion_notes;
DROP TABLE IF EXISTS minion_jobs_depends;
DROP TABLE IF EXISTS minion_jobs;
DROP FUNCTION minion_lock;