Skip to content

Commit

Permalink
add support for warming up range of terms (#2042)
Browse files Browse the repository at this point in the history
* add support for warming up range of terms

* simplify handling of limit
  • Loading branch information
trinity-1686a authored May 22, 2023
1 parent 6564e0c commit a3f0013
Showing 1 changed file with 74 additions and 5 deletions.
79 changes: 74 additions & 5 deletions src/core/inverted_index_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,43 @@ impl InvertedIndexReader {
self.termdict.get_async(term.serialized_value_bytes()).await
}

/// Returns a block postings given a `Term`.
async fn get_term_range_async(
&self,
terms: impl std::ops::RangeBounds<Term>,
limit: Option<u64>,
) -> io::Result<impl Iterator<Item = TermInfo> + '_> {
use std::ops::Bound;
let range_builder = self.termdict.range();
let range_builder = match terms.start_bound() {
Bound::Included(bound) => range_builder.ge(bound.serialized_value_bytes()),
Bound::Excluded(bound) => range_builder.gt(bound.serialized_value_bytes()),
Bound::Unbounded => range_builder,
};
let range_builder = match terms.end_bound() {
Bound::Included(bound) => range_builder.le(bound.serialized_value_bytes()),
Bound::Excluded(bound) => range_builder.lt(bound.serialized_value_bytes()),
Bound::Unbounded => range_builder,
};
let range_builder = if let Some(limit) = limit {
range_builder.limit(limit)
} else {
range_builder
};

let mut stream = range_builder.into_stream_async().await?;

let iter = std::iter::from_fn(move || stream.next().map(|(_k, v)| v.clone()));

// limit on stream is only an optimization to load less data, the stream may still return
// more than limit elements.
let limit = limit.map(|limit| limit as usize).unwrap_or(usize::MAX);
let iter = iter.take(limit);

Ok(iter)
}

/// Warmup a block postings given a `Term`.
/// This method is for an advanced usage only.
///
/// Most users should prefer using [`Self::read_postings()`] instead.
pub async fn warm_postings(&self, term: &Term, with_positions: bool) -> io::Result<()> {
let term_info_opt: Option<TermInfo> = self.get_term_info_async(term).await?;
if let Some(term_info) = term_info_opt {
Expand All @@ -228,10 +261,46 @@ impl InvertedIndexReader {
Ok(())
}

/// Read the block postings for all terms.
/// Warmup a block postings given a range of `Term`s.
/// This method is for an advanced usage only.
pub async fn warm_postings_range(
&self,
terms: impl std::ops::RangeBounds<Term>,
limit: Option<u64>,
with_positions: bool,
) -> io::Result<()> {
let mut term_info = self.get_term_range_async(terms, limit).await?;

let Some(first_terminfo) = term_info.next() else {
// no key matches, nothing more to load
return Ok(());
};

let last_terminfo = term_info.last().unwrap_or_else(|| first_terminfo.clone());

let postings_range = first_terminfo.postings_range.start..last_terminfo.postings_range.end;
let positions_range =
first_terminfo.positions_range.start..last_terminfo.positions_range.end;

let postings = self
.postings_file_slice
.read_bytes_slice_async(postings_range);
if with_positions {
let positions = self
.positions_file_slice
.read_bytes_slice_async(positions_range);
futures_util::future::try_join(postings, positions).await?;
} else {
postings.await?;
}
Ok(())
}

/// Warmup the block postings for all terms.
/// This method is for an advanced usage only.
///
/// If you know which terms to pre-load, prefer using [`Self::warm_postings`] instead.
/// If you know which terms to pre-load, prefer using [`Self::warm_postings`] or
/// [`Self::warm_postings`] instead.
pub async fn warm_postings_full(&self, with_positions: bool) -> io::Result<()> {
self.postings_file_slice.read_bytes_async().await?;
if with_positions {
Expand Down

0 comments on commit a3f0013

Please sign in to comment.