From ee8fd25811060651f93e2962818677f0159b8b9a Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Thu, 24 Nov 2022 22:15:28 +0800 Subject: [PATCH] enable date for range aggregation --- src/aggregation/agg_result.rs | 6 + src/aggregation/bucket/histogram/histogram.rs | 23 +--- src/aggregation/bucket/range.rs | 108 ++++++++++++++++-- src/aggregation/date.rs | 18 +++ src/aggregation/intermediate_agg_result.rs | 39 ++++++- src/aggregation/mod.rs | 14 +-- 6 files changed, 166 insertions(+), 42 deletions(-) create mode 100644 src/aggregation/date.rs diff --git a/src/aggregation/agg_result.rs b/src/aggregation/agg_result.rs index 4f71e9d7a5..be5bfd0835 100644 --- a/src/aggregation/agg_result.rs +++ b/src/aggregation/agg_result.rs @@ -245,4 +245,10 @@ pub struct RangeBucketEntry { /// The to range of the bucket. Equals `f64::MAX` when `None`. #[serde(skip_serializing_if = "Option::is_none")] pub to: Option, + /// The optional string representation for the `from` range. + #[serde(skip_serializing_if = "Option::is_none")] + pub from_as_string: Option, + /// The optional string representation for the `to` range. + #[serde(skip_serializing_if = "Option::is_none")] + pub to_as_string: Option, } diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index c37cc65162..fb59693b9c 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -4,19 +4,17 @@ use std::fmt::Display; use fastfield_codecs::Column; use itertools::Itertools; use serde::{Deserialize, Serialize}; -use time::format_description::well_known::Rfc3339; -use time::OffsetDateTime; use crate::aggregation::agg_req::AggregationsInternal; use crate::aggregation::agg_req_with_accessor::{ AggregationsWithAccessor, BucketAggregationWithAccessor, }; use crate::aggregation::agg_result::BucketEntry; -use crate::aggregation::f64_from_fastfield_u64; use crate::aggregation::intermediate_agg_result::{ IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry, }; use crate::aggregation::segment_agg_result::SegmentAggregationResultsCollector; +use crate::aggregation::{f64_from_fastfield_u64, format_date}; use crate::schema::{Schema, Type}; use crate::{DocId, TantivyError}; @@ -534,22 +532,9 @@ pub(crate) fn intermediate_histogram_buckets_to_final_buckets( .ok_or_else(|| TantivyError::FieldNotFound(histogram_req.field.to_string()))?; if schema.get_field_entry(field).field_type().is_date() { for bucket in buckets.iter_mut() { - match bucket.key { - crate::aggregation::Key::F64(val) => { - let datetime = OffsetDateTime::from_unix_timestamp_nanos(1_000 * (val as i128)) - .map_err(|err| { - TantivyError::InvalidArgument(format!( - "Could not convert {:?} to OffsetDateTime, err {:?}", - val, err - )) - })?; - let key_as_string = datetime.format(&Rfc3339).map_err(|_err| { - TantivyError::InvalidArgument("Could not serialize date".to_string()) - })?; - - bucket.key_as_string = Some(key_as_string); - } - _ => {} + if let crate::aggregation::Key::F64(val) = bucket.key { + let key_as_string = format_date(val as i64)?; + bucket.key_as_string = Some(key_as_string); } } } diff --git a/src/aggregation/bucket/range.rs b/src/aggregation/bucket/range.rs index 333727536e..16296812c5 100644 --- a/src/aggregation/bucket/range.rs +++ b/src/aggregation/bucket/range.rs @@ -1,6 +1,7 @@ use std::fmt::Debug; use std::ops::Range; +use fastfield_codecs::MonotonicallyMappableToU64; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; @@ -11,7 +12,9 @@ use crate::aggregation::intermediate_agg_result::{ IntermediateBucketResult, IntermediateRangeBucketEntry, IntermediateRangeBucketResult, }; use crate::aggregation::segment_agg_result::{BucketCount, SegmentAggregationResultsCollector}; -use crate::aggregation::{f64_from_fastfield_u64, f64_to_fastfield_u64, Key, SerializedKey}; +use crate::aggregation::{ + f64_from_fastfield_u64, f64_to_fastfield_u64, format_date, Key, SerializedKey, +}; use crate::schema::Type; use crate::{DocId, TantivyError}; @@ -181,7 +184,7 @@ impl SegmentRangeCollector { .into_iter() .map(move |range_bucket| { Ok(( - range_to_string(&range_bucket.range, &field_type), + range_to_string(&range_bucket.range, &field_type)?, range_bucket .bucket .into_intermediate_bucket_entry(&agg_with_accessor.sub_aggregation)?, @@ -209,8 +212,8 @@ impl SegmentRangeCollector { let key = range .key .clone() - .map(Key::Str) - .unwrap_or_else(|| range_to_key(&range.range, &field_type)); + .map(|key| Ok(Key::Str(key))) + .unwrap_or_else(|| range_to_key(&range.range, &field_type))?; let to = if range.range.end == u64::MAX { None } else { @@ -228,6 +231,7 @@ impl SegmentRangeCollector { sub_aggregation, )?) }; + Ok(SegmentRangeAndBucketEntry { range: range.range.clone(), bucket: SegmentRangeBucketEntry { @@ -402,34 +406,45 @@ fn extend_validate_ranges( Ok(converted_buckets) } -pub(crate) fn range_to_string(range: &Range, field_type: &Type) -> String { +pub(crate) fn range_to_string(range: &Range, field_type: &Type) -> crate::Result { // is_start is there for malformed requests, e.g. ig the user passes the range u64::MIN..0.0, // it should be rendered as "*-0" and not "*-*" let to_str = |val: u64, is_start: bool| { if (is_start && val == u64::MIN) || (!is_start && val == u64::MAX) { - "*".to_string() + Ok("*".to_string()) + } else if *field_type == Type::Date { + let val = i64::from_u64(val); + format_date(val) } else { - f64_from_fastfield_u64(val, field_type).to_string() + Ok(f64_from_fastfield_u64(val, field_type).to_string()) } }; - format!("{}-{}", to_str(range.start, true), to_str(range.end, false)) + Ok(format!( + "{}-{}", + to_str(range.start, true)?, + to_str(range.end, false)? + )) } -pub(crate) fn range_to_key(range: &Range, field_type: &Type) -> Key { - Key::Str(range_to_string(range, field_type)) +pub(crate) fn range_to_key(range: &Range, field_type: &Type) -> crate::Result { + Ok(Key::Str(range_to_string(range, field_type)?)) } #[cfg(test)] mod tests { use fastfield_codecs::MonotonicallyMappableToU64; + use serde_json::Value; use super::*; use crate::aggregation::agg_req::{ Aggregation, Aggregations, BucketAggregation, BucketAggregationType, }; - use crate::aggregation::tests::{exec_request_with_query, get_test_index_with_num_docs}; + use crate::aggregation::tests::{ + exec_request, exec_request_with_query, get_test_index_2_segments, + get_test_index_with_num_docs, + }; pub fn get_collector_from_ranges( ranges: Vec, @@ -567,6 +582,77 @@ mod tests { Ok(()) } + #[test] + fn range_date_test_single_segment() -> crate::Result<()> { + range_date_test_with_opt(true) + } + + #[test] + fn range_date_test_multi_segment() -> crate::Result<()> { + range_date_test_with_opt(false) + } + + fn range_date_test_with_opt(merge_segments: bool) -> crate::Result<()> { + let index = get_test_index_2_segments(merge_segments)?; + + let agg_req: Aggregations = vec![( + "date_ranges".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Range(RangeAggregation { + field: "date".to_string(), + ranges: vec![ + RangeAggregationRange { + key: None, + from: None, + to: Some(1546300800000000.0f64), + }, + RangeAggregationRange { + key: None, + from: Some(1546300800000000.0f64), + to: Some(1546387200000000.0f64), + }, + ], + keyed: false, + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let agg_res = exec_request(agg_req, &index)?; + + let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?; + + assert_eq!( + res["date_ranges"]["buckets"][0]["from_as_string"], + Value::Null + ); + assert_eq!( + res["date_ranges"]["buckets"][0]["key"], + "*-2019-01-01T00:00:00Z" + ); + assert_eq!( + res["date_ranges"]["buckets"][1]["from_as_string"], + "2019-01-01T00:00:00Z" + ); + assert_eq!( + res["date_ranges"]["buckets"][1]["to_as_string"], + "2019-01-02T00:00:00Z" + ); + + assert_eq!( + res["date_ranges"]["buckets"][2]["from_as_string"], + "2019-01-02T00:00:00Z" + ); + assert_eq!( + res["date_ranges"]["buckets"][2]["to_as_string"], + Value::Null + ); + + Ok(()) + } + #[test] fn range_custom_key_keyed_buckets_test() -> crate::Result<()> { let index = get_test_index_with_num_docs(false, 100)?; diff --git a/src/aggregation/date.rs b/src/aggregation/date.rs new file mode 100644 index 0000000000..b80daa9c61 --- /dev/null +++ b/src/aggregation/date.rs @@ -0,0 +1,18 @@ +use time::format_description::well_known::Rfc3339; +use time::OffsetDateTime; + +use crate::TantivyError; + +pub(crate) fn format_date(val: i64) -> crate::Result { + let datetime = + OffsetDateTime::from_unix_timestamp_nanos(1_000 * (val as i128)).map_err(|err| { + TantivyError::InvalidArgument(format!( + "Could not convert {:?} to OffsetDateTime, err {:?}", + val, err + )) + })?; + let key_as_string = datetime + .format(&Rfc3339) + .map_err(|_err| TantivyError::InvalidArgument("Could not serialize date".to_string()))?; + Ok(key_as_string) +} diff --git a/src/aggregation/intermediate_agg_result.rs b/src/aggregation/intermediate_agg_result.rs index e0117228ac..cdeaa92e3d 100644 --- a/src/aggregation/intermediate_agg_result.rs +++ b/src/aggregation/intermediate_agg_result.rs @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize}; use super::agg_req::{ Aggregations, AggregationsInternal, BucketAggregationInternal, BucketAggregationType, - MetricAggregation, + MetricAggregation, RangeAggregation, }; use super::agg_result::{AggregationResult, BucketResult, RangeBucketEntry}; use super::bucket::{ @@ -19,10 +19,11 @@ use super::bucket::{ }; use super::metric::{IntermediateAverage, IntermediateStats}; use super::segment_agg_result::SegmentMetricResultCollector; -use super::{Key, SerializedKey, VecWithNames}; +use super::{format_date, Key, SerializedKey, VecWithNames}; use crate::aggregation::agg_result::{AggregationResults, BucketEntries, BucketEntry}; use crate::aggregation::bucket::TermsAggregationInternal; use crate::schema::Schema; +use crate::TantivyError; /// Contains the intermediate aggregation result, which is optimized to be merged with other /// intermediate results. @@ -282,7 +283,14 @@ impl IntermediateBucketResult { let mut buckets: Vec = range_res .buckets .into_iter() - .map(|(_, bucket)| bucket.into_final_bucket_entry(&req.sub_aggregation, schema)) + .map(|(_, bucket)| { + bucket.into_final_bucket_entry( + &req.sub_aggregation, + schema, + req.as_range() + .expect("unexpected aggregation, expected histogram aggregation"), + ) + }) .collect::>>()?; buckets.sort_by(|left, right| { @@ -588,8 +596,9 @@ impl IntermediateRangeBucketEntry { self, req: &AggregationsInternal, schema: &Schema, + range_req: &RangeAggregation, ) -> crate::Result { - Ok(RangeBucketEntry { + let mut range_bucket_entry = RangeBucketEntry { key: self.key, doc_count: self.doc_count, sub_aggregation: self @@ -597,7 +606,27 @@ impl IntermediateRangeBucketEntry { .into_final_bucket_result_internal(req, schema)?, to: self.to, from: self.from, - }) + to_as_string: None, + from_as_string: None, + }; + + // If we have a date type on the histogram buckets, we add the `key_as_string` field as + // rfc339 + let field = schema + .get_field(&range_req.field) + .ok_or_else(|| TantivyError::FieldNotFound(range_req.field.to_string()))?; + if schema.get_field_entry(field).field_type().is_date() { + if let Some(val) = range_bucket_entry.to { + let key_as_string = format_date(val as i64)?; + range_bucket_entry.to_as_string = Some(key_as_string); + } + if let Some(val) = range_bucket_entry.from { + let key_as_string = format_date(val as i64)?; + range_bucket_entry.from_as_string = Some(key_as_string); + } + } + + Ok(range_bucket_entry) } } diff --git a/src/aggregation/mod.rs b/src/aggregation/mod.rs index d095be976e..a6bd4478c4 100644 --- a/src/aggregation/mod.rs +++ b/src/aggregation/mod.rs @@ -158,6 +158,7 @@ mod agg_req_with_accessor; pub mod agg_result; pub mod bucket; mod collector; +mod date; pub mod intermediate_agg_result; pub mod metric; mod segment_agg_result; @@ -168,6 +169,7 @@ pub use collector::{ AggregationCollector, AggregationSegmentCollector, DistributedAggregationCollector, MAX_BUCKET_COUNT, }; +pub(crate) use date::format_date; use fastfield_codecs::MonotonicallyMappableToU64; use itertools::Itertools; use serde::{Deserialize, Serialize}; @@ -284,12 +286,11 @@ impl Display for Key { /// Inverse of `to_fastfield_u64`. Used to convert to `f64` for metrics. /// /// # Panics -/// Only `u64`, `f64`, and `i64` are supported. +/// Only `u64`, `f64`, `date`, and `i64` are supported. pub(crate) fn f64_from_fastfield_u64(val: u64, field_type: &Type) -> f64 { match field_type { Type::U64 => val as f64, - Type::I64 => i64::from_u64(val) as f64, - Type::Date => i64::from_u64(val) as f64, + Type::I64 | Type::Date => i64::from_u64(val) as f64, Type::F64 => f64::from_u64(val), _ => { panic!("unexpected type {:?}. This should not happen", field_type) @@ -297,10 +298,9 @@ pub(crate) fn f64_from_fastfield_u64(val: u64, field_type: &Type) -> f64 { } } -/// Converts the `f64` value to fast field value space. +/// Converts the `f64` value to fast field value space, which is always u64. /// -/// If the fast field has `u64`, values are stored as `u64` in the fast field. -/// A `f64` value of e.g. `2.0` therefore needs to be converted to `1u64`. +/// If the fast field has `u64`, values are stored unchanged as `u64` in the fast field. /// /// If the fast field has `f64` values are converted and stored to `u64` using a /// monotonic mapping. @@ -310,7 +310,7 @@ pub(crate) fn f64_from_fastfield_u64(val: u64, field_type: &Type) -> f64 { pub(crate) fn f64_to_fastfield_u64(val: f64, field_type: &Type) -> Option { match field_type { Type::U64 => Some(val as u64), - Type::I64 => Some((val as i64).to_u64()), + Type::I64 | Type::Date => Some((val as i64).to_u64()), Type::F64 => Some(val.to_u64()), _ => None, }