package KinoSearch1::Util::SortExternal;
use strict;
use warnings;
use KinoSearch1::Util::ToolSet;
use base qw( KinoSearch1::Util::CClass );

BEGIN {
    __PACKAGE__->init_instance_vars(
        # constructor args
        invindex      => undef,
        seg_name      => undef,
        mem_threshold => 2**24,
    );
}
our %instance_vars;

sub new {
    my $class = shift;
    verify_args( \%instance_vars, @_ );
    my %args = ( %instance_vars, @_ );
    my $invindex = $args{invindex};

    $class = ref($class) || $class;

    my $filename = "$args{seg_name}.srt";
    $invindex->delete_file($filename) if $invindex->file_exists($filename);
    my $outstream = $invindex->open_outstream($filename);

    return _new( $class, $outstream,
        @args{qw( invindex seg_name mem_threshold )} );
}

# Prepare to start fetching sorted results.
sub sort_all {
    my $self = shift;

    # deal with any items in the cache right now
    if ( $self->_get_num_runs == 0 ) {
        # if we've never exceeded mem_threshold, sort in-memory
        $self->_sort_cache;
    }
    else {
        # create a run from whatever's in the cache right now
        $self->_sort_run;
    }

    # done adding elements, so close file and reopen as an instream
    $self->_get_outstream->close;
    my $filename = $self->_get_seg_name . ".srt";
    my $instream = $self->_get_invindex()->open_instream($filename);
    $self->_set_instream($instream);

    # allow fetching now that we're set up
    $self->_enable_fetch;
}

sub close { shift->_get_instream()->close }

1;

__END__

__XS__

MODULE = KinoSearch1    PACKAGE = KinoSearch1::Util::SortExternal

void
_new(class, outstream_sv, invindex_sv, seg_name_sv, mem_threshold)
    char         *class;
    SV           *outstream_sv;
    SV           *invindex_sv;
    SV           *seg_name_sv;
    I32           mem_threshold;
PREINIT:
    SortExternal *sortex;
PPCODE:
    sortex = Kino1_SortEx_new(outstream_sv, invindex_sv, seg_name_sv,
        mem_threshold);
    ST(0)  = sv_newmortal();
    sv_setref_pv( ST(0), class, (void*)sortex );
    XSRETURN(1);

=for comment

Add one or more items to the sort pool.

=cut

void
feed(sortex, ...)
    SortExternal *sortex;
PREINIT:
    I32      i;
PPCODE:
    for (i = 1; i < items; i++) {   
        SV const * item_sv = ST(i);
        if (!SvPOK(item_sv))
            continue;
        sortex->feed(sortex, SvPVX(item_sv), SvCUR(item_sv));
    }

=for comment

Fetch the next sorted item from the sort pool.  sort_all must be called first.

=cut

SV*
fetch(sortex)
    SortExternal *sortex;
PREINIT:
    ByteBuf *bb;
CODE:
    bb = sortex->fetch(sortex);
    if (bb == NULL) {
        RETVAL = newSV(0);
    }
    else {
        RETVAL = newSVpvn(bb->ptr, bb->size);
        Kino1_BB_destroy(bb);
    }
OUTPUT: RETVAL

=for comment

Sort all items currently in memory.

=cut

void
_sort_cache(sortex)
    SortExternal *sortex;
PPCODE:
    Kino1_SortEx_sort_cache(sortex);

=for comment

Sort everything in memory and write the sorted elements to disk, creating a
SortExRun C object.

=cut

void
_sort_run(sortex);
    SortExternal *sortex;
PPCODE:
    Kino1_SortEx_sort_run(sortex);

=for comment

Turn on fetching.

=cut

void
_enable_fetch(sortex)
    SortExternal *sortex;
PPCODE:
    Kino1_SortEx_enable_fetch(sortex);
    
SV*
_set_or_get(sortex, ...)
    SortExternal *sortex;
