package KinoSearch::Index::PostingsWriter;
use strict;
use warnings;
use KinoSearch::Util::ToolSet;
use base qw( KinoSearch::Util::Class );

use File::Temp qw();
use File::Spec;
use KinoSearch::Index::TermInfo;
use KinoSearch::Index::TermInfosWriter;
use KinoSearch::Util::SortExternal;

our %instance_vars = (
    # constructor params / members
    invindex => undef,
    seg_name => undef,

    # members
    sort_pool => undef,
);

sub init_instance {
    my $self = shift;

    # create a SortExternal object which autosorts the posting list cache
    $self->{sort_pool} = KinoSearch::Util::SortExternal->new(
        invindex => $self->{invindex},
        seg_name => $self->{seg_name},
    );
}

# Add all the postings in an inverted document to the sort pool.
sub add_postings {
    my ( $self, $postings_array ) = @_;
    $self->{sort_pool}->feed(@$postings_array);
}

# Bulk add all the postings in a segment to the sort pool.
sub add_segment {
    my ( $self, $seg_reader, $doc_map ) = @_;
    my $term_enum = $seg_reader->terms;
    my $term_docs = $seg_reader->term_docs;
    $term_docs->set_read_positions(1);
    _add_segment( $self->{sort_pool}, $term_enum, $term_docs, $doc_map );
}

=for comment

Process all the postings in the sort pool.  Generate the freqs and positions
files.  Hand off data to TermInfosWriter for the generating the term
dictionaries.

=cut

sub write_postings {
    my $self = shift;
    my ( $invindex, $seg_name ) = @{$self}{ 'invindex', 'seg_name' };

    $self->{sort_pool}->sort_all;

    my $tinfos_writer = KinoSearch::Index::TermInfosWriter->new(
        invindex => $invindex,
        seg_name => $seg_name,
    );
    my $frq_out = $invindex->open_outstream("$seg_name.frq");
    my $prx_out = $invindex->open_outstream("$seg_name.prx");

    _write_postings( $self->{sort_pool}, $tinfos_writer, $frq_out, $prx_out );

    $frq_out->close;
    $prx_out->close;
    $tinfos_writer->finish;
}

sub finish { }

1;

__END__
__XS__

MODULE = KinoSearch    PACKAGE = KinoSearch::Index::PostingsWriter      

void
_write_postings (sort_pool, tinfos_writer, frq_out, prx_out)
    SortExternal    *sort_pool;
    TermInfosWriter *tinfos_writer;
    OutStream       *frq_out;
    OutStream       *prx_out;
PPCODE:
    Kino_PostWriter_write_postings(sort_pool, tinfos_writer, frq_out,
        prx_out);

void
_add_segment(sort_pool, term_enum, term_docs, doc_map_ref)
    SortExternal  *sort_pool;
    SegTermEnum  *term_enum;
    TermDocs *term_docs;
    SV  *doc_map_ref;
PPCODE:
    Kino_PostWriter_add_segment(sort_pool, term_enum, term_docs, 
        doc_map_ref);

__H__

#ifndef H_KINOSEARCH_INDEX_POSTINGS_WRITER
#define H_KINOSEARCH_INDEX_POSTINGS_WRITER 1

#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"
#include "KinoSearchIndexSegTermEnum.h"
#include "KinoSearchIndexTerm.h"
#include "KinoSearchIndexTermDocs.h"
#include "KinoSearchIndexTermInfosWriter.h"
#include "KinoSearchStoreOutStream.h"
#include "KinoSearchUtilSortExternal.h"
#include "KinoSearchUtilStringHelper.h"

void Kino_PostWriter_write_postings(SortExternal*, TermInfosWriter*, 
                                    OutStream*, OutStream*);
void Kino_PostWriter_add_segment(SortExternal*, SegTermEnum*, TermDocs*, SV*);

#endif /* include guard */

__C__

#include "KinoSearchIndexPostingsWriter.h"

