Skip to content

Commit

Permalink
Orc merge schema fallback and FileScan format configs (NVIDIA#158)
Browse files Browse the repository at this point in the history
* disable merge schema for orc reads

* Added in configs to disable various file formats

* Added in tests
  • Loading branch information
revans2 authored Jun 12, 2020
1 parent b3e274a commit 6665902
Show file tree
Hide file tree
Showing 12 changed files with 189 additions and 13 deletions.
6 changes: 3 additions & 3 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ Spark is very strict when reading CSV and if the data does not conform with the
exactly it will result in a `null` value. The underlying parser that the SQL plugin uses is much
more lenient. If you have badly formatted CSV data you may get data back instead of nulls.
If this is a problem you can disable the CSV reader by setting the config
[`spark.rapids.sql.input.CSVScan`](configs.md#sql.input.CSVScan) to `false`. Because the speed up
is so large and the issues typically only show up in error conditions we felt it was worth having
the CSV reader enabled by default.
[`spark.rapids.sql.format.csv.read.enabled`](configs.md#sql.format.csv.read.enabled) to `false`.
Because the speed up is so large and the issues typically only show up in error conditions we felt
it was worth having the CSV reader enabled by default.

There are also discrepancies/issues with specific types that are detailed below.

Expand Down
8 changes: 8 additions & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ Name | Description | Default Value
<a name="sql.csvTimestamps.enabled"></a>spark.rapids.sql.csvTimestamps.enabled|When set to true, enables the CSV parser to read timestamps. The default output format for Spark includes a timezone at the end. Anything except the UTC timezone is not supported. Timestamps after 2038 and before 1902 are also not supported.|false
<a name="sql.enabled"></a>spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true
<a name="sql.explain"></a>spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NONE
<a name="sql.format.csv.enabled"></a>spark.rapids.sql.format.csv.enabled|When set to false disables all csv input and output acceleration. (only input is currently supported anyways)|true
<a name="sql.format.csv.read.enabled"></a>spark.rapids.sql.format.csv.read.enabled|When set to false disables csv input acceleration|true
<a name="sql.format.orc.enabled"></a>spark.rapids.sql.format.orc.enabled|When set to false disables all orc input and output acceleration|true
<a name="sql.format.orc.read.enabled"></a>spark.rapids.sql.format.orc.read.enabled|When set to false disables orc input acceleration|true
<a name="sql.format.orc.write.enabled"></a>spark.rapids.sql.format.orc.write.enabled|When set to false disables orc output acceleration|true
<a name="sql.format.parquet.enabled"></a>spark.rapids.sql.format.parquet.enabled|When set to false disables all parquet input and output acceleration|true
<a name="sql.format.parquet.read.enabled"></a>spark.rapids.sql.format.parquet.read.enabled|When set to false disables parquet input acceleration|true
<a name="sql.format.parquet.write.enabled"></a>spark.rapids.sql.format.parquet.write.enabled|When set to false disables parquet output acceleration|true
<a name="sql.hasNans"></a>spark.rapids.sql.hasNans|Config to indicate if your data has NaN's. Cudf doesn't currently support NaN's properly so you can get corrupt data if you have NaN's in your data and it runs on the GPU.|true
<a name="sql.hashOptimizeSort.enabled"></a>spark.rapids.sql.hashOptimizeSort.enabled|Whether sorts should be inserted after some hashed operations to improve output ordering. This can improve output file sizes when saving to columnar formats.|false
<a name="sql.improvedFloatOps.enabled"></a>spark.rapids.sql.improvedFloatOps.enabled|For some floating point operations spark uses one way to compute the value and the underlying cudf implementation can use an improved algorithm. In some cases this can result in cudf producing an answer when spark overflows. Because this is not as compatible with spark, we have it disabled by default.|false
Expand Down
19 changes: 19 additions & 0 deletions integration_tests/src/main/python/csv_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,25 @@ def test_round_trip(spark_tmp_path, data_gen):
lambda spark : spark.read.schema(schema).csv(data_path),
conf=_enable_ts_conf)

@allow_non_gpu('FileSourceScanExec')
@pytest.mark.parametrize('read_func', [read_csv_df, read_csv_sql])
@pytest.mark.parametrize('disable_conf', ['spark.rapids.sql.format.csv.enabled', 'spark.rapids.sql.format.csv.read.enabled'])
def test_csv_fallback(spark_tmp_path, read_func, disable_conf):
data_gens =[
StringGen('(\\w| |\t|\ud720){0,10}', nullable=False),
byte_gen, short_gen, int_gen, long_gen, boolean_gen, date_gen]

gen_list = [('_c' + str(i), gen) for i, gen in enumerate(data_gens)]
gen = StructGen(gen_list, nullable=False)
data_path = spark_tmp_path + '/CSV_DATA'
schema = gen.data_type
reader = read_func(data_path, schema, False, ',')
with_cpu_session(
lambda spark : gen_df(spark, gen).write.csv(data_path))
assert_gpu_and_cpu_are_equal_collect(
lambda spark : reader(spark).select(f.col('*'), f.col('_c2') + f.col('_c3')),
conf={disable_conf: 'false'})

csv_supported_date_formats = ['yyyy-MM-dd', 'yyyy/MM/dd', 'yyyy-MM', 'yyyy/MM',
'MM-yyyy', 'MM/yyyy', 'MM-dd-yyyy', 'MM/dd/yyyy']
@pytest.mark.parametrize('date_format', csv_supported_date_formats, ids=idfn)
Expand Down
17 changes: 17 additions & 0 deletions integration_tests/src/main/python/orc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,23 @@ def test_basic_read(std_input_path, name, read_func):
pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/131')),
pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/131'))]

@allow_non_gpu('FileSourceScanExec')
@pytest.mark.parametrize('read_func', [read_orc_df, read_orc_sql])
@pytest.mark.parametrize('disable_conf', ['spark.rapids.sql.format.orc.enabled', 'spark.rapids.sql.format.orc.read.enabled'])
def test_orc_fallback(spark_tmp_path, read_func, disable_conf):
data_gens =[string_gen,
byte_gen, short_gen, int_gen, long_gen, boolean_gen]

gen_list = [('_c' + str(i), gen) for i, gen in enumerate(data_gens)]
gen = StructGen(gen_list, nullable=False)
data_path = spark_tmp_path + '/PARQUET_DATA'
reader = read_func(data_path)
with_cpu_session(
lambda spark : gen_df(spark, gen).write.orc(data_path))
assert_gpu_and_cpu_are_equal_collect(
lambda spark : reader(spark).select(f.col('*'), f.col('_c2') + f.col('_c3')),
conf={disable_conf: 'false'})

@pytest.mark.parametrize('orc_gens', orc_gens_list, ids=idfn)
@pytest.mark.parametrize('read_func', [read_orc_df, read_orc_sql])
def test_read_round_trip(spark_tmp_path, orc_gens, read_func):
Expand Down
17 changes: 17 additions & 0 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ def test_read_round_trip(spark_tmp_path, parquet_gens, read_func):
assert_gpu_and_cpu_are_equal_collect(
read_func(data_path))

@allow_non_gpu('FileSourceScanExec')
@pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql])
@pytest.mark.parametrize('disable_conf', ['spark.rapids.sql.format.parquet.enabled', 'spark.rapids.sql.format.parquet.read.enabled'])
def test_parquet_fallback(spark_tmp_path, read_func, disable_conf):
data_gens =[string_gen,
byte_gen, short_gen, int_gen, long_gen, boolean_gen]

