Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support running Pandas UDFs on GPUs in Python processes. #640

Merged
merged 72 commits into from
Sep 11, 2020
Merged
Show file tree
Hide file tree
Changes from 69 commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
4063857
Support Pandas UDF on GPU
firestarman Jul 21, 2020
94f3b22
Fix an error when running rapids.worker.
firestarman Jul 21, 2020
96e35aa
Pack python files
firestarman Jul 22, 2020
dccf977
Add API to init GPU context in python process
firestarman Jul 27, 2020
ef36d5e
Support limiting the number of python workers
firestarman Jul 31, 2020
93966b4
Support memory limitaion for Python processes
firestarman Aug 4, 2020
3b2e527
Imporve the memory computation for Python workers
firestarman Aug 5, 2020
1e4895a
Support setting max size of RMM pool
firestarman Aug 6, 2020
c877506
Support more types of Pandas UDF
firestarman Aug 11, 2020
d0d15c2
Use maxsize for max pool size when not specified.
firestarman Aug 18, 2020
61b3589
Support two more types of Pandas UDF
firestarman Aug 27, 2020
65e497d
Add tests for udfs and basic support for accelerated arrow exchange with
revans2 Aug 12, 2020
ec2cec6
Support Pandas UDF on GPU
firestarman Jul 21, 2020
d18ff4e
Fix an error when running rapids.worker.
firestarman Jul 21, 2020
616d1da
Pack python files
firestarman Jul 22, 2020
c5557ca
Add API to init GPU context in python process
firestarman Jul 27, 2020
2fcc2aa
Support limiting the number of python workers
firestarman Jul 31, 2020
31a9fb3
Support memory limitaion for Python processes
firestarman Aug 4, 2020
f71f4de
Imporve the memory computation for Python workers
firestarman Aug 5, 2020
d4791af
Support setting max size of RMM pool
firestarman Aug 6, 2020
4b88939
Support more types of Pandas UDF
firestarman Aug 11, 2020
d5d156e
Use maxsize for max pool size when not specified.
firestarman Aug 18, 2020
37dc856
Support two more types of Pandas UDF
firestarman Aug 27, 2020
29e39ea
Use the columnar version rule for Scalar Pandas UDF
firestarman Sep 1, 2020
1033f23
Updates the RapidsMeta of plans for Pandas UDF
firestarman Sep 2, 2020
5e28772
Remove the unnecessary env variable
firestarman Sep 2, 2020
3f94ac8
Correct some doc styles to pass mvn verification
firestarman Sep 3, 2020
6f24ca2
add udf test
shotai Sep 3, 2020
5ea3ab6
Merge branch 'pandas-udf' of https://github.com/firestarman/spark-rap…
shotai Sep 3, 2020
69d5b54
Support process pool for Python workers
firestarman Sep 3, 2020
9b25c2e
merge udftest
shotai Sep 3, 2020
c8ab681
add cudf test
shotai Sep 3, 2020
b81fe94
Add a config to disable/enable Pandas UDF on GPU.
firestarman Sep 4, 2020
908bb93
add more test case with cudf
shotai Sep 4, 2020
963c821
refactor udf test
shotai Sep 4, 2020
868ca3a
Python: Not init GPU if no cuda device specified
firestarman Sep 4, 2020
92c10a6
resolve conflict
shotai Sep 4, 2020
e05e4b6
resolve conflict
shotai Sep 4, 2020
69b1ec0
Update the config doc
firestarman Sep 7, 2020
1f56cd9
skip udf in premerge
shotai Sep 7, 2020
e400f5a
add pyarrow in docker
shotai Sep 7, 2020
6435eaa
disable udf test in premerge
shotai Sep 7, 2020
c33b41c
Merge pull request #4 from firestarman/pandas-test-mg
firestarman Sep 8, 2020
53959db
Merge branch 'branch-0.2' into pandas-udf-col
firestarman Sep 8, 2020
732505b
Move gpu init to `try...catch`
firestarman Sep 8, 2020
5a074ac
Remove numpy, it will include in pandas installation. Update readme.
shotai Sep 8, 2020
b80a201
update doc with pandas udf support
shotai Sep 8, 2020
7514ac1
update integration dockerfile
shotai Sep 8, 2020
db06504
Merge branch 'pandas-udf' of https://github.com/firestarman/spark-rap…
shotai Sep 8, 2020
7817e5e
Update getting-started-on-prem.md
shotai Sep 8, 2020
0f72025
Update getting-started-on-prem.md
shotai Sep 8, 2020
35a3008
Update getting-started-on-prem.md
shotai Sep 8, 2020
ec7848e
Update getting-started-on-prem.md
shotai Sep 8, 2020
be65f58
Add warning log when python worker reuse enabled
firestarman Sep 8, 2020
1757afa
Replace GpuSemaphore with PythonWorkerSemaphore
firestarman Sep 8, 2020
537e594
Remove the warning log for python worker reuse enabled
firestarman Sep 9, 2020
60cf951
remove udf marker, add comment, update jenkins script for udf_cudf test
shotai Sep 9, 2020
6c9e86b
update doc in pandas udf section
shotai Sep 9, 2020
58fab52
update dockerfile for integration test
shotai Sep 9, 2020
283b6a2
Merge branch 'pandas-udf' of https://github.com/firestarman/spark-rap…
shotai Sep 9, 2020
eee4d05
Update the name of conf for python gpu enabled.
firestarman Sep 10, 2020
8924ad8
add marker for cudf udf test
shotai Sep 10, 2020
7eba830
update comment in test start script
shotai Sep 10, 2020
f838ae0
remove old config
shotai Sep 10, 2020
803fcf4
Not init gpu memory when python on gpu is disabled
firestarman Sep 10, 2020
984082b
remove old config
shotai Sep 10, 2020
127ab08
Merge branch 'pandas-udf' of https://github.com/firestarman/spark-rap…
shotai Sep 10, 2020
6156298
import cudf lib normally
shotai Sep 10, 2020
beabf8b
update import cudf
shotai Sep 10, 2020
47ffc98
Check python module conf only when python gpu enabeld
firestarman Sep 10, 2020
b1c9be5
update dynamic config for udf enable
shotai Sep 10, 2020
9860ee6
Merge branch 'pandas-udf' of https://github.com/firestarman/spark-rap…
shotai Sep 10, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ Name | Description | Default Value
<a name="memory.gpu.reserve"></a>spark.rapids.memory.gpu.reserve|The amount of GPU memory that should remain unallocated by RMM and left for system use such as memory needed for kernels, kernel launches or JIT compilation.|1073741824
<a name="memory.host.spillStorageSize"></a>spark.rapids.memory.host.spillStorageSize|Amount of off-heap host memory to use for buffering spilled GPU data before spilling to local disk|1073741824
<a name="memory.pinnedPool.size"></a>spark.rapids.memory.pinnedPool.size|The size of the pinned memory pool in bytes unless otherwise specified. Use 0 to disable the pool.|0
<a name="python.concurrentPythonWorkers"></a>spark.rapids.python.concurrentPythonWorkers|Set the number of Python worker processes that can execute concurrently per GPU. Python worker processes may temporarily block when the number of concurrent Python worker processes started by the same executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors. >0 means enabled, while <=0 means unlimited|0
<a name="python.memory.gpu.allocFraction"></a>spark.rapids.python.memory.gpu.allocFraction|The fraction of total GPU memory that should be initially allocated for pooled memory for all the Python workers. It supposes to be less than (1 - $(spark.rapids.memory.gpu.allocFraction)), since the executor will share the GPU with its owning Python workers. Half of the rest will be used if not specified|None
<a name="python.memory.gpu.maxAllocFraction"></a>spark.rapids.python.memory.gpu.maxAllocFraction|The fraction of total GPU memory that limits the maximum size of the RMM pool for all the Python workers. It supposes to be less than (1 - $(spark.rapids.memory.gpu.maxAllocFraction)), since the executor will share the GPU with its owning Python workers. when setting to 0 it means no limit.|0.0
<a name="python.memory.gpu.pooling.enabled"></a>spark.rapids.python.memory.gpu.pooling.enabled|Should RMM in Python workers act as a pooling allocator for GPU memory, or should it just pass through to CUDA memory allocation directly. When not specified, It will honor the value of config 'spark.rapids.memory.gpu.pooling.enabled'|None
<a name="shuffle.transport.enabled"></a>spark.rapids.shuffle.transport.enabled|When set to true, enable the Rapids Shuffle Transport for accelerated shuffle.|false
<a name="shuffle.transport.maxReceiveInflightBytes"></a>spark.rapids.shuffle.transport.maxReceiveInflightBytes|Maximum aggregate amount of bytes that be fetched at any given time from peers during shuffle|1073741824
<a name="shuffle.ucx.managementServerHost"></a>spark.rapids.shuffle.ucx.managementServerHost|The host to be used to start the management server|null
Expand Down Expand Up @@ -63,6 +67,7 @@ Name | Description | Default Value
<a name="sql.improvedFloatOps.enabled"></a>spark.rapids.sql.improvedFloatOps.enabled|For some floating point operations spark uses one way to compute the value and the underlying cudf implementation can use an improved algorithm. In some cases this can result in cudf producing an answer when spark overflows. Because this is not as compatible with spark, we have it disabled by default.|false
<a name="sql.improvedTimeOps.enabled"></a>spark.rapids.sql.improvedTimeOps.enabled|When set to true, some operators will avoid overflowing by converting epoch days directly to seconds without first converting to microseconds|false
<a name="sql.incompatibleOps.enabled"></a>spark.rapids.sql.incompatibleOps.enabled|For operations that work, but are not 100% compatible with the Spark equivalent set if they should be enabled by default or disabled by default.|false
<a name="sql.python.gpu.enabled"></a>spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false
<a name="sql.reader.batchSizeBytes"></a>spark.rapids.sql.reader.batchSizeBytes|Soft limit on the maximum number of bytes the reader reads per batch. The readers will read chunks of data until this limit is met or exceeded. Note that the reader may estimate the number of bytes that will be used on the GPU in some cases based on the schema and number of rows in each batch.|2147483647
<a name="sql.reader.batchSizeRows"></a>spark.rapids.sql.reader.batchSizeRows|Soft limit on the maximum number of rows the reader will read per batch. The orc and parquet readers will read row groups until this limit is met or exceeded. The limit is respected by the csv reader.|2147483647
<a name="sql.replaceSortMergeJoin.enabled"></a>spark.rapids.sql.replaceSortMergeJoin.enabled|Allow replacing sortMergeJoin with HashJoin|true
Expand Down Expand Up @@ -167,6 +172,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.Or"></a>spark.rapids.sql.expression.Or|`or`|Logical OR|true|None|
<a name="sql.expression.Pmod"></a>spark.rapids.sql.expression.Pmod|`pmod`|Pmod|true|None|
<a name="sql.expression.Pow"></a>spark.rapids.sql.expression.Pow|`pow`, `power`|lhs ^ rhs|true|None|
<a name="sql.expression.PythonUDF"></a>spark.rapids.sql.expression.PythonUDF| |UDF run in an external python process. Does not actually run on the GPU, but the transfer of data to/from it can be accelerated.|true|None|
<a name="sql.expression.Quarter"></a>spark.rapids.sql.expression.Quarter|`quarter`|Returns the quarter of the year for date, in the range 1 to 4|true|None|
<a name="sql.expression.Rand"></a>spark.rapids.sql.expression.Rand|`random`, `rand`|Generate a random column with i.i.d. uniformly distributed values in [0, 1)|true|None|
<a name="sql.expression.RegExpReplace"></a>spark.rapids.sql.expression.RegExpReplace|`regexp_replace`|RegExpReplace support for string literal input patterns|true|None|
Expand Down Expand Up @@ -250,6 +256,12 @@ Name | Description | Default Value | Notes
<a name="sql.exec.CartesianProductExec"></a>spark.rapids.sql.exec.CartesianProductExec|Implementation of join using brute force|false|This is disabled by default because large joins can cause out of memory errors|
<a name="sql.exec.ShuffledHashJoinExec"></a>spark.rapids.sql.exec.ShuffledHashJoinExec|Implementation of join using hashed shuffled data|true|None|
<a name="sql.exec.SortMergeJoinExec"></a>spark.rapids.sql.exec.SortMergeJoinExec|Sort merge join, replacing with shuffled hash join|true|None|
<a name="sql.exec.AggregateInPandasExec"></a>spark.rapids.sql.exec.AggregateInPandasExec|The backend for Grouped Aggregation Pandas UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.ArrowEvalPythonExec"></a>spark.rapids.sql.exec.ArrowEvalPythonExec|The backend of the Scalar Pandas UDFs, it supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF, also accelerates the data transfer between the Java process and Python process|false|This is disabled by default because Performance is not ideal for UDFs that take a long time|
<a name="sql.exec.FlatMapCoGroupsInPandasExec"></a>spark.rapids.sql.exec.FlatMapCoGroupsInPandasExec|The backend for CoGrouped Aggregation Pandas UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.FlatMapGroupsInPandasExec"></a>spark.rapids.sql.exec.FlatMapGroupsInPandasExec|The backend for Grouped Map Pandas UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.MapInPandasExec"></a>spark.rapids.sql.exec.MapInPandasExec|The backend for Map Pandas Iterator UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.WindowInPandasExec"></a>spark.rapids.sql.exec.WindowInPandasExec|The backend for Pandas UDF with window functions, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.WindowExec"></a>spark.rapids.sql.exec.WindowExec|Window-operator backend|true|None|