ALIAS:
    _set_outstream = 1
    _get_outstream = 2
    _set_instream  = 3
    _get_instream  = 4
    _set_num_runs  = 5
    _get_num_runs  = 6
    _set_invindex  = 7
    _get_invindex  = 8
    _set_seg_name  = 9
    _get_seg_name  = 10
CODE:
{
    KINO_START_SET_OR_GET_SWITCH

    case 1:  SvREFCNT_dec(sortex->outstream_sv);
             sortex->outstream_sv = newSVsv( ST(1) );
             Kino1_extract_struct(sortex->outstream_sv, sortex->outstream, 
                OutStream*, "KinoSearch1::Store::OutStream");
             /* fall through */
    case 2:  RETVAL = newSVsv(sortex->outstream_sv);
             break;
             
    case 3:  SvREFCNT_dec(sortex->instream_sv);
             sortex->instream_sv = newSVsv( ST(1) );
             Kino1_extract_struct(sortex->instream_sv, sortex->instream, 
                InStream*, "KinoSearch1::Store::InStream");
             /* fall through */
    case 4:  RETVAL = newSVsv(sortex->instream_sv);
             break;

    case 5:  Kino1_confess("can't set num_runs");
             /* fall through */
    case 6:  RETVAL = newSViv(sortex->num_runs);
             break;

    case 7:  Kino1_confess("can't set_invindex");
             /* fall through */
    case 8:  RETVAL = newSVsv(sortex->invindex_sv);
             break;
             
    case 9:  Kino1_confess("can't set_seg_name");
             /* fall through */
    case 10: RETVAL = newSVsv(sortex->seg_name_sv);
             break;

    KINO_END_SET_OR_GET_SWITCH
}
OUTPUT: RETVAL

void
DESTROY(sortex)
    SortExternal *sortex;
PPCODE:
    Kino1_SortEx_destroy(sortex);

__H__

#ifndef H_KINOSEARCH_UTIL_SORT_EXTERNAL
#define H_KINOSEARCH_UTIL_SORT_EXTERNAL 1

#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"

#include "KinoSearch1StoreInStream.h"
#include "KinoSearch1StoreOutStream.h"
#include "KinoSearch1UtilByteBuf.h"
#include "KinoSearch1UtilCClass.h"
#include "KinoSearch1UtilMemManager.h"

typedef struct sortexrun {
    double     start;
    double     file_pos;
    double     end;
    ByteBuf  **cache;
    I32        cache_cap;
    I32        cache_elems;
    I32        cache_pos;
    I32        slice_size;
} SortExRun;

typedef struct sortexternal {
    ByteBuf   **cache;            /* item cache, both incoming and outgoing */
    I32         cache_cap;        /* allocated limit for cache */
    I32         cache_elems;      /* number of elems in cache */ 
    I32         cache_pos;        /* index of current element in cache */
    ByteBuf   **scratch;          /* memory for use by mergesort */
    I32         scratch_cap;      /* allocated limit for scratch */
    I32         mem_threshold;    /* bytes of mem allowed for cache */
    I32         cache_bytes;      /* bytes of mem occupied by cache */
    I32         run_cache_limit;  /* bytes of mem allowed each run cache */
    SortExRun **runs;
    I32         num_runs;
    SV         *outstream_sv;
    OutStream  *outstream;
    SV         *instream_sv;
    InStream   *instream;
    SV         *invindex_sv;
    SV         *seg_name_sv;
    void      (*feed) (struct sortexternal*, char*, I32);
    ByteBuf*  (*fetch)(struct sortexternal*);
} SortExternal;

SortExternal* Kino1_SortEx_new(SV*, SV*, SV*, I32);
void          Kino1_SortEx_feed(SortExternal*, char*, I32);
ByteBuf*      Kino1_SortEx_fetch(SortExternal*);
ByteBuf*      Kino1_SortEx_fetch_death(SortExternal*);
void          Kino1_SortEx_enable_fetch(SortExternal*);
void          Kino1_SortEx_sort_cache(SortExternal*);
void          Kino1_SortEx_sort_run(SortExternal*);
void          Kino1_SortEx_destroy(SortExternal*);

