diff --git a/docs/compatibility.md b/docs/compatibility.md
index b5393a428bc..1200f2cdd8f 100644
--- a/docs/compatibility.md
+++ b/docs/compatibility.md
@@ -61,9 +61,9 @@ Spark is very strict when reading CSV and if the data does not conform with the
exactly it will result in a `null` value. The underlying parser that the SQL plugin uses is much
more lenient. If you have badly formatted CSV data you may get data back instead of nulls.
If this is a problem you can disable the CSV reader by setting the config
-[`spark.rapids.sql.input.CSVScan`](configs.md#sql.input.CSVScan) to `false`. Because the speed up
-is so large and the issues typically only show up in error conditions we felt it was worth having
-the CSV reader enabled by default.
+[`spark.rapids.sql.format.csv.read.enabled`](configs.md#sql.format.csv.read.enabled) to `false`.
+Because the speed up is so large and the issues typically only show up in error conditions we felt
+it was worth having the CSV reader enabled by default.
There are also discrepancies/issues with specific types that are detailed below.
diff --git a/docs/configs.md b/docs/configs.md
index fed4b80e9f0..cecbbb63cb8 100644
--- a/docs/configs.md
+++ b/docs/configs.md
@@ -44,6 +44,14 @@ Name | Description | Default Value
spark.rapids.sql.csvTimestamps.enabled|When set to true, enables the CSV parser to read timestamps. The default output format for Spark includes a timezone at the end. Anything except the UTC timezone is not supported. Timestamps after 2038 and before 1902 are also not supported.|false
spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true
spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NONE
+spark.rapids.sql.format.csv.enabled|When set to false disables all csv input and output acceleration. (only input is currently supported anyways)|true
+spark.rapids.sql.format.csv.read.enabled|When set to false disables csv input acceleration|true
+spark.rapids.sql.format.orc.enabled|When set to false disables all orc input and output acceleration|true
+spark.rapids.sql.format.orc.read.enabled|When set to false disables orc input acceleration|true
+spark.rapids.sql.format.orc.write.enabled|When set to false disables orc output acceleration|true
+spark.rapids.sql.format.parquet.enabled|When set to false disables all parquet input and output acceleration|true
+spark.rapids.sql.format.parquet.read.enabled|When set to false disables parquet input acceleration|true
+spark.rapids.sql.format.parquet.write.enabled|When set to false disables parquet output acceleration|true
spark.rapids.sql.hasNans|Config to indicate if your data has NaN's. Cudf doesn't currently support NaN's properly so you can get corrupt data if you have NaN's in your data and it runs on the GPU.|true
spark.rapids.sql.hashOptimizeSort.enabled|Whether sorts should be inserted after some hashed operations to improve output ordering. This can improve output file sizes when saving to columnar formats.|false
spark.rapids.sql.improvedFloatOps.enabled|For some floating point operations spark uses one way to compute the value and the underlying cudf implementation can use an improved algorithm. In some cases this can result in cudf producing an answer when spark overflows. Because this is not as compatible with spark, we have it disabled by default.|false
diff --git a/integration_tests/src/main/python/csv_test.py b/integration_tests/src/main/python/csv_test.py
index ddd1d35526a..5bfb4cd6f43 100644
--- a/integration_tests/src/main/python/csv_test.py
+++ b/integration_tests/src/main/python/csv_test.py
@@ -151,6 +151,25 @@ def test_round_trip(spark_tmp_path, data_gen):
lambda spark : spark.read.schema(schema).csv(data_path),
conf=_enable_ts_conf)
+@allow_non_gpu('FileSourceScanExec')
+@pytest.mark.parametrize('read_func', [read_csv_df, read_csv_sql])
+@pytest.mark.parametrize('disable_conf', ['spark.rapids.sql.format.csv.enabled', 'spark.rapids.sql.format.csv.read.enabled'])
+def test_csv_fallback(spark_tmp_path, read_func, disable_conf):
+ data_gens =[
+ StringGen('(\\w| |\t|\ud720){0,10}', nullable=False),
+ byte_gen, short_gen, int_gen, long_gen, boolean_gen, date_gen]
+
+ gen_list = [('_c' + str(i), gen) for i, gen in enumerate(data_gens)]
+ gen = StructGen(gen_list, nullable=False)
+ data_path = spark_tmp_path + '/CSV_DATA'
+ schema = gen.data_type
+ reader = read_func(data_path, schema, False, ',')
+ with_cpu_session(
+ lambda spark : gen_df(spark, gen).write.csv(data_path))
+ assert_gpu_and_cpu_are_equal_collect(
+ lambda spark : reader(spark).select(f.col('*'), f.col('_c2') + f.col('_c3')),
+ conf={disable_conf: 'false'})
+
csv_supported_date_formats = ['yyyy-MM-dd', 'yyyy/MM/dd', 'yyyy-MM', 'yyyy/MM',
'MM-yyyy', 'MM/yyyy', 'MM-dd-yyyy', 'MM/dd/yyyy']
@pytest.mark.parametrize('date_format', csv_supported_date_formats, ids=idfn)
diff --git a/integration_tests/src/main/python/orc_test.py b/integration_tests/src/main/python/orc_test.py
index 35df73c57a2..4d31b747f78 100644
--- a/integration_tests/src/main/python/orc_test.py
+++ b/integration_tests/src/main/python/orc_test.py
@@ -39,6 +39,23 @@ def test_basic_read(std_input_path, name, read_func):
pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/131')),
pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/131'))]
+@allow_non_gpu('FileSourceScanExec')
+@pytest.mark.parametrize('read_func', [read_orc_df, read_orc_sql])
+@pytest.mark.parametrize('disable_conf', ['spark.rapids.sql.format.orc.enabled', 'spark.rapids.sql.format.orc.read.enabled'])
+def test_orc_fallback(spark_tmp_path, read_func, disable_conf):
+ data_gens =[string_gen,
+ byte_gen, short_gen, int_gen, long_gen, boolean_gen]
+
+ gen_list = [('_c' + str(i), gen) for i, gen in enumerate(data_gens)]
+ gen = StructGen(gen_list, nullable=False)
+ data_path = spark_tmp_path + '/PARQUET_DATA'
+ reader = read_func(data_path)
+ with_cpu_session(
+ lambda spark : gen_df(spark, gen).write.orc(data_path))
+ assert_gpu_and_cpu_are_equal_collect(
+ lambda spark : reader(spark).select(f.col('*'), f.col('_c2') + f.col('_c3')),
+ conf={disable_conf: 'false'})
+
@pytest.mark.parametrize('orc_gens', orc_gens_list, ids=idfn)
@pytest.mark.parametrize('read_func', [read_orc_df, read_orc_sql])
def test_read_round_trip(spark_tmp_path, orc_gens, read_func):
diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py
index 6db33d7e677..3583c6f16ee 100644
--- a/integration_tests/src/main/python/parquet_test.py
+++ b/integration_tests/src/main/python/parquet_test.py
@@ -43,6 +43,23 @@ def test_read_round_trip(spark_tmp_path, parquet_gens, read_func):
assert_gpu_and_cpu_are_equal_collect(
read_func(data_path))
+@allow_non_gpu('FileSourceScanExec')
+@pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql])
+@pytest.mark.parametrize('disable_conf', ['spark.rapids.sql.format.parquet.enabled', 'spark.rapids.sql.format.parquet.read.enabled'])
+def test_parquet_fallback(spark_tmp_path, read_func, disable_conf):
+ data_gens =[string_gen,
+ byte_gen, short_gen, int_gen, long_gen, boolean_gen]
+
+ gen_list = [('_c' + str(i), gen) for i, gen in enumerate(data_gens)]
+ gen = StructGen(gen_list, nullable=False)
+ data_path = spark_tmp_path + '/PARQUET_DATA'
+ reader = read_func(data_path)
+ with_cpu_session(
+ lambda spark : gen_df(spark, gen).write.parquet(data_path))
+ assert_gpu_and_cpu_are_equal_collect(
+ lambda spark : reader(spark).select(f.col('*'), f.col('_c2') + f.col('_c3')),
+ conf={disable_conf: 'false'})
+
parquet_compress_options = ['none', 'uncompressed', 'snappy', 'gzip']
# The following need extra jars 'lzo', 'lz4', 'brotli', 'zstd'
# https://github.com/NVIDIA/spark-rapids/issues/143
diff --git a/sql-plugin/src/main/scala/ai/rapids/spark/GpuBatchScanExec.scala b/sql-plugin/src/main/scala/ai/rapids/spark/GpuBatchScanExec.scala
index d5b6d76041d..37e9478f008 100644
--- a/sql-plugin/src/main/scala/ai/rapids/spark/GpuBatchScanExec.scala
+++ b/sql-plugin/src/main/scala/ai/rapids/spark/GpuBatchScanExec.scala
@@ -135,6 +135,16 @@ object GpuCSVScan {
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+ if (!meta.conf.isCsvEnabled) {
+ meta.willNotWorkOnGpu("CSV input and output has been disabled. To enable set" +
+ s"${RapidsConf.ENABLE_CSV} to true")
+ }
+
+ if (!meta.conf.isCsvReadEnabled) {
+ meta.willNotWorkOnGpu("CSV input has been disabled. To enable set" +
+ s"${RapidsConf.ENABLE_CSV_READ} to true")
+ }
+
if (!parsedOptions.enforceSchema) {
meta.willNotWorkOnGpu("GpuCSVScan always enforces schemas")
}
diff --git a/sql-plugin/src/main/scala/ai/rapids/spark/GpuOrcScan.scala b/sql-plugin/src/main/scala/ai/rapids/spark/GpuOrcScan.scala
index b39d209dffe..37da30576ab 100644
--- a/sql-plugin/src/main/scala/ai/rapids/spark/GpuOrcScan.scala
+++ b/sql-plugin/src/main/scala/ai/rapids/spark/GpuOrcScan.scala
@@ -35,7 +35,7 @@ import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.io.DiskRangeList
-import org.apache.orc.{CompressionKind, DataReader, OrcConf, OrcFile, OrcProto, PhysicalWriter, Reader, StripeInformation, TypeDescription}
+import org.apache.orc.{DataReader, OrcConf, OrcFile, OrcProto, PhysicalWriter, Reader, StripeInformation, TypeDescription}
import org.apache.orc.impl._
import org.apache.orc.impl.RecordReaderImpl.SargApplier
import org.apache.orc.mapred.OrcInputFormat
@@ -111,6 +111,9 @@ object GpuOrcScan {
def tagSupport(scanMeta: ScanMeta[OrcScan]): Unit = {
val scan = scanMeta.wrapped
val schema = StructType(scan.readDataSchema ++ scan.readPartitionSchema)
+ if (scan.options.getBoolean("mergeSchema", false)) {
+ scanMeta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet")
+ }
tagSupport(scan.sparkSession, schema, scanMeta)
}
@@ -118,6 +121,20 @@ object GpuOrcScan {
sparkSession: SparkSession,
schema: StructType,
meta: RapidsMeta[_, _, _]): Unit = {
+ if (!meta.conf.isOrcEnabled) {
+ meta.willNotWorkOnGpu("ORC input and output has been disabled. To enable set" +
+ s"${RapidsConf.ENABLE_ORC} to true")
+ }
+
+ if (!meta.conf.isOrcReadEnabled) {
+ meta.willNotWorkOnGpu("ORC input has been disabled. To enable set" +
+ s"${RapidsConf.ENABLE_ORC_READ} to true")
+ }
+
+ if (sparkSession.conf
+ .getOption("spark.sql.orc.mergeSchema").exists(_.toBoolean)) {
+ meta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet")
+ }
schema.foreach { field =>
if (!GpuColumnVector.isSupportedType(field.dataType)) {
meta.willNotWorkOnGpu(s"GpuOrcScan does not support fields of type ${field.dataType}")
@@ -434,9 +451,6 @@ class GpuOrcPartitionReader(
OrcOutputStripe(infoBuilder, outputStripeFooter, rangeCreator.get)
}
- private def estimateRowCount(stripes: Seq[OrcOutputStripe]): Long =
- stripes.map(_.infoBuilder.getNumberOfRows).sum
-
private def estimateOutputSize(stripes: Seq[OrcOutputStripe]): Long = {
// start with header magic
var size: Long = OrcFile.MAGIC.length
@@ -649,12 +663,12 @@ class GpuOrcPartitionReader(
}
}
- private def readPartFile(stripes: Seq[OrcOutputStripe]): (HostMemoryBuffer, Long, Long) = {
+ private def readPartFile(stripes: Seq[OrcOutputStripe]): (HostMemoryBuffer, Long) = {
val nvtxRange = new NvtxWithMetrics("Buffer file split", NvtxColor.YELLOW,
metrics("bufferTime"))
try {
if (stripes.isEmpty) {
- return (null, 0L, 0)
+ return (null, 0L)
}
val hostBufferSize = estimateOutputSize(stripes)
@@ -664,7 +678,7 @@ class GpuOrcPartitionReader(
val out = new HostMemoryOutputStream(hmb)
writeOrcOutputFile(out, stripes)
succeeded = true
- (hmb, out.getPos, estimateRowCount(stripes))
+ (hmb, out.getPos)
} finally {
if (!succeeded) {
hmb.close()
@@ -676,7 +690,7 @@ class GpuOrcPartitionReader(
}
private def readToTable(stripes: Seq[OrcOutputStripe]): Option[Table] = {
- val (dataBuffer, dataSize, rowCount) = readPartFile(stripes)
+ val (dataBuffer, dataSize) = readPartFile(stripes)
try {
if (dataSize == 0) {
None
@@ -684,7 +698,6 @@ class GpuOrcPartitionReader(
if (debugDumpPrefix != null) {
dumpOrcData(dataBuffer, dataSize)
}
- val cudfSchema = GpuColumnVector.from(readDataSchema)
val includedColumns = ctx.updatedReadSchema.getFieldNames.asScala
val parseOpts = ORCOptions.builder()
.withTimeUnit(DType.TIMESTAMP_MICROSECONDS)
diff --git a/sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetFileFormat.scala
index 7fb2f1fc2cf..71ea8455f02 100644
--- a/sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetFileFormat.scala
+++ b/sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetFileFormat.scala
@@ -40,6 +40,16 @@ object GpuParquetFileFormat {
val sqlConf = spark.sessionState.conf
val parquetOptions = new ParquetOptions(options, sqlConf)
+ if (!meta.conf.isParquetEnabled) {
+ meta.willNotWorkOnGpu("Parquet input and output has been disabled. To enable set" +
+ s"${RapidsConf.ENABLE_PARQUET} to true")
+ }
+
+ if (!meta.conf.isParquetWriteEnabled) {
+ meta.willNotWorkOnGpu("Parquet output has been disabled. To enable set" +
+ s"${RapidsConf.ENABLE_PARQUET_WRITE} to true")
+ }
+
parseCompressionType(parquetOptions.compressionCodecClassName)
.getOrElse(meta.willNotWorkOnGpu(
s"compression codec ${parquetOptions.compressionCodecClassName} is not supported"))
diff --git a/sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetScan.scala b/sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetScan.scala
index aca935e3b6c..f42c5e5da71 100644
--- a/sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetScan.scala
+++ b/sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetScan.scala
@@ -114,6 +114,16 @@ object GpuParquetScan {
sparkSession: SparkSession,
readSchema: StructType,
meta: RapidsMeta[_, _, _]): Unit = {
+ if (!meta.conf.isParquetEnabled) {
+ meta.willNotWorkOnGpu("Parquet input and output has been disabled. To enable set" +
+ s"${RapidsConf.ENABLE_PARQUET} to true")
+ }
+
+ if (!meta.conf.isParquetReadEnabled) {
+ meta.willNotWorkOnGpu("Parquet input has been disabled. To enable set" +
+ s"${RapidsConf.ENABLE_PARQUET_READ} to true")
+ }
+
for (field <- readSchema) {
if (!GpuColumnVector.isSupportedType(field.dataType)) {
meta.willNotWorkOnGpu(s"GpuParquetScan does not support fields of type ${field.dataType}")
diff --git a/sql-plugin/src/main/scala/ai/rapids/spark/GpuReadOrcFileFormat.scala b/sql-plugin/src/main/scala/ai/rapids/spark/GpuReadOrcFileFormat.scala
index de8cc926ce8..00808994f49 100644
--- a/sql-plugin/src/main/scala/ai/rapids/spark/GpuReadOrcFileFormat.scala
+++ b/sql-plugin/src/main/scala/ai/rapids/spark/GpuReadOrcFileFormat.scala
@@ -67,6 +67,9 @@ class GpuReadOrcFileFormat extends OrcFileFormat {
object GpuReadOrcFileFormat {
def tagSupport(meta: SparkPlanMeta[FileSourceScanExec]): Unit = {
val fsse = meta.wrapped
+ if (fsse.relation.options.getOrElse("mergeSchema", "false").toBoolean) {
+ meta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet")
+ }
GpuOrcScan.tagSupport(
fsse.sqlContext.sparkSession,
fsse.requiredSchema,
diff --git a/sql-plugin/src/main/scala/ai/rapids/spark/RapidsConf.scala b/sql-plugin/src/main/scala/ai/rapids/spark/RapidsConf.scala
index 0f079ac02cc..c11f9ef692a 100644
--- a/sql-plugin/src/main/scala/ai/rapids/spark/RapidsConf.scala
+++ b/sql-plugin/src/main/scala/ai/rapids/spark/RapidsConf.scala
@@ -409,6 +409,48 @@ object RapidsConf {
.booleanConf
.createWithDefault(false)
+ // FILE FORMATS
+ val ENABLE_PARQUET = conf("spark.rapids.sql.format.parquet.enabled")
+ .doc("When set to false disables all parquet input and output acceleration")
+ .booleanConf
+ .createWithDefault(true)
+
+ val ENABLE_PARQUET_READ = conf("spark.rapids.sql.format.parquet.read.enabled")
+ .doc("When set to false disables parquet input acceleration")
+ .booleanConf
+ .createWithDefault(true)
+
+ val ENABLE_PARQUET_WRITE = conf("spark.rapids.sql.format.parquet.write.enabled")
+ .doc("When set to false disables parquet output acceleration")
+ .booleanConf
+ .createWithDefault(true)
+
+ val ENABLE_ORC = conf("spark.rapids.sql.format.orc.enabled")
+ .doc("When set to false disables all orc input and output acceleration")
+ .booleanConf
+ .createWithDefault(true)
+
+ val ENABLE_ORC_READ = conf("spark.rapids.sql.format.orc.read.enabled")
+ .doc("When set to false disables orc input acceleration")
+ .booleanConf
+ .createWithDefault(true)
+
+ val ENABLE_ORC_WRITE = conf("spark.rapids.sql.format.orc.write.enabled")
+ .doc("When set to false disables orc output acceleration")
+ .booleanConf
+ .createWithDefault(true)
+
+ val ENABLE_CSV = conf("spark.rapids.sql.format.csv.enabled")
+ .doc("When set to false disables all csv input and output acceleration. " +
+ "(only input is currently supported anyways)")
+ .booleanConf
+ .createWithDefault(true)
+
+ val ENABLE_CSV_READ = conf("spark.rapids.sql.format.csv.read.enabled")
+ .doc("When set to false disables csv input acceleration")
+ .booleanConf
+ .createWithDefault(true)
+
// INTERNAL TEST AND DEBUG CONFIGS
val TEST_CONF = conf("spark.rapids.sql.test.enabled")
@@ -735,6 +777,22 @@ class RapidsConf(conf: Map[String, String]) extends Logging {
lazy val isCsvTimestampEnabled: Boolean = get(ENABLE_CSV_TIMESTAMPS)
+ lazy val isParquetEnabled: Boolean = get(ENABLE_PARQUET)
+
+ lazy val isParquetReadEnabled: Boolean = get(ENABLE_PARQUET_READ)
+
+ lazy val isParquetWriteEnabled: Boolean = get(ENABLE_PARQUET_WRITE)
+
+ lazy val isOrcEnabled: Boolean = get(ENABLE_ORC)
+
+ lazy val isOrcReadEnabled: Boolean = get(ENABLE_ORC_READ)
+
+ lazy val isOrcWriteEnabled: Boolean = get(ENABLE_ORC_WRITE)
+
+ lazy val isCsvEnabled: Boolean = get(ENABLE_CSV)
+
+ lazy val isCsvReadEnabled: Boolean = get(ENABLE_CSV_READ)
+
lazy val shuffleTransportEnabled: Boolean = get(SHUFFLE_TRANSPORT_ENABLE)
lazy val shuffleTransportClassName: String = get(SHUFFLE_TRANSPORT_CLASS_NAME)
diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala
index ad3a69f80ea..f8fbdc8bc31 100644
--- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala
@@ -34,13 +34,24 @@ object GpuOrcFileFormat extends Logging {
def tagGpuSupport(meta: RapidsMeta[_, _, _],
spark: SparkSession,
options: Map[String, String]): Option[GpuOrcFileFormat] = {
+
+ if (!meta.conf.isOrcEnabled) {
+ meta.willNotWorkOnGpu("ORC input and output has been disabled. To enable set" +
+ s"${RapidsConf.ENABLE_ORC} to true")
+ }
+
+ if (!meta.conf.isOrcWriteEnabled) {
+ meta.willNotWorkOnGpu("ORC output has been disabled. To enable set" +
+ s"${RapidsConf.ENABLE_ORC_WRITE} to true")
+ }
+
val sqlConf = spark.sessionState.conf
val parameters = CaseInsensitiveMap(options)
case class ConfDataForTagging(orcConf: OrcConf, defaultValue: Any, message: String)
- def tagIfOrcOrHiveConfNotSupported(params: ConfDataForTagging) = {
+ def tagIfOrcOrHiveConfNotSupported(params: ConfDataForTagging): Unit = {
val conf = params.orcConf
val defaultValue = params.defaultValue
val message = params.message