From 91f60a347a3341b4c992084ce95f17db59faa8a8 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 25 May 2023 15:09:23 -0500 Subject: [PATCH 1/2] Add in support for FIXED_LEN_BYTE_ARRAY as binary Signed-off-by: Robert (Bobby) Evans --- .../nvidia/spark/rapids/GpuParquetScan.scala | 8 +- .../rapids/shims/ParquetSchemaClipShims.scala | 5 +- .../sql/rapids/ParquetFormatScanSuite.scala | 113 +++++++++++++++++- 3 files changed, 121 insertions(+), 5 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 858f318e40d..e4c1d3f7866 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -1013,8 +1013,12 @@ private case class GpuParquetFileFilterHandler( case PrimitiveTypeName.INT96 if dt == DataTypes.TimestampType => return - case PrimitiveTypeName.BINARY if dt == DataTypes.StringType || - dt == DataTypes.BinaryType || canReadAsBinaryDecimal(pt, dt) => + case PrimitiveTypeName.BINARY if dt == DataTypes.StringType => + // PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY for StringType is not supported by parquet + return + + case PrimitiveTypeName.BINARY | PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY + if dt == DataTypes.BinaryType => return case PrimitiveTypeName.BINARY | PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/ParquetSchemaClipShims.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/ParquetSchemaClipShims.scala index ae35cf60859..491928ca413 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/ParquetSchemaClipShims.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/ParquetSchemaClipShims.scala @@ -99,7 +99,7 @@ object ParquetSchemaClipShims { if (typeAnnotation == null) s"$typeName" else s"$typeName ($typeAnnotation)" def typeNotImplemented() = - TrampolineUtil.throwAnalysisException(s"Parquet type not yet supported: ${typeString}") + TrampolineUtil.throwAnalysisException(s"Parquet type not yet supported: $typeString") def illegalType() = TrampolineUtil.throwAnalysisException(s"Illegal Parquet type: $parquetType") @@ -182,7 +182,7 @@ object ParquetSchemaClipShims { if (!SQLConf.get.isParquetINT96AsTimestamp) { TrampolineUtil.throwAnalysisException( "INT96 is not supported unless it's interpreted as timestamp. " + - s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.") + s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.") } TimestampType @@ -201,6 +201,7 @@ object ParquetSchemaClipShims { typeAnnotation match { case _: DecimalLogicalTypeAnnotation => makeDecimalType(Decimal.maxPrecisionForBytes(parquetType.getTypeLength)) + case null => BinaryType case _: IntervalLogicalTypeAnnotation => typeNotImplemented() case _ => illegalType() } diff --git a/tests/src/test/scala/org/apache/spark/sql/rapids/ParquetFormatScanSuite.scala b/tests/src/test/scala/org/apache/spark/sql/rapids/ParquetFormatScanSuite.scala index f6f0f7d4329..9db3bef8cbf 100644 --- a/tests/src/test/scala/org/apache/spark/sql/rapids/ParquetFormatScanSuite.scala +++ b/tests/src/test/scala/org/apache/spark/sql/rapids/ParquetFormatScanSuite.scala @@ -35,8 +35,9 @@ import org.apache.parquet.io.api.{Binary, RecordConsumer} import org.apache.parquet.schema.{MessageType, MessageTypeParser} import org.scalatest.concurrent.Eventually -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils /** @@ -390,6 +391,116 @@ class ParquetFormatScanSuite extends SparkQueryCompareTestSuite with Eventually }, conf = conf) } + test(s"FIXED_LEN_BYTE_ARRAY(16) BINARY $parserType") { + assume(isSpark340OrLater) + withCpuSparkSession(spark => { + val schema = + """message spark { + | required fixed_len_byte_array(16) test; + |} + """.stripMargin + + withTempDir(spark) { dir => + val testPath = dir + "/FIXED_BIN16_TEST.parquet" + writeDirect(testPath, schema, { rc => + rc.message { + rc.field("test", 0) { + rc.addBinary(Binary.fromString("1234567890123456")) + } + } + }, { rc => + rc.message { + rc.field("test", 0) { + rc.addBinary(Binary.fromString("ABCDEFGHIJKLMNOP")) + } + } + }) + + val data = spark.read.parquet(testPath).collect() + sameRows(Seq(Row("1234567890123456".getBytes), + Row("ABCDEFGHIJKLMNOP".getBytes)), data) + } + }, conf = conf) + } + + test(s"FIXED_LEN_BYTE_ARRAY(16) binaryAsString $parserType") { + assume(isSpark340OrLater) + // Parquet does not let us tag a FIXED_LEN_BYTE_ARRAY with utf8 to make it a string + // Spark ignores binaryAsString for it, so we need to test that we do the same + val conf = new SparkConf() + .set("spark.rapids.sql.format.parquet.reader.footer.type", parserType) + .set("spark.sql.parquet.binaryAsString", "true") + withCpuSparkSession(spark => { + val schema = + """message spark { + | required fixed_len_byte_array(16) test; + |} + """.stripMargin + + withTempDir(spark) { dir => + val testPath = dir + "/FIXED_BIN16_TEST.parquet" + writeDirect(testPath, schema, { rc => + rc.message { + rc.field("test", 0) { + rc.addBinary(Binary.fromString("1234567890123456")) + } + } + }, { rc => + rc.message { + rc.field("test", 0) { + rc.addBinary(Binary.fromString("ABCDEFGHIJKLMNOP")) + } + } + }) + + val data = spark.read.parquet(testPath).collect() + sameRows(Seq(Row("1234567890123456".getBytes), + Row("ABCDEFGHIJKLMNOP".getBytes)), data) + } + }, conf = conf) + } + + test(s"FIXED_LEN_BYTE_ARRAY(16) String in Schema $parserType") { + assume(isSpark340OrLater) + // Parquet does not let us tag a FIXED_LEN_BYTE_ARRAY with utf8 to make it a string + // Spark also fails the task if we try to read it as a String so we should verify that + // We also throw an exception. + withCpuSparkSession(spark => { + val schema = + """message spark { + | required fixed_len_byte_array(16) test; + |} + """.stripMargin + + withTempDir(spark) { dir => + val testPath = dir + "/FIXED_BIN16_TEST.parquet" + writeDirect(testPath, schema, { rc => + rc.message { + rc.field("test", 0) { + rc.addBinary(Binary.fromString("1234567890123456")) + } + } + }, { rc => + rc.message { + rc.field("test", 0) { + rc.addBinary(Binary.fromString("ABCDEFGHIJKLMNOP")) + } + } + }) + + try { + spark.read.schema(StructType(Seq(StructField("test", StringType)))) + .parquet(testPath).collect() + fail("We read back in some data, but we expected an exception...") + } catch { + case _: SparkException => + // It would be nice to verify that the exception is what we expect, but we are not + // doing CPU vs GPU so we will just doing a GPU pass here. + } + } + }, conf = conf) + } + test(s"BSON $parserType") { withGpuSparkSession(spark => { val schema = From 64987947d62040e71852251685777ca938940ae9 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 23 Aug 2023 11:06:20 -0500 Subject: [PATCH 2/2] Update tests to run on GPU --- .../apache/spark/sql/rapids/ParquetFormatScanSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/src/test/scala/org/apache/spark/sql/rapids/ParquetFormatScanSuite.scala b/tests/src/test/scala/org/apache/spark/sql/rapids/ParquetFormatScanSuite.scala index 6ded8dba440..7df894d003b 100644 --- a/tests/src/test/scala/org/apache/spark/sql/rapids/ParquetFormatScanSuite.scala +++ b/tests/src/test/scala/org/apache/spark/sql/rapids/ParquetFormatScanSuite.scala @@ -391,7 +391,7 @@ class ParquetFormatScanSuite extends SparkQueryCompareTestSuite with Eventually test(s"FIXED_LEN_BYTE_ARRAY(16) BINARY $parserType") { assume(isSpark340OrLater) - withCpuSparkSession(spark => { + withGpuSparkSession(spark => { val schema = """message spark { | required fixed_len_byte_array(16) test; @@ -428,7 +428,7 @@ class ParquetFormatScanSuite extends SparkQueryCompareTestSuite with Eventually val conf = new SparkConf() .set("spark.rapids.sql.format.parquet.reader.footer.type", parserType) .set("spark.sql.parquet.binaryAsString", "true") - withCpuSparkSession(spark => { + withGpuSparkSession(spark => { val schema = """message spark { | required fixed_len_byte_array(16) test; @@ -463,7 +463,7 @@ class ParquetFormatScanSuite extends SparkQueryCompareTestSuite with Eventually // Parquet does not let us tag a FIXED_LEN_BYTE_ARRAY with utf8 to make it a string // Spark also fails the task if we try to read it as a String so we should verify that // We also throw an exception. - withCpuSparkSession(spark => { + withGpuSparkSession(spark => { val schema = """message spark { | required fixed_len_byte_array(16) test;