From 65d90f977030047eddc312bbc4da69cf3c74761b Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Thu, 31 Aug 2023 09:48:25 +0800 Subject: [PATCH 1/2] Make map column non-nullable Signed-off-by: Chong Gao --- .../com/nvidia/spark/rapids/SchemaUtils.scala | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala index fc792a9bf24..6cccfe246f1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.types._ object SchemaUtils { // Parquet field ID metadata key - val FIELD_ID_METADATA_KEY = "parquet.field.id" + private val FIELD_ID_METADATA_KEY = "parquet.field.id" /** * Convert a TypeDescription to a Catalyst StructType. @@ -236,7 +236,8 @@ object SchemaUtils { nullable: Boolean, writeInt96: Boolean, fieldMeta: Metadata, - parquetFieldIdWriteEnabled: Boolean): T = { + parquetFieldIdWriteEnabled: Boolean, + isMapKey: Boolean): T = { // Parquet specific field id val parquetFieldId: Option[Int] = if (fieldMeta.contains(FIELD_ID_METADATA_KEY)) { @@ -275,19 +276,25 @@ object SchemaUtils { a.elementType, name, a.containsNull, - writeInt96, fieldMeta, parquetFieldIdWriteEnabled).build()) + writeInt96, fieldMeta, parquetFieldIdWriteEnabled, isMapKey = false).build()) case m: MapType => + // if this is a key of another map. e.g.: map1(map2(int,int), int), the map2 is the map + // key of map1, map2 should be non-nullable + val isMapNullable = if (isMapKey) false else nullable + // It is ok to use `StructBuilder` here for key and value, since either // `OrcWriterOptions.Builder` or `ParquetWriterOptions.Builder` is actually an // `AbstractStructBuilder`, and here only handles the common column metadata things. builder.withMapColumn( mapColumn(name, writerOptionsFromField( + // This nullable is useless because we use the child of struct column structBuilder(name, nullable), m.keyType, "key", nullable = false, - writeInt96, fieldMeta, parquetFieldIdWriteEnabled).build().getChildColumnOptions()(0), + writeInt96, fieldMeta, parquetFieldIdWriteEnabled, isMapKey = true).build() + .getChildColumnOptions()(0), writerOptionsFromField( structBuilder(name, nullable), m.valueType, @@ -295,7 +302,9 @@ object SchemaUtils { m.valueContainsNull, writeInt96, fieldMeta, - parquetFieldIdWriteEnabled).build().getChildColumnOptions()(0))) + parquetFieldIdWriteEnabled, isMapKey = false).build().getChildColumnOptions()(0), + // set the nullable for this map + isMapNullable)) case BinaryType => if (parquetFieldIdWriteEnabled && parquetFieldId.nonEmpty) { builder.withBinaryColumn(name, nullable, parquetFieldId.get) @@ -326,7 +335,7 @@ object SchemaUtils { parquetFieldIdEnabled: Boolean = false): T = { schema.foreach(field => writerOptionsFromField(builder, field.dataType, field.name, field.nullable, writeInt96, - field.metadata, parquetFieldIdEnabled) + field.metadata, parquetFieldIdEnabled, isMapKey = false) ) builder.asInstanceOf[T] } From aba56d3ffdbaac1803194979795464d9152f86e0 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Mon, 4 Sep 2023 18:30:00 +0800 Subject: [PATCH 2/2] Update --- .../com/nvidia/spark/rapids/SchemaUtils.scala | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala index 6cccfe246f1..22047f22e68 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala @@ -236,8 +236,7 @@ object SchemaUtils { nullable: Boolean, writeInt96: Boolean, fieldMeta: Metadata, - parquetFieldIdWriteEnabled: Boolean, - isMapKey: Boolean): T = { + parquetFieldIdWriteEnabled: Boolean): T = { // Parquet specific field id val parquetFieldId: Option[Int] = if (fieldMeta.contains(FIELD_ID_METADATA_KEY)) { @@ -276,12 +275,8 @@ object SchemaUtils { a.elementType, name, a.containsNull, - writeInt96, fieldMeta, parquetFieldIdWriteEnabled, isMapKey = false).build()) + writeInt96, fieldMeta, parquetFieldIdWriteEnabled).build()) case m: MapType => - // if this is a key of another map. e.g.: map1(map2(int,int), int), the map2 is the map - // key of map1, map2 should be non-nullable - val isMapNullable = if (isMapKey) false else nullable - // It is ok to use `StructBuilder` here for key and value, since either // `OrcWriterOptions.Builder` or `ParquetWriterOptions.Builder` is actually an // `AbstractStructBuilder`, and here only handles the common column metadata things. @@ -293,8 +288,7 @@ object SchemaUtils { m.keyType, "key", nullable = false, - writeInt96, fieldMeta, parquetFieldIdWriteEnabled, isMapKey = true).build() - .getChildColumnOptions()(0), + writeInt96, fieldMeta, parquetFieldIdWriteEnabled).build().getChildColumnOptions()(0), writerOptionsFromField( structBuilder(name, nullable), m.valueType, @@ -302,9 +296,12 @@ object SchemaUtils { m.valueContainsNull, writeInt96, fieldMeta, - parquetFieldIdWriteEnabled, isMapKey = false).build().getChildColumnOptions()(0), + parquetFieldIdWriteEnabled).build().getChildColumnOptions()(0), // set the nullable for this map - isMapNullable)) + // if `m` is a key of another map, this `nullable` should be false + // e.g.: map1(map2(int,int), int), the map2 is the map + // key of map1, map2 should be non-nullable + nullable)) case BinaryType => if (parquetFieldIdWriteEnabled && parquetFieldId.nonEmpty) { builder.withBinaryColumn(name, nullable, parquetFieldId.get) @@ -335,7 +332,7 @@ object SchemaUtils { parquetFieldIdEnabled: Boolean = false): T = { schema.foreach(field => writerOptionsFromField(builder, field.dataType, field.name, field.nullable, writeInt96, - field.metadata, parquetFieldIdEnabled, isMapKey = false) + field.metadata, parquetFieldIdEnabled) ) builder.asInstanceOf[T] }