Skip to content

Commit

Permalink
[SPARK-50322][SQL] Fix parameterized identifier in a sub-query
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
In the PR, I propose to postpone parameters resolution till `UnresolvedWithCTERelations` is resolved.

### Why are the changes needed?
To fix the query failure:
```sql
execute immediate 'with v1 as (select * from tt1 where 1 = (Select * from identifier(:tab))) select * from v1' using 'tt1' as tab;
[UNBOUND_SQL_PARAMETER] Found the unbound parameter: tab. Please, fix `args` and provide a mapping of the parameter to either a SQL literal or collection constructor functions such as `map()`, `array()`, `struct()`. SQLSTATE: 42P02
```

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By running new test:
```
$ build/sbt "sql/test:testOnly org.apache.spark.sql.ParametersSuite
```

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#48847 from MaxGekk/fix-parameter-subquery.

Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
  • Loading branch information
MaxGekk committed Nov 15, 2024
1 parent cc81ed0 commit d317002
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.expressions.{Alias, CreateArray, CreateMap, CreateNamedStruct, Expression, LeafExpression, Literal, MapFromArrays, MapFromEntries, SubqueryExpression, Unevaluable, VariableReference}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SupervisingCommand}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMAND, PARAMETER, PARAMETERIZED_QUERY, TreePattern, UNRESOLVED_WITH}
import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMAND, PARAMETER, PARAMETERIZED_QUERY, TreePattern, UNRESOLVED_IDENTIFIER_WITH_CTE, UNRESOLVED_WITH}
import org.apache.spark.sql.errors.QueryErrorsBase
import org.apache.spark.sql.types.DataType

Expand Down Expand Up @@ -189,7 +189,8 @@ object BindParameters extends ParameterizedQueryProcessor with QueryErrorsBase {
// We should wait for `CTESubstitution` to resolve CTE before binding parameters, as CTE
// relations are not children of `UnresolvedWith`.
case NameParameterizedQuery(child, argNames, argValues)
if !child.containsPattern(UNRESOLVED_WITH) && argValues.forall(_.resolved) =>
if !child.containsAnyPattern(UNRESOLVED_WITH, UNRESOLVED_IDENTIFIER_WITH_CTE) &&
argValues.forall(_.resolved) =>
if (argNames.length != argValues.length) {
throw SparkException.internalError(s"The number of argument names ${argNames.length} " +
s"must be equal to the number of argument values ${argValues.length}.")
Expand All @@ -199,7 +200,8 @@ object BindParameters extends ParameterizedQueryProcessor with QueryErrorsBase {
bind(child) { case NamedParameter(name) if args.contains(name) => args(name) }

case PosParameterizedQuery(child, args)
if !child.containsPattern(UNRESOLVED_WITH) && args.forall(_.resolved) =>
if !child.containsAnyPattern(UNRESOLVED_WITH, UNRESOLVED_IDENTIFIER_WITH_CTE) &&
args.forall(_.resolved) =>
val indexedArgs = args.zipWithIndex
checkArgs(indexedArgs.map(arg => (s"_${arg._2}", arg._1)))

Expand Down
17 changes: 17 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -741,4 +741,21 @@ class ParametersSuite extends QueryTest with SharedSparkSession with PlanTest {
Row("c1"))
}
}

test("SPARK-50322: parameterized identifier in a sub-query") {
withTable("tt1") {
sql("CREATE TABLE tt1 (c1 INT)")
sql("INSERT INTO tt1 VALUES (1)")
def query(p: String): String = {
s"""
|WITH v1 AS (
| SELECT * FROM tt1
| WHERE 1 = (SELECT * FROM IDENTIFIER($p))
|) SELECT * FROM v1""".stripMargin
}

checkAnswer(spark.sql(query(":tab"), args = Map("tab" -> "tt1")), Row(1))
checkAnswer(spark.sql(query("?"), args = Array("tt1")), Row(1))
}
}
}

0 comments on commit d317002

Please sign in to comment.