Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support nested types in ORC writer #3696

Merged
merged 13 commits into from
Oct 13, 2021
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -16370,9 +16370,9 @@ dates or timestamps, or for a lack of type coercion support.
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types DECIMAL, BINARY, MAP, UDT</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types DECIMAL, BINARY, MAP, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down
23 changes: 19 additions & 4 deletions integration_tests/src/main/python/orc_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,25 @@
from marks import *
from pyspark.sql.types import *

orc_write_gens_list = [
[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1970, 1, 1, tzinfo=timezone.utc))],
orc_write_basic_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1970, 1, 1, tzinfo=timezone.utc))]

orc_write_basic_struct_gen = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(orc_write_basic_gens)])

# Some array/struct gens, but not all because of nesting
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mind expanding on this comment a bit, why "not all because of nesting"?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, I will update it in a following PR since this PR should be merged as soon as possible.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this comment.

orc_write_struct_gens_sample = [orc_write_basic_struct_gen,
StructGen([['child0', byte_gen], ['child1', orc_write_basic_struct_gen]]),
StructGen([['child0', ArrayGen(short_gen)], ['child1', double_gen]])]

orc_write_array_gens_sample = [ArrayGen(sub_gen) for sub_gen in orc_write_basic_gens] + [
ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10),
ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10),
ArrayGen(StructGen([['child0', byte_gen], ['child1', string_gen], ['child2', float_gen]]))]

orc_write_gens_list = [orc_write_basic_gens,
orc_write_struct_gens_sample,
orc_write_array_gens_sample,
pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/139')),
pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/140'))]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ object GpuOverrides extends Logging {
(OrcFormatType, FileFormatChecks(
cudfRead = (TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.DECIMAL_64 +
TypeSig.STRUCT + TypeSig.MAP).nested(),
cudfWrite = TypeSig.commonCudfTypes,
cudfWrite = (TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(),
sparkSig = (TypeSig.atomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP +
TypeSig.UDT).nested())))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.nvidia.spark.rapids

import ai.rapids.cudf._
import ai.rapids.cudf.ParquetColumnWriterOptions._
import ai.rapids.cudf.ColumnWriterOptions._
import com.nvidia.spark.RebaseHelper
import org.apache.hadoop.mapreduce.{Job, OutputCommitter, TaskAttemptContext}
import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat}
Expand Down Expand Up @@ -117,8 +117,8 @@ object GpuParquetFileFormat {
}
}

