Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-41666][PYTHON] Support parameterized SQL by sql() #39183

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@
},
"INVALID_SQL_ARG" : {
"message" : [
"The argument <name> of `sql()` is invalid. Consider to replace it by a SQL literal statement."
"The argument <name> of `sql()` is invalid. Consider to replace it by a SQL literal."
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed this to address the review comment at #38864 (comment)

]
},
"INVALID_SQL_SYNTAX" : {
Expand Down Expand Up @@ -1164,7 +1164,7 @@
},
"UNBOUND_SQL_PARAMETER" : {
"message" : [
"Found the unbound parameter: <name>. Please, fix `args` and provide a mapping of the parameter to a SQL literal statement."
"Found the unbound parameter: <name>. Please, fix `args` and provide a mapping of the parameter to a SQL literal."
]
},
"UNCLOSED_BRACKETED_COMMENT" : {
Expand Down Expand Up @@ -5219,4 +5219,4 @@
"grouping() can only be used with GroupingSets/Cube/Rollup"
]
}
}
}
19 changes: 17 additions & 2 deletions python/pyspark/pandas/sql_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import os
import string
from typing import Any, Optional, Union, List, Sequence, Mapping, Tuple
from typing import Any, Dict, Optional, Union, List, Sequence, Mapping, Tuple
import uuid
import warnings

Expand All @@ -43,6 +43,7 @@
def sql(
query: str,
index_col: Optional[Union[str, List[str]]] = None,
args: Dict[str, str] = {},
**kwargs: Any,
) -> DataFrame:
"""
Expand All @@ -57,6 +58,8 @@ def sql(
* pandas Series
* string

Also the method can bind named parameters to SQL literals from `args`.

Parameters
----------
query : str
Expand Down Expand Up @@ -99,6 +102,11 @@ def sql(
e f 3 6

Also note that the index name(s) should be matched to the existing name.
args : dict
A dictionary of named parameters that begin from the `:` marker and
their SQL literals for substituting.

.. versionadded:: 3.4.0
MaxGekk marked this conversation as resolved.
Show resolved Hide resolved
kwargs
other variables that the user want to set that can be referenced in the query

Expand Down Expand Up @@ -152,6 +160,13 @@ def sql(
0 1
1 2
2 3

And substitude named parameters with the `:` prefix by SQL literals.

>>> ps.sql("SELECT * FROM range(10) WHERE id > :bound1", args={"bound1":"7"})
id
0 8
1 9
"""
if os.environ.get("PYSPARK_PANDAS_SQL_LEGACY") == "1":
from pyspark.pandas import sql_processor
Expand All @@ -166,7 +181,7 @@ def sql(
session = default_session()
formatter = PandasSQLStringFormatter(session)
try:
sdf = session.sql(formatter.format(query, **kwargs))
sdf = session.sql(formatter.format(query, **kwargs), args)
finally:
formatter.clear()

Expand Down
22 changes: 18 additions & 4 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1293,20 +1293,25 @@ def prepare(obj: Any) -> Any:
df._schema = struct
return df

def sql(self, sqlQuery: str, **kwargs: Any) -> DataFrame:
def sql(self, sqlQuery: str, args: Dict[str, str] = {}, **kwargs: Any) -> DataFrame:
"""Returns a :class:`DataFrame` representing the result of the given query.
When ``kwargs`` is specified, this method formats the given string by using the Python
standard formatter.
standard formatter. The method binds named parameters to SQL literals from `args`.

.. versionadded:: 2.0.0

.. versionchanged:: 3.4.0
Support Spark Connect.
Support Spark Connect and parameterized SQL.

Parameters
----------
sqlQuery : str
SQL query string.
args : dict
A dictionary of named parameters that begin from the `:` marker and
their SQL literals for substituting.

.. versionadded:: 3.4.0
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved
kwargs : dict
Other variables that the user wants to set that can be referenced in the query

Expand Down Expand Up @@ -1380,13 +1385,22 @@ def sql(self, sqlQuery: str, **kwargs: Any) -> DataFrame:
| 2| 4|
| 3| 6|
+---+---+

And substitude named parameters with the `:` prefix by SQL literals.

>>> spark.sql("SELECT * FROM {df} WHERE {df[B]} > :minB", {"minB" : "5"}, df=mydf).show()
+---+---+
| A| B|
+---+---+
| 3| 6|
+---+---+
"""

formatter = SQLStringFormatter(self)
if len(kwargs) > 0:
sqlQuery = formatter.format(sqlQuery, **kwargs)
try:
return DataFrame(self._jsparkSession.sql(sqlQuery), self)
return DataFrame(self._jsparkSession.sql(sqlQuery, args), self)
finally:
if len(kwargs) > 0:
formatter.clear()
Expand Down