package KinoSearch::Index::PostingsWriter;
use strict;
use warnings;
use KinoSearch::Util::ToolSet;
use base qw( KinoSearch::Util::Class );

BEGIN {
    __PACKAGE__->init_instance_vars(
        #constructor params / members
        invindex => undef,
        seg_name => undef,

        # members
        sort_pool => undef,
    );
}

use KinoSearch::Index::TermInfo;
use KinoSearch::Index::TermInfosWriter;
use KinoSearch::Util::SortExternal;

sub init_instance {
    my $self = shift;

    # create a SortExternal object which autosorts the posting list cache
    $self->{sort_pool} = KinoSearch::Util::SortExternal->new(
        invindex => $self->{invindex},
        seg_name => $self->{seg_name},
    );
}

# Add all the postings in an inverted document to the sort pool.
sub add_postings {
    my ( $self, $postings_array ) = @_;
    $self->{sort_pool}->feed(@$postings_array);
}

# Bulk add all the postings in a segment to the sort pool.
sub add_segment {
    my ( $self, $seg_reader, $doc_map ) = @_;
    my $term_enum = $seg_reader->terms;
    my $term_docs = $seg_reader->term_docs;
    $term_docs->set_read_positions(1);
    _add_segment( $self->{sort_pool}, $term_enum, $term_docs, $doc_map );
}

=for comment

Process all the postings in the sort pool.  Generate the freqs and positions
files.  Hand off data to TermInfosWriter for the generating the term
dictionaries.

=cut

sub write_postings {
    my $self = shift;
    my ( $invindex, $seg_name ) = @{$self}{ 'invindex', 'seg_name' };

    $self->{sort_pool}->sort_all;

    my $tinfos_writer = KinoSearch::Index::TermInfosWriter->new(
        invindex => $invindex,
        seg_name => $seg_name,
    );
    my $frq_out = $invindex->open_outstream("$seg_name.frq");
    my $prx_out = $invindex->open_outstream("$seg_name.prx");

    _write_postings( $self->{sort_pool}, $tinfos_writer, $frq_out, $prx_out );

    $frq_out->close;
    $prx_out->close;
    $tinfos_writer->finish;
}

sub finish {
    my $self = shift;
    $self->{sort_pool}->close;
}

1;

__END__
__XS__

MODULE = KinoSearch    PACKAGE = KinoSearch::Index::PostingsWriter      

void
_write_postings (sort_pool, tinfos_writer, frq_out, prx_out)
    SortExternal    *sort_pool;
    TermInfosWriter *tinfos_writer;
    OutStream       *frq_out;
    OutStream       *prx_out;
PPCODE:
    Kino_PostWriter_write_postings(sort_pool, tinfos_writer, frq_out,
        prx_out);

void
_add_segment(sort_pool, term_enum, term_docs, doc_map_ref)
    SortExternal  *sort_pool;
    SegTermEnum  *term_enum;
    TermDocs *term_docs;
    SV  *doc_map_ref;
PPCODE:
    Kino_PostWriter_add_segment(sort_pool, term_enum, term_docs, 
        doc_map_ref);

__H__

#ifndef H_KINOSEARCH_INDEX_POSTINGS_WRITER
#define H_KINOSEARCH_INDEX_POSTINGS_WRITER 1

#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"
#include "KinoSearchIndexSegTermEnum.h"
#include "KinoSearchIndexTerm.h"
#include "KinoSearchIndexTermDocs.h"
#include "KinoSearchIndexTermInfosWriter.h"
#include "KinoSearchStoreOutStream.h"
#include "KinoSearchUtilByteBuf.h"
#include "KinoSearchUtilSortExternal.h"

void Kino_PostWriter_write_postings(SortExternal*, TermInfosWriter*, 
                                    OutStream*, OutStream*);
void Kino_PostWriter_add_segment(SortExternal*, SegTermEnum*, TermDocs*, SV*);

#endif /* include guard */

__C__

#include "KinoSearchIndexPostingsWriter.h"