#endif /* include guard */

__C__

#include "KinoSearch1UtilSortExternal.h"

static SortExRun* Kino1_SortEx_new_run(double, double);
static void       Kino1_SortEx_grow_bufbuf(ByteBuf***, I32, I32);
static I32        Kino1_SortEx_refill_run(SortExternal*, SortExRun*);
static void       Kino1_SortEx_refill_cache(SortExternal*);
static void       Kino1_SortEx_merge_runs(SortExternal*);
static ByteBuf*   Kino1_SortEx_find_endpost(SortExternal*);
static I32        Kino1_SortEx_define_slice(SortExRun*, ByteBuf*);
static void       Kino1_SortEx_mergesort(ByteBuf**, ByteBuf**, I32);
static void       Kino1_SortEx_msort(ByteBuf**, ByteBuf**, U32, U32);
static void       Kino1_SortEx_merge(ByteBuf**, U32, ByteBuf**, U32, 
                                    ByteBuf**);
static void       Kino1_SortEx_clear_cache(SortExternal*);
static void       Kino1_SortEx_clear_run_cache(SortExRun*);
static void       Kino1_SortEx_destroy_run(SortExRun*);

#define KINO_PER_ITEM_OVERHEAD (sizeof(ByteBuf) + sizeof(ByteBuf*))

SortExternal*
Kino1_SortEx_new(SV *outstream_sv, SV *invindex_sv, SV *seg_name_sv, 
                I32 mem_threshold) {
    SortExternal *sortex;

    /* allocate */
    Kino1_New(0, sortex, 1, SortExternal);
    Kino1_New(0, sortex->cache, 100, ByteBuf*);
    Kino1_New(0, sortex->runs, 1, SortExRun*);

    /* init */
    sortex->scratch         = NULL;
    sortex->scratch_cap     = 0;
    sortex->cache_cap       = 100;
    sortex->cache_elems     = 0;
    sortex->cache_pos       = 0;
    sortex->cache_bytes     = 0;
    sortex->num_runs        = 0;
    sortex->instream_sv     = &PL_sv_undef;
    sortex->feed            = Kino1_SortEx_feed;
    sortex->fetch           = Kino1_SortEx_fetch_death;

    /* assign */
    sortex->outstream_sv  = newSVsv(outstream_sv);
    Kino1_extract_struct(outstream_sv, sortex->outstream,
        OutStream*, "KinoSearch1::Store::OutStream");
    sortex->invindex_sv   = newSVsv(invindex_sv);
    sortex->seg_name_sv   = newSVsv(seg_name_sv);
    sortex->mem_threshold = mem_threshold;
    
    /* derive */
    sortex->run_cache_limit = mem_threshold / 2;

    return sortex;
}


/* Create a new SortExRun object */
static SortExRun*
Kino1_SortEx_new_run(double start, double end) {
    SortExRun *run;
    
    /* allocate */
    Kino1_New(0, run, 1, SortExRun);
    Kino1_New(0, run->cache, 100, ByteBuf*);

    /* init */
    run->cache_cap   = 100;
    run->cache_elems = 0;
    run->cache_pos   = 0;

    /* assign */
    run->start    = start;
    run->file_pos = start;
    run->end      = end;

    return run;
}