### Scans
Expand Down
65 changes: 65 additions & 0 deletions docs/get-started/getting-started-on-prem.md
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,71 @@ This setting controls the amount of host memory (RAM) that can be utilized to sp
the GPU is out of memory, before going to disk. Please verify the [defaults](../configs.md).
- `spark.rapids.memory.host.spillStorageSize`

## GPU Scheduling For Pandas UDF
---
**NOTE**

The _GPU Scheduling for Pandas UDF_ is an experimental feature, and may change at any point it time.

---

_GPU Scheduling for Pandas UDF_ is built on Apache Spark's [Pandas UDF(user defined function)](https://spark.apache.org/docs/3.0.0/sql-pyspark-pandas-with-arrow.html#pandas-udfs-aka-vectorized-udfs), and has two components:

- **Share GPU with JVM**: Let the Python process share JVM GPU. The Python process could run on the same GPU with JVM.

- **Increase Speed**: Make the data transport faster between JVM process and Python process.



To enable _GPU Scheduling for Pandas UDF_, you need to configure your spark job with extra settings.

1. Make sure GPU exclusive mode is disabled. Note that this will not work if you are using exclusive mode to assign GPUs under spark.
2. Currently the python files are packed into the spark rapids plugin jar.

On Yarn, you need to add
```shell
...
--py-files ${SPARK_RAPIDS_PLUGIN_JAR}
```