static void Kino_PostWriter_deserialize(ByteBuf*, ByteBuf*, ByteBuf*, 
                                        U32*, U32*);
static void Kino_PostWriter_write_positions(OutStream*, ByteBuf*);

void
Kino_PostWriter_write_postings(SortExternal *sort_pool,
                               TermInfosWriter *tinfos_writer, 
                               OutStream *frq_out, OutStream *prx_out) {
    ByteBuf   *posting           = NULL;
    ByteBuf   *positions, *termstring, *last_termstring;
    TermInfo  *tinfo;
    U32        doc_num           = 0;
    U32        freq              = 0;
    U32        last_doc_num      = 0;
    U32        last_skip_doc     = 0;
    double     frq_ptr, prx_ptr;
    double     last_skip_frq_ptr = 0.0;
    double     last_skip_prx_ptr = 0.0;
    I32        iter              = 0;
    I32        i;
    AV        *skip_data_av;
    SV        *skip_sv;

    posting         = Kino_BB_new_string("", 0);
    last_termstring = Kino_BB_new_string("\0\0", 2);
    termstring      = Kino_BB_new_view(NULL, 0);
    positions       = Kino_BB_new_view(NULL, 0);
    tinfo           = Kino_TInfo_new();
    skip_data_av    = newAV();
    skip_sv         = &PL_sv_undef;

    /* each loop is one field, one term, one doc_num, many positions */
    while (1) {
        /* retrieve the next posting from the sort pool */
        Kino_BB_destroy(posting);
        posting = sort_pool->fetch(sort_pool);

        /* SortExternal returns NULL when exhausted */
        if (posting == NULL) {
            goto FINAL_ITER;
        }

        /* each iter, add a doc to the doc_freq for a given term */
        iter++;
        tinfo->doc_freq++;    /* lags by 1 iter */

        /* break up the serialized posting into its parts */
        Kino_PostWriter_deserialize(posting, termstring, positions, 
            &doc_num, &freq);

        /* on the first iter, prime the "heldover" variables */
        if (iter == 1) {
            Kino_BB_assign_string(last_termstring, termstring->ptr,
                termstring->size);
            tinfo->doc_freq      = 0;
            tinfo->frq_fileptr   = frq_out->tell(frq_out);
            tinfo->prx_fileptr   = prx_out->tell(prx_out);
            tinfo->skip_offset   = frq_out->tell(frq_out);
            tinfo->index_fileptr = 0;
        }
        else if ( iter == -1 ) { /* never true; can only get here via goto */
            /* prepare to clear out buffers and exit loop */
            FINAL_ITER: {
                iter = -1;
                Kino_BB_destroy(termstring);
                termstring = Kino_BB_new_string("\0\0", 2);
                tinfo->doc_freq++;
            }
        }

        /* create skipdata (unused by KinoSearch at present) */
        if ( (tinfo->doc_freq + 1) % tinfos_writer->skip_interval == 0 ) {
            frq_ptr = frq_out->tell(frq_out);
            prx_ptr = prx_out->tell(prx_out);

            av_push(skip_data_av, newSViv(last_doc_num - last_skip_doc    ));
            av_push(skip_data_av, newSViv(frq_ptr      - last_skip_frq_ptr));
            av_push(skip_data_av, newSViv(prx_ptr      - last_skip_prx_ptr));

            last_skip_doc     = last_doc_num;
            last_skip_frq_ptr = frq_ptr;
            last_skip_prx_ptr = prx_ptr;
        }

        /* if either the term or fieldnum changes, process the last term */
        if ( Kino_BB_compare(termstring, last_termstring) ) {
            /* take note of where we are for the term dictionary */
            frq_ptr = frq_out->tell(frq_out);
            prx_ptr = prx_out->tell(prx_out);

            /* write skipdata if there is any */
            if (av_len(skip_data_av) != -1) {
                /* kludge to compensate for doc_freq's 1-iter lag */
                if (
                    (tinfo->doc_freq + 1) % tinfos_writer->skip_interval == 0 
                ) {
                    /* remove 1 cycle of skip data */
                    for (i = 3; i > 0; i--) {
                        skip_sv = av_pop(skip_data_av);
                        SvREFCNT_dec(skip_sv);
                    }
                }
                if (av_len(skip_data_av) != -1) {
                    /* tell tinfos_writer about the non-zero skip amount */
                    tinfo->skip_offset = frq_ptr - tinfo->frq_fileptr;

                    /* write out the skip data */
                    i = av_len(skip_data_av);
                    while (i-- > -1) {
                        skip_sv = av_shift(skip_data_av);
                        frq_out->write_vint(frq_out, SvIV(skip_sv) );
                        SvREFCNT_dec(skip_sv);
                    }

                    /* update the filepointer for the file we just wrote to */
                    frq_ptr = frq_out->tell(frq_out);
                }
            }

            /* init skip data in preparation for the next term */
            last_skip_doc     = 0;
            last_skip_frq_ptr = frq_ptr;
            last_skip_prx_ptr = prx_ptr;

            /* hand off to TermInfosWriter */
            Kino_TInfosWriter_add(tinfos_writer, last_termstring, tinfo);

            /* start each term afresh */
            tinfo->doc_freq      = 0;
            tinfo->frq_fileptr   = frq_ptr;
            tinfo->prx_fileptr   = prx_ptr;
            tinfo->skip_offset   = 0;
            tinfo->index_fileptr = 0;

            /* remember the termstring so we can write string diffs */
            Kino_BB_assign_string(last_termstring, termstring->ptr,
                termstring->size);

            last_doc_num    = 0;
        }

        /* break out of loop on last iter before writing invalid data */
        if (iter == -1) {
            Kino_TInfo_destroy(tinfo);
            Kino_BB_destroy(termstring);
            Kino_BB_destroy(last_termstring);
            Kino_BB_destroy(positions);
            Kino_BB_destroy(posting);
            SvREFCNT_dec( (SV*)skip_data_av );
            return;
        }

        /*  write positions data */
        Kino_PostWriter_write_positions(prx_out, positions);

        /* write freq data */
        /* doc_code is delta doc_num, shifted left by 1. */
        if (freq == 1) {
            U32 doc_code = (doc_num - last_doc_num) << 1;
            /* set low bit of doc_code to 1 to indicate freq of 1 */
            doc_code += 1;
            frq_out->write_vint(frq_out, doc_code);
        }
        else {
            U32 doc_code = (doc_num - last_doc_num) << 1;
            /* leave low bit of doc_code at 0, record explicit freq */
            frq_out->write_vint(frq_out, doc_code);
            frq_out->write_vint(frq_out, freq);
        }

        /* remember last doc num because we need it for delta encoding */
        last_doc_num = doc_num;
    }
}

