Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Orc merge schema fallback and FileScan format configs #158

Merged
merged 3 commits into from
Jun 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ Spark is very strict when reading CSV and if the data does not conform with the
exactly it will result in a `null` value. The underlying parser that the SQL plugin uses is much
more lenient. If you have badly formatted CSV data you may get data back instead of nulls.
If this is a problem you can disable the CSV reader by setting the config
[`spark.rapids.sql.input.CSVScan`](configs.md#sql.input.CSVScan) to `false`. Because the speed up
is so large and the issues typically only show up in error conditions we felt it was worth having
the CSV reader enabled by default.
[`spark.rapids.sql.format.csv.read.enabled`](configs.md#sql.format.csv.read.enabled) to `false`.
Because the speed up is so large and the issues typically only show up in error conditions we felt
it was worth having the CSV reader enabled by default.

There are also discrepancies/issues with specific types that are detailed below.

Expand Down
8 changes: 8 additions & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ Name | Description | Default Value
<a name="sql.csvTimestamps.enabled"></a>spark.rapids.sql.csvTimestamps.enabled|When set to true, enables the CSV parser to read timestamps. The default output format for Spark includes a timezone at the end. Anything except the UTC timezone is not supported. Timestamps after 2038 and before 1902 are also not supported.|false
<a name="sql.enabled"></a>spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true
<a name="sql.explain"></a>spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NONE
<a name="sql.format.csv.enabled"></a>spark.rapids.sql.format.csv.enabled|When set to false disables all csv input and output acceleration. (only input is currently supported anyways)|true
<a name="sql.format.csv.read.enabled"></a>spark.rapids.sql.format.csv.read.enabled|When set to false disables csv input acceleration|true
<a name="sql.format.orc.enabled"></a>spark.rapids.sql.format.orc.enabled|When set to false disables all orc input and output acceleration|true
<a name="sql.format.orc.read.enabled"></a>spark.rapids.sql.format.orc.read.enabled|When set to false disables orc input acceleration|true
<a name="sql.format.orc.write.enabled"></a>spark.rapids.sql.format.orc.write.enabled|When set to false disables orc output acceleration|true
<a name="sql.format.parquet.enabled"></a>spark.rapids.sql.format.parquet.enabled|When set to false disables all parquet input and output acceleration|true
<a name="sql.format.parquet.read.enabled"></a>spark.rapids.sql.format.parquet.read.enabled|When set to false disables parquet input acceleration|true
<a name="sql.format.parquet.write.enabled"></a>spark.rapids.sql.format.parquet.write.enabled|When set to false disables parquet output acceleration|true
<a name="sql.hasNans"></a>spark.rapids.sql.hasNans|Config to indicate if your data has NaN's. Cudf doesn't currently support NaN's properly so you can get corrupt data if you have NaN's in your data and it runs on the GPU.|true
<a name="sql.hashOptimizeSort.enabled"></a>spark.rapids.sql.hashOptimizeSort.enabled|Whether sorts should be inserted after some hashed operations to improve output ordering. This can improve output file sizes when saving to columnar formats.|false
<a name="sql.improvedFloatOps.enabled"></a>spark.rapids.sql.improvedFloatOps.enabled|For some floating point operations spark uses one way to compute the value and the underlying cudf implementation can use an improved algorithm. In some cases this can result in cudf producing an answer when spark overflows. Because this is not as compatible with spark, we have it disabled by default.|false
Expand Down
19 changes: 19 additions & 0 deletions integration_tests/src/main/python/csv_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,25 @@ def test_round_trip(spark_tmp_path, data_gen):
lambda spark : spark.read.schema(schema).csv(data_path),
conf=_enable_ts_conf)

@allow_non_gpu('FileSourceScanExec')
@pytest.mark.parametrize('read_func', [read_csv_df, read_csv_sql])
@pytest.mark.parametrize('disable_conf', ['spark.rapids.sql.format.csv.enabled', 'spark.rapids.sql.format.csv.read.enabled'])
def test_csv_fallback(spark_tmp_path, read_func, disable_conf):
data_gens =[
StringGen('(\\w| |\t|\ud720){0,10}', nullable=False),
byte_gen, short_gen, int_gen, long_gen, boolean_gen, date_gen]

gen_list = [('_c' + str(i), gen) for i, gen in enumerate(data_gens)]
gen = StructGen(gen_list, nullable=False)
data_path = spark_tmp_path + '/CSV_DATA'
schema = gen.data_type
reader = read_func(data_path, schema, False, ',')
with_cpu_session(
lambda spark : gen_df(spark, gen).write.csv(data_path))
assert_gpu_and_cpu_are_equal_collect(
lambda spark : reader(spark).select(f.col('*'), f.col('_c2') + f.col('_c3')),
conf={disable_conf: 'false'})

csv_supported_date_formats = ['yyyy-MM-dd', 'yyyy/MM/dd', 'yyyy-MM', 'yyyy/MM',
'MM-yyyy', 'MM/yyyy', 'MM-dd-yyyy', 'MM/dd/yyyy']
@pytest.mark.parametrize('date_format', csv_supported_date_formats, ids=idfn)
Expand Down
17 changes: 17 additions & 0 deletions integration_tests/src/main/python/orc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,23 @@ def test_basic_read(std_input_path, name, read_func):
pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/131')),
pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/131'))]

@allow_non_gpu('FileSourceScanExec')
@pytest.mark.parametrize('read_func', [read_orc_df, read_orc_sql])
@pytest.mark.parametrize('disable_conf', ['spark.rapids.sql.format.orc.enabled', 'spark.rapids.sql.format.orc.read.enabled'])
def test_orc_fallback(spark_tmp_path, read_func, disable_conf):
data_gens =[string_gen,
byte_gen, short_gen, int_gen, long_gen, boolean_gen]

gen_list = [('_c' + str(i), gen) for i, gen in enumerate(data_gens)]
gen = StructGen(gen_list, nullable=False)
data_path = spark_tmp_path + '/PARQUET_DATA'
reader = read_func(data_path)
with_cpu_session(
lambda spark : gen_df(spark, gen).write.orc(data_path))
assert_gpu_and_cpu_are_equal_collect(
lambda spark : reader(spark).select(f.col('*'), f.col('_c2') + f.col('_c3')),
conf={disable_conf: 'false'})

@pytest.mark.parametrize('orc_gens', orc_gens_list, ids=idfn)
@pytest.mark.parametrize('read_func', [read_orc_df, read_orc_sql])
def test_read_round_trip(spark_tmp_path, orc_gens, read_func):
Expand Down
17 changes: 17 additions & 0 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ def test_read_round_trip(spark_tmp_path, parquet_gens, read_func):
assert_gpu_and_cpu_are_equal_collect(
read_func(data_path))

@allow_non_gpu('FileSourceScanExec')
@pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql])
@pytest.mark.parametrize('disable_conf', ['spark.rapids.sql.format.parquet.enabled', 'spark.rapids.sql.format.parquet.read.enabled'])
jlowe marked this conversation as resolved.
Show resolved Hide resolved
def test_parquet_fallback(spark_tmp_path, read_func, disable_conf):
data_gens =[string_gen,
byte_gen, short_gen, int_gen, long_gen, boolean_gen]