On Standalone, you need to add
```shell
...
--conf spark.executorEnv.PYTHONPATH=rapids-4-spark_2.12-0.2.0-SNAPSHOT.jar \
--py-files ${SPARK_RAPIDS_PLUGIN_JAR}
```

3. Enable GPU Scheduling for Pandas UDF.

```shell
...
--conf spark.rapids.python.gpu.enabled=true \
--conf spark.rapids.python.memory.gpu.pooling.enabled=false \
--conf spark.rapids.sql.exec.ArrowEvalPythonExec=true \
--conf spark.rapids.sql.exec.MapInPandasExec=true \
--conf spark.rapids.sql.exec.FlatMapGroupsInPandasExec=true \
--conf spark.rapids.sql.exec.AggregateInPandasExec=true \
--conf spark.rapids.sql.exec.FlatMapCoGroupsInPandasExec=true \
--conf spark.rapids.sql.exec.WindowInPandasExec=true
```

Please note the data transfer acceleration only supports scalar UDF and Scalar iterator UDF currently.
You could choose the exec you need to enable.

### Other Configuration

Following configuration settings are also for _GPU Scheduling for Pandas UDF_
```
spark.rapids.python.concurrentPythonWorkers
spark.rapids.python.memory.gpu.allocFraction
spark.rapids.python.memory.gpu.maxAllocFraction
```