void
Kino1_SortEx_feed(SortExternal* sortex, char* ptr, I32 len) {
    /* add room for more cache elements if needed */
    if (sortex->cache_elems == sortex->cache_cap) {
        /* add 100, plus 10% of the current capacity */
        sortex->cache_cap = sortex->cache_cap + 100 + (sortex->cache_cap / 8);
        Kino1_Renew(sortex->cache, sortex->cache_cap, ByteBuf*);
    }

    sortex->cache[ sortex->cache_elems ] = Kino1_BB_new_string(ptr, len);
    sortex->cache_elems++;
        
    /* track memory consumed */
    sortex->cache_bytes += KINO_PER_ITEM_OVERHEAD;
    sortex->cache_bytes += len + 1;

    /* check if it's time to flush the cache */
    if (sortex->cache_bytes >= sortex->mem_threshold)
        Kino1_SortEx_sort_run(sortex);
}

ByteBuf*
Kino1_SortEx_fetch(SortExternal *sortex) {
    if (sortex->cache_pos >= sortex->cache_elems)
        Kino1_SortEx_refill_cache(sortex);

    if (sortex->cache_elems > 0) {
        return sortex->cache[ sortex->cache_pos++ ];
    }
    else {
        return NULL;
    }
}

ByteBuf*
Kino1_SortEx_fetch_death(SortExternal *sortex) {
    ByteBuf *bb = NULL;
    Kino1_confess("can't call fetch before sort_all");
    return bb;
}

void
Kino1_SortEx_enable_fetch(SortExternal *sortex) {
    sortex->fetch = Kino1_SortEx_fetch;
}

/* Allocate more memory to an array of pointers to pointers to ByteBufs, if
 * the current allocation isn't sufficient.
 */
static void
Kino1_SortEx_grow_bufbuf(ByteBuf ***bb_buf, I32 current, I32 desired) {
    if (current < desired)
        Kino1_Renew(*bb_buf, desired, ByteBuf*);
}

/* Sort the main cache.
 */
void
Kino1_SortEx_sort_cache(SortExternal *sortex) {
    Kino1_SortEx_grow_bufbuf(&sortex->scratch, sortex->scratch_cap,
        sortex->cache_elems);
    Kino1_SortEx_mergesort(sortex->cache, sortex->scratch, 
        sortex->cache_elems);
}

void
Kino1_SortEx_sort_run(SortExternal *sortex) {
    OutStream  *outstream;
    ByteBuf   **cache, **cache_end;
    ByteBuf    *bb;
    double      start, end;

    /* bail if there's nothing in the cache */
    if (sortex->cache_bytes == 0)
        return;

    /* allocate space for a new run */
    sortex->num_runs++;
    Kino1_Renew(sortex->runs, sortex->num_runs, SortExRun*);

    /* make local copies */
    outstream = sortex->outstream;
    cache     = sortex->cache;

    /* mark start of run */
    start = outstream->tell(outstream);
    
    /* write sorted items to file */
    Kino1_SortEx_sort_cache(sortex);
    cache_end = cache + sortex->cache_elems;
    for (cache = sortex->cache; cache < cache_end; cache++) {
        bb = *cache;
        outstream->write_vint(outstream, bb->size);
        outstream->write_bytes(outstream, bb->ptr, bb->size);
    }

    /* clear the cache */
    Kino1_SortEx_clear_cache(sortex);

    /* mark end of run and build a new SortExRun object */
    end = outstream->tell(outstream);
    sortex->runs[ sortex->num_runs - 1 ] = Kino1_SortEx_new_run(start, end);

    /* recalculate the size allowed for each run's cache */
    sortex->run_cache_limit = (sortex->mem_threshold / 2) / sortex->num_runs;
    sortex->run_cache_limit = sortex->run_cache_limit < 65536
        ? 65536 
        : sortex->run_cache_limit;
}

/* Recover sorted items from disk, up to the allowable memory limit. 
 */