gen_list = [('_c' + str(i), gen) for i, gen in enumerate(data_gens)]
gen = StructGen(gen_list, nullable=False)
data_path = spark_tmp_path + '/PARQUET_DATA'
reader = read_func(data_path)
with_cpu_session(
lambda spark : gen_df(spark, gen).write.parquet(data_path))
assert_gpu_and_cpu_are_equal_collect(
lambda spark : reader(spark).select(f.col('*'), f.col('_c2') + f.col('_c3')),
conf={disable_conf: 'false'})

parquet_compress_options = ['none', 'uncompressed', 'snappy', 'gzip']
# The following need extra jars 'lzo', 'lz4', 'brotli', 'zstd'
# https://github.com/NVIDIA/spark-rapids/issues/143
Expand Down
10 changes: 10 additions & 0 deletions sql-plugin/src/main/scala/ai/rapids/spark/GpuBatchScanExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@ object GpuCSVScan {
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)

if (!meta.conf.isCsvEnabled) {
meta.willNotWorkOnGpu("CSV input and output has been disabled. To enable set" +
s"${RapidsConf.ENABLE_CSV} to true")
}

if (!meta.conf.isCsvReadEnabled) {
meta.willNotWorkOnGpu("CSV input has been disabled. To enable set" +
s"${RapidsConf.ENABLE_CSV_READ} to true")
}