def parquetWriterOptionsFromField[T <: NestedBuilder[_, _], V <: ParquetColumnWriterOptions](
builder: ParquetColumnWriterOptions.NestedBuilder[T, V],
def parquetWriterOptionsFromField[T <: NestedBuilder[_, _], V <: ColumnWriterOptions](
builder: ColumnWriterOptions.NestedBuilder[T, V],
dataType: DataType,
name: String,
writeInt96: Boolean,
Expand Down Expand Up @@ -167,8 +167,8 @@ object GpuParquetFileFormat {
builder.asInstanceOf[T]
}

def parquetWriterOptionsFromSchema[T <: NestedBuilder[_, _], V <: ParquetColumnWriterOptions](
builder: ParquetColumnWriterOptions.NestedBuilder[T, V],
def parquetWriterOptionsFromSchema[T <: NestedBuilder[_, _], V <: ColumnWriterOptions](
builder: ColumnWriterOptions.NestedBuilder[T, V],
schema: StructType,
writeInt96: Boolean): T = {
// TODO once https://github.com/rapidsai/cudf/issues/7654 is fixed go back to actually
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.rapids

import ai.rapids.cudf._
import ai.rapids.cudf.ColumnWriterOptions._
import com.nvidia.spark.rapids._
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
Expand All @@ -29,7 +30,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.orc.{OrcFileFormat, OrcOptions, OrcUtils}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._

object GpuOrcFileFormat extends Logging {
// The classname used when Spark is configured to use the Hive implementation for ORC.
Expand Down Expand Up @@ -112,6 +113,59 @@ object GpuOrcFileFormat extends Logging {
None
}
}

def orcWriterOptionsFromField[T <: NestedBuilder[_, _], V <: ColumnWriterOptions](
builder: ColumnWriterOptions.NestedBuilder[T, V],
dataType: DataType,
name: String,
nullable: Boolean): T = {
dataType match {
case dt: DecimalType =>
builder.withDecimalColumn(name, dt.precision, nullable)
case TimestampType =>
builder.withTimestampColumn(name, false, nullable)
case s: StructType =>
builder.withStructColumn(
orcWriterOptionsFromSchema(structBuilder(name, nullable), s).build()
)
case a: ArrayType =>
builder.withListColumn(
orcWriterOptionsFromField(
listBuilder(name, nullable),
a.elementType,
name,
nullable).build())
case m: MapType =>
builder.withMapColumn(
mapColumn(name,
orcWriterOptionsFromField(
ORCWriterOptions.builder(),
m.keyType,
"key",
nullable = false).build().getChildColumnOptions()(0),
orcWriterOptionsFromField(
ORCWriterOptions.builder(),
m.valueType,
"value",
nullable).build().getChildColumnOptions()(0)))
case _ =>
builder.withColumns(nullable, name)
}
builder.asInstanceOf[T]
}

/**
* (We could try to merge this with `parquetWriterOptionsFromSchema` after fixing the issue
* https://github.com/rapidsai/cudf/issues/7654)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue is circumvented by pruning masks ourselves and we are already calling the remove_validity_if_needed from the writeORCChunk in TableJni.cpp. So I don't see why this should be preventing us from merging the two methods?

Copy link
Collaborator Author

@firestarman firestarman Oct 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the info. But I tried locally and still getting the exception below when running the Parquet test test_write_map_nullable after changing the nullable to go back to the actual setting.

E                   Caused by: ai.rapids.cudf.CudfException: cuDF failure at: /home/liangcail/work/projects/on_github/cudf/cpp/src/io/parquet/writer_impl.cu:377: Mismatch in metadata prescribed nullability and input column nullability. Metadata for nullable input column cannot prescribe nullability = false
E                   	at ai.rapids.cudf.Table.writeParquetChunk(Native Method)
E                   	at ai.rapids.cudf.Table.access$300(Table.java:39)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably a bug in plugin. Working on it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it by using m.valueContainsNull instead of the parameter nullable when building options from MapType.

*/
def orcWriterOptionsFromSchema[T <: NestedBuilder[_, _], V <: ColumnWriterOptions](
builder: ColumnWriterOptions.NestedBuilder[T, V],
schema: StructType): T = {
schema.foreach(field =>
orcWriterOptionsFromField(builder, field.dataType, field.name, field.nullable)
)
builder.asInstanceOf[T]
}
}

class GpuOrcFileFormat extends ColumnarFileFormat with Logging {
Expand Down Expand Up @@ -161,18 +215,9 @@ class GpuOrcWriter(path: String,
extends ColumnarOutputWriter(path, context, dataSchema, "ORC") {

override val tableWriter: TableWriter = {
val builder= ORCWriterOptions.builder()
val builder = GpuOrcFileFormat
.orcWriterOptionsFromSchema(ORCWriterOptions.builder(), dataSchema)
.withCompressionType(CompressionType.valueOf(OrcConf.COMPRESS.getString(conf)))

dataSchema.foreach(entry => {
if (entry.nullable) {
builder.withColumnNames(entry.name)
} else {
builder.withNotNullableColumnNames(entry.name)
}
})

val options = builder.build()
Table.writeORCChunked(options, this)
Table.writeORCChunked(builder.build(), this)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.rapids
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf
import ai.rapids.cudf.{ColumnView, CudfException, GroupByAggregation, GroupByOptions, ParquetColumnWriterOptions, ParquetWriterOptions, Scalar}
import ai.rapids.cudf.{ColumnView, CudfException, GroupByAggregation, GroupByOptions, ParquetWriterOptions, Scalar}
import com.nvidia.spark.rapids.{GpuBinaryExpression, GpuColumnVector, GpuComplexTypeMergingExpression, GpuListUtils, GpuLiteral, GpuMapUtils, GpuScalar, GpuUnaryExpression}
import com.nvidia.spark.rapids.GpuExpressionsUtils.columnarEvalToColumn
import com.nvidia.spark.rapids.RapidsPluginImplicits._
Expand Down