package KinoSearch::Index::PostingsWriter;
use strict;
use warnings;
use KinoSearch::Util::ToolSet;
use base qw( KinoSearch::Util::Class );

use constant FORMAT         => -2;
use constant INDEX_INTERVAL => 128;
use constant SKIP_INTERVAL  => 16;

use Sort::External;
use File::Temp qw();
use File::Spec;
use KinoSearch::Index::TermInfo;
use KinoSearch::Index::TermInfosWriter;

our %instance_vars = (
    # constructor params / members
    invindex => undef,
    seg_name => undef,

    # members
    temp_dir       => undef,
    postings_cache => undef,
);

sub init_instance {
    my $self = shift;

    # create a temp directory
    my $working_dir =
        defined $self->{invindex}->{path}
        ? $self->{invindex}->{path}
        : File::Spec->tmpdir;
    $self->{temp_dir} = File::Temp::tempdir(
        "kinotemp_XXXXXX",
        DIR     => $working_dir,
        CLEANUP => 1,
    );

    # create a Sort::External object which autosorts the posting list cache
    $self->{postings_cache} = Sort::External->new(
        -working_dir   => $self->{temp_dir},
        -mem_threshold => 2**24,
    );
}

sub add_postings {
    my ( $self, $postings_array ) = @_;
    $self->{postings_cache}->feed(@$postings_array);
}

=for comment

Process all the postings in the sort pool.  Generate the freqs and positions
files.  Hand off data to TermInfosWriter for the generating the term
dictionaries.

=cut

sub write_postings {
    my $self = shift;
    my ( $invindex, $seg_name ) = @{$self}{ 'invindex', 'seg_name' };
    my ( $posting, $termstring, $freq, $doc_num );
    my $last_termstring   = "\0\0";
    my $last_doc_num      = 0;
    my $doc_freq          = -1;       # due to 1-iter lag
    my $tinfo             = undef;
    my $last_skip_doc     = 0;
    my $last_skip_frq_ptr = 0;
    my $last_skip_prx_ptr = 0;
    my @skipdata;

    # sort the serialized postings
    my $postings_cache = $self->{postings_cache};
    $postings_cache->finish;

    # prepare various outputs
    my $tinfos_writer = KinoSearch::Index::TermInfosWriter->new(
        invindex => $invindex,
        seg_name => $seg_name,
    );
    my $frq_out = $invindex->open_outstream("$seg_name.frq");
    my $prx_out = $invindex->open_outstream("$seg_name.prx");

    # each loop is one field, one term, one doc_num, many positions
    my $iter = 0;
    while ( defined( $posting = $postings_cache->fetch ) or goto FINAL_ITER )
    {
        # each loop represents a doc to add to the doc_freq for a given term
        $iter++;
        $doc_freq++;    # lags by 1 iter

        # break up the serialized posting into its parts.
        # $posting gets whittled down until it is only the positions string.
        _deserialize( $posting, $termstring, $doc_num, $freq );

        # on the first iter, prime the "heldover" variables
        if ( $iter == 1 ) {
            $last_termstring = $termstring;
            $tinfo           = KinoSearch::Index::TermInfo->new(
                0,                 # doc_freq
                $frq_out->tell,    # frq_fileptr
                $prx_out->tell,    # prx_fileptr
                $frq_out->tell,    # skip_offset
                0,                 # index_fileptr
            );
        }
        elsif ( $iter == -1 ) {    # never true; can only get here from a goto
                # prepare to clear out buffers and exit loop
        FINAL_ITER: {
                $iter       = -1;
                $termstring = "\0\0";
                $doc_freq++;
            }
        }

        # for common terms, create skipdata (unused by KinoSearch at present)
        if ( ( $doc_freq + 1 ) % SKIP_INTERVAL == 0 ) {
            my $frq_ptr = $frq_out->tell;
            my $prx_ptr = $prx_out->tell;
            push @skipdata,
                (
                $last_doc_num - $last_skip_doc,
                $frq_ptr - $last_skip_frq_ptr,
                $prx_ptr - $last_skip_prx_ptr,
                );
            $last_skip_doc     = $last_doc_num;
            $last_skip_frq_ptr = $frq_ptr;
            $last_skip_prx_ptr = $prx_ptr;
        }

        # if either the term or fieldnum changes, process the last term
        if ( $termstring ne $last_termstring ) {

            # take note of where we are for recording in the term dictionary
            my $frq_ptr = $frq_out->tell;
            my $prx_ptr = $prx_out->tell;

            # write skipdata if there is any
            if (@skipdata) {
                # kludge to compensate for doc_freq's 1-iter lag
                if ( ( $doc_freq + 1 ) % SKIP_INTERVAL == 0 ) {
                    splice @skipdata, -3;
                }

                if (@skipdata) {
                    # tell TinfosWriter about the non-zero skip amount
                    $tinfo->set_skip_offset(
                        $frq_ptr - $tinfo->get_frq_fileptr );

                    # write an extra block of VInts to the frq file
                    $frq_out->lu_write( 'V' x scalar @skipdata, @skipdata );

                    # update the filepointer for the file we just wrote to.
                    $frq_ptr = $frq_out->tell;
                }
                @skipdata = ();
            }

            # init skip data in preparation for the next term
            $last_skip_doc     = 0;
            $last_skip_frq_ptr = $frq_ptr;
            $last_skip_prx_ptr = $prx_ptr;

            # hand off to TermInfosWriter
            $tinfo->set_doc_freq($doc_freq);
            $tinfos_writer->add( $last_termstring, $tinfo );
            $tinfo = KinoSearch::Index::TermInfo->new(
                0,           # doc_freq
                $frq_ptr,    # frq_fileptr
                $prx_ptr,    # prx_fileptr
                0,           # skip_offset
                0,           # index_fileptr
            );

            # start each term afresh.
            $last_termstring = $termstring;
            $doc_freq        = 0;
            $last_doc_num    = 0;
        }

        # break out of loop on last iter before writing invalid data
        last if $iter == -1;

        # write positions data
        _write_positions( $prx_out, $posting );

        # write freq data...
        # doc_code is delta doc_num, shifted left by 1.
        if ( $freq == 1 ) {
            # set low bit of doc_code to 1 to indicate freq of 1
            $frq_out->lu_write( 'V',
                ( ( ( $doc_num - $last_doc_num ) * 2 ) + 1 ),
            );
        }
        else {
            # leave low bit of doc_code at 0, record explicit freq
            $frq_out->lu_write( 'VV', ( ( $doc_num - $last_doc_num ) * 2 ),
                $freq, );
        }

        # remember last doc num because we need it for delta encoding
        $last_doc_num = $doc_num;
    }

    $frq_out->close;
    $prx_out->close;
    $tinfos_writer->finish;
}

