Skip to content

Commit

Permalink
Add dictionary cases to merge bench (#3384)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored Sep 7, 2022
1 parent 79922b4 commit c359018
Showing 1 changed file with 100 additions and 3 deletions.
103 changes: 100 additions & 3 deletions datafusion/core/benches/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
use std::sync::Arc;

use arrow::array::DictionaryArray;
use arrow::datatypes::Int32Type;
use arrow::{
array::{Float64Array, Int64Array, StringArray, UInt64Array},
compute::{self, SortOptions, TakeOptions},
Expand Down Expand Up @@ -113,9 +115,13 @@ const INPUT_SIZE: u64 = 100000;
lazy_static! {
static ref I64_STREAMS: Vec<Vec<RecordBatch>> = i64_streams();
static ref F64_STREAMS: Vec<Vec<RecordBatch>> = f64_streams();
// TODO: add dictionay encoded values

static ref UTF8_LOW_CARDINALITY_STREAMS: Vec<Vec<RecordBatch>> = utf8_low_cardinality_streams();
static ref UTF8_HIGH_CARDINALITY_STREAMS: Vec<Vec<RecordBatch>> = utf8_high_cardinality_streams();

static ref DICTIONARY_STREAMS: Vec<Vec<RecordBatch>> = dictionary_streams();
static ref DICTIONARY_TUPLE_STREAMS: Vec<Vec<RecordBatch>> = dictionary_tuple_streams();
static ref MIXED_DICTIONARY_TUPLE_STREAMS: Vec<Vec<RecordBatch>> = mixed_dictionary_tuple_streams();
// * (string(low), string(low), string(high)) -- tpch q1 + iox
static ref UTF8_TUPLE_STREAMS: Vec<Vec<RecordBatch>> = utf8_tuple_streams();
// * (f64, string, string, int) -- tpch q2
Expand Down Expand Up @@ -154,6 +160,22 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(move || case.run())
});

c.bench_function("merge utf8 dictionary", |b| {
let case = MergeBenchCase::new(&DICTIONARY_STREAMS);

b.iter(move || case.run())
});

c.bench_function("merge utf8 dictionary tuple", |b| {
let case = MergeBenchCase::new(&DICTIONARY_TUPLE_STREAMS);
b.iter(move || case.run())
});

c.bench_function("merge mixed utf8 dictionary tuple", |b| {
let case = MergeBenchCase::new(&MIXED_DICTIONARY_TUPLE_STREAMS);
b.iter(move || case.run())
});

c.bench_function("merge mixed tuple", |b| {
let case = MergeBenchCase::new(&MIXED_TUPLE_STREAMS);

Expand Down Expand Up @@ -330,6 +352,79 @@ fn mixed_tuple_streams() -> Vec<Vec<RecordBatch>> {
split_batch(batch)
}

/// Create a batch of (utf8_dict)
fn dictionary_streams() -> Vec<Vec<RecordBatch>> {
let mut gen = DataGenerator::new();
let values = gen.utf8_low_cardinality_values();
let dictionary: DictionaryArray<Int32Type> =
values.iter().map(Option::as_deref).collect();

let batch =
RecordBatch::try_from_iter(vec![("dict", Arc::new(dictionary) as _)]).unwrap();

split_batch(batch)
}

/// Create a batch of (utf8_dict, utf8_dict, utf8_dict)
fn dictionary_tuple_streams() -> Vec<Vec<RecordBatch>> {
let mut gen = DataGenerator::new();
let mut tuples: Vec<_> = gen
.utf8_low_cardinality_values()
.into_iter()
.zip(gen.utf8_low_cardinality_values())
.zip(gen.utf8_low_cardinality_values())
.collect();
tuples.sort_unstable();

let (tuples, c): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let (a, b): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();

let a: DictionaryArray<Int32Type> = a.iter().map(Option::as_deref).collect();
let b: DictionaryArray<Int32Type> = b.iter().map(Option::as_deref).collect();
let c: DictionaryArray<Int32Type> = c.iter().map(Option::as_deref).collect();

let batch = RecordBatch::try_from_iter(vec![
("a", Arc::new(a) as _),
("b", Arc::new(b) as _),
("c", Arc::new(c) as _),
])
.unwrap();

split_batch(batch)
}

/// Create a batch of (utf8_dict, utf8_dict, utf8_dict, i64)
fn mixed_dictionary_tuple_streams() -> Vec<Vec<RecordBatch>> {
let mut gen = DataGenerator::new();
let mut tuples: Vec<_> = gen
.utf8_low_cardinality_values()
.into_iter()
.zip(gen.utf8_low_cardinality_values())
.zip(gen.utf8_low_cardinality_values())
.zip(gen.i64_values())
.collect();
tuples.sort_unstable();

let (tuples, d): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let (tuples, c): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();
let (a, b): (Vec<_>, Vec<_>) = tuples.into_iter().unzip();

let a: DictionaryArray<Int32Type> = a.iter().map(Option::as_deref).collect();
let b: DictionaryArray<Int32Type> = b.iter().map(Option::as_deref).collect();
let c: DictionaryArray<Int32Type> = c.iter().map(Option::as_deref).collect();
let d: Int64Array = d.into_iter().collect();

let batch = RecordBatch::try_from_iter(vec![
("a", Arc::new(a) as _),
("b", Arc::new(b) as _),
("c", Arc::new(c) as _),
("d", Arc::new(d) as _),
])
.unwrap();

split_batch(batch)
}

/// Encapsulates creating data for this test
struct DataGenerator {
rng: StdRng,
Expand Down Expand Up @@ -363,13 +458,15 @@ impl DataGenerator {

/// array of low cardinality (100 distinct) values
fn utf8_low_cardinality_values(&mut self) -> Vec<Option<Arc<str>>> {
let strings = (0..100).map(|s| format!("value{}", s)).collect::<Vec<_>>();
let strings = (0..100)
.map(|s| format!("value{}", s).into())
.collect::<Vec<_>>();

// pick from the 100 strings randomly
let mut input = (0..INPUT_SIZE)
.map(|_| {
let idx = self.rng.gen_range(0..strings.len());
let s = Arc::from(strings[idx].as_str());
let s = Arc::clone(&strings[idx]);
Some(s)
})
.collect::<Vec<_>>();
Expand Down

0 comments on commit c359018

Please sign in to comment.