if (!parsedOptions.enforceSchema) {
meta.willNotWorkOnGpu("GpuCSVScan always enforces schemas")
}
Expand Down
31 changes: 22 additions & 9 deletions sql-plugin/src/main/scala/ai/rapids/spark/GpuOrcScan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.io.DiskRangeList
import org.apache.orc.{CompressionKind, DataReader, OrcConf, OrcFile, OrcProto, PhysicalWriter, Reader, StripeInformation, TypeDescription}
import org.apache.orc.{DataReader, OrcConf, OrcFile, OrcProto, PhysicalWriter, Reader, StripeInformation, TypeDescription}
import org.apache.orc.impl._
import org.apache.orc.impl.RecordReaderImpl.SargApplier
import org.apache.orc.mapred.OrcInputFormat
Expand Down Expand Up @@ -111,13 +111,30 @@ object GpuOrcScan {
def tagSupport(scanMeta: ScanMeta[OrcScan]): Unit = {
val scan = scanMeta.wrapped
val schema = StructType(scan.readDataSchema ++ scan.readPartitionSchema)
if (scan.options.getBoolean("mergeSchema", false)) {
scanMeta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet")
}
tagSupport(scan.sparkSession, schema, scanMeta)
}

def tagSupport(
sparkSession: SparkSession,
schema: StructType,
meta: RapidsMeta[_, _, _]): Unit = {
if (!meta.conf.isOrcEnabled) {
meta.willNotWorkOnGpu("ORC input and output has been disabled. To enable set" +
s"${RapidsConf.ENABLE_ORC} to true")
}

if (!meta.conf.isOrcReadEnabled) {
meta.willNotWorkOnGpu("ORC input has been disabled. To enable set" +
s"${RapidsConf.ENABLE_ORC_READ} to true")
}

if (sparkSession.conf
.getOption("spark.sql.orc.mergeSchema").exists(_.toBoolean)) {
meta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet")
}
schema.foreach { field =>
if (!GpuColumnVector.isSupportedType(field.dataType)) {
meta.willNotWorkOnGpu(s"GpuOrcScan does not support fields of type ${field.dataType}")
Expand Down Expand Up @@ -434,9 +451,6 @@ class GpuOrcPartitionReader(
OrcOutputStripe(infoBuilder, outputStripeFooter, rangeCreator.get)
}

private def estimateRowCount(stripes: Seq[OrcOutputStripe]): Long =
stripes.map(_.infoBuilder.getNumberOfRows).sum

private def estimateOutputSize(stripes: Seq[OrcOutputStripe]): Long = {
// start with header magic
var size: Long = OrcFile.MAGIC.length
Expand Down Expand Up @@ -649,12 +663,12 @@ class GpuOrcPartitionReader(
}
}

private def readPartFile(stripes: Seq[OrcOutputStripe]): (HostMemoryBuffer, Long, Long) = {
private def readPartFile(stripes: Seq[OrcOutputStripe]): (HostMemoryBuffer, Long) = {
val nvtxRange = new NvtxWithMetrics("Buffer file split", NvtxColor.YELLOW,
metrics("bufferTime"))
try {
if (stripes.isEmpty) {
return (null, 0L, 0)
return (null, 0L)
}

val hostBufferSize = estimateOutputSize(stripes)
Expand All @@ -664,7 +678,7 @@ class GpuOrcPartitionReader(
val out = new HostMemoryOutputStream(hmb)
writeOrcOutputFile(out, stripes)
succeeded = true
(hmb, out.getPos, estimateRowCount(stripes))
(hmb, out.getPos)
} finally {
if (!succeeded) {
hmb.close()
Expand All @@ -676,15 +690,14 @@ class GpuOrcPartitionReader(
}

private def readToTable(stripes: Seq[OrcOutputStripe]): Option[Table] = {
val (dataBuffer, dataSize, rowCount) = readPartFile(stripes)
val (dataBuffer, dataSize) = readPartFile(stripes)
try {
if (dataSize == 0) {
None
} else {
if (debugDumpPrefix != null) {
dumpOrcData(dataBuffer, dataSize)
}
val cudfSchema = GpuColumnVector.from(readDataSchema)
val includedColumns = ctx.updatedReadSchema.getFieldNames.asScala
val parseOpts = ORCOptions.builder()
.withTimeUnit(DType.TIMESTAMP_MICROSECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ object GpuParquetFileFormat {
val sqlConf = spark.sessionState.conf
val parquetOptions = new ParquetOptions(options, sqlConf)

if (!meta.conf.isParquetEnabled) {
meta.willNotWorkOnGpu("Parquet input and output has been disabled. To enable set" +
s"${RapidsConf.ENABLE_PARQUET} to true")
}

if (!meta.conf.isParquetWriteEnabled) {
meta.willNotWorkOnGpu("Parquet output has been disabled. To enable set" +
s"${RapidsConf.ENABLE_PARQUET_WRITE} to true")
}

parseCompressionType(parquetOptions.compressionCodecClassName)
.getOrElse(meta.willNotWorkOnGpu(
s"compression codec ${parquetOptions.compressionCodecClassName} is not supported"))
Expand Down
10 changes: 10 additions & 0 deletions sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetScan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ object GpuParquetScan {
sparkSession: SparkSession,
readSchema: StructType,
meta: RapidsMeta[_, _, _]): Unit = {
if (!meta.conf.isParquetEnabled) {
meta.willNotWorkOnGpu("Parquet input and output has been disabled. To enable set" +
s"${RapidsConf.ENABLE_PARQUET} to true")
}

if (!meta.conf.isParquetReadEnabled) {
meta.willNotWorkOnGpu("Parquet input has been disabled. To enable set" +
s"${RapidsConf.ENABLE_PARQUET_READ} to true")
}

for (field <- readSchema) {
if (!GpuColumnVector.isSupportedType(field.dataType)) {
meta.willNotWorkOnGpu(s"GpuParquetScan does not support fields of type ${field.dataType}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ class GpuReadOrcFileFormat extends OrcFileFormat {
object GpuReadOrcFileFormat {
def tagSupport(meta: SparkPlanMeta[FileSourceScanExec]): Unit = {
val fsse = meta.wrapped
if (fsse.relation.options.getOrElse("mergeSchema", "false").toBoolean) {
meta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet")
}
GpuOrcScan.tagSupport(
fsse.sqlContext.sparkSession,
fsse.requiredSchema,
Expand Down
58 changes: 58 additions & 0 deletions sql-plugin/src/main/scala/ai/rapids/spark/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,48 @@ object RapidsConf {
.booleanConf
.createWithDefault(false)

// FILE FORMATS
val ENABLE_PARQUET = conf("spark.rapids.sql.format.parquet.enabled")
.doc("When set to false disables all parquet input and output acceleration")
.booleanConf
.createWithDefault(true)

val ENABLE_PARQUET_READ = conf("spark.rapids.sql.format.parquet.read.enabled")
.doc("When set to false disables parquet input acceleration")
.booleanConf
.createWithDefault(true)

val ENABLE_PARQUET_WRITE = conf("spark.rapids.sql.format.parquet.write.enabled")
.doc("When set to false disables parquet output acceleration")
.booleanConf
.createWithDefault(true)

val ENABLE_ORC = conf("spark.rapids.sql.format.orc.enabled")
.doc("When set to false disables all orc input and output acceleration")
.booleanConf
.createWithDefault(true)

val ENABLE_ORC_READ = conf("spark.rapids.sql.format.orc.read.enabled")
.doc("When set to false disables orc input acceleration")
.booleanConf
.createWithDefault(true)

val ENABLE_ORC_WRITE = conf("spark.rapids.sql.format.orc.write.enabled")
.doc("When set to false disables orc output acceleration")
.booleanConf
.createWithDefault(true)

val ENABLE_CSV = conf("spark.rapids.sql.format.csv.enabled")
.doc("When set to false disables all csv input and output acceleration. " +
"(only input is currently supported anyways)")
.booleanConf
.createWithDefault(true)

val ENABLE_CSV_READ = conf("spark.rapids.sql.format.csv.read.enabled")
.doc("When set to false disables csv input acceleration")
.booleanConf
.createWithDefault(true)

// INTERNAL TEST AND DEBUG CONFIGS

val TEST_CONF = conf("spark.rapids.sql.test.enabled")
Expand Down Expand Up @@ -735,6 +777,22 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isCsvTimestampEnabled: Boolean = get(ENABLE_CSV_TIMESTAMPS)

lazy val isParquetEnabled: Boolean = get(ENABLE_PARQUET)

lazy val isParquetReadEnabled: Boolean = get(ENABLE_PARQUET_READ)

lazy val isParquetWriteEnabled: Boolean = get(ENABLE_PARQUET_WRITE)

lazy val isOrcEnabled: Boolean = get(ENABLE_ORC)

lazy val isOrcReadEnabled: Boolean = get(ENABLE_ORC_READ)

lazy val isOrcWriteEnabled: Boolean = get(ENABLE_ORC_WRITE)

lazy val isCsvEnabled: Boolean = get(ENABLE_CSV)

lazy val isCsvReadEnabled: Boolean = get(ENABLE_CSV_READ)

lazy val shuffleTransportEnabled: Boolean = get(SHUFFLE_TRANSPORT_ENABLE)

lazy val shuffleTransportClassName: String = get(SHUFFLE_TRANSPORT_CLASS_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,24 @@ object GpuOrcFileFormat extends Logging {
def tagGpuSupport(meta: RapidsMeta[_, _, _],
spark: SparkSession,
options: Map[String, String]): Option[GpuOrcFileFormat] = {

if (!meta.conf.isOrcEnabled) {
meta.willNotWorkOnGpu("ORC input and output has been disabled. To enable set" +
s"${RapidsConf.ENABLE_ORC} to true")
}

if (!meta.conf.isOrcWriteEnabled) {
meta.willNotWorkOnGpu("ORC output has been disabled. To enable set" +
s"${RapidsConf.ENABLE_ORC_WRITE} to true")
}

val sqlConf = spark.sessionState.conf

val parameters = CaseInsensitiveMap(options)

case class ConfDataForTagging(orcConf: OrcConf, defaultValue: Any, message: String)

def tagIfOrcOrHiveConfNotSupported(params: ConfDataForTagging) = {
def tagIfOrcOrHiveConfNotSupported(params: ConfDataForTagging): Unit = {
val conf = params.orcConf
val defaultValue = params.defaultValue
val message = params.message
Expand Down

0 comments on commit 6665902

Please sign in to comment.