sub finish { }

1;

__END__
__XS__

MODULE = KinoSearch    PACKAGE = KinoSearch::Index::PostingsWriter      


=begin comment

Add the postings to the segment.  Postings are serialized and dumped into a
Sort::External sort pool.  The actual writing takes place later.

The serialization algo is designed so that postings emerge from the sort
pool in the order ideal for writing an index after a  simple lexical sort.
The concatenated components are:

    field number
    term text 
    document number
    positions (C array of U32)
    term length

=end comment
=cut

=begin comment

Pull apart a serialized posting into its component parts.

Scalars are modified in place, which isn't Perl-ish, but this is
performance-critical code.

=end comment
=cut

void 
_deserialize ( posting_sv, termstring_sv, doc_num_sv, freq_sv )
    SV      *posting_sv 
    SV      *termstring_sv
    SV      *doc_num_sv 
    SV      *freq_sv 
PREINIT:
    STRLEN   posting_len;        /* length of the serialized posting */
    char    *posting_str;        /* ptr to PV of the serialized posting */
    char    *termstring_len_ptr;
    STRLEN   termstring_len;     /* length of the term, with field num */
    IV       doc_num;
    IV       freq;               /* freq of term in field */
PPCODE:
{
    /* extract pointer from serialized posting */
    posting_str = SvPV(posting_sv, posting_len);

    /* extract termstring_len, decoding packed 'n', assign termstring */
    termstring_len_ptr = posting_str + posting_len - 2;
    termstring_len     
        = Kino_decode_bigend_U16(termstring_len_ptr) + KINO_FIELD_NUM_LEN;
    sv_setpvn(termstring_sv, posting_str, termstring_len);

    /* extract and assign doc_num, decoding packed 'N' */
    posting_str  += termstring_len;
    doc_num      = Kino_decode_bigend_U32(posting_str);
    posting_str  += 4;
    sv_setiv(doc_num_sv, doc_num);

    /* whack termstring_len off the end of the posting */
    posting_len -= 2; 
    SvCUR_set(posting_sv, posting_len);
    
    /* whack field_num/term text off the front, leaving only the positions */
    sv_chop(posting_sv, posting_str);

    /* calculate freq by counting the number of positions, assign */
    freq = (posting_len - termstring_len - 4) / 4;
    sv_setiv(freq_sv, freq);
}

=begin comment

Write out the positions data using the delta encoding specified by the Lucene 
file format.

=end comment
=cut

void
_write_positions ( prx_outstream, positions_sv )
    OutStream *prx_outstream;
    SV        *positions_sv
PREINIT:
    STRLEN   positions_len;
    char    *positions;
    U32     *current_pos_ptr;
    U32     *end;
    U32      last_pos;
    U32      pos_delta;
PPCODE:
{
    positions = SvPV(positions_sv, positions_len);

    /* Extract native 32 bit unsigned integers from positions_sv. positions_sv
     * was originally built up using the equivalent of pack('I*', @positions),
     * and pack template 'I' is a U32.
     */
    current_pos_ptr = (U32*)positions;
    end             = current_pos_ptr + (positions_len / 4);
    last_pos        = 0;
    while (current_pos_ptr < end) {
        /* get delta and write out as VInt */
        pos_delta = *current_pos_ptr - last_pos;
        prx_outstream->write_vint(prx_outstream, pos_delta);

        /* advance pointers */
        last_pos = *current_pos_ptr;
        current_pos_ptr++;
    }
}

__POD__

=begin devdocs

=head1 NAME

KinoSearch::Index::PostingsWriter - write postings data to an invindex

=head1 DESCRIPTION

PostingsWriter creates posting lists.  It writes the frequency and and
positional data files, plus feeds data to TermInfosWriter.

=head1 COPYRIGHT

Copyright 2005-2006 Marvin Humphrey

=head1 LICENSE, DISCLAIMER, BUGS, etc.

See L<KinoSearch|KinoSearch> version 0.08.

=end devdocs
=cut