static I32 
Kino1_SortEx_refill_run(SortExternal* sortex, SortExRun *run) {
    InStream *instream;
    double    end;
    I32       run_cache_bytes = 0;
    int       num_elems       = 0; /* number of items recovered */
    I32       len;
    ByteBuf  *bb;
    I32       run_cache_limit;

    /* see if we actually need to refill */
    if (run->cache_elems - run->cache_pos)
        return run->cache_elems - run->cache_pos;
    else 
        Kino1_SortEx_clear_run_cache(run);

    /* make local copies */
    instream        = sortex->instream;
    run_cache_limit = sortex->run_cache_limit;
    end             = run->end;

    instream->seek(instream, run->file_pos);

    while (1) {
        /* bail if we've read everything in this run */
        if (instream->tell(instream) >= end) {
            /* make sure we haven't read too much */
            if (instream->tell(instream) > end) {
                UV pos = instream->tell(instream);
                Kino1_confess(
                    "read past end of run: %"UVuf", %"UVuf, pos, (UV)end );
            }
            break;
        }

        /* bail if we've hit the ceiling for this run's cache */
        if (run_cache_bytes > run_cache_limit)
            break;

        /* retrieve and decode len; allocate a ByteBuf and recover the string */
        len = instream->read_vint(instream);
        bb  = Kino1_BB_new(len);
        instream->read_bytes(instream, bb->ptr, len);
        bb->ptr[len] = '\0';

        /* add to the run's cache */
        if (num_elems == run->cache_cap) {
            run->cache_cap = run->cache_cap + 100 + (run->cache_cap / 8);
            Kino1_Renew(run->cache, run->cache_cap, ByteBuf*);
        }
        run->cache[ num_elems ] = bb;

        /* track how much we've read so far */
        num_elems++;
        run_cache_bytes += len + 1 + KINO_PER_ITEM_OVERHEAD;
    }

    /* reset the cache array position and length; remember file pos */
    run->cache_elems = num_elems;
    run->cache_pos   = 0;
    run->file_pos    = instream->tell(instream);

    return num_elems;
}

/* Refill the main cache, drawing from the caches of all runs.
 */
static void
Kino1_SortEx_refill_cache(SortExternal *sortex) {
    ByteBuf   *endpost;
    SortExRun *run;
    I32        i = 0;
    I32        total = 0;

    /* free all the existing ByteBufs, as they've been fetched by now */
    Kino1_SortEx_clear_cache(sortex);
    
    /* make sure all runs have at least one item in the cache */
    while (i < sortex->num_runs) {
        run = sortex->runs[i];
        if (   (run->cache_elems > run->cache_pos)
            || (Kino1_SortEx_refill_run(sortex, run)) 
        ) {
            i++;
        }
        else {
            /* discard empty runs */
            Kino1_SortEx_destroy_run(run);
            sortex->num_runs--;
            sortex->runs[i] = sortex->runs[ sortex->num_runs ];
            sortex->runs[ sortex->num_runs ] = NULL;
        }
    }

    if (!sortex->num_runs)
        return;

    /* move as many items as possible into the sorting cache */
    endpost = Kino1_SortEx_find_endpost(sortex);
    for (i = 0; i < sortex->num_runs; i++) {
        total += Kino1_SortEx_define_slice(sortex->runs[i], endpost);
    }

    /* make sure we have enough room in both the main cache and the scratch */
    Kino1_SortEx_grow_bufbuf(&sortex->cache,   sortex->cache_cap,   total);
    Kino1_SortEx_grow_bufbuf(&sortex->scratch, sortex->scratch_cap, total);

    Kino1_SortEx_merge_runs(sortex);
    sortex->cache_elems = total;
}

/* Merge all the items which are "in-range" from all the Runs into the main
 * cache.
 */
