Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50238][PYTHON] Add Variant Support in PySpark UDFs/UDTFs/UDAFs #48770

Closed

Conversation

harshmotw-db
Copy link
Contributor

@harshmotw-db harshmotw-db commented Nov 6, 2024

What changes were proposed in this pull request?

This PR adds support for the Variant type in PySpark UDFs/UDTFs/UDAFs. Support is added in both modes - arrow and pickle - and support is also added in pandas UDFs.

Why are the changes needed?

After this change, users will be able to use the new Variant data type with UDFs, which is currently prohibited.

Does this PR introduce any user-facing change?

Yes, users should now be able to use Variants with Python UDFs.

How was this patch tested?

Unit tests in all scenarios - arrow, pickle and pandas

Was this patch authored or co-authored using generative AI tooling?

No.

@@ -169,6 +148,23 @@ case class PythonUDAF(

override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): PythonUDAF =
copy(children = newChildren)

override def checkInputDataTypes(): TypeCheckResult = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I haven't tested support in UDAFs yet, so I kept it disabled in this scenario. Can you point me to examples where Python UDAFs are tested?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import pandas as pd

from pyspark.sql.functions import pandas_udf

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()

@harshmotw-db harshmotw-db marked this pull request as ready for review November 6, 2024 01:49
@harshmotw-db
Copy link
Contributor Author

cc @allisonwang-db @ueshin

Copy link
Contributor

@gene-db gene-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harshmotw-db Thanks for this feature! I left a few questions.

python/pyspark/sql/tests/test_udf.py Outdated Show resolved Hide resolved
python/pyspark/sql/tests/test_udf.py Outdated Show resolved Hide resolved
python/pyspark/sql/tests/test_udf.py Show resolved Hide resolved

scalar_f = pandas_udf(lambda u: str(u), StringType())
scalar_f = pandas_udf(lambda u: u.apply(str), StringType(), PandasUDFType.SCALAR)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does pandas_udf go through the same path as an arrow udf path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for the most part. I recall that for pandas UDFs to work, I also had to add changes in arrow_to_pandas and _create_batch too because they treat struct types in a special way. Example: https://github.com/apache/spark/pull/48770/files#r1831583273

python/pyspark/sql/pandas/types.py Show resolved Hide resolved
python/pyspark/sql/pandas/serializers.py Show resolved Hide resolved
python/pyspark/sql/pandas/serializers.py Show resolved Hide resolved
case dataType =>
val fieldType =
new FieldType(nullable, toArrowType(dataType, timeZoneId, largeVarTypes), null)
new Field(name, fieldType, Seq.empty[Field].asJava)
}
}

def isVariantField(field: Field): Boolean = {
assert(field.getType.isInstanceOf[ArrowType.Struct])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this assert, or should it just return false when it is not a struct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be an assert since the callsite already checks for struct.

python/pyspark/sql/tests/test_udf.py Outdated Show resolved Hide resolved
python/pyspark/sql/tests/test_udf.py Outdated Show resolved Hide resolved
python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py Outdated Show resolved Hide resolved
@HyukjinKwon
Copy link
Member

Seems mostly fine to me

Copy link
Contributor

@gene-db gene-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harshmotw-db Thanks for this new ability!

LGTM

…_input_type and unsupported_udf_output_type errorsd
@harshmotw-db
Copy link
Contributor Author

@gene-db @HyukjinKwon @ueshin I have added support for Variant in Python UDAFs in my latest commit.

@harshmotw-db
Copy link
Contributor Author

@HyukjinKwon @ueshin I'm not sure why SparkConnectSessionHolderSuite is failing on this PR. I am not able to repro the failure on local. Can you look into this?

@harshmotw-db
Copy link
Contributor Author

Noting that the tests that were failing earlier passed on the latest commit. Seems like a broken test on old versions

@harshmotw-db harshmotw-db changed the title [SPARK-50238][PYTHON] Add Variant Support in PySpark UDFs/UDTFs [SPARK-50238][PYTHON] Add Variant Support in PySpark UDFs/UDTFs/UDAFs Nov 13, 2024
@HyukjinKwon
Copy link
Member

Merged to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants