Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for array_contains expression #1163

Merged
merged 24 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
eaa6cf6
feat: add support for array_contains expression
dharanad Dec 11, 2024
edcc168
test: add unit test for array_contains function
dharanad Dec 11, 2024
feaa492
Removes unnecessary case expression for handling null values
dharanad Dec 11, 2024
210f380
chore: Move more expressions from core crate to spark-expr crate (#1152)
andygrove Dec 9, 2024
47d2431
remove dead code (#1155)
andygrove Dec 10, 2024
ef1131d
fix: Spark 4.0-preview1 SPARK-47120 (#1156)
kazuyukitanimura Dec 11, 2024
6587964
chore: Move string kernels and expressions to spark-expr crate (#1164)
andygrove Dec 12, 2024
c6aa6be
chore: Move remaining expressions to spark-expr crate + some minor re…
andygrove Dec 12, 2024
6feefbd
chore: Add ignored tests for reading complex types from Parquet (#1167)
andygrove Dec 12, 2024
9d446cd
feat: Add Spark-compatible implementation of SchemaAdapterFactory (#1…
andygrove Dec 17, 2024
8d7d005
fix: Document enabling comet explain plan usage in Spark (4.0) (#1176)
parthchandra Dec 17, 2024
d464ba5
test: enabling Spark tests with offHeap requirement (#1177)
kazuyukitanimura Dec 18, 2024
baa1109
feat: Improve shuffle metrics (second attempt) (#1175)
andygrove Dec 18, 2024
698cd1f
fix: stddev_pop should not directly return 0.0 when count is 1.0 (#1184)
viirya Dec 19, 2024
6a74832
feat: Make native shuffle compression configurable and respect `spark…
andygrove Dec 20, 2024
62e8cdd
minor: move shuffle classes from common to spark (#1193)
andygrove Dec 22, 2024
8ee39df
minor: refactor decodeBatches to make private in broadcast exchange (…
andygrove Dec 22, 2024
8891549
minor: refactor prepare_output so that it does not require an Executi…
andygrove Dec 22, 2024
d7f5545
fix: fix missing explanation for then branch in case when (#1200)
rluvaton Dec 27, 2024
2dc6d3f
minor: remove unused source files (#1202)
andygrove Dec 28, 2024
02f817c
chore: Upgrade to DataFusion 44.0.0-rc2 (#1154)
andygrove Dec 28, 2024
d2da499
Merge branch 'main' into support-array_contains
dharanad Jan 1, 2025
624f5ce
update UT
dharanad Jan 2, 2025
7655b9d
fix typo in UT
dharanad Jan 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ use datafusion_expr::{
AggregateUDF, ScalarUDF, WindowFrame, WindowFrameBound, WindowFrameUnits,
WindowFunctionDefinition,
};
use datafusion_functions_nested::array_has::ArrayHas;
use datafusion_physical_expr::expressions::{Literal, StatsType};
use datafusion_physical_expr::window::WindowExpr;
use datafusion_physical_expr::LexOrdering;
Expand Down Expand Up @@ -719,6 +720,20 @@ impl PhysicalPlanner {
expr.legacy_negative_index,
)))
}
ExprStruct::ArrayContains(expr) => {
let src_array_expr =
self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?;
let key_expr =
self.create_expr(expr.right.as_ref().unwrap(), Arc::clone(&input_schema))?;
let args = vec![Arc::clone(&src_array_expr), key_expr];
let array_has_expr = Arc::new(ScalarFunctionExpr::new(
"array_has",
Arc::new(ScalarUDF::new_from_impl(ArrayHas::new())),
args,
DataType::Boolean,
));
Ok(array_has_expr)
}
expr => Err(ExecutionError::GeneralError(format!(
"Not implemented: {:?}",
expr
Expand Down
1 change: 1 addition & 0 deletions native/proto/src/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ message Expr {
GetArrayStructFields get_array_struct_fields = 57;
BinaryExpr array_append = 58;
ArrayInsert array_insert = 59;
BinaryExpr array_contains = 60;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2266,6 +2266,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
withInfo(expr, "unsupported arguments for GetArrayStructFields", child)
None
}
case expr if expr.prettyName == "array_contains" =>
createBinaryExpr(
expr.children(0),
expr.children(1),
inputs,
(builder, binaryExpr) => builder.setArrayContains(binaryExpr))
case _ if expr.prettyName == "array_append" =>
createBinaryExpr(
expr.children(0),
Expand Down
12 changes: 12 additions & 0 deletions spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2517,4 +2517,16 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
checkSparkAnswer(df.select("arrUnsupportedArgs"))
}
}

test("array_contains") {
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "test.parquet")
makeParquetFileAllTypes(path, dictionaryEnabled = false, n = 10000)
spark.read.parquet(path.toString).createOrReplaceTempView("t1");
checkSparkAnswerAndOperator(
spark.sql("SELECT array_contains(array(_2, _3, _4), _2) FROM t1"))
dharanad marked this conversation as resolved.
Show resolved Hide resolved
checkSparkAnswerAndOperator(
spark.sql("SELECT array_contains((CASE WHEN _2 =_3 THEN array(_4) END), _4) FROM t1"));
}
}
}
Loading