From ca20bfa7769747e6bb33c7b32a392e93a3491b82 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Thu, 2 Mar 2023 12:17:35 +0800 Subject: [PATCH] add date_histogram (#1900) * add date_histogram * add return result --- src/aggregation/agg_req.rs | 15 +- src/aggregation/agg_req_with_accessor.rs | 8 +- .../bucket/histogram/date_histogram.rs | 372 +++++++++++++++++- src/aggregation/bucket/histogram/histogram.rs | 1 + src/aggregation/bucket/histogram/mod.rs | 4 +- src/aggregation/error.rs | 9 + src/aggregation/intermediate_agg_result.rs | 18 +- src/aggregation/mod.rs | 2 + src/aggregation/segment_agg_result.rs | 8 + src/error.rs | 4 + 10 files changed, 410 insertions(+), 31 deletions(-) create mode 100644 src/aggregation/error.rs diff --git a/src/aggregation/agg_req.rs b/src/aggregation/agg_req.rs index 85269cf95f..45c6b2a611 100644 --- a/src/aggregation/agg_req.rs +++ b/src/aggregation/agg_req.rs @@ -50,7 +50,7 @@ use std::collections::{HashMap, HashSet}; use serde::{Deserialize, Serialize}; pub use super::bucket::RangeAggregation; -use super::bucket::{HistogramAggregation, TermsAggregation}; +use super::bucket::{DateHistogramAggregationReq, HistogramAggregation, TermsAggregation}; use super::metric::{ AverageAggregation, CountAggregation, MaxAggregation, MinAggregation, StatsAggregation, SumAggregation, @@ -110,10 +110,13 @@ impl BucketAggregationInternal { _ => None, } } - pub(crate) fn as_histogram(&self) -> Option<&HistogramAggregation> { + pub(crate) fn as_histogram(&self) -> crate::Result> { match &self.bucket_agg { - BucketAggregationType::Histogram(histogram) => Some(histogram), - _ => None, + BucketAggregationType::Histogram(histogram) => Ok(Some(histogram.clone())), + BucketAggregationType::DateHistogram(histogram) => { + Ok(Some(histogram.to_histogram_req()?)) + } + _ => Ok(None), } } pub(crate) fn as_term(&self) -> Option<&TermsAggregation> { @@ -196,6 +199,9 @@ pub enum BucketAggregationType { /// Put data into buckets of user-defined ranges. #[serde(rename = "histogram")] Histogram(HistogramAggregation), + /// Put data into buckets of user-defined ranges. + #[serde(rename = "date_histogram")] + DateHistogram(DateHistogramAggregationReq), /// Put data into buckets of terms. #[serde(rename = "terms")] Terms(TermsAggregation), @@ -207,6 +213,7 @@ impl BucketAggregationType { BucketAggregationType::Terms(terms) => terms.field.as_str(), BucketAggregationType::Range(range) => range.field.as_str(), BucketAggregationType::Histogram(histogram) => histogram.field.as_str(), + BucketAggregationType::DateHistogram(histogram) => histogram.field.as_str(), } } } diff --git a/src/aggregation/agg_req_with_accessor.rs b/src/aggregation/agg_req_with_accessor.rs index 0d87206051..aba7d2067f 100644 --- a/src/aggregation/agg_req_with_accessor.rs +++ b/src/aggregation/agg_req_with_accessor.rs @@ -6,7 +6,9 @@ use std::sync::atomic::AtomicU32; use columnar::{Column, ColumnType, StrColumn}; use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation}; -use super::bucket::{HistogramAggregation, RangeAggregation, TermsAggregation}; +use super::bucket::{ + DateHistogramAggregationReq, HistogramAggregation, RangeAggregation, TermsAggregation, +}; use super::metric::{ AverageAggregation, CountAggregation, MaxAggregation, MinAggregation, StatsAggregation, SumAggregation, @@ -62,6 +64,10 @@ impl BucketAggregationWithAccessor { BucketAggregationType::Histogram(HistogramAggregation { field: field_name, .. }) => get_ff_reader_and_validate(reader, field_name)?, + BucketAggregationType::DateHistogram(DateHistogramAggregationReq { + field: field_name, + .. + }) => get_ff_reader_and_validate(reader, field_name)?, BucketAggregationType::Terms(TermsAggregation { field: field_name, .. }) => { diff --git a/src/aggregation/bucket/histogram/date_histogram.rs b/src/aggregation/bucket/histogram/date_histogram.rs index f01c7e04f4..37c159f8bf 100644 --- a/src/aggregation/bucket/histogram/date_histogram.rs +++ b/src/aggregation/bucket/histogram/date_histogram.rs @@ -1,5 +1,8 @@ use serde::{Deserialize, Serialize}; +use super::{HistogramAggregation, HistogramBounds}; +use crate::aggregation::AggregationError; + /// DateHistogramAggregation is similar to `HistogramAggregation`, but it can only be used with date /// type. /// @@ -29,8 +32,16 @@ use serde::{Deserialize, Serialize}; /// See [`BucketEntry`](crate::aggregation::agg_result::BucketEntry) #[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] pub struct DateHistogramAggregationReq { + #[doc(hidden)] + /// Only for validation + interval: Option, + #[doc(hidden)] + /// Only for validation + date_interval: Option, /// The field to aggregate on. pub field: String, + /// The format to format dates. + pub format: Option, /// The interval to chunk your data range. Each bucket spans a value range of /// [0..fixed_interval). Accepted values /// @@ -55,29 +66,132 @@ pub struct DateHistogramAggregationReq { /// Intervals implicitly defines an absolute grid of buckets `[interval * k, interval * (k + /// 1))`. pub offset: Option, + /// The minimum number of documents in a bucket to be returned. Defaults to 0. + pub min_doc_count: Option, + /// Limits the data range to `[min, max]` closed interval. + /// + /// This can be used to filter values if they are not in the data range. + /// + /// hard_bounds only limits the buckets, to force a range set both extended_bounds and + /// hard_bounds to the same range. + /// + /// Needs to be provided as timestamp in microseconds precision. + /// + /// ## Example + /// ```json + /// { + /// "sales_over_time": { + /// "date_histogram": { + /// "field": "dates", + /// "interval": "1d", + /// "hard_bounds": { + /// "min": 0, + /// "max": 1420502400000000 + /// } + /// } + /// } + /// } + /// ``` + pub hard_bounds: Option, + /// Can be set to extend your bounds. The range of the buckets is by default defined by the + /// data range of the values of the documents. As the name suggests, this can only be used to + /// extend the value range. If the bounds for min or max are not extending the range, the value + /// has no effect on the returned buckets. + /// + /// Cannot be set in conjunction with min_doc_count > 0, since the empty buckets from extended + /// bounds would not be returned. + pub extended_bounds: Option, + /// Whether to return the buckets as a hash map #[serde(default)] pub keyed: bool, } impl DateHistogramAggregationReq { + pub(crate) fn to_histogram_req(&self) -> crate::Result { + self.validate()?; + Ok(HistogramAggregation { + field: self.field.to_string(), + interval: parse_into_microseconds(&self.fixed_interval)? as f64, + offset: self + .offset + .as_ref() + .map(|offset| parse_offset_into_microseconds(offset)) + .transpose()? + .map(|el| el as f64), + min_doc_count: self.min_doc_count, + hard_bounds: None, + extended_bounds: None, + keyed: self.keyed, + }) + } + fn validate(&self) -> crate::Result<()> { + if self.interval.is_some() { + return Err(crate::TantivyError::InvalidArgument(format!( + "`interval` parameter {:?} in date histogram is unsupported, only \ + `fixed_interval` is supported", + self.interval + ))); + } + if self.format.is_some() { + return Err(crate::TantivyError::InvalidArgument( + "format parameter on date_histogram is unsupported".to_string(), + )); + } + + if self.date_interval.is_some() { + return Err(crate::TantivyError::InvalidArgument( + "date_interval in date histogram is unsupported, only `fixed_interval` is \ + supported" + .to_string(), + )); + } + + parse_into_microseconds(&self.fixed_interval)?; + Ok(()) } } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Error)] /// Errors when parsing the fixed interval for `DateHistogramAggregationReq`. pub enum DateHistogramParseError { /// Unit not recognized in passed String + #[error("Unit not recognized in passed String {0:?}")] UnitNotRecognized(String), /// Number not found in passed String + #[error("Number not found in passed String {0:?}")] NumberMissing(String), /// Unit not found in passed String + #[error("Unit not found in passed String {0:?}")] UnitMissing(String), + /// Offset invalid + #[error("passed offset is invalid {0:?}")] + InvalidOffset(String), } -fn parse_into_milliseconds(input: &str) -> Result { +fn parse_offset_into_microseconds(input: &str) -> Result { + let is_sign = |byte| &[byte] == b"-" || &[byte] == b"+"; + if input.is_empty() { + return Err(DateHistogramParseError::InvalidOffset(input.to_string()).into()); + } + + let has_sign = is_sign(input.as_bytes()[0]); + if has_sign { + let (sign, input) = input.split_at(1); + let val = parse_into_microseconds(input)?; + if sign == "-" { + Ok(-val) + } else { + Ok(val) + } + } else { + parse_into_microseconds(input) + } +} + +fn parse_into_microseconds(input: &str) -> Result { let split_boundary = input .as_bytes() .iter() @@ -85,12 +199,12 @@ fn parse_into_milliseconds(input: &str) -> Result .count(); let (number, unit) = input.split_at(split_boundary); if number.is_empty() { - return Err(DateHistogramParseError::NumberMissing(input.to_string())); + return Err(DateHistogramParseError::NumberMissing(input.to_string()).into()); } if unit.is_empty() { - return Err(DateHistogramParseError::UnitMissing(input.to_string())); + return Err(DateHistogramParseError::UnitMissing(input.to_string()).into()); } - let number: u64 = number + let number: i64 = number .parse() // Technically this should never happen, but there was a bug // here and being defensive does not hurt. @@ -102,36 +216,260 @@ fn parse_into_milliseconds(input: &str) -> Result "m" => 60 * 1000, "h" => 60 * 60 * 1000, "d" => 24 * 60 * 60 * 1000, - _ => return Err(DateHistogramParseError::UnitNotRecognized(unit.to_string())), + _ => return Err(DateHistogramParseError::UnitNotRecognized(unit.to_string()).into()), }; - Ok(number * multiplier_from_unit) + Ok(number * multiplier_from_unit * 1000) } #[cfg(test)] mod tests { + use pretty_assertions::assert_eq; + use super::*; + use crate::aggregation::agg_req::Aggregations; + use crate::aggregation::tests::exec_request; + use crate::indexer::NoMergePolicy; + use crate::schema::{Schema, FAST}; + use crate::Index; + + #[test] + fn test_parse_into_microseconds() { + assert_eq!(parse_into_microseconds("1m").unwrap(), 60_000_000); + assert_eq!(parse_into_microseconds("2m").unwrap(), 120_000_000); + assert_eq!( + parse_into_microseconds("2y").unwrap_err(), + DateHistogramParseError::UnitNotRecognized("y".to_string()).into() + ); + assert_eq!( + parse_into_microseconds("2000").unwrap_err(), + DateHistogramParseError::UnitMissing("2000".to_string()).into() + ); + assert_eq!( + parse_into_microseconds("ms").unwrap_err(), + DateHistogramParseError::NumberMissing("ms".to_string()).into() + ); + } #[test] - fn test_parse_into_milliseconds() { - assert_eq!(parse_into_milliseconds("1m").unwrap(), 60_000); - assert_eq!(parse_into_milliseconds("2m").unwrap(), 120_000); + fn test_parse_offset_into_microseconds() { + assert_eq!(parse_offset_into_microseconds("1m").unwrap(), 60_000_000); + assert_eq!(parse_offset_into_microseconds("+1m").unwrap(), 60_000_000); + assert_eq!(parse_offset_into_microseconds("-1m").unwrap(), -60_000_000); + assert_eq!(parse_offset_into_microseconds("2m").unwrap(), 120_000_000); + assert_eq!(parse_offset_into_microseconds("+2m").unwrap(), 120_000_000); + assert_eq!(parse_offset_into_microseconds("-2m").unwrap(), -120_000_000); + assert_eq!(parse_offset_into_microseconds("-2ms").unwrap(), -2_000); assert_eq!( - parse_into_milliseconds("2y").unwrap_err(), - DateHistogramParseError::UnitNotRecognized("y".to_string()) + parse_offset_into_microseconds("2y").unwrap_err(), + DateHistogramParseError::UnitNotRecognized("y".to_string()).into() ); assert_eq!( - parse_into_milliseconds("2000").unwrap_err(), - DateHistogramParseError::UnitMissing("2000".to_string()) + parse_offset_into_microseconds("2000").unwrap_err(), + DateHistogramParseError::UnitMissing("2000".to_string()).into() ); assert_eq!( - parse_into_milliseconds("ms").unwrap_err(), - DateHistogramParseError::NumberMissing("ms".to_string()) + parse_offset_into_microseconds("ms").unwrap_err(), + DateHistogramParseError::NumberMissing("ms".to_string()).into() ); } #[test] fn test_parse_into_milliseconds_do_not_accept_non_ascii() { - assert!(parse_into_milliseconds("1m").is_err()); + assert!(parse_into_microseconds("1m").is_err()); + } + + pub fn get_test_index_from_docs( + merge_segments: bool, + segment_and_docs: &[Vec<&str>], + ) -> crate::Result { + let mut schema_builder = Schema::builder(); + schema_builder.add_date_field("date", FAST); + schema_builder.add_text_field("text", FAST); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema.clone()); + { + let mut index_writer = index.writer_with_num_threads(1, 30_000_000)?; + index_writer.set_merge_policy(Box::new(NoMergePolicy)); + for values in segment_and_docs { + for doc_str in values { + let doc = schema.parse_document(doc_str)?; + index_writer.add_document(doc)?; + } + // writing the segment + index_writer.commit()?; + } + } + if merge_segments { + let segment_ids = index + .searchable_segment_ids() + .expect("Searchable segments failed."); + if segment_ids.len() > 1 { + let mut index_writer = index.writer_for_tests()?; + index_writer.merge(&segment_ids).wait()?; + index_writer.wait_merging_threads()?; + } + } + + Ok(index) + } + + #[test] + fn histogram_test_date_force_merge_segments() -> crate::Result<()> { + histogram_test_date_merge_segments(true) + } + + #[test] + fn histogram_test_date() -> crate::Result<()> { + histogram_test_date_merge_segments(false) + } + fn histogram_test_date_merge_segments(merge_segments: bool) -> crate::Result<()> { + let docs = vec![ + vec![r#"{ "date": "2015-01-01T12:10:30Z", "text": "aaa" }"#], + vec![r#"{ "date": "2015-01-01T11:11:30Z", "text": "bbb" }"#], + vec![r#"{ "date": "2015-01-02T00:00:00Z", "text": "bbb" }"#], + vec![r#"{ "date": "2015-01-06T00:00:00Z", "text": "ccc" }"#], + ]; + + let index = get_test_index_from_docs(merge_segments, &docs)?; + // 30day + offset + let elasticsearch_compatible_json = json!( + { + "sales_over_time": { + "date_histogram": { + "field": "date", + "fixed_interval": "30d", + "offset": "-4d" + } + } + } + ); + + let agg_req: Aggregations = + serde_json::from_str(&serde_json::to_string(&elasticsearch_compatible_json).unwrap()) + .unwrap(); + let res = exec_request(agg_req, &index)?; + let expected_res = json!({ + "sales_over_time" : { + "buckets" : [ + { + "key_as_string" : "2015-01-01T00:00:00Z", + "key" : 1420070400000000.0, + "doc_count" : 4 + } + ] + } + }); + assert_eq!(res, expected_res); + + // 30day + offset + sub_agg + let elasticsearch_compatible_json = json!( + { + "sales_over_time": { + "date_histogram": { + "field": "date", + "fixed_interval": "30d", + "offset": "-4d" + }, + "aggs": { + "texts": { + "terms": {"field": "text"} + } + } + } + } + ); + + let agg_req: Aggregations = + serde_json::from_str(&serde_json::to_string(&elasticsearch_compatible_json).unwrap()) + .unwrap(); + let res = exec_request(agg_req, &index)?; + println!("{}", serde_json::to_string_pretty(&res).unwrap()); + let expected_res = json!({ + "sales_over_time" : { + "buckets" : [ + { + "key_as_string" : "2015-01-01T00:00:00Z", + "key" : 1420070400000000.0, + "doc_count" : 4, + "texts": { + "buckets": [ + { + "doc_count": 2, + "key": "bbb" + }, + { + "doc_count": 1, + "key": "ccc" + }, + { + "doc_count": 1, + "key": "aaa" + } + ], + "doc_count_error_upper_bound": 0, + "sum_other_doc_count": 0 + } + } + ] + } + }); + assert_eq!(res, expected_res); + + // 1day + let elasticsearch_compatible_json = json!( + { + "sales_over_time": { + "date_histogram": { + "field": "date", + "fixed_interval": "1d" + } + } + } + ); + + let agg_req: Aggregations = + serde_json::from_str(&serde_json::to_string(&elasticsearch_compatible_json).unwrap()) + .unwrap(); + let res = exec_request(agg_req, &index)?; + let expected_res = json!( { + "sales_over_time": { + "buckets": [ + { + "doc_count": 2, + "key": 1420070400000000.0, + "key_as_string": "2015-01-01T00:00:00Z" + }, + { + "doc_count": 1, + "key": 1420156800000000.0, + "key_as_string": "2015-01-02T00:00:00Z" + }, + { + "doc_count": 0, + "key": 1420243200000000.0, + "key_as_string": "2015-01-03T00:00:00Z" + }, + { + "doc_count": 0, + "key": 1420329600000000.0, + "key_as_string": "2015-01-04T00:00:00Z" + }, + { + "doc_count": 0, + "key": 1420416000000000.0, + "key_as_string": "2015-01-05T00:00:00Z" + }, + { + "doc_count": 1, + "key": 1420502400000000.0, + "key_as_string": "2015-01-06T00:00:00Z" + } + ] + } + }); + assert_eq!(res, expected_res); + + Ok(()) } } diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index 3af15248a0..849a27e361 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -394,6 +394,7 @@ fn intermediate_buckets_to_final_buckets_fill_gaps( // extended_bounds from the request let min_max = minmax(buckets.iter().map(|bucket| bucket.key)); + // TODO add memory check let fill_gaps_buckets = generate_buckets_with_opt_minmax(histogram_req, min_max); let empty_sub_aggregation = IntermediateAggregationResults::empty_from_req(sub_aggregation); diff --git a/src/aggregation/bucket/histogram/mod.rs b/src/aggregation/bucket/histogram/mod.rs index 596ce57c1d..82f695caae 100644 --- a/src/aggregation/bucket/histogram/mod.rs +++ b/src/aggregation/bucket/histogram/mod.rs @@ -1,4 +1,4 @@ -// mod date_histogram; +mod date_histogram; mod histogram; -// pub use date_histogram::*; +pub use date_histogram::*; pub use histogram::*; diff --git a/src/aggregation/error.rs b/src/aggregation/error.rs new file mode 100644 index 0000000000..b04d07861f --- /dev/null +++ b/src/aggregation/error.rs @@ -0,0 +1,9 @@ +use super::bucket::DateHistogramParseError; + +/// Error that may occur when opening a directory +#[derive(Debug, Clone, PartialEq, Eq, Error)] +pub enum AggregationError { + /// Failed to open the directory. + #[error("Date histogram parse error: {0:?}")] + DateHistogramParseError(#[from] DateHistogramParseError), +} diff --git a/src/aggregation/intermediate_agg_result.rs b/src/aggregation/intermediate_agg_result.rs index b07b02f029..25e86c1f27 100644 --- a/src/aggregation/intermediate_agg_result.rs +++ b/src/aggregation/intermediate_agg_result.rs @@ -329,15 +329,17 @@ impl IntermediateBucketResult { column_type, buckets, } => { + let histogram_req = &req + .as_histogram()? + .expect("unexpected aggregation, expected histogram aggregation"); let buckets = intermediate_histogram_buckets_to_final_buckets( buckets, column_type, - req.as_histogram() - .expect("unexpected aggregation, expected histogram aggregation"), + histogram_req, &req.sub_aggregation, )?; - let buckets = if req.as_histogram().unwrap().keyed { + let buckets = if histogram_req.keyed { let mut bucket_map = FxHashMap::with_capacity_and_hasher(buckets.len(), Default::default()); for bucket in buckets { @@ -361,10 +363,12 @@ impl IntermediateBucketResult { match req { BucketAggregationType::Terms(_) => IntermediateBucketResult::Terms(Default::default()), BucketAggregationType::Range(_) => IntermediateBucketResult::Range(Default::default()), - BucketAggregationType::Histogram(_) => IntermediateBucketResult::Histogram { - buckets: vec![], - column_type: None, - }, + BucketAggregationType::Histogram(_) | BucketAggregationType::DateHistogram(_) => { + IntermediateBucketResult::Histogram { + buckets: vec![], + column_type: None, + } + } } } fn merge_fruits(&mut self, other: IntermediateBucketResult) { diff --git a/src/aggregation/mod.rs b/src/aggregation/mod.rs index f3b327b614..a995dbffa8 100644 --- a/src/aggregation/mod.rs +++ b/src/aggregation/mod.rs @@ -162,6 +162,7 @@ pub mod bucket; mod buf_collector; mod collector; mod date; +mod error; pub mod intermediate_agg_result; pub mod metric; mod segment_agg_result; @@ -177,6 +178,7 @@ pub use collector::{ }; use columnar::{ColumnType, MonotonicallyMappableToU64}; pub(crate) use date::format_date; +pub use error::AggregationError; use itertools::Itertools; use serde::{Deserialize, Serialize}; diff --git a/src/aggregation/segment_agg_result.rs b/src/aggregation/segment_agg_result.rs index b6e7fe2f36..b91a514373 100644 --- a/src/aggregation/segment_agg_result.rs +++ b/src/aggregation/segment_agg_result.rs @@ -144,6 +144,14 @@ pub(crate) fn build_bucket_segment_agg_collector( accessor_idx, )?)) } + BucketAggregationType::DateHistogram(histogram) => { + Ok(Box::new(SegmentHistogramCollector::from_req_and_validate( + &histogram.to_histogram_req()?, + &req.sub_aggregation, + req.field_type, + accessor_idx, + )?)) + } } } diff --git a/src/error.rs b/src/error.rs index ec3ceb87f1..816074a19c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,6 +6,7 @@ use std::{fmt, io}; use thiserror::Error; +use crate::aggregation::AggregationError; use crate::directory::error::{ Incompatibility, LockError, OpenDirectoryError, OpenReadError, OpenWriteError, }; @@ -53,6 +54,9 @@ impl fmt::Debug for DataCorruption { /// The library's error enum #[derive(Debug, Clone, Error)] pub enum TantivyError { + /// Error when handling aggregations. + #[error("AggregationError {0:?}")] + AggregationError(#[from] AggregationError), /// Failed to open the directory. #[error("Failed to open the directory: '{0:?}'")] OpenDirectoryError(#[from] OpenDirectoryError),