static void 
Kino1_SortEx_merge_runs(SortExternal *sortex) {
    SortExRun   *run;
    ByteBuf   ***slice_starts;
    ByteBuf    **cache = sortex->cache;
    I32         *slice_sizes;
    I32          i = 0, j = 0, slice_size = 0, num_slices = 0;

    Kino1_New(0, slice_starts, sortex->num_runs, ByteBuf**);
    Kino1_New(0, slice_sizes,  sortex->num_runs, I32);

    /* copy all the elements in range into the cache */
    j = 0;
    for (i = 0; i < sortex->num_runs; i++) {
        run = sortex->runs[i];
        slice_size      = run->slice_size;
        if (slice_size == 0)
            continue;

        slice_sizes[j]  = slice_size;
        slice_starts[j] = cache;
        Copy( (run->cache + run->cache_pos), cache, slice_size, ByteBuf* );
        
        run->cache_pos += slice_size;
        cache += slice_size;
        num_slices = ++j;
    }

    /* exploit previous sorting, rather than sort cache naively */
    while (num_slices > 1) {
        /* leave the first slice intact if the number of slices is odd */
        i = 0;
        j = 0;
        while (i < num_slices) {
            if (num_slices - i >= 2) {
                /* merge two consecutive slices */
                slice_size = slice_sizes[i] + slice_sizes[i+1];
                Kino1_SortEx_merge(slice_starts[i], slice_sizes[i],
                    slice_starts[i+1], slice_sizes[i+1], sortex->scratch);
                slice_sizes[j] = slice_size;
                slice_starts[j] = slice_starts[i];
                Copy(sortex->scratch, slice_starts[j], slice_size, ByteBuf*);

                i += 2;
                j += 1;
            }
            else if (num_slices - i >= 1) {
                /* move single slice pointer */
                slice_sizes[j]  = slice_sizes[i];
                slice_starts[j] = slice_starts[i];
                i += 1;
                j += 1;
            }
        }
        num_slices = j;
    }

    Kino1_Safefree(slice_starts);
    Kino1_Safefree(slice_sizes);
}

/* Return a pointer to the item in one of the runs' caches which is 
 * the highest in sort order, but which we can guarantee is lower in sort
 * order than any item which has yet to enter a run cache. 
 */
static ByteBuf*
Kino1_SortEx_find_endpost(SortExternal *sortex) {
    int         i;
    ByteBuf    *endpost = NULL, *candidate = NULL;
    SortExRun  *run;

    for (i = 0; i < sortex->num_runs; i++) {
        /* get a run and verify no errors */
        run = sortex->runs[i];
        if (run->cache_pos == run->cache_elems || run->cache_elems < 1)
            Kino1_confess("find_endpost encountered an empty run cache");

        /* get the last item in this run's cache */
        candidate = run->cache[ run->cache_elems - 1 ];

        /* if it's the first run, the item is automatically the new endpost */
        if (i == 0) {
            endpost = candidate;
            continue;
        }
        /* if it's less than the current endpost, it's the new endpost */
        else if (Kino1_BB_compare(candidate, endpost) < 0) {
            endpost = candidate;
        }
    }

    return endpost;
}

/* Record the number of items in the run's cache which are lexically
 * less than or equal to the endpost.
 */
static I32
Kino1_SortEx_define_slice(SortExRun *run, ByteBuf *endpost) {
    I32 lo, mid, hi, delta;
    ByteBuf **cache = run->cache;

    /* operate on a slice of the cache */
    lo  = run->cache_pos - 1;
    hi  = run->cache_elems;

    /* binary search */
    while (hi - lo > 1) {
        mid = (lo + hi) / 2;
        delta = Kino1_BB_compare(cache[mid], endpost);
        if (delta > 0) 
            hi = mid;
        else
            lo = mid;
    }

    run->slice_size = lo == -1 
        ? 0 
        : (lo - run->cache_pos) + 1;
    return run->slice_size;
}

/* Standard merge sort.
 */
static void
Kino1_SortEx_mergesort(ByteBuf **bufbuf, ByteBuf **scratch, I32 buf_size) {
    if (buf_size == 0)
        return;
    Kino1_SortEx_msort(bufbuf, scratch, 0, buf_size - 1);
}

/* Standard merge sort msort function.
 */