gen_list = [('_c' + str(i), gen) for i, gen in enumerate(data_gens)]
gen = StructGen(gen_list, nullable=False)
data_path = spark_tmp_path + '/PARQUET_DATA'
reader = read_func(data_path)
with_cpu_session(
lambda spark : gen_df(spark, gen).write.parquet(data_path))
assert_gpu_and_cpu_are_equal_collect(
jlowe marked this conversation as resolved.
Show resolved Hide resolved
lambda spark : reader(spark).select(f.col('*'), f.col('_c2') + f.col('_c3')),
conf={disable_conf: 'false'})

parquet_compress_options = ['none', 'uncompressed', 'snappy', 'gzip']
# The following need extra jars 'lzo', 'lz4', 'brotli', 'zstd'
# https://github.com/NVIDIA/spark-rapids/issues/143
Expand Down
10 changes: 10 additions & 0 deletions sql-plugin/src/main/scala/ai/rapids/spark/GpuBatchScanExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@ object GpuCSVScan {
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)

if (!meta.conf.isCsvEnabled) {
meta.willNotWorkOnGpu("CSV input and output has been disabled. To enable set" +
s"${RapidsConf.ENABLE_CSV} to true")
}

if (!meta.conf.isCsvReadEnabled) {
meta.willNotWorkOnGpu("CSV input has been disabled. To enable set" +
s"${RapidsConf.ENABLE_CSV_READ} to true")
}

