use strict;
use warnings;
package KinoSearch::InvIndexer;
use KinoSearch::Util::ToolSet;
use base qw( KinoSearch::Util::Class );
our %instance_vars = (
# constructor args / members
invindex => undef,
lock_factory => undef,
# members
schema => undef,
folder => undef,
seg_info => undef,
reader => undef,
seg_infos => undef,
seg_writer => undef,
write_lock => undef,
has_deletions => 0,
);
use KinoSearch::Index::IndexReader;
use KinoSearch::Index::SegInfo;
use KinoSearch::Index::SegInfos;
use KinoSearch::Index::SegWriter;
use KinoSearch::Index::IndexFileNames qw(
WRITE_LOCK_NAME
WRITE_LOCK_TIMEOUT
COMMIT_LOCK_NAME
COMMIT_LOCK_TIMEOUT
unused_files );
use KinoSearch::Util::StringHelper qw( to_base36 );
use KinoSearch::Store::LockFactory;
sub init_instance {
my $self = shift;
# confirm invindex and extract schema and Folder
my $invindex = $self->{invindex};
confess("Missing required arg 'invindex'")
unless a_isa_b( $invindex, "KinoSearch::InvIndex" );
my $folder = $self->{folder} = $invindex->get_folder;
my $schema = $self->{schema} = $invindex->get_schema;
# confirm or create a lock factory
if ( defined $self->{lock_factory} ) {
confess("Not a LockFactory")
unless a_isa_b( $self->{lock_factory},
"KinoSearch::Store::LockFactory" );
}
else {
$self->{lock_factory} = KinoSearch::Store::LockFactory->new(
agent_id => '',
folder => $folder,
);
}
# get a write lock for this folder.
my $write_lock = $self->{lock_factory}->make_lock(
lock_name => WRITE_LOCK_NAME,
timeout => WRITE_LOCK_TIMEOUT,
);
$write_lock->clear_stale;
if ( $write_lock->obtain ) {
# only assign if successful, otherwise DESTROY unlocks (bad!)
$self->{write_lock} = $write_lock;
}
else {
my $mess = "InvIndex is locked";
$mess .= " by " . $write_lock->get_filename
if $write_lock->can('get_filename');
confess($mess);
}
# read the segment infos
my $seg_infos = $self->{seg_infos}
= KinoSearch::Index::SegInfos->new( schema => $self->{schema} );
$seg_infos->read_infos( folder => $folder );
# get an IndexReader if the invindex already has content
if ( $seg_infos->size ) {
$self->{reader} = KinoSearch::Index::IndexReader->open(
invindex => $invindex,
seg_infos => $seg_infos,
);
}
# name a new segment, create a SegInfo and a SegWriter
my $seg_name = $self->_new_seg_name;
my $seg_info = $self->{seg_info} = KinoSearch::Index::SegInfo->new(
seg_name => $seg_name,
fspecs => $schema->get_fspecs,
);
$self->{seg_writer} = KinoSearch::Index::SegWriter->new(
invindex => $invindex,
seg_info => $seg_info,
);
}
my %add_doc_args = ( boost => undef, );
sub add_doc {
my $self = shift;
my $doc = shift;
confess("First argument must be a hashref")
unless reftype($doc) eq 'HASH';
confess kerror() unless verify_args( \%add_doc_args, @_ );
my %args = @_;
# add doc to output segment
my $boost = defined $args{boost} ? $args{boost} : 1.0;
$self->{seg_writer}->add_doc( $doc, $boost );
}
sub add_invindexes {
my ( $self, @invindexes ) = @_;
my $schema = $self->{schema};
my $seg_info = $self->{seg_info};
# all the invindexes must match our schema
my $orig_class = ref($schema);
for my $invindex (@invindexes) {
my $other_schema = $invindex->get_schema;
my $other_class = ref($other_schema);
if ( $other_class ne $orig_class ) {
confess("Schema class '$other_class' doesn't match $orig_class");
}
for my $other_field ( $other_schema->all_fields ) {
my $fspec_class = ref( $other_schema->fetch_fspec($other_field) );
$schema->add_field( $other_field, $fspec_class );
$seg_info->add_field($other_field);
}
}
# get a reader for each InvIndex
my @readers
= map { KinoSearch::Index::IndexReader->open( invindex => $_ ) }
@invindexes;
# add all segments in each of the supplied invindexes
my $seg_writer = $self->{seg_writer};
for my $reader (@readers) {
$seg_writer->add_segment($_) for $reader->segreaders_to_merge('all');
}
}
sub delete_docs_by_term {
confess( "delete_docs_by_term() has been replaced by delete_by_term(), "
. "which has slightly different behavior -- see InvIndexer's docs"
);
}
sub delete_by_term {
my ( $self, $field_name, $term_text ) = @_;
# bail if this is a new InvIndex
return unless $self->{reader};
# raise exception if the field isn't indexed
my $field_spec = $self->{schema}->fetch_fspec($field_name);
confess("$field_name is not an indexed field")
unless ( defined $field_spec and $field_spec->indexed );
# create a term, analyze it, and ask the reader to delete docs with it
my $term;
if ( $field_spec->analyzed ) {
my $analyzer = $self->{schema}->fetch_analyzer($field_name);
my ($analyzed_text) = $analyzer->analyze_raw($term_text);
$term = KinoSearch::Index::Term->new( $field_name, $analyzed_text );
}
else {
$term = KinoSearch::Index::Term->new( $field_name, $term_text );
}
$self->{reader}->delete_docs_by_term($term);
# trigger write later
$self->{has_deletions} = 1;
}
our %finish_defaults = ( optimize => 0 );
sub finish {
my $self = shift;
confess kerror() unless verify_args( \%finish_defaults, @_ );
my %args = ( %finish_defaults, @_ );
my ( $folder, $seg_info, $seg_infos, $seg_writer, $reader )
= @{$self}{qw( folder seg_info seg_infos seg_writer reader )};
# safety check
if ( !defined $self->{write_lock} ) {
confess("Can't call finish() more than once");
}
# perform segment merging
my @to_merge
= $reader
? $reader->segreaders_to_merge( $args{optimize} )
: ();
$seg_writer->add_segment($_) for @to_merge;
$seg_infos->delete_segment( $_->get_seg_name ) for @to_merge;
# write out new deletions
$self->{reader}->write_deletions if $self->{has_deletions};
# if docs were added, write a new segment
if ( $seg_info->get_doc_count or @to_merge ) {
# finish the segment and add its info to the 'segments' file
$seg_writer->finish;
$seg_infos->add_info( $seg_writer->get_seg_info );
}
# write a new segments_XXX.yaml file if anything has changed
if ( $seg_info->get_doc_count
or $self->{has_deletions}
or @to_merge )
{
$seg_infos->write_infos($folder);
}
# close reader, so that we can delete its files if appropriate
$reader->close if defined $reader;
# purge obsolete files
$self->_purge_unused();
# realease the write lock, invalidating the invindexer
$self->_release_locks;
}
# Delete unused files.
sub _purge_unused {
my $self = shift;
my $folder = $self->{folder};
my $commit_lock = $self->{lock_factory}->make_lock(
lock_name => COMMIT_LOCK_NAME,
timeout => COMMIT_LOCK_TIMEOUT,
);
$commit_lock->clear_stale;
my $got_lock = $commit_lock->obtain;
if ($got_lock) {
my @deletions = $self->_discover_unused_files;
# attempt to delete files -- if failure, no big deal, try again later
for my $deletion (@deletions) {
eval { $folder->delete_file($deletion) };
}
$commit_lock->release;
}
else {
warn "Can't obtain commit lock, skipping deletion of obsolete files";
}
}
# Return a list of all index files not locked by readers.
sub _discover_unused_files {
my $self = shift;
my ( $schema, $folder, $lock_factory )
= @{$self}{qw( schema folder lock_factory )};
my @files = $folder->list;
my @seg_infoses = ( $self->{seg_infos} );
for my $filename (@files) {
next unless $filename =~ /^(segments_\w+).yaml$/;
my $lock_name = $1;
my $read_lock = $lock_factory->make_shared_lock(
lock_name => $lock_name,
timeout => 0,
);
next unless $read_lock->is_locked;
my $seg_infos = KinoSearch::Index::SegInfos->new( schema => $schema );
$seg_infos->read_infos( folder => $folder, filename => $filename );
push @seg_infoses, $seg_infos;
}
return unused_files( \@files, @seg_infoses );
}
# Release the write lock - if it's there.
sub _release_locks {
my $self = shift;
if ( defined $self->{write_lock} ) {
$self->{write_lock}->release;
undef $self->{write_lock};
}
}
# Generate segment names.
sub _new_seg_name {
my $self = shift;
my $counter = $self->{seg_infos}->get_seg_counter;
$self->{seg_infos}->set_seg_counter( ++$counter );
return '_' . to_base36($counter);
}
sub DESTROY { shift->_release_locks }
1;
__END__
=head1 NAME
KinoSearch::InvIndexer - Build inverted indexes.
=head1 SYNOPSIS
use KinoSearch::InvIndexer;
use MySchema;
my $invindexer = KinoSearch::InvIndexer->new(
invindex => MySchema->clobber('/path/to/invindex'),
);
while ( my ( $title, $content ) = each %source_docs ) {
$invindexer->add_doc({
title => $title,
content => $content,
});
}
$invindexer->finish;
=head1 DESCRIPTION
The InvIndexer class is KinoSearch's primary tool for managing the content of
inverted indexes, which may later be searched using L<KinoSearch::Searcher>.
=head2 Concurrency
Only one InvIndexer may write to an invindex at a time. If a write lock
cannot be secured, new() will throw an exception.
If an index is located on a shared volume, each writer application must
identify itself by passing a L<LockFactory|KinoSearch::StoreLockFactory> to
InvIndexer's constructor or index corruption will occur.
=head1 METHODS
=head2 new
my $invindex = MySchema->clobber('/path/to/invindex');
my $invindexer = KinoSearch::InvIndexer->new(
invindex => $invindex, # required
lock_factory => $factory # default: created internally
);
Constructor. Takes labeled parameters.
=over
=item *
B<invindex> - An object which isa L<KinoSearch::InvIndex>.
=item *
B<lock_factory> - An object which isa L<KinoSearch::StoreLockFactory>.
=back
=head2 add_doc
$invindexer->add_doc( { field_name => $field_value } );
# or ...
$invindexer->add_doc( { field_name => $field_value }, boost => 2.5 );
Add a document to the invindex. The first argument must be a reference to hash
comprised of field_name => field_value pairs. Ownership of the hash is assumed
by the InvIndexer object.
After the hashref, labeled parameters are accepted.
=over
=item *
B<boost> - A scoring multiplier. Setting boost to something other than 1
causes a document to score better or worse against a given query relative to
other documents.
=back
=head2 add_invindexes
$invindexer->add_invindexes( $another_invindex, $yet_another_invindex );
Absorb existing invindexes into this one. The other invindexes must use the
same Schema as the invindex which was supplied to new().
=head2 delete_by_term
$invindexer->delete_by_term( $field_name, $term_text );
Mark documents which contain the supplied term as deleted, so that they will
be excluded from search results. The change is not apparent to search apps
until a new Searcher is opened I<after> finish() completes.
If the field is associated with an analyzer, C<$term_text> will be
processed automatically (so don't pre-process it yourself).
C<$field_name> must identify an I<indexed> field, or an error will occur.
=head2 finish
$invindexer->finish(
optimize => 1, # default: 0
);
Finish processing any changes made to the invindex and commit. Until the
commit happens near the end of the finish(), none of the changes made during
an indexing session are permanent.
Calling finish() invalidates the InvIndexer, so if you want to make more
changes you'll need a new one.
Takes one labeled parameter:
=over
=item *
B<optimize> - If optimize is set to 1, the invindex will be collapsed to its
most compact form, a process which may take a while -- but which will yield
the fastest queries at search time.
=back
=head1 COPYRIGHT
Copyright 2005-2007 Marvin Humphrey
=head1 LICENSE, DISCLAIMER, BUGS, etc.
See L<KinoSearch> version 0.20.
=cut