From c5c2e59b2bbb810db2e55758f220384f9e2c2724 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 17 May 2022 11:35:39 +0800 Subject: [PATCH] introduce optional collect_block in segmentcollector add collect_block in segment_collector to handle groups of documents as performance optimization add collect_block for MultiCollector --- src/aggregation/bucket/term_agg.rs | 2 +- src/collector/count_collector.rs | 5 +++ src/collector/custom_score_top_collector.rs | 14 +++++- src/collector/mod.rs | 47 ++++++++++++++++++++- src/collector/multi_collector.rs | 18 ++++++++ src/collector/top_score_collector.rs | 8 ++++ 6 files changed, 89 insertions(+), 5 deletions(-) diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index 52e120cc96..8a9970e0fd 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -256,7 +256,7 @@ impl TermBuckets { }); entry.doc_count += 1; if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() { - sub_aggregations.collect(doc, &sub_aggregation)?; + sub_aggregations.collect(doc, sub_aggregation)?; } } bucket_count.validate_bucket_count()?; diff --git a/src/collector/count_collector.rs b/src/collector/count_collector.rs index 02f30f85c1..3ff1368f83 100644 --- a/src/collector/count_collector.rs +++ b/src/collector/count_collector.rs @@ -70,6 +70,11 @@ impl SegmentCollector for SegmentCountCollector { Ok(()) } + fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> { + self.count += docs.len(); + Ok(()) + } + fn harvest(self) -> usize { self.count } diff --git a/src/collector/custom_score_top_collector.rs b/src/collector/custom_score_top_collector.rs index d597727ed1..0f981c20a7 100644 --- a/src/collector/custom_score_top_collector.rs +++ b/src/collector/custom_score_top_collector.rs @@ -8,7 +8,8 @@ pub(crate) struct CustomScoreTopCollector { } impl CustomScoreTopCollector -where TScore: Clone + PartialOrd +where + TScore: Clone + PartialOrd, { pub(crate) fn new( custom_scorer: TCustomScorer, @@ -96,6 +97,14 @@ where Ok(()) } + fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> { + for (doc, _score) in docs { + let score = self.segment_scorer.score(*doc); + self.segment_collector.collect(*doc, score); + } + Ok(()) + } + fn harvest(self) -> Vec<(TScore, DocAddress)> { self.segment_collector.harvest() } @@ -114,7 +123,8 @@ where } impl CustomSegmentScorer for F -where F: 'static + FnMut(DocId) -> TScore +where + F: 'static + FnMut(DocId) -> TScore, { fn score(&mut self, doc: DocId) -> TScore { (self)(doc) diff --git a/src/collector/mod.rs b/src/collector/mod.rs index 6d600bb488..7e6d43f7b7 100644 --- a/src/collector/mod.rs +++ b/src/collector/mod.rs @@ -172,19 +172,33 @@ pub trait Collector: Sync + Send { ) -> crate::Result<::Fruit> { let mut segment_collector = self.for_segment(segment_ord as u32, reader)?; + let mut cache_pos = 0; + let mut cache = [(0, 0.0); 64]; + if let Some(alive_bitset) = reader.alive_bitset() { weight.for_each(reader, &mut |doc, score| { if alive_bitset.is_alive(doc) { - segment_collector.collect(doc, score)?; + cache[cache_pos] = (doc, score); + cache_pos += 1; + if cache_pos == 64 { + segment_collector.collect_block(&cache)?; + cache_pos = 0; + } } Ok(()) })?; } else { weight.for_each(reader, &mut |doc, score| { - segment_collector.collect(doc, score)?; + cache[cache_pos] = (doc, score); + cache_pos += 1; + if cache_pos == 64 { + segment_collector.collect_block(&cache)?; + cache_pos = 0; + } Ok(()) })?; } + segment_collector.collect_block(&cache[..cache_pos])?; Ok(segment_collector.harvest()) } } @@ -258,6 +272,14 @@ pub trait SegmentCollector: 'static { /// The query pushes the scored document to the collector via this method. fn collect(&mut self, doc: DocId, score: Score) -> crate::Result<()>; + /// The query pushes the scored document to the collector via this method. + fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> { + for (doc, score) in docs { + self.collect(*doc, *score)?; + } + Ok(()) + } + /// Extract the fruit of the collection from the `SegmentCollector`. fn harvest(self) -> Self::Fruit; } @@ -317,6 +339,12 @@ where Ok(()) } + fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> { + self.0.collect_block(docs)?; + self.1.collect_block(docs)?; + Ok(()) + } + fn harvest(self) -> ::Fruit { (self.0.harvest(), self.1.harvest()) } @@ -383,6 +411,13 @@ where Ok(()) } + fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> { + self.0.collect_block(docs)?; + self.1.collect_block(docs)?; + self.2.collect_block(docs)?; + Ok(()) + } + fn harvest(self) -> ::Fruit { (self.0.harvest(), self.1.harvest(), self.2.harvest()) } @@ -459,6 +494,14 @@ where Ok(()) } + fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> { + self.0.collect_block(docs)?; + self.1.collect_block(docs)?; + self.2.collect_block(docs)?; + self.3.collect_block(docs)?; + Ok(()) + } + fn harvest(self) -> ::Fruit { ( self.0.harvest(), diff --git a/src/collector/multi_collector.rs b/src/collector/multi_collector.rs index 7b119ad868..a7a8ed1692 100644 --- a/src/collector/multi_collector.rs +++ b/src/collector/multi_collector.rs @@ -57,6 +57,11 @@ impl SegmentCollector for Box { Ok(()) } + fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> { + self.as_mut().collect_block(docs)?; + Ok(()) + } + fn harvest(self) -> Box { BoxableSegmentCollector::harvest_from_box(self) } @@ -64,6 +69,7 @@ impl SegmentCollector for Box { pub trait BoxableSegmentCollector { fn collect(&mut self, doc: u32, score: Score) -> crate::Result<()>; + fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()>; fn harvest_from_box(self: Box) -> Box; } @@ -76,6 +82,11 @@ impl BoxableSegmentCollector self.0.collect(doc, score) } + fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> { + self.0.collect_block(docs)?; + Ok(()) + } + fn harvest_from_box(self: Box) -> Box { Box::new(self.0.harvest()) } @@ -236,6 +247,13 @@ impl SegmentCollector for MultiCollectorChild { Ok(()) } + fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> { + for child in &mut self.children { + child.collect_block(docs)?; + } + Ok(()) + } + fn harvest(self) -> MultiFruit { MultiFruit { sub_fruits: self diff --git a/src/collector/top_score_collector.rs b/src/collector/top_score_collector.rs index e0e3aeb9dc..6074d088a3 100644 --- a/src/collector/top_score_collector.rs +++ b/src/collector/top_score_collector.rs @@ -704,6 +704,14 @@ impl SegmentCollector for TopScoreSegmentCollector { Ok(()) } + #[inline] + fn collect_block(&mut self, docs: &[(DocId, Score)]) -> crate::Result<()> { + for (doc, score) in docs { + self.0.collect(*doc, *score); + } + Ok(()) + } + fn harvest(self) -> Vec<(Score, DocAddress)> { self.0.harvest() }