Skip to content

Commit

Permalink
Merge pull request #2164 from NVIDIA/branch-0.5
Browse files Browse the repository at this point in the history
[auto-merge] branch-0.5 to branch-0.6 [skip ci] [bot]
  • Loading branch information
nvauto authored Apr 16, 2021
2 parents 116668d + 87fc897 commit 8849be5
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 186 deletions.
87 changes: 57 additions & 30 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -20026,12 +20026,11 @@ dates or timestamps, or for a lack of type coercion support.
<th>ARRAY</th>
<th>MAP</th>
<th>STRUCT</th>
<th>UDT</th>
</tr>
<tr>
<th rowSpan="2">Parquet</th>
<th>Input</th>
<td>S</td>
<td>S</td>
<th rowSpan="2">CSV</th>
<th>Read</th>
<td>S</td>
<td>S</td>
<td>S</td>
Expand All @@ -20040,18 +20039,41 @@ dates or timestamps, or for a lack of type coercion support.
<td>S</td>
<td>S</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td></td>
<td><b>NS</b></td>
<td></td>
<td><em>PS (missing nested BINARY)</em></td>
<td><em>PS (missing nested BINARY)</em></td>
<td><em>PS (missing nested BINARY)</em></td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<th>Output</th>
<td>S</td>
<td>S</td>
<th>Write</th>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<th rowSpan="2">ORC</th>
<th>Read</th>
<td>S</td>
<td>S</td>
<td>S</td>
Expand All @@ -20060,18 +20082,19 @@ dates or timestamps, or for a lack of type coercion support.
<td>S</td>
<td>S</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td></td>
<td><b>NS</b></td>
<td></td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
</tr>
<tr>
<th rowSpan="2">ORC</th>
<th>Input</th>
<td>S</td>
<th>Write</th>
<td>S</td>
<td>S</td>
<td>S</td>
Expand All @@ -20080,18 +20103,20 @@ dates or timestamps, or for a lack of type coercion support.
<td>S</td>
<td>S</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
</tr>
<tr>
<th>Output</th>
<td>S</td>
<th rowSpan="2">Parquet</th>
<th>Read</th>
<td>S</td>
<td>S</td>
<td>S</td>
Expand All @@ -20100,19 +20125,19 @@ dates or timestamps, or for a lack of type coercion support.
<td>S</td>
<td>S</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td>S*</td>
<td> </td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><em>PS* (missing nested BINARY, UDT)</em></td>
<td><em>PS* (missing nested BINARY, UDT)</em></td>
<td><em>PS* (missing nested BINARY, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
<th>CSV</th>
<th>Input</th>
<td>S</td>
<th>Write</th>
<td>S</td>
<td>S</td>
<td>S</td>
Expand All @@ -20121,10 +20146,12 @@ dates or timestamps, or for a lack of type coercion support.
<td>S</td>
<td>S</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td>S*</td>
<td> </td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,7 @@ object GpuCSVScan {
}
// TODO parsedOptions.emptyValueInRead

if (readSchema.exists(_.dataType.isInstanceOf[DecimalType])) {
meta.willNotWorkOnGpu("DecimalType is not supported")
}
FileFormatChecks.tag(meta, readSchema, CsvFormatType, ReadFileOp)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,12 @@ object GpuOrcScanBase {
s"${RapidsConf.ENABLE_ORC_READ} to true")
}

FileFormatChecks.tag(meta, schema, OrcFormatType, ReadFileOp)

if (sparkSession.conf
.getOption("spark.sql.orc.mergeSchema").exists(_.toBoolean)) {
meta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet")
}
schema.foreach { field =>
if (!GpuOverrides.isSupportedType(field.dataType)) {
meta.willNotWorkOnGpu(s"GpuOrcScan does not support fields of type ${field.dataType}")
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,25 @@ trait GpuOverridesListener {
costOptimizations: Seq[Optimization])
}

sealed trait FileFormatType
object CsvFormatType extends FileFormatType {
override def toString = "CSV"
}
object ParquetFormatType extends FileFormatType {
override def toString = "Parquet"
}
object OrcFormatType extends FileFormatType {
override def toString = "ORC"
}

sealed trait FileFormatOp
object ReadFileOp extends FileFormatOp {
override def toString = "read"
}
object WriteFileOp extends FileFormatOp {
override def toString = "write"
}

object GpuOverrides {
val FLOAT_DIFFERS_GROUP_INCOMPAT =
"when enabling these, there may be extra groups produced for floating point grouping " +
Expand Down Expand Up @@ -735,6 +754,22 @@ object GpuOverrides {
.map(r => r.wrap(expr, conf, parent, r).asInstanceOf[BaseExprMeta[INPUT]])
.getOrElse(new RuleNotFoundExprMeta(expr, conf, parent))

val fileFormats: Map[FileFormatType, Map[FileFormatOp, FileFormatChecks]] = Map(
(CsvFormatType, FileFormatChecks(
cudfRead = TypeSig.commonCudfTypes,
cudfWrite = TypeSig.none,
sparkSig = TypeSig.atomics)),
(ParquetFormatType, FileFormatChecks(
cudfRead = (TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT + TypeSig.ARRAY +
TypeSig.MAP).nested(),
cudfWrite = TypeSig.commonCudfTypes + TypeSig.DECIMAL,
sparkSig = (TypeSig.atomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP +
TypeSig.UDT).nested())),
(OrcFormatType, FileFormatChecks(
cudfReadWrite = TypeSig.commonCudfTypes,
sparkSig = (TypeSig.atomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP +
TypeSig.UDT).nested())))

val commonExpressions: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = Seq(
expr[Literal](
"Holds a static value from the query",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ object GpuParquetFileFormat {
options: Map[String, String],
schema: StructType): Option[GpuParquetFileFormat] = {

val unSupportedTypes =
schema.filterNot(field => GpuOverrides.isSupportedType(field.dataType, allowDecimal = true))
if (unSupportedTypes.nonEmpty) {
meta.willNotWorkOnGpu(s"These types aren't supported for parquet $unSupportedTypes")
}

val sqlConf = spark.sessionState.conf
val parquetOptions = new ParquetOptions(options, sqlConf)

Expand All @@ -61,6 +55,8 @@ object GpuParquetFileFormat {
s"${RapidsConf.ENABLE_PARQUET_WRITE} to true")
}

FileFormatChecks.tag(meta, schema, ParquetFormatType, WriteFileOp)

parseCompressionType(parquetOptions.compressionCodecClassName)
.getOrElse(meta.willNotWorkOnGpu(
s"compression codec ${parquetOptions.compressionCodecClassName} is not supported"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,7 @@ object GpuParquetScanBase {
s"${RapidsConf.ENABLE_PARQUET_READ} to true")
}

for (field <- readSchema) {
if (!GpuOverrides.isSupportedType(
field.dataType,
allowMaps = true,
allowArray = true,
allowStruct = true,
allowNesting = true,
allowDecimal = meta.conf.decimalTypeEnabled)) {
meta.willNotWorkOnGpu(s"GpuParquetScan does not support fields of type ${field.dataType}")
}
}
FileFormatChecks.tag(meta, readSchema, ParquetFormatType, ReadFileOp)

val schemaHasStrings = readSchema.exists { field =>
TrampolineUtil.dataTypeExistsRecursively(field.dataType, _.isInstanceOf[StringType])
Expand Down
Loading

0 comments on commit 8849be5

Please sign in to comment.