/* most win32 perls are beyond fixing, requiring dTHX */
/* even for ISO-C functions such as malloc. avoid! avoid! avoid! */
/* and fail to define numerous symbols, but still overrwide them */
/* with non-working versions (e.g. setjmp). */
#ifdef _WIN32
/*# define PERL_CORE 1 fixes some, breaks others */
#else
# define PERL_NO_GET_CONTEXT
#endif

#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"

#define X_STACKSIZE 1024 * sizeof (void *)

#include "CoroAPI.h"
#include "perlmulticore.h"
#include "schmorp.h"
#include "xthread.h"

#ifdef _WIN32
  #ifndef sigset_t
    #define sigset_t int
  #endif
#endif

#ifndef SvREFCNT_dec_NN
  #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
#endif

#ifndef SvREFCNT_dec_simple_void_NN
  #define SvREFCNT_dec_simple_void_NN(sv) SvREFCNT_dec (sv)
#endif

#ifndef SvREFCNT_inc_NN
  #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
#endif

#ifndef RECURSION_CHECK
  #define RECURSION_CHECK 0
#endif

static X_TLS_DECLARE(current_key);
#if RECURSION_CHECK
static X_TLS_DECLARE(check_key);
#endif

static void
fatal (const char *msg)
{
  write (2, msg, strlen (msg));
  abort ();
}

static s_epipe ep;
static void *perl_thx;
static sigset_t cursigset, fullsigset;

static int global_enable = 0;
static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */

/* assigned to a thread for each release/acquire */
struct tctx
{
  void *coro;
  int wait_f;
  xcond_t acquire_c;
  int jeret;
};

static struct tctx *tctx_free;

static struct tctx *
tctx_get (void)
{
  struct tctx *ctx;

  if (!tctx_free)
    {
      ctx = malloc (sizeof (*tctx_free));
      X_COND_CREATE (ctx->acquire_c);
    }
  else
    {
      ctx = tctx_free;
      tctx_free = tctx_free->coro;
    }

  return ctx;
}

static void
tctx_put (struct tctx *ctx)
{
  ctx->coro = tctx_free;
  tctx_free = ctx;
}

/* a stack of tctxs */
struct tctxs
{
  struct tctx **ctxs;
  int cur, max;
};

static struct tctx *
tctxs_get (struct tctxs *ctxs)
{
  return ctxs->ctxs[--ctxs->cur];
}

static void
tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
{
  if (ctxs->cur >= ctxs->max)
    {
      ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
      ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
    }

  ctxs->ctxs[ctxs->cur++] = ctx;
}

static xmutex_t release_m = X_MUTEX_INIT;
static xcond_t release_c = X_COND_INIT;
static struct tctxs releasers;
static int idle;
static int min_idle = 1;
static int curthreads, max_threads = 1; /* protected by release_m */

static xmutex_t acquire_m = X_MUTEX_INIT;
static struct tctxs acquirers;

X_THREAD_PROC(thread_proc)
{
  PERL_SET_CONTEXT (perl_thx);

  {
    dTHXa (perl_thx);
    dJMPENV;
    struct tctx *ctx;
    int catchret;

    X_LOCK (release_m);

    for (;;)
      {
        while (!releasers.cur)
          if (idle <= min_idle || 1)
            X_COND_WAIT (release_c, release_m);
          else
            {
              struct timespec ts = { time (0) + idle - min_idle, 0 };

              if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
                if (idle > min_idle && !releasers.cur)
                  break;
            }

        ctx = tctxs_get (&releasers);
        --idle;
        X_UNLOCK (release_m);

        if (!ctx) /* timed out? */
          break;

        pthread_sigmask (SIG_SETMASK, &cursigset, 0);
        JMPENV_PUSH (ctx->jeret);

        if (!ctx->jeret)
          while (ctx->coro)
            CORO_SCHEDULE;

        JMPENV_POP;
        pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);

        X_LOCK (acquire_m);
        ctx->wait_f = 1;
        X_COND_SIGNAL (ctx->acquire_c);
        X_UNLOCK (acquire_m);

        X_LOCK (release_m);
        ++idle;
      }
  }
}

static void
start_thread (void)
{
  xthread_t tid;

  if (!curthreads)
    {
      X_UNLOCK (release_m);
      {
        dTHX;
        dSP;

        PUSHSTACKi (PERLSI_REQUIRE);

        eval_pv ("Coro::Multicore::init", 1);

        POPSTACK;
      }
      X_LOCK (release_m);
    }

  if (curthreads >= max_threads && 0)
    return;

  ++curthreads;
  ++idle;
  xthread_create (&tid, thread_proc, 0);
}