static void Kino_PostWriter_deserialize(SV*, SV*, U32*, U32*);
static void Kino_PostWriter_write_positions(OutStream*, SV*);

void
Kino_PostWriter_write_postings(SortExternal *sort_pool,
                               TermInfosWriter *tinfos_writer, 
                               OutStream *frq_out, OutStream *prx_out) {
    SV        *termstring_sv, *last_termstring_sv, *posting_sv, *scratch_sv;
    TermInfo  *tinfo;
    U32        doc_num;
    U32        freq;
    U32        last_doc_num      = 0;
    U32        last_skip_doc     = 0;
    double     frq_ptr, prx_ptr;
    double     last_skip_frq_ptr = 0.0;
    double     last_skip_prx_ptr = 0.0;
    I32        iter              = 0;
    I32        i, num_returned;
    AV        *skip_data_av;
    SV        *skip_sv;

    dSP;
    
    last_termstring_sv = newSVpvn("\0\0", 2);
    termstring_sv      = newSV(0);
    posting_sv         = newSV(0);
    tinfo              = Kino_TInfo_new();
    skip_data_av       = newAV();
    skip_sv            = &PL_sv_undef;

    /* each loop is one field, one term, one doc_num, many positions */
    while (1) {
        /* retrieve the next posting from the sort pool */
        scratch_sv = sort_pool->fetch(sort_pool);
        SvSetSV(posting_sv, scratch_sv);
        SvREFCNT_dec(scratch_sv);

        /* SortExternal returns undef when exhausted */
        if ( !SvOK(posting_sv) ) {
            goto FINAL_ITER;
        }

        /* each iter, add a doc to the doc_freq for a given term */
        iter++;
        tinfo->doc_freq++;    /* lags by 1 iter */

        /* Break up the serialized posting into its parts.
         * posting_sv gets whittled down until it is only the positions string.
         */
        Kino_PostWriter_deserialize(posting_sv, termstring_sv, 
            &doc_num, &freq);

        /* on the first iter, prime the "heldover" variables */
        if (iter == 1) {
            SvSetSV(last_termstring_sv, termstring_sv);
            tinfo->doc_freq      = 0;
            tinfo->frq_fileptr   = frq_out->tell(frq_out);
            tinfo->prx_fileptr   = prx_out->tell(prx_out);
            tinfo->skip_offset   = frq_out->tell(frq_out);
            tinfo->index_fileptr = 0;
        }
        else if ( iter == -1 ) { /* never true; can only get here via goto */
            /* prepare to clear out buffers and exit loop */
            FINAL_ITER: {
                iter = -1;
                sv_setpvn(termstring_sv, "\0\0", 2);
                tinfo->doc_freq++;
            }
        }

        /* create skipdata (unused by KinoSearch at present) */
        if ( (tinfo->doc_freq + 1) % tinfos_writer->skip_interval == 0 ) {
            frq_ptr = frq_out->tell(frq_out);
            prx_ptr = prx_out->tell(prx_out);

            av_push(skip_data_av, newSViv(last_doc_num - last_skip_doc    ));
            av_push(skip_data_av, newSViv(frq_ptr      - last_skip_frq_ptr));
            av_push(skip_data_av, newSViv(prx_ptr      - last_skip_prx_ptr));

            last_skip_doc     = last_doc_num;
            last_skip_frq_ptr = frq_ptr;
            last_skip_prx_ptr = prx_ptr;
        }

        /* if either the term or fieldnum changes, process the last term */
        if ( Kino_StrHelp_compare_svs(termstring_sv, last_termstring_sv) ) {
            /* take note of where we are for the term dictionary */
            frq_ptr = frq_out->tell(frq_out);
            prx_ptr = prx_out->tell(prx_out);

            /* write skipdata if there is any */
            if (av_len(skip_data_av) != -1) {
                /* kludge to compensate for doc_freq's 1-iter lag */
                if (
                    (tinfo->doc_freq + 1) % tinfos_writer->skip_interval == 0 
                ) {
                    /* remove 1 cycle of skip data */
                    for (i = 3; i > 0; i--) {
                        skip_sv = av_pop(skip_data_av);
                        SvREFCNT_dec(skip_sv);
                    }
                }
                if (av_len(skip_data_av) != -1) {
                    /* tell tinfos_writer about the non-zero skip amount */
                    tinfo->skip_offset = frq_ptr - tinfo->frq_fileptr;

                    /* write out the skip data */
                    i = av_len(skip_data_av);
                    while (i-- > -1) {
                        skip_sv = av_shift(skip_data_av);
                        frq_out->write_vint(frq_out, SvIV(skip_sv) );
                        SvREFCNT_dec(skip_sv);
                    }

                    /* update the filepointer for the file we just wrote to */
                    frq_ptr = frq_out->tell(frq_out);
                }
            }

            /* init skip data in preparation for the next term */
            last_skip_doc     = 0;
            last_skip_frq_ptr = frq_ptr;
            last_skip_prx_ptr = prx_ptr;

            /* hand off to TermInfosWriter */
            Kino_TInfosWriter_add(tinfos_writer, last_termstring_sv, tinfo);

            /* start each term afresh */
            tinfo->doc_freq      = 0;
            tinfo->frq_fileptr   = frq_ptr;
            tinfo->prx_fileptr   = prx_ptr;
            tinfo->skip_offset   = 0;
            tinfo->index_fileptr = 0;

            /* remember the termstring so we can write string diffs */
            SvSetSV(last_termstring_sv, termstring_sv);

            last_doc_num    = 0;
        }

        /* break out of loop on last iter before writing invalid data */
        if (iter == -1) {
            Kino_TInfo_destroy(tinfo);
            SvREFCNT_dec(posting_sv);
            SvREFCNT_dec(termstring_sv);
            SvREFCNT_dec(last_termstring_sv);
            SvREFCNT_dec( (SV*)skip_data_av );
            return;
        }

        /*  write positions data */
        Kino_PostWriter_write_positions(prx_out, posting_sv);

        /* write freq data */
        /* doc_code is delta doc_num, shifted left by 1. */
        if (freq == 1) {
            U32 doc_code = (doc_num - last_doc_num) << 1;
            /* set low bit of doc_code to 1 to indicate freq of 1 */
            doc_code += 1;
            frq_out->write_vint(frq_out, doc_code);
        }
        else {
            U32 doc_code = (doc_num - last_doc_num) << 1;
            /* leave low bit of doc_code at 0, record explicit freq */
            frq_out->write_vint(frq_out, doc_code);
            frq_out->write_vint(frq_out, freq);
        }

        /* remember last doc num because we need it for delta encoding */
        last_doc_num = doc_num;
    }
}

