-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Round robin polling between tied winners in sort preserving merge (#1…
…3133) * first draft Signed-off-by: jayzhan211 <[email protected]> * add data Signed-off-by: jayzhan211 <[email protected]> * fix benchmark Signed-off-by: jayzhan211 <[email protected]> * add more bencmark data Signed-off-by: jayzhan211 <[email protected]> * fix benchmark Signed-off-by: jayzhan211 <[email protected]> * fmt Signed-off-by: jayzhan211 <[email protected]> * get max size Signed-off-by: jayzhan211 <[email protected]> * add license Signed-off-by: jayzhan211 <[email protected]> * rm code for merge Signed-off-by: jayzhan211 <[email protected]> * cleanup Signed-off-by: jayzhan211 <[email protected]> * cleanup Signed-off-by: jayzhan211 <[email protected]> * update poll count only we have tie Signed-off-by: jayzhan211 <[email protected]> * upd comment Signed-off-by: jayzhan211 <[email protected]> * fix logic Signed-off-by: jayzhan211 <[email protected]> * configurable Signed-off-by: jayzhan211 <[email protected]> * fmt Signed-off-by: jayzhan211 <[email protected]> * add mem limit test Signed-off-by: jayzhan211 <[email protected]> * rm test Signed-off-by: jayzhan211 <[email protected]> * escape bracket Signed-off-by: jayzhan211 <[email protected]> * add test Signed-off-by: jayzhan211 <[email protected]> * rm per consumer record Signed-off-by: jayzhan211 <[email protected]> * repartition limit Signed-off-by: jayzhan211 <[email protected]> * add benchmark Signed-off-by: jayzhan211 <[email protected]> * cleanup Signed-off-by: jayzhan211 <[email protected]> * benchmark with parameter Signed-off-by: jayzhan211 <[email protected]> * only calculate consumer pool if the limit is set Signed-off-by: jayzhan211 <[email protected]> * combine eq and gt Signed-off-by: jayzhan211 <[email protected]> * review part 1 * Update merge.rs * upd doc Signed-off-by: jayzhan211 <[email protected]> * no need index comparison Signed-off-by: jayzhan211 <[email protected]> * combine handle tie and eq check Signed-off-by: jayzhan211 <[email protected]> * upd doc Signed-off-by: jayzhan211 <[email protected]> * fmt Signed-off-by: jayzhan211 <[email protected]> * add more comment Signed-off-by: jayzhan211 <[email protected]> * remove flag Signed-off-by: jayzhan211 <[email protected]> * upd comment Signed-off-by: jayzhan211 <[email protected]> * Revert "remove flag" This reverts commit 8d6c0a6. * Revert "upd comment" This reverts commit a18cba8. * add more comment Signed-off-by: jayzhan211 <[email protected]> * add more comment Signed-off-by: jayzhan211 <[email protected]> * fmt Signed-off-by: jayzhan211 <[email protected]> * simpliy mem pool Signed-off-by: jayzhan211 <[email protected]> * clippy Signed-off-by: jayzhan211 <[email protected]> * Update merge.rs * minor * add comment Signed-off-by: jayzhan211 <[email protected]> --------- Signed-off-by: jayzhan211 <[email protected]> Co-authored-by: berkaysynnada <[email protected]>
- Loading branch information
1 parent
4975829
commit f23360f
Showing
6 changed files
with
501 additions
and
24 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use std::sync::Arc; | ||
|
||
use arrow::record_batch::RecordBatch; | ||
use arrow_array::{ArrayRef, Int32Array, Int64Array, StringArray}; | ||
use datafusion_execution::TaskContext; | ||
use datafusion_physical_expr::expressions::col; | ||
use datafusion_physical_expr::PhysicalSortExpr; | ||
use datafusion_physical_plan::memory::MemoryExec; | ||
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; | ||
use datafusion_physical_plan::{collect, ExecutionPlan}; | ||
|
||
use criterion::async_executor::FuturesExecutor; | ||
use criterion::{black_box, criterion_group, criterion_main, Criterion}; | ||
|
||
fn generate_spm_for_round_robin_tie_breaker( | ||
has_same_value: bool, | ||
enable_round_robin_repartition: bool, | ||
batch_count: usize, | ||
partition_count: usize, | ||
) -> SortPreservingMergeExec { | ||
let row_size = 256; | ||
let rb = if has_same_value { | ||
let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size])); | ||
let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("a"); row_size])); | ||
let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![0; row_size])); | ||
RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap() | ||
} else { | ||
let v = (0i32..row_size as i32).collect::<Vec<_>>(); | ||
let a: ArrayRef = Arc::new(Int32Array::from(v)); | ||
|
||
// Use alphanumeric characters | ||
let charset: Vec<char> = | ||
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" | ||
.chars() | ||
.collect(); | ||
|
||
let mut strings = Vec::new(); | ||
for i in 0..256 { | ||
let mut s = String::new(); | ||
s.push(charset[i % charset.len()]); | ||
s.push(charset[(i / charset.len()) % charset.len()]); | ||
strings.push(Some(s)); | ||
} | ||
|
||
let b: ArrayRef = Arc::new(StringArray::from_iter(strings)); | ||
|
||
let v = (0i64..row_size as i64).collect::<Vec<_>>(); | ||
let c: ArrayRef = Arc::new(Int64Array::from_iter(v)); | ||
RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap() | ||
}; | ||
|
||
let rbs = (0..batch_count).map(|_| rb.clone()).collect::<Vec<_>>(); | ||
let partitiones = vec![rbs.clone(); partition_count]; | ||
|
||
let schema = rb.schema(); | ||
let sort = vec![ | ||
PhysicalSortExpr { | ||
expr: col("b", &schema).unwrap(), | ||
options: Default::default(), | ||
}, | ||
PhysicalSortExpr { | ||
expr: col("c", &schema).unwrap(), | ||
options: Default::default(), | ||
}, | ||
]; | ||
|
||
let exec = MemoryExec::try_new(&partitiones, schema, None).unwrap(); | ||
SortPreservingMergeExec::new(sort, Arc::new(exec)) | ||
.with_round_robin_repartition(enable_round_robin_repartition) | ||
} | ||
|
||
fn run_bench( | ||
c: &mut Criterion, | ||
has_same_value: bool, | ||
enable_round_robin_repartition: bool, | ||
batch_count: usize, | ||
partition_count: usize, | ||
description: &str, | ||
) { | ||
let task_ctx = TaskContext::default(); | ||
let task_ctx = Arc::new(task_ctx); | ||
|
||
let spm = Arc::new(generate_spm_for_round_robin_tie_breaker( | ||
has_same_value, | ||
enable_round_robin_repartition, | ||
batch_count, | ||
partition_count, | ||
)) as Arc<dyn ExecutionPlan>; | ||
|
||
c.bench_function(description, |b| { | ||
b.to_async(FuturesExecutor) | ||
.iter(|| black_box(collect(Arc::clone(&spm), Arc::clone(&task_ctx)))) | ||
}); | ||
} | ||
|
||
fn criterion_benchmark(c: &mut Criterion) { | ||
let params = [ | ||
(true, false, "low_card_without_tiebreaker"), // low cardinality, no tie breaker | ||
(true, true, "low_card_with_tiebreaker"), // low cardinality, with tie breaker | ||
(false, false, "high_card_without_tiebreaker"), // high cardinality, no tie breaker | ||
(false, true, "high_card_with_tiebreaker"), // high cardinality, with tie breaker | ||
]; | ||
|
||
let batch_counts = [1, 25, 625]; | ||
let partition_counts = [2, 8, 32]; | ||
|
||
for &(has_same_value, enable_round_robin_repartition, cardinality_label) in ¶ms { | ||
for &batch_count in &batch_counts { | ||
for &partition_count in &partition_counts { | ||
let description = format!( | ||
"{}_batch_count_{}_partition_count_{}", | ||
cardinality_label, batch_count, partition_count | ||
); | ||
run_bench( | ||
c, | ||
has_same_value, | ||
enable_round_robin_repartition, | ||
batch_count, | ||
partition_count, | ||
&description, | ||
); | ||
} | ||
} | ||
} | ||
} | ||
|
||
criterion_group!(benches, criterion_benchmark); | ||
criterion_main!(benches); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.