Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet small file reading optimization #595

Merged
merged 88 commits into from
Aug 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
484c781
Initial prototype small filees parquet
tgravescs Jul 30, 2020
1d3dd3f
Change datasource v1 to use small files
tgravescs Jul 30, 2020
c168214
Working but has 72 bytes off in size
tgravescs Jul 30, 2020
b2c2959
Merge remote-tracking branch 'origin/branch-0.2' into smallfiles
tgravescs Jul 30, 2020
ccdf32d
Copy filesourcescan to databricks and fix merge error
tgravescs Jul 30, 2020
40c41e2
Fix databricks package name
tgravescs Jul 31, 2020
5afddf0
Try to debug size calculation - adds lots of warnings
tgravescs Aug 3, 2020
566520e
Merge remote-tracking branch 'origin/branch-0.2' into smallfiles
tgravescs Aug 3, 2020
5054c8a
Cleanup and have file source scan small files only work for parquet
tgravescs Aug 3, 2020
5c0cee4
Switch to use ArrayBuffer so order correct
tgravescs Aug 3, 2020
048b4ff
debug
tgravescs Aug 5, 2020
3117550
Fix order issue
tgravescs Aug 5, 2020
4857510
add more to calculated size
tgravescs Aug 5, 2020
64981ce
cleanup
tgravescs Aug 5, 2020
560cc81
Try to handle partition values
tgravescs Aug 5, 2020
84dc48e
fix passing partitionValues
tgravescs Aug 5, 2020
bbade25
refactor
tgravescs Aug 5, 2020
13fbd4d
disable mergeschema
tgravescs Aug 6, 2020
73a212a
add check for mergeSchema
tgravescs Aug 6, 2020
a87af11
Merge remote-tracking branch 'origin/branch-0.2' into smallfiles
tgravescs Aug 6, 2020
c5b8a6e
Add tests for both small file optimization on and off
tgravescs Aug 6, 2020
d2ac90a
hadnle input file - but doesn't totally work
tgravescs Aug 6, 2020
3e014a8
remove extra values reader
tgravescs Aug 6, 2020
c13d8f3
Fixes
tgravescs Aug 6, 2020
6c53c45
Debug
tgravescs Aug 10, 2020
8965b81
Merge remote-tracking branch 'origin/branch-0.2' into smallfiles
tgravescs Aug 10, 2020
1c05dc4
Check to see if Inputfile execs used
tgravescs Aug 10, 2020
ed7d7fb
Merge branch 'smallfiles' of https://github.com/tgravescs/spark-rapid…
tgravescs Aug 10, 2020
95abe40
Finding InputFileName works
tgravescs Aug 10, 2020
c75df6c
finding input file working
tgravescs Aug 10, 2020
2efbfe8
cleanup and add tests for V2 datasource
tgravescs Aug 11, 2020
b14cbe8
Add check for input file to GpuParquetScan
tgravescs Aug 11, 2020
4ae5fff
Add more tests
tgravescs Aug 11, 2020
a457dd8
Add GPU metrics to GpuFileSourceScanExec
jlowe Aug 10, 2020
4767453
remove log messages
tgravescs Aug 11, 2020
8b942a8
Docs
tgravescs Aug 11, 2020
c4ed0bc
cleanup
tgravescs Aug 11, 2020
8f5480c
Update 300db and 310 FileSourceScanExecs passing unit tests
tgravescs Aug 11, 2020
9875969
Add test for bucketing
tgravescs Aug 12, 2020
a95c964
Merge remote-tracking branch 'origin/branch-0.2' into smallfiles
tgravescs Aug 12, 2020
4f7d77a
Add in logic for datetime corrected rebase mode
tgravescs Aug 12, 2020
dbd62ef
Merge remote-tracking branch 'origin/branch-0.2' into smallfiles
tgravescs Aug 12, 2020
3acf602
Merge branch 'smallfiles' of https://github.com/tgravescs/spark-rapid…
tgravescs Aug 12, 2020
8dfb0e4
Commonize some code
tgravescs Aug 12, 2020
e7309e9
Cleanup
tgravescs Aug 12, 2020
8f6f8df
fixes
tgravescs Aug 12, 2020
341c400
Extract GpuFileSourceScanExec from shims
jlowe Aug 12, 2020
7f78c7f
Add more tests
tgravescs Aug 12, 2020
9f74fc4
comments
tgravescs Aug 12, 2020
3a5e08f
update test
tgravescs Aug 13, 2020
b0fd541
Pass metrics via GPU file format rather than custom options map
jlowe Aug 13, 2020
8172861
working
tgravescs Aug 13, 2020
dba69fb
pass schema around properly
tgravescs Aug 13, 2020
e21e68b
fix value from tuple
tgravescs Aug 13, 2020
b2aa3bd
Rename case class
tgravescs Aug 13, 2020
cf05611
Update tests
tgravescs Aug 13, 2020
ee8c0b5
Update code checking for DataSourceScanExec
jlowe Aug 13, 2020
c98f818
Merge branch 'branch-0.2' into scan-metrics
jlowe Aug 13, 2020
3474fcf
Fix scaladoc warning and unused imports
jlowe Aug 13, 2020
03ca8e4
Add realloc if over memory size
tgravescs Aug 13, 2020
9a88e7a
refactor memory checks
tgravescs Aug 13, 2020
e2fef62
Fix copyright
jlowe Aug 13, 2020
5cb7ac2
Merge branch 'jasonFileSourceScanPR' into smallfilesRebase
tgravescs Aug 13, 2020
a9439fb
Upmerge to latest FileSourceScanExec changes for metrics
tgravescs Aug 13, 2020
b7d42ef
Add missing check Filesource scan mergeSchema and cleanup
tgravescs Aug 13, 2020
ee09ba5
Cleanup
tgravescs Aug 13, 2020
b85400c
remove bucket test for now
tgravescs Aug 14, 2020
a676ada
Merge remote-tracking branch 'origin/branch-0.2' into smallfilesRebase
tgravescs Aug 14, 2020
c97ab17
formatting
tgravescs Aug 14, 2020
0200dd9
Fixes
tgravescs Aug 14, 2020
f4d155d
Add more tests
tgravescs Aug 14, 2020
fd0545f
Merge remote-tracking branch 'origin/branch-0.2' into smallfilesRebase
tgravescs Aug 19, 2020
63312fb
Merge conflict
tgravescs Aug 19, 2020
24f43c7
Merge remote-tracking branch 'origin/branch-0.2' into smallfilesRebase
tgravescs Aug 19, 2020
5157cce
Merge remote-tracking branch 'origin/branch-0.2' into smallfilesRebase
tgravescs Aug 19, 2020
ddc9e54
Fix merge conflict
tgravescs Aug 20, 2020
6670cfe
enable parquet bucket tests and change warning
tgravescs Aug 20, 2020
513c8ed
cleanup
tgravescs Aug 20, 2020
b1b658b
remove debug logs
tgravescs Aug 20, 2020
a4f9571
Move FilePartition creation to shim
tgravescs Aug 20, 2020
0a3a586
Add better message for mergeSchema
tgravescs Aug 20, 2020
7b19a0c
Address review comments. Add in withResources and closeOnExcept and m…
tgravescs Aug 21, 2020
97913e2
Merge remote-tracking branch 'origin/branch-0.2' into smallfilesRebas…
tgravescs Aug 24, 2020
8370651
Fix spacing
tgravescs Aug 24, 2020
ef4aa7e
Fix databricks support and passing arguments
tgravescs Aug 24, 2020
7557d71
fix typo in db
tgravescs Aug 25, 2020
fd942a3
Update config description
tgravescs Aug 25, 2020
8a29098
Rework
tgravescs Aug 26, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Name | Description | Default Value
<a name="sql.format.orc.write.enabled"></a>spark.rapids.sql.format.orc.write.enabled|When set to false disables orc output acceleration|true
<a name="sql.format.parquet.enabled"></a>spark.rapids.sql.format.parquet.enabled|When set to false disables all parquet input and output acceleration|true
<a name="sql.format.parquet.read.enabled"></a>spark.rapids.sql.format.parquet.read.enabled|When set to false disables parquet input acceleration|true
<a name="sql.format.parquet.smallFiles.enabled"></a>spark.rapids.sql.format.parquet.smallFiles.enabled|When set to true, handles reading multiple small files within a partition more efficiently by combining multiple files on the CPU side before sending to the GPU. Recommended unless user needs mergeSchema option or schema evolution.|true
<a name="sql.format.parquet.write.enabled"></a>spark.rapids.sql.format.parquet.write.enabled|When set to false disables parquet output acceleration|true
<a name="sql.hasNans"></a>spark.rapids.sql.hasNans|Config to indicate if your data has NaN's. Cudf doesn't currently support NaN's properly so you can get corrupt data if you have NaN's in your data and it runs on the GPU.|true
<a name="sql.hashOptimizeSort.enabled"></a>spark.rapids.sql.hashOptimizeSort.enabled|Whether sorts should be inserted after some hashed operations to improve output ordering. This can improve output file sizes when saving to columnar formats.|false
Expand Down
171 changes: 152 additions & 19 deletions integration_tests/src/main/python/parquet_test.py

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions integration_tests/src/main/python/spark_init_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ def _spark__init():
# DO NOT SET ANY OTHER CONFIGS HERE!!!
# due to bugs in pyspark/pytest it looks like any configs set here
# can be reset in the middle of a test if specific operations are done (some types of cast etc)
# enableHiveSupport() is needed for parquet bucket tests
_s = SparkSession.builder \
.config('spark.plugins', 'com.nvidia.spark.SQLPlugin') \
.config('spark.sql.queryExecutionListeners', 'com.nvidia.spark.rapids.ExecutionPlanCaptureCallback')\
.enableHiveSupport() \
.appName('rapids spark plugin integration tests (python)').getOrCreate()
#TODO catch the ClassNotFound error that happens if the classpath is not set up properly and
# make it a better error message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ case class GpuParquetScan(
options: CaseInsensitiveStringMap,
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression],
rapidsConf: RapidsConf)
rapidsConf: RapidsConf,
supportsSmallFileOpt: Boolean = true)
extends GpuParquetScanBase(sparkSession, hadoopConf, dataSchema,
readDataSchema, readPartitionSchema, pushedFilters, rapidsConf) with FileScan {
readDataSchema, readPartitionSchema, pushedFilters, rapidsConf,
supportsSmallFileOpt) with FileScan {

override def isSplitable(path: Path): Boolean = super.isSplitableBase(path)

Expand All @@ -52,7 +54,8 @@ case class GpuParquetScan(
override def equals(obj: Any): Boolean = obj match {
case p: GpuParquetScan =>
super.equals(p) && dataSchema == p.dataSchema && options == p.options &&
equivalentFilters(pushedFilters, p.pushedFilters) && rapidsConf == p.rapidsConf
equivalentFilters(pushedFilters, p.pushedFilters) && rapidsConf == p.rapidsConf &&
supportsSmallFileOpt == p.supportsSmallFileOpt
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase

case class GpuShuffleExchangeExec(
override val outputPartitioning: Partitioning,
child: SparkPlan) extends GpuShuffleExchangeExecBase(outputPartitioning, child)
child: SparkPlan) extends GpuShuffleExchangeExecBase(outputPartitioning, child)

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package com.nvidia.spark.rapids.shims.spark300

import java.time.ZoneId

import scala.collection.JavaConverters._

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.spark300.RapidsShuffleManager

Expand All @@ -35,7 +37,7 @@ import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, HadoopFsRelation, PartitionDirectory, PartitionedFile}
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
Expand Down Expand Up @@ -141,13 +143,20 @@ class Spark300Shims extends SparkShims {
override def tagPlanForGpu(): Unit = GpuFileSourceScanExec.tagSupport(this)

override def convertToGpu(): GpuExec = {
val sparkSession = wrapped.relation.sparkSession
val options = wrapped.relation.options
val newRelation = HadoopFsRelation(
wrapped.relation.location,
wrapped.relation.partitionSchema,
wrapped.relation.dataSchema,
wrapped.relation.bucketSpec,
GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat),
wrapped.relation.options)(wrapped.relation.sparkSession)
options)(sparkSession)
val canUseSmallFileOpt = newRelation.fileFormat match {
case _: ParquetFileFormat =>
GpuParquetScanBase.canUseSmallFileParquetOpt(conf, options, sparkSession)
case _ => false
}
GpuFileSourceScanExec(
newRelation,
wrapped.output,
Expand All @@ -156,7 +165,8 @@ class Spark300Shims extends SparkShims {
wrapped.optionalBucketSet,
None,
wrapped.dataFilters,
wrapped.tableIdentifier)
wrapped.tableIdentifier,
canUseSmallFileOpt)
}
}),
GpuOverrides.exec[SortMergeJoinExec](
Expand Down Expand Up @@ -226,7 +236,10 @@ class Spark300Shims extends SparkShims {
(a, conf, p, r) => new ScanMeta[ParquetScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this)

override def convertToGpu(): Scan =
override def convertToGpu(): Scan = {
val canUseSmallFileOpt =
GpuParquetScanBase.canUseSmallFileParquetOpt(conf,
a.options.asCaseSensitiveMap().asScala.toMap, a.sparkSession)
GpuParquetScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
Expand All @@ -237,7 +250,9 @@ class Spark300Shims extends SparkShims {
a.options,
a.partitionFilters,
a.dataFilters,
conf)
conf,
canUseSmallFileOpt)
}
}),
GpuOverrides.scan[OrcScan](
"ORC parsing",
Expand Down Expand Up @@ -330,4 +345,20 @@ class Spark300Shims extends SparkShims {
filePartitions: Seq[FilePartition]): RDD[InternalRow] = {
new FileScanRDD(sparkSession, readFunction, filePartitions)
}

override def createFilePartition(index: Int, files: Array[PartitionedFile]): FilePartition = {
FilePartition(index, files)
}

override def copyParquetBatchScanExec(batchScanExec: GpuBatchScanExec,
supportsSmallFileOpt: Boolean): GpuBatchScanExec = {
val scan = batchScanExec.scan.asInstanceOf[GpuParquetScan]
val scanCopy = scan.copy(supportsSmallFileOpt=supportsSmallFileOpt)
batchScanExec.copy(scan=scanCopy)
}

override def copyFileSourceScanExec(scanExec: GpuFileSourceScanExec,
supportsSmallFileOpt: Boolean): GpuFileSourceScanExec = {
scanExec.copy(supportsSmallFileOpt=supportsSmallFileOpt)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition, HadoopFsRelation, PartitionDirectory, PartitionedFile}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec}
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
Expand Down Expand Up @@ -93,13 +94,20 @@ class Spark300dbShims extends Spark300Shims {
override def tagPlanForGpu(): Unit = GpuFileSourceScanExec.tagSupport(this)

override def convertToGpu(): GpuExec = {
val sparkSession = wrapped.relation.sparkSession
val options = wrapped.relation.options
val newRelation = HadoopFsRelation(
wrapped.relation.location,
wrapped.relation.partitionSchema,
wrapped.relation.dataSchema,
wrapped.relation.bucketSpec,
GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat),
wrapped.relation.options)(wrapped.relation.sparkSession)
options)(sparkSession)
val canUseSmallFileOpt = newRelation.fileFormat match {
case _: ParquetFileFormat =>
GpuParquetScanBase.canUseSmallFileParquetOpt(conf, options, sparkSession)
case _ => false
}
GpuFileSourceScanExec(
newRelation,
wrapped.output,
Expand All @@ -109,7 +117,8 @@ class Spark300dbShims extends Spark300Shims {
// TODO: Does Databricks have coalesced bucketing implemented?
None,
wrapped.dataFilters,
wrapped.tableIdentifier)
wrapped.tableIdentifier,
canUseSmallFileOpt)
}
}),
GpuOverrides.exec[SortMergeJoinExec](
Expand Down Expand Up @@ -180,4 +189,13 @@ class Spark300dbShims extends Spark300Shims {
filePartitions: Seq[FilePartition]): RDD[InternalRow] = {
new GpuFileScanRDD(sparkSession, readFunction, filePartitions)
}

override def createFilePartition(index: Int, files: Array[PartitionedFile]): FilePartition = {
FilePartition(index, files)
}

override def copyFileSourceScanExec(scanExec: GpuFileSourceScanExec,
supportsSmallFileOpt: Boolean): GpuFileSourceScanExec = {
scanExec.copy(supportsSmallFileOpt=supportsSmallFileOpt)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ case class GpuParquetScan(
options: CaseInsensitiveStringMap,
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression],
rapidsConf: RapidsConf)
rapidsConf: RapidsConf,
supportsSmallFileOpt: Boolean = true)
extends GpuParquetScanBase(sparkSession, hadoopConf, dataSchema,
readDataSchema, readPartitionSchema, pushedFilters, rapidsConf) with FileScan {
readDataSchema, readPartitionSchema, pushedFilters, rapidsConf,
supportsSmallFileOpt) with FileScan {

override def isSplitable(path: Path): Boolean = super.isSplitableBase(path)

Expand All @@ -52,7 +54,8 @@ case class GpuParquetScan(
override def equals(obj: Any): Boolean = obj match {
case p: GpuParquetScan =>
super.equals(p) && dataSchema == p.dataSchema && options == p.options &&
equivalentFilters(pushedFilters, p.pushedFilters) && rapidsConf == p.rapidsConf
equivalentFilters(pushedFilters, p.pushedFilters) && rapidsConf == p.rapidsConf &&
supportsSmallFileOpt == p.supportsSmallFileOpt
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package com.nvidia.spark.rapids.shims.spark310

import java.time.ZoneId

import scala.collection.JavaConverters._

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.shims.spark301.Spark301Shims
import com.nvidia.spark.rapids.spark310.RapidsShuffleManager
Expand All @@ -29,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec}
Expand Down Expand Up @@ -137,13 +139,20 @@ class Spark310Shims extends Spark301Shims {
override def tagPlanForGpu(): Unit = GpuFileSourceScanExec.tagSupport(this)

override def convertToGpu(): GpuExec = {
val sparkSession = wrapped.relation.sparkSession
val options = wrapped.relation.options
val newRelation = HadoopFsRelation(
wrapped.relation.location,
wrapped.relation.partitionSchema,
wrapped.relation.dataSchema,
wrapped.relation.bucketSpec,
GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat),
wrapped.relation.options)(wrapped.relation.sparkSession)
options)(sparkSession)
val canUseSmallFileOpt = newRelation.fileFormat match {
case _: ParquetFileFormat =>
GpuParquetScanBase.canUseSmallFileParquetOpt(conf, options, sparkSession)
case _ => false
}
GpuFileSourceScanExec(
newRelation,
wrapped.output,
Expand All @@ -152,7 +161,8 @@ class Spark310Shims extends Spark301Shims {
wrapped.optionalBucketSet,
wrapped.optionalNumCoalescedBuckets,
wrapped.dataFilters,
wrapped.tableIdentifier)
wrapped.tableIdentifier,
canUseSmallFileOpt)
}
}),
GpuOverrides.exec[SortMergeJoinExec](
Expand All @@ -173,7 +183,9 @@ class Spark310Shims extends Spark301Shims {
(a, conf, p, r) => new ScanMeta[ParquetScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this)

override def convertToGpu(): Scan =
override def convertToGpu(): Scan = {
val canUseSmallFileOpt = GpuParquetScanBase.canUseSmallFileParquetOpt(conf,
a.options.asCaseSensitiveMap().asScala.toMap, a.sparkSession)
GpuParquetScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
Expand All @@ -184,7 +196,9 @@ class Spark310Shims extends Spark301Shims {
a.options,
a.partitionFilters,
a.dataFilters,
conf)
conf,
canUseSmallFileOpt)
}
}),
GpuOverrides.scan[OrcScan](
"ORC parsing",
Expand Down Expand Up @@ -223,4 +237,16 @@ class Spark310Shims extends Spark301Shims {
override def getShuffleManagerShims(): ShuffleManagerShimBase = {
new ShuffleManagerShim
}

override def copyParquetBatchScanExec(batchScanExec: GpuBatchScanExec,
supportsSmallFileOpt: Boolean): GpuBatchScanExec = {
val scan = batchScanExec.scan.asInstanceOf[GpuParquetScan]
val scanCopy = scan.copy(supportsSmallFileOpt = supportsSmallFileOpt)
batchScanExec.copy(scan = scanCopy)
}

override def copyFileSourceScanExec(scanExec: GpuFileSourceScanExec,
supportsSmallFileOpt: Boolean): GpuFileSourceScanExec = {
scanExec.copy(supportsSmallFileOpt=supportsSmallFileOpt)
}
}
9 changes: 9 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ trait Arm {
}
}

/** Executes the provided code block and then closes the array of resources */
def withResource[T <: AutoCloseable, V](r: Array[T])(block: Array[T] => V): V = {
try {
block(r)
} finally {
r.safeClose()
}
}

/** Executes the provided code block, closing the resource only if an exception occurs */
def closeOnExcept[T <: AutoCloseable, V](r: T)(block: T => V): V = {
try {
Expand Down
Loading