Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[auto-merge] branch-0.5 to branch-0.6 [skip ci] [bot] #2164

Merged
merged 1 commit into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 57 additions & 30 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -20026,12 +20026,11 @@ dates or timestamps, or for a lack of type coercion support.
<th>ARRAY</th>
<th>MAP</th>
<th>STRUCT</th>
<th>UDT</th>
</tr>
<tr>
<th rowSpan="2">Parquet</th>
<th>Input</th>
<td>S</td>
<td>S</td>
<th rowSpan="2">CSV</th>
<th>Read</th>
<td>S</td>
<td>S</td>
<td>S</td>
Expand All @@ -20040,18 +20039,41 @@ dates or timestamps, or for a lack of type coercion support.
<td>S</td>
<td>S</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td></td>
<td><b>NS</b></td>
<td></td>
<td><em>PS (missing nested BINARY)</em></td>
<td><em>PS (missing nested BINARY)</em></td>
<td><em>PS (missing nested BINARY)</em></td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<th>Output</th>
<td>S</td>
<td>S</td>
<th>Write</th>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<th rowSpan="2">ORC</th>
<th>Read</th>
<td>S</td>
<td>S</td>
<td>S</td>
Expand All @@ -20060,18 +20082,19 @@ dates or timestamps, or for a lack of type coercion support.
<td>S</td>
<td>S</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td></td>
<td><b>NS</b></td>
<td></td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
</tr>
<tr>
<th rowSpan="2">ORC</th>
<th>Input</th>
<td>S</td>
<th>Write</th>
<td>S</td>
<td>S</td>
<td>S</td>
Expand All @@ -20080,18 +20103,20 @@ dates or timestamps, or for a lack of type coercion support.
<td>S</td>
<td>S</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
</tr>
<tr>
<th>Output</th>
<td>S</td>
<th rowSpan="2">Parquet</th>
<th>Read</th>
<td>S</td>
<td>S</td>
<td>S</td>
Expand All @@ -20100,19 +20125,19 @@ dates or timestamps, or for a lack of type coercion support.
<td>S</td>
<td>S</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td>S*</td>
<td> </td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><em>PS* (missing nested BINARY, UDT)</em></td>
<td><em>PS* (missing nested BINARY, UDT)</em></td>
<td><em>PS* (missing nested BINARY, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
<th>CSV</th>
<th>Input</th>
<td>S</td>
<th>Write</th>
<td>S</td>
<td>S</td>
<td>S</td>
Expand All @@ -20121,10 +20146,12 @@ dates or timestamps, or for a lack of type coercion support.
<td>S</td>
<td>S</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td>S*</td>
<td> </td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,7 @@ object GpuCSVScan {
}
// TODO parsedOptions.emptyValueInRead

if (readSchema.exists(_.dataType.isInstanceOf[DecimalType])) {
meta.willNotWorkOnGpu("DecimalType is not supported")
}
FileFormatChecks.tag(meta, readSchema, CsvFormatType, ReadFileOp)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,12 @@ object GpuOrcScanBase {
s"${RapidsConf.ENABLE_ORC_READ} to true")
}

FileFormatChecks.tag(meta, schema, OrcFormatType, ReadFileOp)

if (sparkSession.conf
.getOption("spark.sql.orc.mergeSchema").exists(_.toBoolean)) {
meta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet")
}
schema.foreach { field =>
if (!GpuOverrides.isSupportedType(field.dataType)) {
meta.willNotWorkOnGpu(s"GpuOrcScan does not support fields of type ${field.dataType}")
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,25 @@ trait GpuOverridesListener {
costOptimizations: Seq[Optimization])
}

sealed trait FileFormatType
object CsvFormatType extends FileFormatType {
override def toString = "CSV"
}
object ParquetFormatType extends FileFormatType {
override def toString = "Parquet"
}
object OrcFormatType extends FileFormatType {
override def toString = "ORC"
}

sealed trait FileFormatOp
object ReadFileOp extends FileFormatOp {
override def toString = "read"
}
object WriteFileOp extends FileFormatOp {
override def toString = "write"
}

object GpuOverrides {
val FLOAT_DIFFERS_GROUP_INCOMPAT =
"when enabling these, there may be extra groups produced for floating point grouping " +
Expand Down Expand Up @@ -735,6 +754,22 @@ object GpuOverrides {
.map(r => r.wrap(expr, conf, parent, r).asInstanceOf[BaseExprMeta[INPUT]])
.getOrElse(new RuleNotFoundExprMeta(expr, conf, parent))

val fileFormats: Map[FileFormatType, Map[FileFormatOp, FileFormatChecks]] = Map(
(CsvFormatType, FileFormatChecks(
cudfRead = TypeSig.commonCudfTypes,
cudfWrite = TypeSig.none,
sparkSig = TypeSig.atomics)),
(ParquetFormatType, FileFormatChecks(
cudfRead = (TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.STRUCT + TypeSig.ARRAY +
TypeSig.MAP).nested(),
cudfWrite = TypeSig.commonCudfTypes + TypeSig.DECIMAL,
sparkSig = (TypeSig.atomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP +
TypeSig.UDT).nested())),
(OrcFormatType, FileFormatChecks(
cudfReadWrite = TypeSig.commonCudfTypes,
sparkSig = (TypeSig.atomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP +
TypeSig.UDT).nested())))

val commonExpressions: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = Seq(
expr[Literal](
"Holds a static value from the query",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ object GpuParquetFileFormat {
options: Map[String, String],
schema: StructType): Option[GpuParquetFileFormat] = {

val unSupportedTypes =
schema.filterNot(field => GpuOverrides.isSupportedType(field.dataType, allowDecimal = true))
if (unSupportedTypes.nonEmpty) {
meta.willNotWorkOnGpu(s"These types aren't supported for parquet $unSupportedTypes")
}

val sqlConf = spark.sessionState.conf
val parquetOptions = new ParquetOptions(options, sqlConf)

Expand All @@ -61,6 +55,8 @@ object GpuParquetFileFormat {
s"${RapidsConf.ENABLE_PARQUET_WRITE} to true")
}

FileFormatChecks.tag(meta, schema, ParquetFormatType, WriteFileOp)

parseCompressionType(parquetOptions.compressionCodecClassName)
.getOrElse(meta.willNotWorkOnGpu(
s"compression codec ${parquetOptions.compressionCodecClassName} is not supported"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,7 @@ object GpuParquetScanBase {
s"${RapidsConf.ENABLE_PARQUET_READ} to true")
}

for (field <- readSchema) {
if (!GpuOverrides.isSupportedType(
field.dataType,
allowMaps = true,
allowArray = true,
allowStruct = true,
allowNesting = true,
allowDecimal = meta.conf.decimalTypeEnabled)) {
meta.willNotWorkOnGpu(s"GpuParquetScan does not support fields of type ${field.dataType}")
}
}
FileFormatChecks.tag(meta, readSchema, ParquetFormatType, ReadFileOp)

val schemaHasStrings = readSchema.exists { field =>
TrampolineUtil.dataTypeExistsRecursively(field.dataType, _.isInstanceOf[StringType])
Expand Down
Loading