To find details on the above Python configuration settings, please see the [RAPIDS Accelerator for Apache Spark Configuration Guide](../configs.md).



## Advanced Configuration

See the [RAPIDS Accelerator for Apache Spark Configuration Guide](../configs.md) for details on all
Expand Down
10 changes: 10 additions & 0 deletions integration_tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ Should be enough to get the basics started.

`sre_yield` provides a set of APIs to generate string data from a regular expression.

### pandas
`pip install pandas`

`pandas` is a fast, powerful, flexible and easy to use open source data analysis and manipulation tool.

### pyarrow
`pip install pyarrow`

`pyarrow` provides a Python API for functionality provided by the Arrow C++ libraries, along with tools for Arrow integration and interoperability with pandas, NumPy, and other software in the Python ecosystem.

## Running

Running the tests follows the pytest conventions, the main difference is using
Expand Down
1 change: 1 addition & 0 deletions integration_tests/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ markers =
incompat: Enable incompat operators
limit(num_rows): Limit the number of rows that will be check in a result
qarun: Mark qa test
cudf_udf: Mark udf cudf test

1 change: 1 addition & 0 deletions integration_tests/src/main/python/marks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
incompat = pytest.mark.incompat
limit = pytest.mark.limit
qarun = pytest.mark.qarun
cudf_udf = pytest.mark.cudf_udf
Loading