Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce hybrid (CPU) scan for Parquet read [databricks] #11720

Merged
merged 34 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
65de010
Merge C2C code to main
Nov 12, 2024
e6cede2
Update the dependencies in pom.xml
Nov 13, 2024
1e4fc13
revert BD velox hdfs code
Nov 15, 2024
46e19df
fit codes into the new HybridScan hierarchy
sperlingxx Nov 18, 2024
4f2a4d6
refine QueryPlan, RapidsMeta and test suites for HybridScan
sperlingxx Nov 20, 2024
4d52f90
Integrate Hybrid plugin; update IT
Nov 25, 2024
c82eb29
Make Hybrid jar provoided scope; Update shim to only applicable for S…
Dec 4, 2024
65b585a
Fix comments
Dec 4, 2024
d214739
Code comment update, a minor change
Dec 5, 2024
e0f1e3b
Fix shim logic
Dec 5, 2024
6331ab8
Fix shims: build for all shims, but report error when Spark is CDH or…
Dec 6, 2024
b1b8481
Remove useless shim code
Dec 9, 2024
dbae63f
IT: add tests for decimal types
Dec 9, 2024
5e972d6
Add checks for Java/Scala version, only supports Java 1.8 and Scala 2.12
Dec 9, 2024
c6fa249
Check datasource is v1
Dec 10, 2024
e95e7cc
Update test case: skip if runtime Spark is Databricks
Dec 11, 2024
092dab8
Update Hybrid Config doc: not all Spark versions are fully tested, on…
Dec 11, 2024
519f33c
Merge branch 'branch-25.02' into merge-c2c
Dec 11, 2024
b3b6f80
some refinement
sperlingxx Dec 13, 2024
f0921a4
fix tests && unsupported types
sperlingxx Dec 17, 2024
dd5d8f9
Add doc for Hybrid execution feature
Dec 23, 2024
6149589
Update doc
Dec 24, 2024
114b93a
Check Hybrid jar in executor
Dec 24, 2024
36d3cdf
Update hybrid-execution.md
winningsix Dec 25, 2024
5bfd763
Remove check for Java versions
Dec 25, 2024
275fa3d
Fix scala 2.13 check failure
Dec 25, 2024
cbb5609
Fix for Scala 2.13 building
Dec 30, 2024
c1df7c4
Fix: specify default value for loadBackend to avoid exception
Jan 13, 2025
99602c5
Update Copyright to add new year 2025
Jan 13, 2025
3d3b172
Fix Databricks building
Jan 14, 2025
d718d8c
Minor format change
Jan 14, 2025
4e79c2b
Merge branch 'branch-25.02' into merge-c2c
Jan 14, 2025
aae45b9
Add shim 354 for Hybrid feature
Jan 14, 2025
4fd5fdb
Fix Databricks building
Jan 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions docs/dev/hybrid-execution.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
---
layout: page
title: The Hybrid(on CPU) execution
nav_order: 14
parent: Developer Overview
---

# The Hybrid(CPU/GPU) execution
Note: this is an experimental feature currently.

## Overview
The Hybrid execution provides a way to offload Parquet scan onto CPU by leveraging Gluten/Velox.

## Configuration
To enable Hybrid Execution, please set the following configurations:
```
"spark.sql.sources.useV1SourceList": "parquet"
"spark.rapids.sql.parquet.useHybridReader": "true"
"spark.rapids.sql.hybrid.loadBackend": "true"
```