/* Pull apart a serialized posting into its component parts */

#define DOC_NUM_LEN 4
#define TEXT_LEN_LEN 2
#define NULL_BYTE_LEN 1 

void
Kino_PostWriter_add_segment(SortExternal *sort_pool, SegTermEnum* term_enum, 
                            TermDocs *term_docs, SV *doc_map_ref) {
    I32        *doc_map;
    I32         doc_num, max_doc;
    char        doc_num_buf[4];
    char        text_len_buf[4];
    SV         *positions_sv, *doc_map_sv;
    ByteBuf    *posting;
    TermBuffer *term_buf;
    char       *positions_ptr;
    STRLEN      len, common_len, positions_len;

    /* extract the doc number remapping array */
    doc_map_sv = SvRV(doc_map_ref);
    doc_map    = (I32*)SvPV(doc_map_sv, len);
    max_doc    = len / sizeof(I32);

    term_buf   = term_enum->term_buf;
    posting    = Kino_BB_new_string("", 0);

    while (Kino_SegTermEnum_next(term_enum)) {
        /* start with the termstring and the null byte */
        Kino_encode_bigend_U16(term_buf->text_len, text_len_buf);
        common_len = term_buf->text_len + KINO_FIELD_NUM_LEN;
        Kino_BB_assign_string(posting, term_buf->termstring->ptr, common_len);
        Kino_BB_cat_string(posting, "\0", NULL_BYTE_LEN);
        common_len += NULL_BYTE_LEN;

        term_docs->seek_tinfo(term_docs, term_enum->tinfo);
        while (term_docs->next(term_docs)) {
            posting->size = common_len; /* can't ever be gt posting->cap */

            /* concat the remapped doc number */
            doc_num = term_docs->get_doc(term_docs);
            if (doc_num == -1)
                continue;
            if (doc_num > max_doc) 
                Kino_confess("doc_num > max_doc: %d %d", doc_num, max_doc);
            doc_num = doc_map[doc_num];
            Kino_encode_bigend_U32(doc_num, doc_num_buf);
            Kino_BB_cat_string(posting, doc_num_buf, DOC_NUM_LEN); 

            /* concat the positions */
            positions_sv = term_docs->get_positions(term_docs);
            positions_ptr = SvPV(positions_sv, positions_len);
            Kino_BB_cat_string(posting, positions_ptr, positions_len);

            /* concat the term_length */
            Kino_BB_cat_string(posting, text_len_buf, TEXT_LEN_LEN);

            /* add the posting to the sortpool */
            sort_pool->feed(sort_pool, posting->ptr, posting->size);
        }
    }
    Kino_BB_destroy(posting);
}

