Skip to content

Commit

Permalink
Async write support for ORC (#11865)
Browse files Browse the repository at this point in the history
* Async write support for ORC writer

Signed-off-by: Jihoon Son <[email protected]>

* doc change

* remove unnecessary coalesce in the tests

* revert unrelated change

* sort results

---------

Signed-off-by: Jihoon Son <[email protected]>
  • Loading branch information
jihoonson authored Dec 24, 2024
1 parent f0c35ff commit 0f702cd
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 10 deletions.
17 changes: 17 additions & 0 deletions integration_tests/src/main/python/orc_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,23 @@ def create_empty_df(spark, path):
conf={'spark.rapids.sql.format.orc.write.enabled': True})


hold_gpu_configs = [True, False]
@pytest.mark.parametrize('hold_gpu', hold_gpu_configs, ids=idfn)
def test_async_writer(spark_tmp_path, hold_gpu):
data_path = spark_tmp_path + '/ORC_DATA'
num_rows = 2048
num_cols = 10
orc_gen = [int_gen for _ in range(num_cols)]
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(orc_gen)]
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path: gen_df(spark, gen_list, length=num_rows).write.orc(path),
lambda spark, path: spark.read.orc(path).orderBy([('_c' + str(i)) for i in range(num_cols)]),
data_path,
conf={"spark.rapids.sql.asyncWrite.queryOutput.enabled": "true",
"spark.rapids.sql.batchSizeBytes": 4 * num_cols * 100, # 100 rows per batch
"spark.rapids.sql.queryOutput.holdGpuInTask": hold_gpu})


@ignore_order
@pytest.mark.skipif(is_before_spark_320(), reason="is only supported in Spark 320+")
def test_concurrent_writer(spark_tmp_path):
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/src/main/python/parquet_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,8 +705,8 @@ def test_async_writer(spark_tmp_path, hold_gpu):
parquet_gen = [int_gen for _ in range(num_cols)]
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gen)]
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path: gen_df(spark, gen_list, length=num_rows).coalesce(1).write.parquet(path),
lambda spark, path: spark.read.parquet(path),
lambda spark, path: gen_df(spark, gen_list, length=num_rows).write.parquet(path),
lambda spark, path: spark.read.parquet(path).orderBy([('_c' + str(i)) for i in range(num_cols)]),
data_path,
copy_and_update(
writer_confs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2462,7 +2462,7 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.
.doc("Option to turn on the async query output write. During the final output write, the " +
"task first copies the output to the host memory, and then writes it into the storage. " +
"When this option is enabled, the task will asynchronously write the output in the host " +
"memory to the storage. Only the Parquet format is supported currently.")
"memory to the storage. Only the Parquet and ORC formats are supported currently.")
.internal()
.booleanConf
.createWithDefault(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ class GpuOrcFileFormat extends ColumnarFileFormat with Logging {
options: Map[String, String],
dataSchema: StructType): ColumnarOutputWriterFactory = {

val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)
val sqlConf = sparkSession.sessionState.conf
val orcOptions = new OrcOptions(options, sqlConf)

val conf = job.getConfiguration

Expand All @@ -180,12 +181,18 @@ class GpuOrcFileFormat extends ColumnarFileFormat with Logging {
conf.asInstanceOf[JobConf]
.setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]])

val asyncOutputWriteEnabled = RapidsConf.ENABLE_ASYNC_OUTPUT_WRITE.get(sqlConf)
// holdGpuBetweenBatches is on by default if asyncOutputWriteEnabled is on
val holdGpuBetweenBatches = RapidsConf.ASYNC_QUERY_OUTPUT_WRITE_HOLD_GPU_IN_TASK.get(sqlConf)
.getOrElse(asyncOutputWriteEnabled)

new ColumnarOutputWriterFactory {
override def newInstance(path: String,
dataSchema: StructType,
context: TaskAttemptContext,
debugOutputPath: Option[String]): ColumnarOutputWriter = {
new GpuOrcWriter(path, dataSchema, context, debugOutputPath)
new GpuOrcWriter(path, dataSchema, context, debugOutputPath, holdGpuBetweenBatches,
asyncOutputWriteEnabled)
}

override def getFileExtension(context: TaskAttemptContext): String = {
Expand All @@ -204,11 +211,15 @@ class GpuOrcFileFormat extends ColumnarFileFormat with Logging {
}
}

class GpuOrcWriter(override val path: String,
dataSchema: StructType,
context: TaskAttemptContext,
debugOutputPath: Option[String])
extends ColumnarOutputWriter(context, dataSchema, "ORC", true, debugOutputPath) {
class GpuOrcWriter(
override val path: String,
dataSchema: StructType,
context: TaskAttemptContext,
debugOutputPath: Option[String],
holdGpuBetweenBatches: Boolean,
useAsyncWrite: Boolean)
extends ColumnarOutputWriter(context, dataSchema, "ORC", true, debugOutputPath,
holdGpuBetweenBatches, useAsyncWrite) {

override val tableWriter: TableWriter = {
val builder = SchemaUtils
Expand Down

0 comments on commit 0f702cd

Please sign in to comment.