diff --git a/datafusion/core/benches/merge.rs b/datafusion/core/benches/merge.rs index 8a5b421953ac..0b993ca64441 100644 --- a/datafusion/core/benches/merge.rs +++ b/datafusion/core/benches/merge.rs @@ -69,6 +69,8 @@ use std::sync::Arc; +use arrow::array::DictionaryArray; +use arrow::datatypes::Int32Type; use arrow::{ array::{Float64Array, Int64Array, StringArray, UInt64Array}, compute::{self, SortOptions, TakeOptions}, @@ -113,9 +115,13 @@ const INPUT_SIZE: u64 = 100000; lazy_static! { static ref I64_STREAMS: Vec> = i64_streams(); static ref F64_STREAMS: Vec> = f64_streams(); - // TODO: add dictionay encoded values + static ref UTF8_LOW_CARDINALITY_STREAMS: Vec> = utf8_low_cardinality_streams(); static ref UTF8_HIGH_CARDINALITY_STREAMS: Vec> = utf8_high_cardinality_streams(); + + static ref DICTIONARY_STREAMS: Vec> = dictionary_streams(); + static ref DICTIONARY_TUPLE_STREAMS: Vec> = dictionary_tuple_streams(); + static ref MIXED_DICTIONARY_TUPLE_STREAMS: Vec> = mixed_dictionary_tuple_streams(); // * (string(low), string(low), string(high)) -- tpch q1 + iox static ref UTF8_TUPLE_STREAMS: Vec> = utf8_tuple_streams(); // * (f64, string, string, int) -- tpch q2 @@ -154,6 +160,22 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(move || case.run()) }); + c.bench_function("merge utf8 dictionary", |b| { + let case = MergeBenchCase::new(&DICTIONARY_STREAMS); + + b.iter(move || case.run()) + }); + + c.bench_function("merge utf8 dictionary tuple", |b| { + let case = MergeBenchCase::new(&DICTIONARY_TUPLE_STREAMS); + b.iter(move || case.run()) + }); + + c.bench_function("merge mixed utf8 dictionary tuple", |b| { + let case = MergeBenchCase::new(&MIXED_DICTIONARY_TUPLE_STREAMS); + b.iter(move || case.run()) + }); + c.bench_function("merge mixed tuple", |b| { let case = MergeBenchCase::new(&MIXED_TUPLE_STREAMS); @@ -330,6 +352,79 @@ fn mixed_tuple_streams() -> Vec> { split_batch(batch) } +/// Create a batch of (utf8_dict) +fn dictionary_streams() -> Vec> { + let mut gen = DataGenerator::new(); + let values = gen.utf8_low_cardinality_values(); + let dictionary: DictionaryArray = + values.iter().map(Option::as_deref).collect(); + + let batch = + RecordBatch::try_from_iter(vec![("dict", Arc::new(dictionary) as _)]).unwrap(); + + split_batch(batch) +} + +/// Create a batch of (utf8_dict, utf8_dict, utf8_dict) +fn dictionary_tuple_streams() -> Vec> { + let mut gen = DataGenerator::new(); + let mut tuples: Vec<_> = gen + .utf8_low_cardinality_values() + .into_iter() + .zip(gen.utf8_low_cardinality_values()) + .zip(gen.utf8_low_cardinality_values()) + .collect(); + tuples.sort_unstable(); + + let (tuples, c): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + let (a, b): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + + let a: DictionaryArray = a.iter().map(Option::as_deref).collect(); + let b: DictionaryArray = b.iter().map(Option::as_deref).collect(); + let c: DictionaryArray = c.iter().map(Option::as_deref).collect(); + + let batch = RecordBatch::try_from_iter(vec![ + ("a", Arc::new(a) as _), + ("b", Arc::new(b) as _), + ("c", Arc::new(c) as _), + ]) + .unwrap(); + + split_batch(batch) +} + +/// Create a batch of (utf8_dict, utf8_dict, utf8_dict, i64) +fn mixed_dictionary_tuple_streams() -> Vec> { + let mut gen = DataGenerator::new(); + let mut tuples: Vec<_> = gen + .utf8_low_cardinality_values() + .into_iter() + .zip(gen.utf8_low_cardinality_values()) + .zip(gen.utf8_low_cardinality_values()) + .zip(gen.i64_values()) + .collect(); + tuples.sort_unstable(); + + let (tuples, d): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + let (tuples, c): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + let (a, b): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + + let a: DictionaryArray = a.iter().map(Option::as_deref).collect(); + let b: DictionaryArray = b.iter().map(Option::as_deref).collect(); + let c: DictionaryArray = c.iter().map(Option::as_deref).collect(); + let d: Int64Array = d.into_iter().collect(); + + let batch = RecordBatch::try_from_iter(vec![ + ("a", Arc::new(a) as _), + ("b", Arc::new(b) as _), + ("c", Arc::new(c) as _), + ("d", Arc::new(d) as _), + ]) + .unwrap(); + + split_batch(batch) +} + /// Encapsulates creating data for this test struct DataGenerator { rng: StdRng, @@ -363,13 +458,15 @@ impl DataGenerator { /// array of low cardinality (100 distinct) values fn utf8_low_cardinality_values(&mut self) -> Vec>> { - let strings = (0..100).map(|s| format!("value{}", s)).collect::>(); + let strings = (0..100) + .map(|s| format!("value{}", s).into()) + .collect::>(); // pick from the 100 strings randomly let mut input = (0..INPUT_SIZE) .map(|_| { let idx = self.rng.gen_range(0..strings.len()); - let s = Arc::from(strings[idx].as_str()); + let s = Arc::clone(&strings[idx]); Some(s) }) .collect::>();