Skip to content

Commit

Permalink
chore: [comet-parquet-exec] enable native scan by default (again) (#1302
Browse files Browse the repository at this point in the history
)

* fix regression

* enable native scan by default

* experiment

* Revert "experiment"

This reverts commit e05a625.

* revert change to exception handling
  • Loading branch information
andygrove authored Jan 21, 2025
1 parent 22eb591 commit facda24
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 4 deletions.
2 changes: 1 addition & 1 deletion common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ object CometConf extends ShimCometConf {
"that to enable native vectorized execution, both this config and " +
"'spark.comet.exec.enabled' need to be enabled.")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

val SCAN_NATIVE_COMET = "native_comet"
val SCAN_NATIVE_DATAFUSION = "native_datafusion"
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Comet provides the following configuration settings.
| spark.comet.parquet.read.parallel.io.enabled | Whether to enable Comet's parallel reader for Parquet files. The parallel reader reads ranges of consecutive data in a file in parallel. It is faster for large files and row groups but uses more resources. | true |
| spark.comet.parquet.read.parallel.io.thread-pool.size | The maximum number of parallel threads the parallel reader will use in a single executor. For executors configured with a smaller number of cores, use a smaller number. | 16 |
| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway using Rust's regular expression engine. See compatibility guide for more information. | false |
| spark.comet.scan.enabled | Whether to enable native scans. When this is turned on, Spark will use Comet to read supported data sources (currently only Parquet is supported natively). Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | false |
| spark.comet.scan.enabled | Whether to enable native scans. When this is turned on, Spark will use Comet to read supported data sources (currently only Parquet is supported natively). Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | true |
| spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. | false |
| spark.comet.scan.preFetch.threadNum | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 |
| spark.comet.shuffle.preferDictionary.ratio | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,13 @@ class CometParquetFileFormat extends ParquetFileFormat with MetricsSupport with
partitionSchema,
file.partitionValues,
JavaConverters.mapAsJavaMap(metrics))
batchReader.init()
try {
batchReader.init()
} catch {
case e: Throwable =>
batchReader.close()
throw e
}
batchReader
} else {
val batchReader = new BatchReader(
Expand All @@ -167,7 +173,13 @@ class CometParquetFileFormat extends ParquetFileFormat with MetricsSupport with
partitionSchema,
file.partitionValues,
JavaConverters.mapAsJavaMap(metrics))
batchReader.init()
try {
batchReader.init()
} catch {
case e: Throwable =>
batchReader.close()
throw e
}
batchReader
}
val iter = new RecordReaderIterator(recordBatchReader)
Expand Down

0 comments on commit facda24

Please sign in to comment.