/* Pull apart a serialized posting into its component parts */

#define DOC_NUM_LEN 4
#define TEXT_LEN_LEN 2
#define NULL_BYTE_LEN 1 

void
Kino_PostWriter_add_segment(SortExternal *sort_pool, SegTermEnum* term_enum, 
                            TermDocs *term_docs, SV *doc_map_ref) {
    I32        *doc_map;
    I32         doc_num, max_doc;
    char        doc_num_buf[4];
    char        text_len_buf[4];
    SV         *posting_sv, *positions_sv, *doc_map_sv;
    TermBuffer *term_buf;
    STRLEN      len, common_len, positions_len;

    dSP;

    doc_map_sv = SvRV(doc_map_ref);
    doc_map    = (I32*)SvPV(doc_map_sv, len);
    max_doc = len / sizeof(I32);
    posting_sv = newSV(0);
    term_buf   = term_enum->term_buf;

    while (Kino_SegTermEnum_next(term_enum)) {
        /* start with the termstring and the null byte */
        Kino_encode_bigend_U16(term_buf->text_len, text_len_buf);
        common_len = term_buf->text_len + KINO_FIELD_NUM_LEN;
        sv_setpvn(posting_sv, term_buf->termstring, common_len);
        sv_catpvn(posting_sv, "\0", NULL_BYTE_LEN);
        common_len += NULL_BYTE_LEN;

        term_docs->seek_tinfo(term_docs, term_enum->tinfo);
        while (term_docs->next(term_docs)) {
            SvCUR_set(posting_sv, common_len);

            /* concat the remapped doc number */
            doc_num = term_docs->get_doc(term_docs);
            if (doc_num == -1)
                continue;
            if (doc_num > max_doc) 
                Kino_confess("doc_num > max_doc: %d %d", doc_num, max_doc);
            doc_num = doc_map[doc_num];
            Kino_encode_bigend_U32(doc_num, doc_num_buf);
            sv_catpvn(posting_sv, doc_num_buf, DOC_NUM_LEN); 

            /* concat the positions */
            positions_sv = term_docs->get_positions(term_docs);
            sv_catsv(posting_sv, positions_sv);

            /* concat the term_length */
            sv_catpvn(posting_sv, text_len_buf, TEXT_LEN_LEN);

            /* add the posting to the sortpool */
            sort_pool->feed(sort_pool, posting_sv);
        }
    }
}