if (!parsedOptions.enforceSchema) {
meta.willNotWorkOnGpu("GpuCSVScan always enforces schemas")
}
Expand Down
31 changes: 22 additions & 9 deletions sql-plugin/src/main/scala/ai/rapids/spark/GpuOrcScan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.io.DiskRangeList
import org.apache.orc.{CompressionKind, DataReader, OrcConf, OrcFile, OrcProto, PhysicalWriter, Reader, StripeInformation, TypeDescription}
import org.apache.orc.{DataReader, OrcConf, OrcFile, OrcProto, PhysicalWriter, Reader, StripeInformation, TypeDescription}
import org.apache.orc.impl._
import org.apache.orc.impl.RecordReaderImpl.SargApplier
import org.apache.orc.mapred.OrcInputFormat
Expand Down Expand Up @@ -111,13 +111,30 @@ object GpuOrcScan {
def tagSupport(scanMeta: ScanMeta[OrcScan]): Unit = {
val scan = scanMeta.wrapped
val schema = StructType(scan.readDataSchema ++ scan.readPartitionSchema)
if (scan.options.getBoolean("mergeSchema", false)) {
scanMeta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet")
}
tagSupport(scan.sparkSession, schema, scanMeta)
}

def tagSupport(
sparkSession: SparkSession,
schema: StructType,
meta: RapidsMeta[_, _, _]): Unit = {
if (!meta.conf.isOrcEnabled) {
meta.willNotWorkOnGpu("ORC input and output has been disabled. To enable set" +
s"${RapidsConf.ENABLE_ORC} to true")
}

if (!meta.conf.isOrcReadEnabled) {
meta.willNotWorkOnGpu("ORC input has been disabled. To enable set" +
s"${RapidsConf.ENABLE_ORC_READ} to true")
}

if (sparkSession.conf
.getOption("spark.sql.orc.mergeSchema").exists(_.toBoolean)) {
meta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet")
}
schema.foreach { field =>
if (!GpuColumnVector.isSupportedType(field.dataType)) {
meta.willNotWorkOnGpu(s"GpuOrcScan does not support fields of type ${field.dataType}")
Expand Down Expand Up @@ -434,9 +451,6 @@ class GpuOrcPartitionReader(
OrcOutputStripe(infoBuilder, outputStripeFooter, rangeCreator.get)
}

private def estimateRowCount(stripes: Seq[OrcOutputStripe]): Long =
stripes.map(_.infoBuilder.getNumberOfRows).sum

private def estimateOutputSize(stripes: Seq[OrcOutputStripe]): Long = {
// start with header magic
var size: Long = OrcFile.MAGIC.length
Expand Down Expand Up @@ -649,12 +663,12 @@ class GpuOrcPartitionReader(
}
}

private def readPartFile(stripes: Seq[OrcOutputStripe]): (HostMemoryBuffer, Long, Long) = {
private def readPartFile(stripes: Seq[OrcOutputStripe]): (HostMemoryBuffer, Long) = {
val nvtxRange = new NvtxWithMetrics("Buffer file split", NvtxColor.YELLOW,
metrics("bufferTime"))
try {
if (stripes.isEmpty) {
return (null, 0L, 0)
return (null, 0L)
}

val hostBufferSize = estimateOutputSize(stripes)
Expand All @@ -664,7 +678,7 @@ class GpuOrcPartitionReader(
val out = new HostMemoryOutputStream(hmb)
writeOrcOutputFile(out, stripes)
succeeded = true
(hmb, out.getPos, estimateRowCount(stripes))
(hmb, out.getPos)
} finally {
if (!succeeded) {
hmb.close()
Expand All @@ -676,15 +690,14 @@ class GpuOrcPartitionReader(
}

private def readToTable(stripes: Seq[OrcOutputStripe]): Option[Table] = {
val (dataBuffer, dataSize, rowCount) = readPartFile(stripes)
val (dataBuffer, dataSize) = readPartFile(stripes)
try {
if (dataSize == 0) {
None
} else {
if (debugDumpPrefix != null) {
dumpOrcData(dataBuffer, dataSize)
}
val cudfSchema = GpuColumnVector.from(readDataSchema)
val includedColumns = ctx.updatedReadSchema.getFieldNames.asScala
val parseOpts = ORCOptions.builder()
.withTimeUnit(DType.TIMESTAMP_MICROSECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ object GpuParquetFileFormat {
val sqlConf = spark.sessionState.conf
val parquetOptions = new ParquetOptions(options, sqlConf)

if (!meta.conf.isParquetEnabled) {
meta.willNotWorkOnGpu("Parquet input and output has been disabled. To enable set" +
s"${RapidsConf.ENABLE_PARQUET} to true")
}

if (!meta.conf.isParquetWriteEnabled) {
meta.willNotWorkOnGpu("Parquet output has been disabled. To enable set" +
s"${RapidsConf.ENABLE_PARQUET_WRITE} to true")
}

parseCompressionType(parquetOptions.compressionCodecClassName)
.getOrElse(meta.willNotWorkOnGpu(
s"compression codec ${parquetOptions.compressionCodecClassName} is not supported"))
Expand Down
10 changes: 10 additions & 0 deletions sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetScan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ object GpuParquetScan {
sparkSession: SparkSession,
readSchema: StructType,
meta: RapidsMeta[_, _, _]): Unit = {
if (!meta.conf.isParquetEnabled) {
meta.willNotWorkOnGpu("Parquet input and output has been disabled. To enable set" +
s"${RapidsConf.ENABLE_PARQUET} to true")
}

if (!meta.conf.isParquetReadEnabled) {
meta.willNotWorkOnGpu("Parquet input has been disabled. To enable set" +
s"${RapidsConf.ENABLE_PARQUET_READ} to true")
}

for (field <- readSchema) {
if (!GpuColumnVector.isSupportedType(field.dataType)) {
meta.willNotWorkOnGpu(s"GpuParquetScan does not support fields of type ${field.dataType}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ class GpuReadOrcFileFormat extends OrcFileFormat {
object GpuReadOrcFileFormat {
def tagSupport(meta: SparkPlanMeta[FileSourceScanExec]): Unit = {
val fsse = meta.wrapped
if (fsse.relation.options.getOrElse("mergeSchema", "false").toBoolean) {
meta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet")
}
GpuOrcScan.tagSupport(
fsse.sqlContext.sparkSession,
fsse.requiredSchema,
Expand Down
58 changes: 58 additions & 0 deletions sql-plugin/src/main/scala/ai/rapids/spark/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,48 @@ object RapidsConf {
.booleanConf
.createWithDefault(false)

// FILE FORMATS
val ENABLE_PARQUET = conf("spark.rapids.sql.format.parquet.enabled")
.doc("When set to false disables all parquet input and output acceleration")
.booleanConf
.createWithDefault(true)

val ENABLE_PARQUET_READ = conf("spark.rapids.sql.format.parquet.read.enabled")
.doc("When set to false disables parquet input acceleration")
.booleanConf
.createWithDefault(true)

val ENABLE_PARQUET_WRITE = conf("spark.rapids.sql.format.parquet.write.enabled")
.doc("When set to false disables parquet output acceleration")
.booleanConf
.createWithDefault(true)

val ENABLE_ORC = conf("spark.rapids.sql.format.orc.enabled")
.doc("When set to false disables all orc input and output acceleration")
.booleanConf
.createWithDefault(true)

val ENABLE_ORC_READ = conf("spark.rapids.sql.format.orc.read.enabled")
.doc("When set to false disables orc input acceleration")
.booleanConf
.createWithDefault(true)

val ENABLE_ORC_WRITE = conf("spark.rapids.sql.format.orc.write.enabled")
.doc("When set to false disables orc output acceleration")
.booleanConf
.createWithDefault(true)

val ENABLE_CSV = conf("spark.rapids.sql.format.csv.enabled")
.doc("When set to false disables all csv input and output acceleration. " +
"(only input is currently supported anyways)")
.booleanConf
.createWithDefault(true)

val ENABLE_CSV_READ = conf("spark.rapids.sql.format.csv.read.enabled")
.doc("When set to false disables csv input acceleration")
.booleanConf
.createWithDefault(true)

// INTERNAL TEST AND DEBUG CONFIGS

val TEST_CONF = conf("spark.rapids.sql.test.enabled")
Expand Down Expand Up @@ -735,6 +777,22 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isCsvTimestampEnabled: Boolean = get(ENABLE_CSV_TIMESTAMPS)

lazy val isParquetEnabled: Boolean = get(ENABLE_PARQUET)

lazy val isParquetReadEnabled: Boolean = get(ENABLE_PARQUET_READ)

lazy val isParquetWriteEnabled: Boolean = get(ENABLE_PARQUET_WRITE)

lazy val isOrcEnabled: Boolean = get(ENABLE_ORC)

lazy val isOrcReadEnabled: Boolean = get(ENABLE_ORC_READ)

lazy val isOrcWriteEnabled: Boolean = get(ENABLE_ORC_WRITE)

lazy val isCsvEnabled: Boolean = get(ENABLE_CSV)

lazy val isCsvReadEnabled: Boolean = get(ENABLE_CSV_READ)

lazy val shuffleTransportEnabled: Boolean = get(SHUFFLE_TRANSPORT_ENABLE)

lazy val shuffleTransportClassName: String = get(SHUFFLE_TRANSPORT_CLASS_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,24 @@ object GpuOrcFileFormat extends Logging {
def tagGpuSupport(meta: RapidsMeta[_, _, _],
spark: SparkSession,
options: Map[String, String]): Option[GpuOrcFileFormat] = {

if (!meta.conf.isOrcEnabled) {
meta.willNotWorkOnGpu("ORC input and output has been disabled. To enable set" +
s"${RapidsConf.ENABLE_ORC} to true")
}

if (!meta.conf.isOrcWriteEnabled) {
meta.willNotWorkOnGpu("ORC output has been disabled. To enable set" +
jlowe marked this conversation as resolved.
Show resolved Hide resolved
s"${RapidsConf.ENABLE_ORC_WRITE} to true")
}

val sqlConf = spark.sessionState.conf

val parameters = CaseInsensitiveMap(options)

case class ConfDataForTagging(orcConf: OrcConf, defaultValue: Any, message: String)

def tagIfOrcOrHiveConfNotSupported(params: ConfDataForTagging) = {
def tagIfOrcOrHiveConfNotSupported(params: ConfDataForTagging): Unit = {
val conf = params.orcConf
val defaultValue = params.defaultValue
val message = params.message
Expand Down