static void 
Kino_PostWriter_deserialize(ByteBuf *posting, ByteBuf *termstring, 
                            ByteBuf *positions,
                            U32 *doc_num_ptr, U32 *freq_ptr) {
    char    *ptr;
    STRLEN   len;

    /* extract termstring_len, decoding packed 'n', assign termstring */
    ptr = posting->ptr + posting->size - TEXT_LEN_LEN;
    termstring->size = Kino_decode_bigend_U16(ptr) + KINO_FIELD_NUM_LEN;
    Kino_BB_assign_view(termstring, posting->ptr, termstring->size);

    /* extract and assign doc_num, decoding packed 'N' */
    ptr = posting->ptr + termstring->size + NULL_BYTE_LEN;
    *doc_num_ptr  = Kino_decode_bigend_U32(ptr);

    /* make positions ByteBuf a view of the positional data in the posting */
    ptr = posting->ptr + termstring->size + NULL_BYTE_LEN + DOC_NUM_LEN;
    len = posting->size 
            - termstring->size 
            - NULL_BYTE_LEN 
            - DOC_NUM_LEN 
            - TEXT_LEN_LEN;
    Kino_BB_assign_view(positions, ptr, len);
    
    /* calculate freq by counting the number of positions, assign */
    *freq_ptr = len / 4;
}

/* Write out the positions data using delta encoding.
 */
static void
Kino_PostWriter_write_positions(OutStream *prx_out, ByteBuf *positions) {
    U32     *current_pos_ptr, *end;
    U32      last_pos;
    U32      pos_delta;

    /* extract 32 bit unsigned integers from positions_sv.  */
    current_pos_ptr = (U32*)positions->ptr;
    end             = current_pos_ptr + (positions->size / 4);
    last_pos        = 0;
    while (current_pos_ptr < end) {
        /* get delta and write out as VInt */
        pos_delta = *current_pos_ptr - last_pos;
        prx_out->write_vint(prx_out, pos_delta);

        /* advance pointers */
        last_pos = *current_pos_ptr;
        current_pos_ptr++;
    }
}

__POD__

=begin devdocs

=head1 NAME

KinoSearch::Index::PostingsWriter - write postings data to an invindex

=head1 DESCRIPTION

PostingsWriter creates posting lists.  It writes the frequency and and
positional data files, plus feeds data to TermInfosWriter.

=head1 COPYRIGHT

Copyright 2005-2006 Marvin Humphrey

=head1 LICENSE, DISCLAIMER, BUGS, etc.

See L<KinoSearch|KinoSearch> version 0.12.

=end devdocs
=cut