static void 
Kino_PostWriter_deserialize(SV *posting_sv, SV *termstring_sv, 
                            U32 *doc_num_ptr, U32 *freq_ptr) {
    STRLEN   posting_len, termstring_len;
    char    *posting_str, *termstring_len_ptr;

    /* extract pointer from serialized posting */
    posting_str = SvPV(posting_sv, posting_len);

    /* extract termstring_len, decoding packed 'n', assign termstring */
    termstring_len_ptr = posting_str + posting_len - TEXT_LEN_LEN;
    termstring_len     
        = Kino_decode_bigend_U16(termstring_len_ptr) + KINO_FIELD_NUM_LEN;
    sv_setpvn(termstring_sv, posting_str, termstring_len);

    /* extract and assign doc_num, decoding packed 'N' */
    posting_str  += termstring_len + NULL_BYTE_LEN;
    *doc_num_ptr  = Kino_decode_bigend_U32(posting_str);
    posting_str  += DOC_NUM_LEN;

    /* whack encoded termstring_len off the end of the posting */
    posting_len -= TEXT_LEN_LEN; 
    SvCUR_set(posting_sv, posting_len);
    
    /* whack field_num/term text off the front, leaving only the positions */
    sv_chop(posting_sv, posting_str);

    /* calculate freq by counting the number of positions, assign */
    *freq_ptr = (posting_len - termstring_len - DOC_NUM_LEN) / 4;
}

/* Write out the positions data using delta encoding.
 */
static void
Kino_PostWriter_write_positions(OutStream* prx_outstream, SV* positions_sv) {
    STRLEN   positions_len;
    char    *positions;
    U32     *current_pos_ptr;
    U32     *end;
    U32      last_pos;
    U32      pos_delta;

    positions = SvPV(positions_sv, positions_len);

    /* extract 32 bit unsigned integers from positions_sv.  */
    current_pos_ptr = (U32*)positions;
    end             = current_pos_ptr + (positions_len / 4);
    last_pos        = 0;
    while (current_pos_ptr < end) {
        /* get delta and write out as VInt */
        pos_delta = *current_pos_ptr - last_pos;
        prx_outstream->write_vint(prx_outstream, pos_delta);

        /* advance pointers */
        last_pos = *current_pos_ptr;
        current_pos_ptr++;
    }
}

__POD__

=begin devdocs

=head1 NAME

KinoSearch::Index::PostingsWriter - write postings data to an invindex

=head1 DESCRIPTION

PostingsWriter creates posting lists.  It writes the frequency and and
positional data files, plus feeds data to TermInfosWriter.

=head1 COPYRIGHT

Copyright 2005-2006 Marvin Humphrey

=head1 LICENSE, DISCLAIMER, BUGS, etc.

See L<KinoSearch|KinoSearch> version 0.09.

=end devdocs
=cut