Skip to content

Commit

Permalink
introduce optional collect_block in segmentcollector
Browse files Browse the repository at this point in the history
add collect_block in segment_collector to handle groups of documents as performance optimization
add collect_block for MultiCollector
  • Loading branch information
PSeitz committed May 19, 2022
1 parent 44ea731 commit c5c2e59
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/aggregation/bucket/term_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ impl TermBuckets {
});
entry.doc_count += 1;
if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() {
sub_aggregations.collect(doc, &sub_aggregation)?;
sub_aggregations.collect(doc, sub_aggregation)?;
}
}
bucket_count.validate_bucket_count()?;
Expand Down
5 changes: 5 additions & 0 deletions src/collector/count_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ impl SegmentCollector for SegmentCountCollector {
Ok(())
}

fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> {
self.count += docs.len();
Ok(())
}

fn harvest(self) -> usize {
self.count
}
Expand Down
14 changes: 12 additions & 2 deletions src/collector/custom_score_top_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ pub(crate) struct CustomScoreTopCollector<TCustomScorer, TScore = Score> {
}

impl<TCustomScorer, TScore> CustomScoreTopCollector<TCustomScorer, TScore>
where TScore: Clone + PartialOrd
where
TScore: Clone + PartialOrd,
{
pub(crate) fn new(
custom_scorer: TCustomScorer,
Expand Down Expand Up @@ -96,6 +97,14 @@ where
Ok(())
}

fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> {
for (doc, _score) in docs {
let score = self.segment_scorer.score(*doc);
self.segment_collector.collect(*doc, score);
}
Ok(())
}

fn harvest(self) -> Vec<(TScore, DocAddress)> {
self.segment_collector.harvest()
}
Expand All @@ -114,7 +123,8 @@ where
}

impl<F, TScore> CustomSegmentScorer<TScore> for F
where F: 'static + FnMut(DocId) -> TScore
where
F: 'static + FnMut(DocId) -> TScore,
{
fn score(&mut self, doc: DocId) -> TScore {
(self)(doc)
Expand Down
47 changes: 45 additions & 2 deletions src/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,19 +172,33 @@ pub trait Collector: Sync + Send {
) -> crate::Result<<Self::Child as SegmentCollector>::Fruit> {
let mut segment_collector = self.for_segment(segment_ord as u32, reader)?;

let mut cache_pos = 0;
let mut cache = [(0, 0.0); 64];

if let Some(alive_bitset) = reader.alive_bitset() {
weight.for_each(reader, &mut |doc, score| {
if alive_bitset.is_alive(doc) {
segment_collector.collect(doc, score)?;
cache[cache_pos] = (doc, score);
cache_pos += 1;
if cache_pos == 64 {
segment_collector.collect_block(&cache)?;
cache_pos = 0;
}
}
Ok(())
})?;
} else {
weight.for_each(reader, &mut |doc, score| {
segment_collector.collect(doc, score)?;
cache[cache_pos] = (doc, score);
cache_pos += 1;
if cache_pos == 64 {
segment_collector.collect_block(&cache)?;
cache_pos = 0;
}
Ok(())
})?;
}
segment_collector.collect_block(&cache[..cache_pos])?;
Ok(segment_collector.harvest())
}
}
Expand Down Expand Up @@ -258,6 +272,14 @@ pub trait SegmentCollector: 'static {
/// The query pushes the scored document to the collector via this method.
fn collect(&mut self, doc: DocId, score: Score) -> crate::Result<()>;

/// The query pushes the scored document to the collector via this method.
fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> {
for (doc, score) in docs {
self.collect(*doc, *score)?;
}
Ok(())
}

/// Extract the fruit of the collection from the `SegmentCollector`.
fn harvest(self) -> Self::Fruit;
}
Expand Down Expand Up @@ -317,6 +339,12 @@ where
Ok(())
}

fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> {
self.0.collect_block(docs)?;
self.1.collect_block(docs)?;
Ok(())
}

fn harvest(self) -> <Self as SegmentCollector>::Fruit {
(self.0.harvest(), self.1.harvest())
}
Expand Down Expand Up @@ -383,6 +411,13 @@ where
Ok(())
}

fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> {
self.0.collect_block(docs)?;
self.1.collect_block(docs)?;
self.2.collect_block(docs)?;
Ok(())
}

fn harvest(self) -> <Self as SegmentCollector>::Fruit {
(self.0.harvest(), self.1.harvest(), self.2.harvest())
}
Expand Down Expand Up @@ -459,6 +494,14 @@ where
Ok(())
}

fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> {
self.0.collect_block(docs)?;
self.1.collect_block(docs)?;
self.2.collect_block(docs)?;
self.3.collect_block(docs)?;
Ok(())
}

fn harvest(self) -> <Self as SegmentCollector>::Fruit {
(
self.0.harvest(),
Expand Down
18 changes: 18 additions & 0 deletions src/collector/multi_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,19 @@ impl SegmentCollector for Box<dyn BoxableSegmentCollector> {
Ok(())
}

fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> {
self.as_mut().collect_block(docs)?;
Ok(())
}

fn harvest(self) -> Box<dyn Fruit> {
BoxableSegmentCollector::harvest_from_box(self)
}
}

pub trait BoxableSegmentCollector {
fn collect(&mut self, doc: u32, score: Score) -> crate::Result<()>;
fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()>;
fn harvest_from_box(self: Box<Self>) -> Box<dyn Fruit>;
}

Expand All @@ -76,6 +82,11 @@ impl<TSegmentCollector: SegmentCollector> BoxableSegmentCollector
self.0.collect(doc, score)
}

fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> {
self.0.collect_block(docs)?;
Ok(())
}

fn harvest_from_box(self: Box<Self>) -> Box<dyn Fruit> {
Box::new(self.0.harvest())
}
Expand Down Expand Up @@ -236,6 +247,13 @@ impl SegmentCollector for MultiCollectorChild {
Ok(())
}

fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> {
for child in &mut self.children {
child.collect_block(docs)?;
}
Ok(())
}

fn harvest(self) -> MultiFruit {
MultiFruit {
sub_fruits: self
Expand Down
8 changes: 8 additions & 0 deletions src/collector/top_score_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,14 @@ impl SegmentCollector for TopScoreSegmentCollector {
Ok(())
}

#[inline]
fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> {
for (doc, score) in docs {
self.0.collect(*doc, *score);
}
Ok(())
}

fn harvest(self) -> Vec<(Score, DocAddress)> {
self.0.harvest()
}
Expand Down

0 comments on commit c5c2e59

Please sign in to comment.