From c00628dd478b17f4cfda469af869666a40ff0d21 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 18 Sep 2023 13:10:28 +0100 Subject: [PATCH] Update arrow 47.0.0 --- .github/workflows/docs_pr.yaml | 1 - .github/workflows/rust.yml | 8 +- Cargo.toml | 11 +- datafusion-cli/Cargo.lock | 46 ++---- datafusion-cli/Cargo.toml | 7 + datafusion-examples/Cargo.toml | 4 +- datafusion/core/Cargo.toml | 3 - datafusion/core/tests/sql/select.rs | 1 - datafusion/optimizer/Cargo.toml | 1 - datafusion/physical-expr/Cargo.toml | 3 - .../physical-expr/src/expressions/binary.rs | 114 +++----------- .../physical-expr/src/expressions/datum.rs | 56 +++++++ .../physical-expr/src/expressions/like.rs | 143 ++---------------- .../physical-expr/src/expressions/mod.rs | 1 + datafusion/proto/Cargo.toml | 2 +- datafusion/substrait/Cargo.toml | 4 +- parquet-testing | 2 +- testing | 2 +- 18 files changed, 136 insertions(+), 273 deletions(-) create mode 100644 datafusion/physical-expr/src/expressions/datum.rs diff --git a/.github/workflows/docs_pr.yaml b/.github/workflows/docs_pr.yaml index 6669aa610080..c2f3dd684a23 100644 --- a/.github/workflows/docs_pr.yaml +++ b/.github/workflows/docs_pr.yaml @@ -47,7 +47,6 @@ jobs: uses: ./.github/actions/setup-builder with: rust-version: stable - # Note: this does not include dictionary_expressions to reduce codegen - name: Run doctests run: cargo test --doc --features avro,json - name: Verify Working Directory Clean diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 995916ef0d4e..6989d89878a3 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -65,7 +65,6 @@ jobs: - name: Check workspace in debug mode run: cargo check - # Note: this does not include dictionary_expressions to reduce codegen - name: Check workspace with all features run: cargo check --workspace --benches --features avro,json - name: Check Cargo.lock for datafusion-cli @@ -96,7 +95,7 @@ jobs: with: rust-version: stable - name: Run tests (excluding doctests) - run: cargo test --lib --tests --bins --features avro,json,dictionary_expressions,backtrace + run: cargo test --lib --tests --bins --features avro,json,backtrace - name: Verify Working Directory Clean run: git diff --exit-code @@ -177,7 +176,6 @@ jobs: uses: ./.github/actions/setup-builder with: rust-version: stable - # Note: this does not include dictionary_expressions to reduce codegen - name: Run doctests run: | cargo test --doc --features avro,json @@ -302,7 +300,7 @@ jobs: shell: bash run: | export PATH=$PATH:$HOME/d/protoc/bin - cargo test --lib --tests --bins --features avro,json,dictionary_expressions,backtrace + cargo test --lib --tests --bins --features avro,json,backtrace cd datafusion-cli cargo test --lib --tests --bins --all-features env: @@ -338,7 +336,7 @@ jobs: - name: Run tests (excluding doctests) shell: bash run: | - cargo test --lib --tests --bins --features avro,json,dictionary_expressions,backtrace + cargo test --lib --tests --bins --features avro,json,backtrace cd datafusion-cli cargo test --lib --tests --bins --all-features env: diff --git a/Cargo.toml b/Cargo.toml index 2141b73f8aec..308ab6deb9d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ rust-version = "1.70" version = "31.0.0" [workspace.dependencies] -arrow = { version = "46.0.0", features = ["prettyprint", "dyn_cmp_dict"] } +arrow = { version = "46.0.0", features = ["prettyprint"] } arrow-array = { version = "46.0.0", default-features = false, features = ["chrono-tz"] } arrow-buffer = { version = "46.0.0", default-features = false } arrow-flight = { version = "46.0.0", features = ["flight-sql-experimental"] } @@ -41,7 +41,6 @@ parquet = { version = "46.0.0", features = ["arrow", "async", "object_store"] } sqlparser = { version = "0.37.0", features = ["visitor"] } chrono = { version = "0.4.31", default-features = false } - [profile.release] codegen-units = 1 lto = true @@ -59,3 +58,11 @@ opt-level = 3 overflow-checks = false panic = 'unwind' rpath = false + +[patch.crates-io] +arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "175c7765939c0738defc736426c0b0a93b00bfa8" } +arrow-array = { git = "https://github.com/apache/arrow-rs.git", rev = "175c7765939c0738defc736426c0b0a93b00bfa8" } +arrow-buffer = { git = "https://github.com/apache/arrow-rs.git", rev = "175c7765939c0738defc736426c0b0a93b00bfa8" } +arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "175c7765939c0738defc736426c0b0a93b00bfa8" } +arrow-schema = { git = "https://github.com/apache/arrow-rs.git", rev = "175c7765939c0738defc736426c0b0a93b00bfa8" } +parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "175c7765939c0738defc736426c0b0a93b00bfa8" } diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index d7d1357a659d..27d6dac907e2 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -96,8 +96,7 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "arrow" version = "46.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04a8801ebb147ad240b2d978d3ab9f73c9ccd4557ba6a03e7800496770ed10e0" +source = "git+https://github.com/apache/arrow-rs.git?rev=175c7765939c0738defc736426c0b0a93b00bfa8#175c7765939c0738defc736426c0b0a93b00bfa8" dependencies = [ "ahash", "arrow-arith", @@ -118,8 +117,7 @@ dependencies = [ [[package]] name = "arrow-arith" version = "46.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "895263144bd4a69751cbe6a34a53f26626e19770b313a9fa792c415cd0e78f11" +source = "git+https://github.com/apache/arrow-rs.git?rev=175c7765939c0738defc736426c0b0a93b00bfa8#175c7765939c0738defc736426c0b0a93b00bfa8" dependencies = [ "arrow-array", "arrow-buffer", @@ -133,8 +131,7 @@ dependencies = [ [[package]] name = "arrow-array" version = "46.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "226fdc6c3a4ae154a74c24091d36a90b514f0ed7112f5b8322c1d8f354d8e20d" +source = "git+https://github.com/apache/arrow-rs.git?rev=175c7765939c0738defc736426c0b0a93b00bfa8#175c7765939c0738defc736426c0b0a93b00bfa8" dependencies = [ "ahash", "arrow-buffer", @@ -150,8 +147,7 @@ dependencies = [ [[package]] name = "arrow-buffer" version = "46.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc4843af4dd679c2f35b69c572874da8fde33be53eb549a5fb128e7a4b763510" +source = "git+https://github.com/apache/arrow-rs.git?rev=175c7765939c0738defc736426c0b0a93b00bfa8#175c7765939c0738defc736426c0b0a93b00bfa8" dependencies = [ "bytes", "half", @@ -161,8 +157,7 @@ dependencies = [ [[package]] name = "arrow-cast" version = "46.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35e8b9990733a9b635f656efda3c9b8308c7a19695c9ec2c7046dd154f9b144b" +source = "git+https://github.com/apache/arrow-rs.git?rev=175c7765939c0738defc736426c0b0a93b00bfa8#175c7765939c0738defc736426c0b0a93b00bfa8" dependencies = [ "arrow-array", "arrow-buffer", @@ -179,8 +174,7 @@ dependencies = [ [[package]] name = "arrow-csv" version = "46.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "646fbb4e11dd0afb8083e883f53117713b8caadb4413b3c9e63e3f535da3683c" +source = "git+https://github.com/apache/arrow-rs.git?rev=175c7765939c0738defc736426c0b0a93b00bfa8#175c7765939c0738defc736426c0b0a93b00bfa8" dependencies = [ "arrow-array", "arrow-buffer", @@ -198,8 +192,7 @@ dependencies = [ [[package]] name = "arrow-data" version = "46.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da900f31ff01a0a84da0572209be72b2b6f980f3ea58803635de47913191c188" +source = "git+https://github.com/apache/arrow-rs.git?rev=175c7765939c0738defc736426c0b0a93b00bfa8#175c7765939c0738defc736426c0b0a93b00bfa8" dependencies = [ "arrow-buffer", "arrow-schema", @@ -210,8 +203,7 @@ dependencies = [ [[package]] name = "arrow-ipc" version = "46.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2707a8d7ee2d345d045283ece3ae43416175873483e5d96319c929da542a0b1f" +source = "git+https://github.com/apache/arrow-rs.git?rev=175c7765939c0738defc736426c0b0a93b00bfa8#175c7765939c0738defc736426c0b0a93b00bfa8" dependencies = [ "arrow-array", "arrow-buffer", @@ -224,8 +216,7 @@ dependencies = [ [[package]] name = "arrow-json" version = "46.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d1b91a63c356d14eedc778b76d66a88f35ac8498426bb0799a769a49a74a8b4" +source = "git+https://github.com/apache/arrow-rs.git?rev=175c7765939c0738defc736426c0b0a93b00bfa8#175c7765939c0738defc736426c0b0a93b00bfa8" dependencies = [ "arrow-array", "arrow-buffer", @@ -244,8 +235,7 @@ dependencies = [ [[package]] name = "arrow-ord" version = "46.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "584325c91293abbca7aaaabf8da9fe303245d641f5f4a18a6058dc68009c7ebf" +source = "git+https://github.com/apache/arrow-rs.git?rev=175c7765939c0738defc736426c0b0a93b00bfa8#175c7765939c0738defc736426c0b0a93b00bfa8" dependencies = [ "arrow-array", "arrow-buffer", @@ -259,8 +249,7 @@ dependencies = [ [[package]] name = "arrow-row" version = "46.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e32afc1329f7b372463b21c6ca502b07cf237e1ed420d87706c1770bb0ebd38" +source = "git+https://github.com/apache/arrow-rs.git?rev=175c7765939c0738defc736426c0b0a93b00bfa8#175c7765939c0738defc736426c0b0a93b00bfa8" dependencies = [ "ahash", "arrow-array", @@ -274,15 +263,14 @@ dependencies = [ [[package]] name = "arrow-schema" version = "46.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b104f5daa730f00fde22adc03a12aa5a2ae9ccbbf99cbd53d284119ddc90e03d" +source = "git+https://github.com/apache/arrow-rs.git?rev=175c7765939c0738defc736426c0b0a93b00bfa8#175c7765939c0738defc736426c0b0a93b00bfa8" [[package]] name = "arrow-select" version = "46.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73b3ca55356d1eae07cf48808d8c462cea674393ae6ad1e0b120f40b422eb2b4" +source = "git+https://github.com/apache/arrow-rs.git?rev=175c7765939c0738defc736426c0b0a93b00bfa8#175c7765939c0738defc736426c0b0a93b00bfa8" dependencies = [ + "ahash", "arrow-array", "arrow-buffer", "arrow-data", @@ -293,8 +281,7 @@ dependencies = [ [[package]] name = "arrow-string" version = "46.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af1433ce02590cae68da0a18ed3a3ed868ffac2c6f24c533ddd2067f7ee04b4a" +source = "git+https://github.com/apache/arrow-rs.git?rev=175c7765939c0738defc736426c0b0a93b00bfa8#175c7765939c0738defc736426c0b0a93b00bfa8" dependencies = [ "arrow-array", "arrow-buffer", @@ -2304,8 +2291,7 @@ dependencies = [ [[package]] name = "parquet" version = "46.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ad2cba786ae07da4d73371a88b9e0f9d3ffac1a9badc83922e0e15814f5c5fa" +source = "git+https://github.com/apache/arrow-rs.git?rev=175c7765939c0738defc736426c0b0a93b00bfa8#175c7765939c0738defc736426c0b0a93b00bfa8" dependencies = [ "ahash", "arrow-array", diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index e77ad7d85ebe..7502a3d9df39 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -50,3 +50,10 @@ assert_cmd = "2.0" ctor = "0.2.0" predicates = "3.0" rstest = "0.17" + +[patch.crates-io] +arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "175c7765939c0738defc736426c0b0a93b00bfa8" } +arrow-array = { git = "https://github.com/apache/arrow-rs.git", rev = "175c7765939c0738defc736426c0b0a93b00bfa8" } +arrow-buffer = { git = "https://github.com/apache/arrow-rs.git", rev = "175c7765939c0738defc736426c0b0a93b00bfa8" } +arrow-schema = { git = "https://github.com/apache/arrow-rs.git", rev = "175c7765939c0738defc736426c0b0a93b00bfa8" } +parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "175c7765939c0738defc736426c0b0a93b00bfa8" } diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 22cc3f4df425..d928f0177dc3 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -47,12 +47,12 @@ log = "0.4" mimalloc = { version = "0.1", default-features = false } num_cpus = "1.13.0" object_store = { version = "0.7.0", features = ["aws"] } -prost = { version = "0.11", default-features = false } +prost = { version = "0.12", default-features = false } prost-derive = { version = "0.11", default-features = false } serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.82" tempfile = "3" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] } -tonic = "0.9" +tonic = "0.10" url = "2.2" uuid = "1.2" diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 99ffbb8be8e8..8c8375001fca 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -40,9 +40,6 @@ backtrace = ["datafusion-common/backtrace"] compression = ["xz2", "bzip2", "flate2", "zstd", "async-compression"] crypto_expressions = ["datafusion-physical-expr/crypto_expressions", "datafusion-optimizer/crypto_expressions"] default = ["crypto_expressions", "encoding__expressions", "regex_expressions", "unicode_expressions", "compression"] -# Enables support for non-scalar, binary operations on dictionaries -# Note: this results in significant additional codegen -dictionary_expressions = ["datafusion-physical-expr/dictionary_expressions", "datafusion-optimizer/dictionary_expressions"] encoding__expressions = ["datafusion-physical-expr/encoding_expressions"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = [] diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index c05dc31cbe0f..ae4601fcd826 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -188,7 +188,6 @@ async fn query_nested_get_indexed_field_on_struct() -> Result<()> { } #[tokio::test] -#[cfg(feature = "dictionary_expressions")] async fn query_on_string_dictionary() -> Result<()> { // Test to ensure DataFusion can operate on dictionary types // Use StringDictionary (32 bit indexes = keys) diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index f42a67c0d782..38443c9297f9 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -35,7 +35,6 @@ path = "src/lib.rs" [features] crypto_expressions = ["datafusion-physical-expr/crypto_expressions"] default = ["unicode_expressions", "crypto_expressions", "regex_expressions"] -dictionary_expressions = ["datafusion-physical-expr/dictionary_expressions"] regex_expressions = ["datafusion-physical-expr/regex_expressions"] unicode_expressions = ["datafusion-physical-expr/unicode_expressions"] diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 777918bfac6a..70e3483483b4 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -35,9 +35,6 @@ path = "src/lib.rs" [features] crypto_expressions = ["md-5", "sha2", "blake2", "blake3"] default = ["crypto_expressions", "regex_expressions", "unicode_expressions", "encoding_expressions"] -# Enables support for non-scalar, binary operations on dictionaries -# Note: this results in significant additional codegen -dictionary_expressions = ["arrow/dyn_cmp_dict"] encoding_expressions = ["base64", "hex"] regex_expressions = ["regex"] unicode_expressions = ["unicode-segmentation"] diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index f235ea9fcdaf..30af6ac80cd1 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -38,12 +38,12 @@ use arrow::compute::kernels::comparison::regexp_is_match_utf8_scalar; use arrow::compute::kernels::concat_elements::concat_elements_utf8; use arrow::datatypes::*; use arrow::record_batch::RecordBatch; -use arrow_array::Datum; use datafusion_common::cast::as_boolean_array; use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue}; use datafusion_expr::type_coercion::binary::get_result_type; use datafusion_expr::{ColumnarValue, Operator}; +use crate::expressions::datum::{apply, apply_cmp}; use kernels::{ bitwise_and_dyn, bitwise_and_dyn_scalar, bitwise_or_dyn, bitwise_or_dyn_scalar, bitwise_shift_left_dyn, bitwise_shift_left_dyn_scalar, bitwise_shift_right_dyn, @@ -132,32 +132,6 @@ macro_rules! compute_utf8_op { }}; } -/// Invoke a compute kernel on a data array and a scalar value -macro_rules! compute_utf8_op_scalar { - ($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident, $OP_TYPE:expr) => {{ - let ll = $LEFT - .as_any() - .downcast_ref::<$DT>() - .expect("compute_op failed to downcast left side array"); - if let ScalarValue::Utf8(Some(string_value)) - | ScalarValue::LargeUtf8(Some(string_value)) = $RIGHT - { - Ok(Arc::new(paste::expr! {[<$OP _utf8_scalar>]}( - &ll, - &string_value, - )?)) - } else if $RIGHT.is_null() { - Ok(Arc::new(new_null_array($OP_TYPE, $LEFT.len()))) - } else { - internal_err!( - "compute_utf8_op_scalar for '{}' failed to cast literal value {}", - stringify!($OP), - $RIGHT - ) - } - }}; -} - macro_rules! binary_string_array_op { ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{ match $LEFT.data_type() { @@ -284,35 +258,37 @@ impl PhysicalExpr for BinaryExpr { } fn evaluate(&self, batch: &RecordBatch) -> Result { - let left_value = self.left.evaluate(batch)?; - let right_value = self.right.evaluate(batch)?; - let left_data_type = left_value.data_type(); - let right_data_type = right_value.data_type(); + use arrow::compute::kernels::numeric::*; + + let lhs = self.left.evaluate(batch)?; + let rhs = self.right.evaluate(batch)?; + let left_data_type = lhs.data_type(); + let right_data_type = rhs.data_type(); let schema = batch.schema(); let input_schema = schema.as_ref(); - if self.is_datum_operator() { - return match (&left_value, &right_value) { - (ColumnarValue::Array(left), ColumnarValue::Array(right)) => { - self.evaluate_datum(&left.as_ref(), &right.as_ref()) - } - (ColumnarValue::Scalar(left), ColumnarValue::Array(right)) => { - self.evaluate_datum(&left.to_scalar(), &right.as_ref()) - } - (ColumnarValue::Array(left), ColumnarValue::Scalar(right)) => { - self.evaluate_datum(&left.as_ref(), &right.to_scalar()) - } - (ColumnarValue::Scalar(left), ColumnarValue::Scalar(right)) => { - self.evaluate_datum(&left.to_scalar(), &right.to_scalar()) - } - }; + match self.op { + Operator::Plus => return apply(&lhs, &rhs, add_wrapping), + Operator::Minus => return apply(&lhs, &rhs, sub_wrapping), + Operator::Multiply => return apply(&lhs, &rhs, mul_wrapping), + Operator::Divide => return apply(&lhs, &rhs, mul_wrapping), + Operator::Modulo => return apply(&lhs, &rhs, rem), + Operator::Eq => return apply_cmp(&lhs, &rhs, eq), + Operator::NotEq => return apply_cmp(&lhs, &rhs, neq), + Operator::Lt => return apply_cmp(&lhs, &rhs, lt), + Operator::Gt => return apply_cmp(&lhs, &rhs, gt), + Operator::LtEq => return apply_cmp(&lhs, &rhs, lt_eq), + Operator::GtEq => return apply_cmp(&lhs, &rhs, gt_eq), + Operator::IsDistinctFrom => return apply_cmp(&lhs, &rhs, distinct), + Operator::IsNotDistinctFrom => return apply_cmp(&lhs, &rhs, not_distinct), + _ => {} } let result_type = self.data_type(input_schema)?; // Attempt to use special kernels if one input is scalar and the other is an array - let scalar_result = match (&left_value, &right_value) { + let scalar_result = match (&lhs, &rhs) { (ColumnarValue::Array(array), ColumnarValue::Scalar(scalar)) => { // if left is array and right is literal - use scalar operations self.evaluate_array_scalar(array, scalar.clone())?.map(|r| { @@ -328,8 +304,8 @@ impl PhysicalExpr for BinaryExpr { // if both arrays or both literals - extract arrays and continue execution let (left, right) = ( - left_value.into_array(batch.num_rows()), - right_value.into_array(batch.num_rows()), + lhs.into_array(batch.num_rows()), + rhs.into_array(batch.num_rows()), ); self.evaluate_with_resolved_args(left, &left_data_type, right, &right_data_type) .map(|a| ColumnarValue::Array(a)) @@ -450,46 +426,6 @@ fn to_result_type_array( } impl BinaryExpr { - fn is_datum_operator(&self) -> bool { - use Operator::*; - self.op.is_numerical_operators() - || matches!( - self.op, - Lt | LtEq | Gt | GtEq | Eq | NotEq | IsDistinctFrom | IsNotDistinctFrom - ) - } - - /// Evaluate the expression using [`Datum`] - fn evaluate_datum( - &self, - left: &dyn Datum, - right: &dyn Datum, - ) -> Result { - use arrow::compute::kernels::numeric::*; - let array = match self.op { - Operator::Plus => add_wrapping(left, right)?, - Operator::Minus => sub_wrapping(left, right)?, - Operator::Multiply => mul_wrapping(left, right)?, - Operator::Divide => div(left, right)?, - Operator::Modulo => rem(left, right)?, - Operator::Eq => Arc::new(eq(left, right)?), - Operator::NotEq => Arc::new(neq(left, right)?), - Operator::Lt => Arc::new(lt(left, right)?), - Operator::Gt => Arc::new(gt(left, right)?), - Operator::LtEq => Arc::new(lt_eq(left, right)?), - Operator::GtEq => Arc::new(gt_eq(left, right)?), - Operator::IsDistinctFrom => Arc::new(distinct(left, right)?), - Operator::IsNotDistinctFrom => Arc::new(not_distinct(left, right)?), - _ => unreachable!(), - }; - - if left.get().1 && right.get().1 { - let scalar = ScalarValue::try_from_array(array.as_ref(), 0)?; - return Ok(ColumnarValue::Scalar(scalar)); - } - Ok(ColumnarValue::Array(array)) - } - /// Evaluate the expression of the left input is an array and /// right is literal - use scalar operations fn evaluate_array_scalar( diff --git a/datafusion/physical-expr/src/expressions/datum.rs b/datafusion/physical-expr/src/expressions/datum.rs new file mode 100644 index 000000000000..326d5a4e82a3 --- /dev/null +++ b/datafusion/physical-expr/src/expressions/datum.rs @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, Datum}; +use arrow::error::ArrowError; +use arrow_array::BooleanArray; +use datafusion_common::{Result, ScalarValue}; +use datafusion_expr::ColumnarValue; +use std::sync::Arc; + +/// Applies a binary [`Datum`] kernel `f` to `lhs` and `rhs` +pub(crate) fn apply( + lhs: &ColumnarValue, + rhs: &ColumnarValue, + f: impl Fn(&dyn Datum, &dyn Datum) -> Result, +) -> Result { + match (&lhs, &rhs) { + (ColumnarValue::Array(left), ColumnarValue::Array(right)) => { + Ok(ColumnarValue::Array(f(&left.as_ref(), &right.as_ref())?)) + } + (ColumnarValue::Scalar(left), ColumnarValue::Array(right)) => { + Ok(ColumnarValue::Array(f(&left.to_scalar(), &right.as_ref())?)) + } + (ColumnarValue::Array(left), ColumnarValue::Scalar(right)) => { + Ok(ColumnarValue::Array(f(&left.as_ref(), &right.to_scalar())?)) + } + (ColumnarValue::Scalar(left), ColumnarValue::Scalar(right)) => { + let array = f(&left.to_scalar(), &right.to_scalar())?; + let scalar = ScalarValue::try_from_array(array.as_ref(), 0)?; + Ok(ColumnarValue::Scalar(scalar)) + } + } +} + +/// Applies a binary [`Datum`] comparison kernel `f` to `lhs` and `rhs` +pub(crate) fn apply_cmp( + lhs: &ColumnarValue, + rhs: &ColumnarValue, + f: impl Fn(&dyn Datum, &dyn Datum) -> Result, +) -> Result { + apply(lhs, rhs, |l, r| Ok(Arc::new(f(l, r)?))) +} diff --git a/datafusion/physical-expr/src/expressions/like.rs b/datafusion/physical-expr/src/expressions/like.rs index b3d366142ad3..e833eabbfff2 100644 --- a/datafusion/physical-expr/src/expressions/like.rs +++ b/datafusion/physical-expr/src/expressions/like.rs @@ -20,18 +20,10 @@ use std::{any::Any, sync::Arc}; use crate::{physical_expr::down_cast_any_ref, PhysicalExpr}; -use arrow::compute::kernels::comparison::{ - ilike_utf8, like_utf8, nilike_utf8, nlike_utf8, -}; -use arrow::compute::kernels::comparison::{ - ilike_utf8_scalar, like_utf8_scalar, nilike_utf8_scalar, nlike_utf8_scalar, -}; -use arrow::{ - array::{new_null_array, Array, ArrayRef, LargeStringArray, StringArray}, - record_batch::RecordBatch, -}; +use crate::expressions::datum::apply_cmp; +use arrow::record_batch::RecordBatch; use arrow_schema::{DataType, Schema}; -use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue}; +use datafusion_common::{internal_err, DataFusionError, Result}; use datafusion_expr::ColumnarValue; // Like expression @@ -109,61 +101,15 @@ impl PhysicalExpr for LikeExpr { } fn evaluate(&self, batch: &RecordBatch) -> Result { - let expr_value = self.expr.evaluate(batch)?; - let pattern_value = self.pattern.evaluate(batch)?; - let expr_data_type = expr_value.data_type(); - let pattern_data_type = pattern_value.data_type(); - - match ( - &expr_value, - &expr_data_type, - &pattern_value, - &pattern_data_type, - ) { - // Types are equal => valid - (_, l, _, r) if l == r => {} - // Allow comparing a dictionary value with its corresponding scalar value - ( - ColumnarValue::Array(_), - DataType::Dictionary(_, dict_t), - ColumnarValue::Scalar(_), - scalar_t, - ) - | ( - ColumnarValue::Scalar(_), - scalar_t, - ColumnarValue::Array(_), - DataType::Dictionary(_, dict_t), - ) if dict_t.as_ref() == scalar_t => {} - _ => { - return internal_err!( - "Cannot evaluate {} expression with types {:?} and {:?}", - self.op_name(), - expr_data_type, - pattern_data_type - ); - } - } - - // Attempt to use special kernels if one input is scalar and the other is an array - let scalar_result = match (&expr_value, &pattern_value) { - (ColumnarValue::Array(array), ColumnarValue::Scalar(scalar)) => { - self.evaluate_array_scalar(array, scalar)? - } - (_, _) => None, // default to array implementation - }; - - if let Some(result) = scalar_result { - return result.map(|a| ColumnarValue::Array(a)); + use arrow::compute::*; + let lhs = self.expr.evaluate(batch)?; + let rhs = self.pattern.evaluate(batch)?; + match (self.negated, self.case_insensitive) { + (false, false) => apply_cmp(&lhs, &rhs, like), + (false, true) => apply_cmp(&lhs, &rhs, ilike), + (true, false) => apply_cmp(&lhs, &rhs, nlike), + (true, true) => apply_cmp(&lhs, &rhs, nilike), } - - // if both arrays or both literals - extract arrays and continue execution - let (expr, pattern) = ( - expr_value.into_array(batch.num_rows()), - pattern_value.into_array(batch.num_rows()), - ); - self.evaluate_array_array(expr, pattern) - .map(|a| ColumnarValue::Array(a)) } fn children(&self) -> Vec> { @@ -202,71 +148,6 @@ impl PartialEq for LikeExpr { } } -macro_rules! binary_string_array_op_scalar { - ($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr) => {{ - let result: Result> = match $LEFT.data_type() { - DataType::Utf8 => compute_utf8_op_scalar!($LEFT, $RIGHT, $OP, StringArray, $OP_TYPE), - DataType::LargeUtf8 => compute_utf8_op_scalar!($LEFT, $RIGHT, $OP, LargeStringArray, $OP_TYPE), - other => internal_err!( - "Data type {:?} not supported for scalar operation '{}' on string array", - other, stringify!($OP) - ), - }; - Some(result) - }}; -} - -impl LikeExpr { - /// Evaluate the expression if the input is an array and - /// pattern is literal - use scalar operations - fn evaluate_array_scalar( - &self, - array: &dyn Array, - scalar: &ScalarValue, - ) -> Result>> { - let scalar_result = match (self.negated, self.case_insensitive) { - (false, false) => binary_string_array_op_scalar!( - array, - scalar.clone(), - like, - &DataType::Boolean - ), - (true, false) => binary_string_array_op_scalar!( - array, - scalar.clone(), - nlike, - &DataType::Boolean - ), - (false, true) => binary_string_array_op_scalar!( - array, - scalar.clone(), - ilike, - &DataType::Boolean - ), - (true, true) => binary_string_array_op_scalar!( - array, - scalar.clone(), - nilike, - &DataType::Boolean - ), - }; - Ok(scalar_result) - } - - fn evaluate_array_array( - &self, - left: Arc, - right: Arc, - ) -> Result { - match (self.negated, self.case_insensitive) { - (false, false) => binary_string_array_op!(left, right, like), - (true, false) => binary_string_array_op!(left, right, nlike), - (false, true) => binary_string_array_op!(left, right, ilike), - (true, true) => binary_string_array_op!(left, right, nilike), - } - } -} - /// Create a like expression, erroring if the argument types are not compatible. pub fn like( negated: bool, @@ -294,7 +175,7 @@ pub fn like( mod test { use super::*; use crate::expressions::col; - use arrow::array::BooleanArray; + use arrow::array::*; use arrow_schema::Field; use datafusion_common::cast::as_boolean_array; diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index cfc8ec2f0728..226dd32962ec 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -22,6 +22,7 @@ mod binary; mod case; mod cast; mod column; +mod datum; mod get_indexed_field; mod in_list; mod is_not_null; diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index 9ba9d5484dec..1f3c4072f723 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -47,7 +47,7 @@ datafusion-common = { path = "../common", version = "31.0.0" } datafusion-expr = { path = "../expr", version = "31.0.0" } object_store = { version = "0.7.0" } pbjson = { version = "0.5", optional = true } -prost = "0.11.0" +prost = "0.12.0" serde = { version = "1.0", optional = true } serde_json = { version = "1.0", optional = true } diff --git a/datafusion/substrait/Cargo.toml b/datafusion/substrait/Cargo.toml index c654d8a6566a..b0795e04ee45 100644 --- a/datafusion/substrait/Cargo.toml +++ b/datafusion/substrait/Cargo.toml @@ -33,8 +33,8 @@ chrono = { workspace = true } datafusion = { version = "31.0.0", path = "../core" } itertools = "0.11" object_store = "0.7.0" -prost = "0.11" -prost-types = "0.11" +prost = "0.12" +prost-types = "0.12" substrait = "0.13.1" tokio = "1.17" diff --git a/parquet-testing b/parquet-testing index d79a0101d90d..a11fc8f148f8 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit d79a0101d90dfa3bbb10337626f57a3e8c4b5363 +Subproject commit a11fc8f148f8a7a89d9281cc0da3eb9d56095fbf diff --git a/testing b/testing index 2c84953c8c27..e81d0c6de359 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 2c84953c8c2779a0dc86ef9ebe8a6cd978125bfe +Subproject commit e81d0c6de35948b3be7984af8e00413b314cde6e