## Build
### Build Gluten bundle and third party jars.
Hybrid execution targets Gluten v1.2.0 code tag.
For the Gluten building, please refer to [link](https://github.com/apache/incubator-gluten).
Start the docker Gluten project provided, then execute the following
```bash
git clone https://github.com/apache/incubator-gluten.git
git checkout v1.2.0
# Cherry pick a fix from main branch: Fix ObjectStore::stores initialized twice issue
git cherry-pick 2a6a974d6fbaa38869eb9a0b91b2e796a578884c
./dev/package.sh
```
Note: Should cherry-pick a fix as shown in the above steps.
In the $Gluten_ROOT/package/target, you can get the bundle third_party jars.

### Download Rapids Hybrid jar from Maven repo
```xml
<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-hybrid_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
```

## How to use
Decide the Spark version. Set the configurations as described in the above section.
Prepare the Gluten bundle and third party jars for the Spark version as described
in the above section. Get the Rapids Hybrid jar. Put the jars(Gluten two jars and
the Rapids hybrid jar) in the classpath by specifying:
`--jars=<gluten-bundle-jar>,<gluten-thirdparty-jar>,<rapids-hybrid-jar>`

## Limitations
- Only supports V1 Parquet data source.
- Only supports Scala 2.12, do not support Scala 2.13.
- Support Spark 3.2.2, 3.3.1, 3.4.2, and 3.5.1 like [Gluten supports](https://github.com/apache/incubator-gluten/releases/tag/v1.2.0),
other Spark versions 32x, 33x, 34x, 35x also work, but are not fully tested.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we add a few comments about what cases this appears to be better than the current parquet scan so that customers can know if it is worth the effort to try this out?

Do we need/want to mention some of the limitations with different data types? And are there any gluten specific configs that they need to set to make this work for them?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

18 changes: 17 additions & 1 deletion integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/bash
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
# Copyright (c) 2020-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -364,6 +364,22 @@ EOF
fi
export PYSP_TEST_spark_rapids_memory_gpu_allocSize=${PYSP_TEST_spark_rapids_memory_gpu_allocSize:-'1536m'}

# Turns on $LOAD_HYBRID_BACKEND and setup the filepath of hybrid backend jars, to activate the
# hybrid backend while running subsequent integration tests.
if [[ "$LOAD_HYBRID_BACKEND" -eq 1 ]]; then
if [ -z "${HYBRID_BACKEND_JARS}" ]; then
echo "Error: Environment HYBRID_BACKEND_JARS is not set."
exit 1
fi
export PYSP_TEST_spark_jars="${PYSP_TEST_spark_jars},${HYBRID_BACKEND_JARS//:/,}"
export PYSP_TEST_spark_rapids_sql_parquet_useHybridReader=true
export PYSP_TEST_spark_rapids_sql_hybrid_loadBackend=true
export PYSP_TEST_spark_memory_offHeap_enabled=true
export PYSP_TEST_spark_memory_offHeap_size=512M
export PYSP_TEST_spark_rapids_sql_hybrid_load=true
export PYSP_TEST_spark_gluten_loadLibFromJar=true
fi

SPARK_SHELL_SMOKE_TEST="${SPARK_SHELL_SMOKE_TEST:-0}"
if [[ "${SPARK_SHELL_SMOKE_TEST}" != "0" ]]; then
echo "Running spark-shell smoke test..."
Expand Down
147 changes: 147 additions & 0 deletions integration_tests/src/main/python/hybrid_parquet_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Copyright (c) 2024-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

import pytest

from asserts import *
from data_gen import *
from marks import *
from parquet_test import rebase_write_corrected_conf
from spark_session import *

"""
Hybrid Scan unsupported types:
1. Decimal with negative scale is NOT supported
2. Decimal128 inside nested types is NOT supported
3. BinaryType is NOT supported
4. MapType wrapped by NestedType (Struct of Map/Array of Map/Map of Map) is NOT fully supported
"""
parquet_gens_list = [
[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, date_gen,
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc)), ArrayGen(byte_gen),
ArrayGen(long_gen), ArrayGen(string_gen), ArrayGen(date_gen),
ArrayGen(TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))),
ArrayGen(decimal_gen_64bit),
ArrayGen(ArrayGen(byte_gen)),
StructGen([['child0', ArrayGen(byte_gen)],
['child1', byte_gen],
['child2', float_gen],
['child3', decimal_gen_64bit]]),
ArrayGen(StructGen([['child0', string_gen],
['child1', double_gen],
['child2', int_gen]]))
],
[MapGen(f(nullable=False), f()) for f in [
BooleanGen, ByteGen, ShortGen, IntegerGen, LongGen, FloatGen, DoubleGen, DateGen,
lambda nullable=True: TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc), nullable=nullable)]
],
[simple_string_to_string_map_gen,
MapGen(StringGen(pattern='key_[0-9]', nullable=False), ArrayGen(string_gen), max_length=10),
MapGen(RepeatSeqGen(IntegerGen(nullable=False), 10), long_gen, max_length=10),
],
decimal_gens,
]

