Skip to content

Commit

Permalink
perf: Optimize IfExpr by delegating to CaseExpr (apache#681)
Browse files Browse the repository at this point in the history
* Unify IF and CASE expressions

* revert test changes

* fix
  • Loading branch information
andygrove authored Jul 24, 2024
1 parent 01362b5 commit e2d838e
Show file tree
Hide file tree
Showing 5 changed files with 340 additions and 30 deletions.
17 changes: 17 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,30 @@ chrono = { workspace = true }
datafusion = { workspace = true }
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
chrono-tz = { workspace = true }
num = { workspace = true }
regex = { workspace = true }
thiserror = { workspace = true }

[dev-dependencies]
criterion = "0.5.1"
rand = "0.8.5"

[lib]
name = "datafusion_comet_spark_expr"
path = "src/lib.rs"

[[bench]]
name = "cast_from_string"
harness = false

[[bench]]
name = "cast_numeric"
harness = false

[[bench]]
name = "conditional"
harness = false
91 changes: 91 additions & 0 deletions benches/cast_from_string.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::{builder::StringBuilder, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion_comet_spark_expr::{Cast, EvalMode};
use datafusion_physical_expr::{expressions::Column, PhysicalExpr};
use std::sync::Arc;

fn criterion_benchmark(c: &mut Criterion) {
let batch = create_utf8_batch();
let expr = Arc::new(Column::new("a", 0));
let timezone = "".to_string();
let cast_string_to_i8 = Cast::new(
expr.clone(),
DataType::Int8,
EvalMode::Legacy,
timezone.clone(),
);
let cast_string_to_i16 = Cast::new(
expr.clone(),
DataType::Int16,
EvalMode::Legacy,
timezone.clone(),
);
let cast_string_to_i32 = Cast::new(
expr.clone(),
DataType::Int32,
EvalMode::Legacy,
timezone.clone(),
);
let cast_string_to_i64 = Cast::new(expr, DataType::Int64, EvalMode::Legacy, timezone);

let mut group = c.benchmark_group("cast_string_to_int");
group.bench_function("cast_string_to_i8", |b| {
b.iter(|| cast_string_to_i8.evaluate(&batch).unwrap());
});
group.bench_function("cast_string_to_i16", |b| {
b.iter(|| cast_string_to_i16.evaluate(&batch).unwrap());
});
group.bench_function("cast_string_to_i32", |b| {
b.iter(|| cast_string_to_i32.evaluate(&batch).unwrap());
});
group.bench_function("cast_string_to_i64", |b| {
b.iter(|| cast_string_to_i64.evaluate(&batch).unwrap());
});
}

// Create UTF8 batch with strings representing ints, floats, nulls
fn create_utf8_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
let mut b = StringBuilder::new();
for i in 0..1000 {
if i % 10 == 0 {
b.append_null();
} else if i % 2 == 0 {
b.append_value(format!("{}", rand::random::<f64>()));
} else {
b.append_value(format!("{}", rand::random::<i64>()));
}
}
let array = b.finish();

RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap()
}

fn config() -> Criterion {
Criterion::default()
}

criterion_group! {
name = benches;
config = config();
targets = criterion_benchmark
}
criterion_main!(benches);
79 changes: 79 additions & 0 deletions benches/cast_numeric.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::{builder::Int32Builder, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion_comet_spark_expr::{Cast, EvalMode};
use datafusion_physical_expr::{expressions::Column, PhysicalExpr};
use std::sync::Arc;

fn criterion_benchmark(c: &mut Criterion) {
let batch = create_int32_batch();
let expr = Arc::new(Column::new("a", 0));
let timezone = "".to_string();
let cast_i32_to_i8 = Cast::new(
expr.clone(),
DataType::Int8,
EvalMode::Legacy,
timezone.clone(),
);
let cast_i32_to_i16 = Cast::new(
expr.clone(),
DataType::Int16,
EvalMode::Legacy,
timezone.clone(),
);
let cast_i32_to_i64 = Cast::new(expr, DataType::Int64, EvalMode::Legacy, timezone);

let mut group = c.benchmark_group("cast_int_to_int");
group.bench_function("cast_i32_to_i8", |b| {
b.iter(|| cast_i32_to_i8.evaluate(&batch).unwrap());
});
group.bench_function("cast_i32_to_i16", |b| {
b.iter(|| cast_i32_to_i16.evaluate(&batch).unwrap());
});
group.bench_function("cast_i32_to_i64", |b| {
b.iter(|| cast_i32_to_i64.evaluate(&batch).unwrap());
});
}

fn create_int32_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
let mut b = Int32Builder::new();
for i in 0..1000 {
if i % 10 == 0 {
b.append_null();
} else {
b.append_value(rand::random::<i32>());
}
}
let array = b.finish();

RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap()
}

fn config() -> Criterion {
Criterion::default()
}

criterion_group! {
name = benches;
config = config();
targets = criterion_benchmark
}
criterion_main!(benches);
139 changes: 139 additions & 0 deletions benches/conditional.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::datatypes::{Field, Schema};
use arrow::record_batch::RecordBatch;
use arrow_array::builder::{Int32Builder, StringBuilder};
use arrow_schema::DataType;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion_comet_spark_expr::IfExpr;
use datafusion_common::ScalarValue;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{BinaryExpr, CaseExpr};
use datafusion_physical_expr_common::expressions::column::Column;
use datafusion_physical_expr_common::expressions::Literal;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use std::sync::Arc;

fn make_col(name: &str, index: usize) -> Arc<dyn PhysicalExpr> {
Arc::new(Column::new(name, index))
}

fn make_lit_i32(n: i32) -> Arc<dyn PhysicalExpr> {
Arc::new(Literal::new(ScalarValue::Int32(Some(n))))
}

fn make_null_lit() -> Arc<dyn PhysicalExpr> {
Arc::new(Literal::new(ScalarValue::Utf8(None)))
}

fn criterion_benchmark(c: &mut Criterion) {
// create input data
let mut c1 = Int32Builder::new();
let mut c2 = StringBuilder::new();
let mut c3 = StringBuilder::new();
for i in 0..1000 {
c1.append_value(i);
if i % 7 == 0 {
c2.append_null();
} else {
c2.append_value(&format!("string {i}"));
}
if i % 9 == 0 {
c3.append_null();
} else {
c3.append_value(&format!("other string {i}"));
}
}
let c1 = Arc::new(c1.finish());
let c2 = Arc::new(c2.finish());
let c3 = Arc::new(c3.finish());
let schema = Schema::new(vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Utf8, true),
Field::new("c3", DataType::Utf8, true),
]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![c1, c2, c3]).unwrap();

