From 05f5ed5a1d4cf38f7974b2a827db40d1e4286943 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 11 Jun 2020 12:41:32 -0500 Subject: [PATCH 1/3] disable merge schema for orc reads --- .../scala/ai/rapids/spark/GpuOrcScan.scala | 21 +++++++++++-------- .../rapids/spark/GpuReadOrcFileFormat.scala | 3 +++ 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/sql-plugin/src/main/scala/ai/rapids/spark/GpuOrcScan.scala b/sql-plugin/src/main/scala/ai/rapids/spark/GpuOrcScan.scala index b39d209dffe..ddf3711c7c6 100644 --- a/sql-plugin/src/main/scala/ai/rapids/spark/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/ai/rapids/spark/GpuOrcScan.scala @@ -35,7 +35,7 @@ import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.io.DiskRangeList -import org.apache.orc.{CompressionKind, DataReader, OrcConf, OrcFile, OrcProto, PhysicalWriter, Reader, StripeInformation, TypeDescription} +import org.apache.orc.{DataReader, OrcConf, OrcFile, OrcProto, PhysicalWriter, Reader, StripeInformation, TypeDescription} import org.apache.orc.impl._ import org.apache.orc.impl.RecordReaderImpl.SargApplier import org.apache.orc.mapred.OrcInputFormat @@ -111,6 +111,9 @@ object GpuOrcScan { def tagSupport(scanMeta: ScanMeta[OrcScan]): Unit = { val scan = scanMeta.wrapped val schema = StructType(scan.readDataSchema ++ scan.readPartitionSchema) + if (scan.options.getBoolean("mergeSchema", false)) { + scanMeta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet") + } tagSupport(scan.sparkSession, schema, scanMeta) } @@ -118,6 +121,10 @@ object GpuOrcScan { sparkSession: SparkSession, schema: StructType, meta: RapidsMeta[_, _, _]): Unit = { + if (sparkSession.conf + .getOption("spark.sql.orc.mergeSchema").exists(_.toBoolean)) { + meta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet") + } schema.foreach { field => if (!GpuColumnVector.isSupportedType(field.dataType)) { meta.willNotWorkOnGpu(s"GpuOrcScan does not support fields of type ${field.dataType}") @@ -434,9 +441,6 @@ class GpuOrcPartitionReader( OrcOutputStripe(infoBuilder, outputStripeFooter, rangeCreator.get) } - private def estimateRowCount(stripes: Seq[OrcOutputStripe]): Long = - stripes.map(_.infoBuilder.getNumberOfRows).sum - private def estimateOutputSize(stripes: Seq[OrcOutputStripe]): Long = { // start with header magic var size: Long = OrcFile.MAGIC.length @@ -649,12 +653,12 @@ class GpuOrcPartitionReader( } } - private def readPartFile(stripes: Seq[OrcOutputStripe]): (HostMemoryBuffer, Long, Long) = { + private def readPartFile(stripes: Seq[OrcOutputStripe]): (HostMemoryBuffer, Long) = { val nvtxRange = new NvtxWithMetrics("Buffer file split", NvtxColor.YELLOW, metrics("bufferTime")) try { if (stripes.isEmpty) { - return (null, 0L, 0) + return (null, 0L) } val hostBufferSize = estimateOutputSize(stripes) @@ -664,7 +668,7 @@ class GpuOrcPartitionReader( val out = new HostMemoryOutputStream(hmb) writeOrcOutputFile(out, stripes) succeeded = true - (hmb, out.getPos, estimateRowCount(stripes)) + (hmb, out.getPos) } finally { if (!succeeded) { hmb.close() @@ -676,7 +680,7 @@ class GpuOrcPartitionReader( } private def readToTable(stripes: Seq[OrcOutputStripe]): Option[Table] = { - val (dataBuffer, dataSize, rowCount) = readPartFile(stripes) + val (dataBuffer, dataSize) = readPartFile(stripes) try { if (dataSize == 0) { None @@ -684,7 +688,6 @@ class GpuOrcPartitionReader( if (debugDumpPrefix != null) { dumpOrcData(dataBuffer, dataSize) } - val cudfSchema = GpuColumnVector.from(readDataSchema) val includedColumns = ctx.updatedReadSchema.getFieldNames.asScala val parseOpts = ORCOptions.builder() .withTimeUnit(DType.TIMESTAMP_MICROSECONDS) diff --git a/sql-plugin/src/main/scala/ai/rapids/spark/GpuReadOrcFileFormat.scala b/sql-plugin/src/main/scala/ai/rapids/spark/GpuReadOrcFileFormat.scala index de8cc926ce8..00808994f49 100644 --- a/sql-plugin/src/main/scala/ai/rapids/spark/GpuReadOrcFileFormat.scala +++ b/sql-plugin/src/main/scala/ai/rapids/spark/GpuReadOrcFileFormat.scala @@ -67,6 +67,9 @@ class GpuReadOrcFileFormat extends OrcFileFormat { object GpuReadOrcFileFormat { def tagSupport(meta: SparkPlanMeta[FileSourceScanExec]): Unit = { val fsse = meta.wrapped + if (fsse.relation.options.getOrElse("mergeSchema", "false").toBoolean) { + meta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet") + } GpuOrcScan.tagSupport( fsse.sqlContext.sparkSession, fsse.requiredSchema, From 4638fe087dff9f983d73a26ac9d5b9f6916bb6ba Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 11 Jun 2020 13:45:24 -0500 Subject: [PATCH 2/3] Added in configs to disable various file formats --- docs/compatibility.md | 6 +- docs/configs.md | 8 +++ .../ai/rapids/spark/GpuBatchScanExec.scala | 10 ++++ .../scala/ai/rapids/spark/GpuOrcScan.scala | 10 ++++ .../rapids/spark/GpuParquetFileFormat.scala | 10 ++++ .../ai/rapids/spark/GpuParquetScan.scala | 10 ++++ .../scala/ai/rapids/spark/RapidsConf.scala | 58 +++++++++++++++++++ .../spark/sql/rapids/GpuOrcFileFormat.scala | 13 ++++- 8 files changed, 121 insertions(+), 4 deletions(-) diff --git a/docs/compatibility.md b/docs/compatibility.md index b5393a428bc..1200f2cdd8f 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -61,9 +61,9 @@ Spark is very strict when reading CSV and if the data does not conform with the exactly it will result in a `null` value. The underlying parser that the SQL plugin uses is much more lenient. If you have badly formatted CSV data you may get data back instead of nulls. If this is a problem you can disable the CSV reader by setting the config -[`spark.rapids.sql.input.CSVScan`](configs.md#sql.input.CSVScan) to `false`. Because the speed up -is so large and the issues typically only show up in error conditions we felt it was worth having -the CSV reader enabled by default. +[`spark.rapids.sql.format.csv.read.enabled`](configs.md#sql.format.csv.read.enabled) to `false`. +Because the speed up is so large and the issues typically only show up in error conditions we felt +it was worth having the CSV reader enabled by default. There are also discrepancies/issues with specific types that are detailed below. diff --git a/docs/configs.md b/docs/configs.md index fed4b80e9f0..cecbbb63cb8 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -44,6 +44,14 @@ Name | Description | Default Value spark.rapids.sql.csvTimestamps.enabled|When set to true, enables the CSV parser to read timestamps. The default output format for Spark includes a timezone at the end. Anything except the UTC timezone is not supported. Timestamps after 2038 and before 1902 are also not supported.|false spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NONE +spark.rapids.sql.format.csv.enabled|When set to false disables all csv input and output acceleration. (only input is currently supported anyways)|true +spark.rapids.sql.format.csv.read.enabled|When set to false disables csv input acceleration|true +spark.rapids.sql.format.orc.enabled|When set to false disables all orc input and output acceleration|true +spark.rapids.sql.format.orc.read.enabled|When set to false disables orc input acceleration|true +spark.rapids.sql.format.orc.write.enabled|When set to false disables orc output acceleration|true +spark.rapids.sql.format.parquet.enabled|When set to false disables all parquet input and output acceleration|true +spark.rapids.sql.format.parquet.read.enabled|When set to false disables parquet input acceleration|true +spark.rapids.sql.format.parquet.write.enabled|When set to false disables parquet output acceleration|true spark.rapids.sql.hasNans|Config to indicate if your data has NaN's. Cudf doesn't currently support NaN's properly so you can get corrupt data if you have NaN's in your data and it runs on the GPU.|true spark.rapids.sql.hashOptimizeSort.enabled|Whether sorts should be inserted after some hashed operations to improve output ordering. This can improve output file sizes when saving to columnar formats.|false spark.rapids.sql.improvedFloatOps.enabled|For some floating point operations spark uses one way to compute the value and the underlying cudf implementation can use an improved algorithm. In some cases this can result in cudf producing an answer when spark overflows. Because this is not as compatible with spark, we have it disabled by default.|false diff --git a/sql-plugin/src/main/scala/ai/rapids/spark/GpuBatchScanExec.scala b/sql-plugin/src/main/scala/ai/rapids/spark/GpuBatchScanExec.scala index d5b6d76041d..37e9478f008 100644 --- a/sql-plugin/src/main/scala/ai/rapids/spark/GpuBatchScanExec.scala +++ b/sql-plugin/src/main/scala/ai/rapids/spark/GpuBatchScanExec.scala @@ -135,6 +135,16 @@ object GpuCSVScan { sparkSession.sessionState.conf.sessionLocalTimeZone, sparkSession.sessionState.conf.columnNameOfCorruptRecord) + if (!meta.conf.isCsvEnabled) { + meta.willNotWorkOnGpu("CSV input and output has been disabled. To enable set" + + s"${RapidsConf.ENABLE_CSV} to true") + } + + if (!meta.conf.isCsvReadEnabled) { + meta.willNotWorkOnGpu("CSV input has been disabled. To enable set" + + s"${RapidsConf.ENABLE_CSV_READ} to true") + } + if (!parsedOptions.enforceSchema) { meta.willNotWorkOnGpu("GpuCSVScan always enforces schemas") } diff --git a/sql-plugin/src/main/scala/ai/rapids/spark/GpuOrcScan.scala b/sql-plugin/src/main/scala/ai/rapids/spark/GpuOrcScan.scala index ddf3711c7c6..37da30576ab 100644 --- a/sql-plugin/src/main/scala/ai/rapids/spark/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/ai/rapids/spark/GpuOrcScan.scala @@ -121,6 +121,16 @@ object GpuOrcScan { sparkSession: SparkSession, schema: StructType, meta: RapidsMeta[_, _, _]): Unit = { + if (!meta.conf.isOrcEnabled) { + meta.willNotWorkOnGpu("ORC input and output has been disabled. To enable set" + + s"${RapidsConf.ENABLE_ORC} to true") + } + + if (!meta.conf.isOrcReadEnabled) { + meta.willNotWorkOnGpu("ORC input has been disabled. To enable set" + + s"${RapidsConf.ENABLE_ORC_READ} to true") + } + if (sparkSession.conf .getOption("spark.sql.orc.mergeSchema").exists(_.toBoolean)) { meta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet") diff --git a/sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetFileFormat.scala index 7fb2f1fc2cf..71ea8455f02 100644 --- a/sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetFileFormat.scala @@ -40,6 +40,16 @@ object GpuParquetFileFormat { val sqlConf = spark.sessionState.conf val parquetOptions = new ParquetOptions(options, sqlConf) + if (!meta.conf.isParquetEnabled) { + meta.willNotWorkOnGpu("Parquet input and output has been disabled. To enable set" + + s"${RapidsConf.ENABLE_PARQUET} to true") + } + + if (!meta.conf.isParquetWriteEnabled) { + meta.willNotWorkOnGpu("Parquet output has been disabled. To enable set" + + s"${RapidsConf.ENABLE_PARQUET_WRITE} to true") + } + parseCompressionType(parquetOptions.compressionCodecClassName) .getOrElse(meta.willNotWorkOnGpu( s"compression codec ${parquetOptions.compressionCodecClassName} is not supported")) diff --git a/sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetScan.scala b/sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetScan.scala index aca935e3b6c..f42c5e5da71 100644 --- a/sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetScan.scala @@ -114,6 +114,16 @@ object GpuParquetScan { sparkSession: SparkSession, readSchema: StructType, meta: RapidsMeta[_, _, _]): Unit = { + if (!meta.conf.isParquetEnabled) { + meta.willNotWorkOnGpu("Parquet input and output has been disabled. To enable set" + + s"${RapidsConf.ENABLE_PARQUET} to true") + } + + if (!meta.conf.isParquetReadEnabled) { + meta.willNotWorkOnGpu("Parquet input has been disabled. To enable set" + + s"${RapidsConf.ENABLE_PARQUET_READ} to true") + } + for (field <- readSchema) { if (!GpuColumnVector.isSupportedType(field.dataType)) { meta.willNotWorkOnGpu(s"GpuParquetScan does not support fields of type ${field.dataType}") diff --git a/sql-plugin/src/main/scala/ai/rapids/spark/RapidsConf.scala b/sql-plugin/src/main/scala/ai/rapids/spark/RapidsConf.scala index 0f079ac02cc..c11f9ef692a 100644 --- a/sql-plugin/src/main/scala/ai/rapids/spark/RapidsConf.scala +++ b/sql-plugin/src/main/scala/ai/rapids/spark/RapidsConf.scala @@ -409,6 +409,48 @@ object RapidsConf { .booleanConf .createWithDefault(false) + // FILE FORMATS + val ENABLE_PARQUET = conf("spark.rapids.sql.format.parquet.enabled") + .doc("When set to false disables all parquet input and output acceleration") + .booleanConf + .createWithDefault(true) + + val ENABLE_PARQUET_READ = conf("spark.rapids.sql.format.parquet.read.enabled") + .doc("When set to false disables parquet input acceleration") + .booleanConf + .createWithDefault(true) + + val ENABLE_PARQUET_WRITE = conf("spark.rapids.sql.format.parquet.write.enabled") + .doc("When set to false disables parquet output acceleration") + .booleanConf + .createWithDefault(true) + + val ENABLE_ORC = conf("spark.rapids.sql.format.orc.enabled") + .doc("When set to false disables all orc input and output acceleration") + .booleanConf + .createWithDefault(true) + + val ENABLE_ORC_READ = conf("spark.rapids.sql.format.orc.read.enabled") + .doc("When set to false disables orc input acceleration") + .booleanConf + .createWithDefault(true) + + val ENABLE_ORC_WRITE = conf("spark.rapids.sql.format.orc.write.enabled") + .doc("When set to false disables orc output acceleration") + .booleanConf + .createWithDefault(true) + + val ENABLE_CSV = conf("spark.rapids.sql.format.csv.enabled") + .doc("When set to false disables all csv input and output acceleration. " + + "(only input is currently supported anyways)") + .booleanConf + .createWithDefault(true) + + val ENABLE_CSV_READ = conf("spark.rapids.sql.format.csv.read.enabled") + .doc("When set to false disables csv input acceleration") + .booleanConf + .createWithDefault(true) + // INTERNAL TEST AND DEBUG CONFIGS val TEST_CONF = conf("spark.rapids.sql.test.enabled") @@ -735,6 +777,22 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isCsvTimestampEnabled: Boolean = get(ENABLE_CSV_TIMESTAMPS) + lazy val isParquetEnabled: Boolean = get(ENABLE_PARQUET) + + lazy val isParquetReadEnabled: Boolean = get(ENABLE_PARQUET_READ) + + lazy val isParquetWriteEnabled: Boolean = get(ENABLE_PARQUET_WRITE) + + lazy val isOrcEnabled: Boolean = get(ENABLE_ORC) + + lazy val isOrcReadEnabled: Boolean = get(ENABLE_ORC_READ) + + lazy val isOrcWriteEnabled: Boolean = get(ENABLE_ORC_WRITE) + + lazy val isCsvEnabled: Boolean = get(ENABLE_CSV) + + lazy val isCsvReadEnabled: Boolean = get(ENABLE_CSV_READ) + lazy val shuffleTransportEnabled: Boolean = get(SHUFFLE_TRANSPORT_ENABLE) lazy val shuffleTransportClassName: String = get(SHUFFLE_TRANSPORT_CLASS_NAME) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala index ad3a69f80ea..f8fbdc8bc31 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala @@ -34,13 +34,24 @@ object GpuOrcFileFormat extends Logging { def tagGpuSupport(meta: RapidsMeta[_, _, _], spark: SparkSession, options: Map[String, String]): Option[GpuOrcFileFormat] = { + + if (!meta.conf.isOrcEnabled) { + meta.willNotWorkOnGpu("ORC input and output has been disabled. To enable set" + + s"${RapidsConf.ENABLE_ORC} to true") + } + + if (!meta.conf.isOrcWriteEnabled) { + meta.willNotWorkOnGpu("ORC output has been disabled. To enable set" + + s"${RapidsConf.ENABLE_ORC_WRITE} to true") + } + val sqlConf = spark.sessionState.conf val parameters = CaseInsensitiveMap(options) case class ConfDataForTagging(orcConf: OrcConf, defaultValue: Any, message: String) - def tagIfOrcOrHiveConfNotSupported(params: ConfDataForTagging) = { + def tagIfOrcOrHiveConfNotSupported(params: ConfDataForTagging): Unit = { val conf = params.orcConf val defaultValue = params.defaultValue val message = params.message From c19f778ddfef32ba7cb19d248f608e78b6f43d87 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 11 Jun 2020 17:32:53 -0500 Subject: [PATCH 3/3] Added in tests --- integration_tests/src/main/python/csv_test.py | 19 +++++++++++++++++++ integration_tests/src/main/python/orc_test.py | 17 +++++++++++++++++ .../src/main/python/parquet_test.py | 17 +++++++++++++++++ 3 files changed, 53 insertions(+) diff --git a/integration_tests/src/main/python/csv_test.py b/integration_tests/src/main/python/csv_test.py index ddd1d35526a..5bfb4cd6f43 100644 --- a/integration_tests/src/main/python/csv_test.py +++ b/integration_tests/src/main/python/csv_test.py @@ -151,6 +151,25 @@ def test_round_trip(spark_tmp_path, data_gen): lambda spark : spark.read.schema(schema).csv(data_path), conf=_enable_ts_conf) +@allow_non_gpu('FileSourceScanExec') +@pytest.mark.parametrize('read_func', [read_csv_df, read_csv_sql]) +@pytest.mark.parametrize('disable_conf', ['spark.rapids.sql.format.csv.enabled', 'spark.rapids.sql.format.csv.read.enabled']) +def test_csv_fallback(spark_tmp_path, read_func, disable_conf): + data_gens =[ + StringGen('(\\w| |\t|\ud720){0,10}', nullable=False), + byte_gen, short_gen, int_gen, long_gen, boolean_gen, date_gen] + + gen_list = [('_c' + str(i), gen) for i, gen in enumerate(data_gens)] + gen = StructGen(gen_list, nullable=False) + data_path = spark_tmp_path + '/CSV_DATA' + schema = gen.data_type + reader = read_func(data_path, schema, False, ',') + with_cpu_session( + lambda spark : gen_df(spark, gen).write.csv(data_path)) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : reader(spark).select(f.col('*'), f.col('_c2') + f.col('_c3')), + conf={disable_conf: 'false'}) + csv_supported_date_formats = ['yyyy-MM-dd', 'yyyy/MM/dd', 'yyyy-MM', 'yyyy/MM', 'MM-yyyy', 'MM/yyyy', 'MM-dd-yyyy', 'MM/dd/yyyy'] @pytest.mark.parametrize('date_format', csv_supported_date_formats, ids=idfn) diff --git a/integration_tests/src/main/python/orc_test.py b/integration_tests/src/main/python/orc_test.py index 35df73c57a2..4d31b747f78 100644 --- a/integration_tests/src/main/python/orc_test.py +++ b/integration_tests/src/main/python/orc_test.py @@ -39,6 +39,23 @@ def test_basic_read(std_input_path, name, read_func): pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/131')), pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/131'))] +@allow_non_gpu('FileSourceScanExec') +@pytest.mark.parametrize('read_func', [read_orc_df, read_orc_sql]) +@pytest.mark.parametrize('disable_conf', ['spark.rapids.sql.format.orc.enabled', 'spark.rapids.sql.format.orc.read.enabled']) +def test_orc_fallback(spark_tmp_path, read_func, disable_conf): + data_gens =[string_gen, + byte_gen, short_gen, int_gen, long_gen, boolean_gen] + + gen_list = [('_c' + str(i), gen) for i, gen in enumerate(data_gens)] + gen = StructGen(gen_list, nullable=False) + data_path = spark_tmp_path + '/PARQUET_DATA' + reader = read_func(data_path) + with_cpu_session( + lambda spark : gen_df(spark, gen).write.orc(data_path)) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : reader(spark).select(f.col('*'), f.col('_c2') + f.col('_c3')), + conf={disable_conf: 'false'}) + @pytest.mark.parametrize('orc_gens', orc_gens_list, ids=idfn) @pytest.mark.parametrize('read_func', [read_orc_df, read_orc_sql]) def test_read_round_trip(spark_tmp_path, orc_gens, read_func): diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index 6db33d7e677..3583c6f16ee 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -43,6 +43,23 @@ def test_read_round_trip(spark_tmp_path, parquet_gens, read_func): assert_gpu_and_cpu_are_equal_collect( read_func(data_path)) +@allow_non_gpu('FileSourceScanExec') +@pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) +@pytest.mark.parametrize('disable_conf', ['spark.rapids.sql.format.parquet.enabled', 'spark.rapids.sql.format.parquet.read.enabled']) +def test_parquet_fallback(spark_tmp_path, read_func, disable_conf): + data_gens =[string_gen, + byte_gen, short_gen, int_gen, long_gen, boolean_gen] + + gen_list = [('_c' + str(i), gen) for i, gen in enumerate(data_gens)] + gen = StructGen(gen_list, nullable=False) + data_path = spark_tmp_path + '/PARQUET_DATA' + reader = read_func(data_path) + with_cpu_session( + lambda spark : gen_df(spark, gen).write.parquet(data_path)) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : reader(spark).select(f.col('*'), f.col('_c2') + f.col('_c3')), + conf={disable_conf: 'false'}) + parquet_compress_options = ['none', 'uncompressed', 'snappy', 'gzip'] # The following need extra jars 'lzo', 'lz4', 'brotli', 'zstd' # https://github.com/NVIDIA/spark-rapids/issues/143