static void
Kino1_SortEx_msort(ByteBuf **bufbuf, ByteBuf **scratch, U32 left, U32 right) {
    I32 mid;

    if (right > left) {
        mid = ( (right+left)/2 ) + 1;
        Kino1_SortEx_msort(bufbuf, scratch, left, mid - 1);
        Kino1_SortEx_msort(bufbuf, scratch, mid,  right);
        Kino1_SortEx_merge( (bufbuf + left),  (mid - left), 
                           (bufbuf + mid), (right - mid + 1), scratch);
        Copy( scratch, (bufbuf + left), (right - left + 1), ByteBuf* );
    }
}

/* Standard mergesort merge function.  This variant is capable of merging two
 * discontiguous source arrays.  Copying elements back into the source is left
 * for the caller.
 */
static void
Kino1_SortEx_merge(ByteBuf **left_ptr,  U32 left_size,
                  ByteBuf **right_ptr, U32 right_size,
                  ByteBuf **dest) {
    ByteBuf **left_boundary, **right_boundary;

    left_boundary  = left_ptr  + left_size;
    right_boundary = right_ptr + right_size;

    while (left_ptr < left_boundary && right_ptr < right_boundary) {
        if (Kino1_BB_compare(*left_ptr, *right_ptr) < 1) {
            *dest++ = *left_ptr++;
        }
        else {
            *dest++ = *right_ptr++;
        }
    }
    while (left_ptr < left_boundary) {
        *dest++ = *left_ptr++;
    }
    while (right_ptr < right_boundary) {
        *dest++ = *right_ptr++;
    }
}

static void
Kino1_SortEx_clear_cache(SortExternal *sortex) {
    ByteBuf **cache, **cache_end;
    cache_end = sortex->cache + sortex->cache_elems;
    /* only blow away items that haven't been released */
    for (cache = sortex->cache + sortex->cache_pos; 
         cache < cache_end; cache++
    ) {
        Kino1_BB_destroy(*cache);
    }
    sortex->cache_bytes = 0;
    sortex->cache_elems = 0;
    sortex->cache_pos   = 0;
}

static void
Kino1_SortEx_clear_run_cache(SortExRun *run) {
    ByteBuf **cache, **cache_end;
    cache_end = run->cache + run->cache_elems;
    /* only destroy items which haven't been passed to the main cache */
    for (cache = run->cache + run->cache_pos; cache < cache_end; cache++) {
        Kino1_BB_destroy(*cache);
    }
    run->cache_elems = 0;
    run->cache_pos   = 0;
}

void
Kino1_SortEx_destroy(SortExternal *sortex) {
    I32 i;
    
    /* delegate to Perl garbage collector */
    SvREFCNT_dec(sortex->outstream_sv);
    SvREFCNT_dec(sortex->instream_sv);
    SvREFCNT_dec(sortex->invindex_sv);
    SvREFCNT_dec(sortex->seg_name_sv);

    /* free the cache and the scratch */
    Kino1_SortEx_clear_cache(sortex);
    Kino1_Safefree(sortex->cache);
    Kino1_Safefree(sortex->scratch);

    /* free all of the runs and the array that held them */
    for (i = 0; i < sortex->num_runs; i++) {
        Kino1_SortEx_destroy_run(sortex->runs[i]);
    }
    Kino1_Safefree(sortex->runs);

    /* free me */
    Kino1_Safefree(sortex);
}

static void
Kino1_SortEx_destroy_run(SortExRun *run) {
    Kino1_SortEx_clear_run_cache(run);
    Kino1_Safefree(run->cache);
    Kino1_Safefree(run);
}

__POD__

=begin devdocs

=head1 NAME

KinoSearch1::Util::SortExternal - external sorting

=head1 DESCRIPTION

External sorting implementation, using lexical comparison.

=head1 COPYRIGHT

Copyright 2005-2010 Marvin Humphrey

=head1 LICENSE, DISCLAIMER, BUGS, etc.

See L<KinoSearch1> version 1.00.

=end devdocs
=cut