Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added binary read support for Parquet [Databricks] #6161

Merged
merged 8 commits into from
Aug 5, 2022

Conversation

razajafri
Copy link
Collaborator

@razajafri razajafri commented Jul 30, 2022

This PR adds Binary type to ParquetSourceScanExec.

  • Changes made in GpuOverrides to allow BinaryType to be supported for FileSourceScanExec
  • Added test to test binary type to be read in as binary
  • Added logic to honor the binaryAsString flag if set

depends on rapidsai/cudf#11410
fixes #5416

Signed-off-by: Raza Jafri [email protected]

@sameerz sameerz added the feature request New feature or request label Aug 1, 2022
@razajafri razajafri self-assigned this Aug 2, 2022
@razajafri razajafri marked this pull request as ready for review August 2, 2022 07:45
@razajafri razajafri changed the title Added binary read support for Parquet Added binary read support for Parquet [Databricks] Aug 2, 2022
@sameerz sameerz requested a review from revans2 August 2, 2022 16:31
Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks okay, but I really would like to hear from others too. @jlowe you have been looking at the schema code for parquet lately. What do you think?

import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.internal.SQLConf;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we revert these import changes that don't appear to be used?

Comment on lines 761 to 763
val clippedSchemaTmp = sparkToParquetSchema.convert(
parquetToSparkSchema.convert(ParquetSchemaClipShims.clipSchema(fileSchema,
readDataSchema, isCaseSensitive, readUseFieldId, timestampNTZEnabled)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a roundabout way to solve a very specific problem, which is that if, and only if, spark.sql.parquet.binaryAsString is true, we should treat all occurrences of BinaryType in the file schema as if it were StringType.

Note that the comment above seems incorrect, as the file schema-as-spark-schema being passed into the schema evolution is a subset of the read schema, not the file schema. If the data is truly being loaded as a string by libcudf, there should have been no issue trying to apply the Spark read schema to those columns.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jlowe perhaps I misunderstood this code, but I thought that spark.sql.parquet.binaryAsString primarily impacted the read schema, and only gets applied if the file that is written out does not have the Spark metadata in it for the schema already.

Because of this I thought that we had to go off of the read schema not the file schema and the config. What I think we want to do is to go through the read schema when setting up the columns to read in and see if the file schema says it is binary but the read schema says it is a String. Then we would add that column name to the set of columns that are binary, but should be read as a String.

I agree that this is a bit convoluted (updating the metadata in the modified file to do what we want), but it appears to be working which is why I wanted you opinion on it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of this I thought that we had to go off of the read schema not the file schema and the config

That's my point. If libcudf was truly loading the binary column as a string, as the comment states it does, then trying to apply the read schema to that column should not be throwing an exception in GpuColumnVector.from as stated.

Maybe I'm missing something, but the comment doesn't seem to correctly capture what's actually going on without this workaround.

I also don't like the round-trip-the-entire-schema-through-spark-and-back, since it seems ripe to cause unintended consequences. For example, the Parquet schema may state that there are unsigned types, but Spark doesn't have an unsigned type. Is it OK to silently update the file metadata stating what was a UINT32 is now an INT64?

If we need to bash the Parquet schema to change binaries to strings, then I'd rather see a targeted approach to doing that unless we're convinced doing a roundtrip through Spark types and back won't cause other problems.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this a bit more, I believe round-tripping through the Spark types will also strip out any field IDs being used in the Parquet schema. The Spark-to-Parquet converter will translate field IDs found in the Spark schema into Parquet field IDs, but I didn't see any evidence that the Parquet-to-Spark converter would convey Parquet field IDs into Spark. Losing field IDs will break some schema evolution cases that require field IDs to line up the Parquet schema with the Spark read schema.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback Jason.

I misunderstood the problem. The reason why I was running into the issue that I was seeing was because I was reading the conf directly. Instead, what we should do is let SparkToParquetSchema tell us what the column should be read as by cudf.

To explain a bit further. The binary file that I was reading from is written by Spark and Spark stores some metadata that was overriding the flag binaryAsString so while I was trying to read the file as a String, Spark was expecting the file to be read in as a binary.

@razajafri
Copy link
Collaborator Author

build

@jlowe
Copy link
Contributor

jlowe commented Aug 4, 2022

Is this no longer intended for 22.08? The commit history shows you merged in branch-22.10, but the base branch on this PR is still 22.08 which is why this PR is suddenly much larger, both in terms of commits and changes.

@razajafri razajafri changed the base branch from branch-22.08 to branch-22.10 August 4, 2022 21:09
@razajafri
Copy link
Collaborator Author

build

@razajafri razajafri changed the base branch from branch-22.10 to branch-22.08 August 4, 2022 21:54
@razajafri
Copy link
Collaborator Author

build

Comment on lines +1399 to +1401
t._2.dataType == BinaryType &&
sparkToParquetSchema.convertField(t._2).asPrimitiveType().getPrimitiveTypeName
== PrimitiveTypeName.BINARY)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build is failing here:

Error: ] /home/runner/work/spark-rapids/spark-rapids/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala:1399: type mismatch;
 found   : Boolean
 required: String
Error:  one error found

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where? I just built it again locally and it's passing.

Do you have the cudf changes to ParquetOptions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's failing in CI (see failed runs below), although it looks like that was run against 22.10? It was trying to download spark-rapids-jni-22.10-SNAPSHOT.

It looks like the normal CI is running against 22.08 correctly, so I'll try to manually re-kick the ones that failed here.

@sameerz
Copy link
Collaborator

sameerz commented Aug 5, 2022

build

@revans2
Copy link
Collaborator

revans2 commented Aug 5, 2022

On a side note could you test what happens if you pass in a schema that switches StringType to BinaryType or BinaryType to StringType?

val baseWithBothBinary = spark.read.schema(StructType(Seq(StructField("a", LongType), StructField("b", BinaryType), StructField("c", BinaryType)))).parquet("binary_as_string.parquet")
val baseWithBothString = spark.read.schema(StructType(Seq(StructField("a", LongType), StructField("b", StringType), StructField("c", StringType)))).parquet("binary_as_string.parquet")

It appears to work on the CPU, but I want to know if it is going to work on the GPU or if we are going to run into some odd errors that we didn't expect.

@revans2
Copy link
Collaborator

revans2 commented Aug 5, 2022

I did some manual testing with this patch, and bothString works, but both binary does not. This is a good enough improvement that I will merge this in as is, and then do follow on work for the failing use case.

@revans2 revans2 merged commit 8d14f8c into NVIDIA:branch-22.08 Aug 5, 2022
revans2 added a commit to revans2/spark-rapids that referenced this pull request Aug 10, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] Support reading binary data types from Parquet as binary (not strings)
4 participants