static void
pmapi_release (void)
{
  if (! ((thread_enable ? thread_enable : global_enable) & 1))
    {
      X_TLS_SET (current_key, 0);
      return;
    }

  #if RECURSION_CHECK
  if (X_TLS_GET (check_key))
    fatal ("FATAL: perlinterp_release () called without valid perl context");

  X_TLS_SET (check_key, &check_key);
  #endif

  struct tctx *ctx = tctx_get ();
  ctx->coro = SvREFCNT_inc_simple_NN (CORO_CURRENT);
  ctx->wait_f = 0;

  X_TLS_SET (current_key, ctx);
  pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);

  X_LOCK (release_m);

  if (idle <= min_idle)
    start_thread ();

  tctxs_put (&releasers, ctx);
  X_COND_SIGNAL (release_c);

  while (!idle && releasers.cur)
    {
      X_UNLOCK (release_m);
      X_LOCK (release_m);
    }

  X_UNLOCK (release_m);
}

static void
pmapi_acquire (void)
{
  int jeret;
  struct tctx *ctx = X_TLS_GET (current_key);

  if (!ctx)
    return;

  #if RECURSION_CHECK
  if (X_TLS_GET (check_key) != &check_key)
    fatal ("FATAL: perlinterp_acquire () called with valid perl context");

  X_TLS_SET (check_key, 0);
  #endif

  X_LOCK (acquire_m);

  tctxs_put (&acquirers, ctx);

  s_epipe_signal (&ep);
  while (!ctx->wait_f)
    X_COND_WAIT (ctx->acquire_c, acquire_m);
  X_UNLOCK (acquire_m);

  jeret = ctx->jeret;
  tctx_put (ctx);
  pthread_sigmask (SIG_SETMASK, &cursigset, 0);

  if (jeret)
    {
      dTHX;
      JMPENV_JUMP (jeret);
    }
}

static void
set_thread_enable (pTHX_ void *arg)
{
  thread_enable = PTR2IV (arg);
}

static void
atfork_child (void)
{
  s_epipe_renew (&ep);
}

MODULE = Coro::Multicore		PACKAGE = Coro::Multicore

PROTOTYPES: DISABLE

BOOT:
{
#ifndef _WIN32
	sigfillset (&fullsigset);
#endif

        X_TLS_INIT (current_key);
#if RECURSION_CHECK
        X_TLS_INIT (check_key);
#endif

        if (s_epipe_new (&ep))
          croak ("Coro::Multicore: unable to initialise event pipe.\n");

        pthread_atfork (0, 0, atfork_child);

        perl_thx = PERL_GET_CONTEXT;

	I_CORO_API ("Coro::Multicore");

        if (0) { /*D*/
        X_LOCK (release_m);
        while (idle < min_idle)
          start_thread ();
        X_UNLOCK (release_m);
        }

        /* not perfectly efficient to do it this way, but it is simple */
	perl_multicore_init (); /* calls release */
        perl_multicore_api->pmapi_release = pmapi_release;
        perl_multicore_api->pmapi_acquire = pmapi_acquire;
}

bool
enable (bool enable = NO_INIT)
	CODE:
        RETVAL = global_enable;
        if (items)
          global_enable = enable;
        OUTPUT:
        RETVAL

void
scoped_enable ()
	CODE:
        LEAVE; /* see Guard.xs */
        CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
        ENTER; /* see Guard.xs */

void
scoped_disable ()
	CODE:
        LEAVE; /* see Guard.xs */
        CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
        ENTER; /* see Guard.xs */

#if 0

U32
min_idle_threads (U32 min = NO_INIT)
	CODE:
        X_LOCK (acquire_m);
        RETVAL = min_idle;
        if (items)
	  min_idle = min;
        X_UNLOCK (acquire_m);
        OUTPUT:
        RETVAL

#endif

int
fd ()
	CODE:
        RETVAL = s_epipe_fd (&ep);
	OUTPUT:
        RETVAL

void
poll (...)
	CODE:
        s_epipe_drain (&ep);
	X_LOCK (acquire_m);
        while (acquirers.cur)
          {
            struct tctx *ctx = tctxs_get (&acquirers);
            CORO_READY ((SV *)ctx->coro);
            SvREFCNT_dec_simple_void_NN ((SV *)ctx->coro);
            ctx->coro = 0;
          }
	X_UNLOCK (acquire_m);

void
sleep (NV seconds)
	CODE:
        perlinterp_release ();
	{
          int nsec = seconds;
          if (nsec) sleep (nsec);
          nsec = (seconds - nsec) * 1e9;
          if (nsec) usleep (nsec);
        }
        perlinterp_acquire ();