// use same predicate for all benchmarks
let predicate = Arc::new(BinaryExpr::new(
make_col("c1", 0),
Operator::LtEq,
make_lit_i32(500),
));

// CASE WHEN c1 <= 500 THEN 1 ELSE 0 END
c.bench_function("case_when: scalar or scalar", |b| {
let expr = Arc::new(
CaseExpr::try_new(
None,
vec![(predicate.clone(), make_lit_i32(1))],
Some(make_lit_i32(0)),
)
.unwrap(),
);
b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap()))
});
c.bench_function("if: scalar or scalar", |b| {
let expr = Arc::new(IfExpr::new(
predicate.clone(),
make_lit_i32(1),
make_lit_i32(0),
));
b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap()))
});

// CASE WHEN c1 <= 500 THEN c2 [ELSE NULL] END
c.bench_function("case_when: column or null", |b| {
let expr = Arc::new(
CaseExpr::try_new(None, vec![(predicate.clone(), make_col("c2", 1))], None).unwrap(),
);
b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap()))
});
c.bench_function("if: column or null", |b| {
let expr = Arc::new(IfExpr::new(
predicate.clone(),
make_col("c2", 1),
make_null_lit(),
));
b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap()))
});

// CASE WHEN c1 <= 500 THEN c2 ELSE c3 END
c.bench_function("case_when: expr or expr", |b| {
let expr = Arc::new(
CaseExpr::try_new(
None,
vec![(predicate.clone(), make_col("c2", 1))],
Some(make_col("c3", 2)),
)
.unwrap(),
);
b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap()))
});
c.bench_function("if: expr or expr", |b| {
let expr = Arc::new(IfExpr::new(
predicate.clone(),
make_col("c2", 1),
make_col("c3", 2),
));
b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap()))
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
Loading

0 comments on commit e2d838e

Please sign in to comment.