parquet_gens_fallback_lists = [
# Decimal128 inside nested types is NOT supported
[MapGen(StringGen(pattern='key_[0-9]', nullable=False), decimal_gen_128bit)],
# BinaryType is NOT supported
[BinaryGen()],
# MapType wrapped by NestedType is NOT fully supported
[MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen)],
[ArrayGen(simple_string_to_string_map_gen)],
[ArrayGen(ArrayGen(simple_string_to_string_map_gen))],
[ArrayGen(StructGen([["c0", simple_string_to_string_map_gen]]))],
[StructGen([["c0", simple_string_to_string_map_gen]])],
[StructGen([["c0", ArrayGen(simple_string_to_string_map_gen)]])],
[StructGen([["c0", StructGen([["cc0", simple_string_to_string_map_gen]])]])],
]


@pytest.mark.skipif(is_databricks_runtime(), reason="Hybrid feature does not support Databricks currently")
@pytest.mark.skipif(not is_hybrid_backend_loaded(), reason="HybridScan specialized tests")
@pytest.mark.parametrize('parquet_gens', parquet_gens_list, ids=idfn)
@pytest.mark.parametrize('gen_rows', [20, 100, 512, 1024, 4096], ids=idfn)
@hybrid_test
def test_hybrid_parquet_read_round_trip(spark_tmp_path, parquet_gens, gen_rows):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark: gen_df(spark, gen_list, length=gen_rows).write.parquet(data_path),
conf=rebase_write_corrected_conf)

assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.parquet(data_path),
conf={
'spark.sql.sources.useV1SourceList': 'parquet',
'spark.rapids.sql.parquet.useHybridReader': 'true',
})


# Creating scenarios in which CoalesceConverter will coalesce several input batches by adjusting
# reader_batch_size and coalesced_batch_size, tests if the CoalesceConverter functions correctly
# when coalescing is needed.
@pytest.mark.skipif(is_databricks_runtime(), reason="Hybrid feature does not support Databricks currently")
@pytest.mark.skipif(not is_hybrid_backend_loaded(), reason="HybridScan specialized tests")
@pytest.mark.parametrize('reader_batch_size', [512, 1024, 2048], ids=idfn)
@pytest.mark.parametrize('coalesced_batch_size', [1 << 25, 1 << 27], ids=idfn)
@pytest.mark.parametrize('gen_rows', [8192, 10000], ids=idfn)
@hybrid_test
def test_hybrid_parquet_read_round_trip_multiple_batches(spark_tmp_path,
reader_batch_size,
coalesced_batch_size,
gen_rows):
gens = []
for g in parquet_gens_list:
gens.extend(g)

gen_list = [('_c' + str(i), gen) for i, gen in enumerate(gens)]
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark: gen_df(spark, gen_list, length=gen_rows).write.parquet(data_path),
conf=rebase_write_corrected_conf)

assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.parquet(data_path),
conf={
'spark.sql.sources.useV1SourceList': 'parquet',
'spark.rapids.sql.parquet.useHybridReader': 'true',
'spark.gluten.sql.columnar.maxBatchSize': reader_batch_size,
'spark.rapids.sql.batchSizeBytes': coalesced_batch_size,
})


# HybridScan shall NOT be enabled over unsupported data types. Instead, fallbacks to GpuScan.
@pytest.mark.skipif(is_databricks_runtime(), reason="Hybrid feature does not support Databricks currently")
@pytest.mark.skipif(not is_hybrid_backend_loaded(), reason="HybridScan specialized tests")
@pytest.mark.parametrize('parquet_gens', parquet_gens_fallback_lists, ids=idfn)
@hybrid_test
def test_hybrid_parquet_read_fallback_to_gpu(spark_tmp_path, parquet_gens):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark: gen_df(spark, gen_list, length=512).write.parquet(data_path),
conf=rebase_write_corrected_conf)

assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: spark.read.parquet(data_path),
exist_classes='GpuFileSourceScanExec',
non_exist_classes='HybridFileSourceScanExec',
conf={
'spark.sql.sources.useV1SourceList': 'parquet',
'spark.rapids.sql.parquet.useHybridReader': 'true',
})
3 changes: 2 additions & 1 deletion integration_tests/src/main/python/marks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
# Copyright (c) 2020-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,3 +35,4 @@
pyarrow_test = pytest.mark.pyarrow_test
datagen_overrides = pytest.mark.datagen_overrides
tz_sensitive_test = pytest.mark.tz_sensitive_test
hybrid_test = pytest.mark.hybrid_test
5 changes: 4 additions & 1 deletion integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
# Copyright (c) 2020-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -328,3 +328,6 @@ def is_hive_available():
if is_at_least_precommit_run():
return True
return _spark.conf.get("spark.sql.catalogImplementation") == "hive"

def is_hybrid_backend_loaded():
return _spark.conf.get("spark.rapids.sql.hybrid.loadBackend", "false") == "true"
7 changes: 7 additions & 0 deletions scala2.13/sql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-hybrid_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<!-- #if scala-2.13 -->
<profiles>
Expand Down
7 changes: 7 additions & 0 deletions sql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-hybrid_${scala.binary.version}</artifactId>
revans2 marked this conversation as resolved.
Show resolved Hide resolved
<version>${project.version}</version>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using the rapids plugin version, rapids-4-spark-hybrid dependency should have a separate version definition

I'll follow it up.

<scope>provided</scope>
</dependency>
</dependencies>
<!-- #if scala-2.13 --><!--
<profiles>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

object HybridExecutionUtils {

private val HYBRID_JAR_PLUGIN_CLASS_NAME = "com.nvidia.spark.rapids.hybrid.HybridPluginWrapper"

/**
* Check if the Hybrid jar is in the classpath,
* report error if not
*/
def checkHybridJarInClassPath(): Unit = {
try {
Class.forName(HYBRID_JAR_PLUGIN_CLASS_NAME)
} catch {
case e: ClassNotFoundException => throw new RuntimeException(
"Hybrid jar is not in the classpath, Please add Hybrid jar into the class path, or " +
"Please disable Hybrid feature by setting " +
"spark.rapids.sql.parquet.useHybridReader=false", e)
}
}
}
26 changes: 22 additions & 4 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -343,16 +343,29 @@ object RapidsPluginUtils extends Logging {
}
}

/**
* Find spark-rapids-extra-plugins files, and create plugin instances by reflection.
* Note: If Hybrid jar is not in the classpath, then will not create Hybrid plugin.
* @return plugin instances defined in spark-rapids-extra-plugins files.
*/
private def getExtraPlugins: Seq[SparkPlugin] = {
val resourceName = "spark-rapids-extra-plugins"
val classLoader = RapidsPluginUtils.getClass.getClassLoader
val resource = classLoader.getResourceAsStream(resourceName)
if (resource == null) {
val resourceUrls = classLoader.getResources(resourceName)
val resourceUrlArray = resourceUrls.asScala.toArray

if (resourceUrlArray.isEmpty) {
logDebug(s"Could not find file $resourceName in the classpath, not loading extra plugins")
Seq.empty
} else {
val pluginClasses = scala.io.Source.fromInputStream(resource).getLines().toSeq
loadExtensions(classOf[SparkPlugin], pluginClasses)
val plugins = scala.collection.mutable.ListBuffer[SparkPlugin]()
for (resourceUrl <- resourceUrlArray) {
val source = scala.io.Source.fromURL(resourceUrl)
val pluginClasses = source.getLines().toList
source.close()
plugins ++= loadExtensions(classOf[SparkPlugin], pluginClasses)
}
plugins.toSeq
}
}

Expand Down Expand Up @@ -514,6 +527,11 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
// Fail if there are multiple plugin jars in the classpath.
RapidsPluginUtils.detectMultipleJars(conf)

// Check Hybrid jar if needed.
if (conf.useHybridParquetReader) {
HybridExecutionUtils.checkHybridJarInClassPath()
}

// Compare if the cudf version mentioned in the classpath is equal to the version which
// plugin expects. If there is a version mismatch, throw error. This check can be disabled
// by setting this config spark.rapids.cudfVersionOverride=true
Expand Down
Loading
Loading