Skip to content

Commit

Permalink
Support running Pandas UDFs on GPUs in Python processes. (#640)
Browse files Browse the repository at this point in the history
Add support to run Pandas UDFs on GPUs, mainly consisting of two things:

Overriding all the 6 related plans to build GPU context of device and memory for Python processes.
Introducing 2 new python modules rapids.worker and rapids.daemon to execute the GPU memory initialization by leveraging RMM Python APIs.

Signed-off-by: Firestarman <[email protected]>

Co-authored-by: Liangcai Li <[email protected]>
Co-authored-by: Robert (Bobby) Evans <[email protected]>
Co-authored-by: shotai <[email protected]>
  • Loading branch information
4 people authored Sep 11, 2020
1 parent 9582bc8 commit ade7a5f
Show file tree
Hide file tree
Showing 31 changed files with 3,143 additions and 14 deletions.
12 changes: 12 additions & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ Name | Description | Default Value
<a name="memory.gpu.reserve"></a>spark.rapids.memory.gpu.reserve|The amount of GPU memory that should remain unallocated by RMM and left for system use such as memory needed for kernels, kernel launches or JIT compilation.|1073741824
<a name="memory.host.spillStorageSize"></a>spark.rapids.memory.host.spillStorageSize|Amount of off-heap host memory to use for buffering spilled GPU data before spilling to local disk|1073741824
<a name="memory.pinnedPool.size"></a>spark.rapids.memory.pinnedPool.size|The size of the pinned memory pool in bytes unless otherwise specified. Use 0 to disable the pool.|0
<a name="python.concurrentPythonWorkers"></a>spark.rapids.python.concurrentPythonWorkers|Set the number of Python worker processes that can execute concurrently per GPU. Python worker processes may temporarily block when the number of concurrent Python worker processes started by the same executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors. >0 means enabled, while <=0 means unlimited|0
<a name="python.memory.gpu.allocFraction"></a>spark.rapids.python.memory.gpu.allocFraction|The fraction of total GPU memory that should be initially allocated for pooled memory for all the Python workers. It supposes to be less than (1 - $(spark.rapids.memory.gpu.allocFraction)), since the executor will share the GPU with its owning Python workers. Half of the rest will be used if not specified|None
<a name="python.memory.gpu.maxAllocFraction"></a>spark.rapids.python.memory.gpu.maxAllocFraction|The fraction of total GPU memory that limits the maximum size of the RMM pool for all the Python workers. It supposes to be less than (1 - $(spark.rapids.memory.gpu.maxAllocFraction)), since the executor will share the GPU with its owning Python workers. when setting to 0 it means no limit.|0.0
<a name="python.memory.gpu.pooling.enabled"></a>spark.rapids.python.memory.gpu.pooling.enabled|Should RMM in Python workers act as a pooling allocator for GPU memory, or should it just pass through to CUDA memory allocation directly. When not specified, It will honor the value of config 'spark.rapids.memory.gpu.pooling.enabled'|None
<a name="shuffle.transport.enabled"></a>spark.rapids.shuffle.transport.enabled|When set to true, enable the Rapids Shuffle Transport for accelerated shuffle.|false
<a name="shuffle.transport.maxReceiveInflightBytes"></a>spark.rapids.shuffle.transport.maxReceiveInflightBytes|Maximum aggregate amount of bytes that be fetched at any given time from peers during shuffle|1073741824
<a name="shuffle.ucx.managementServerHost"></a>spark.rapids.shuffle.ucx.managementServerHost|The host to be used to start the management server|null
Expand Down Expand Up @@ -65,6 +69,7 @@ Name | Description | Default Value
<a name="sql.improvedFloatOps.enabled"></a>spark.rapids.sql.improvedFloatOps.enabled|For some floating point operations spark uses one way to compute the value and the underlying cudf implementation can use an improved algorithm. In some cases this can result in cudf producing an answer when spark overflows. Because this is not as compatible with spark, we have it disabled by default.|false
<a name="sql.improvedTimeOps.enabled"></a>spark.rapids.sql.improvedTimeOps.enabled|When set to true, some operators will avoid overflowing by converting epoch days directly to seconds without first converting to microseconds|false
<a name="sql.incompatibleOps.enabled"></a>spark.rapids.sql.incompatibleOps.enabled|For operations that work, but are not 100% compatible with the Spark equivalent set if they should be enabled by default or disabled by default.|false
<a name="sql.python.gpu.enabled"></a>spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false
<a name="sql.reader.batchSizeBytes"></a>spark.rapids.sql.reader.batchSizeBytes|Soft limit on the maximum number of bytes the reader reads per batch. The readers will read chunks of data until this limit is met or exceeded. Note that the reader may estimate the number of bytes that will be used on the GPU in some cases based on the schema and number of rows in each batch.|2147483647
<a name="sql.reader.batchSizeRows"></a>spark.rapids.sql.reader.batchSizeRows|Soft limit on the maximum number of rows the reader will read per batch. The orc and parquet readers will read row groups until this limit is met or exceeded. The limit is respected by the csv reader.|2147483647
<a name="sql.replaceSortMergeJoin.enabled"></a>spark.rapids.sql.replaceSortMergeJoin.enabled|Allow replacing sortMergeJoin with HashJoin|true
Expand Down Expand Up @@ -169,6 +174,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.Or"></a>spark.rapids.sql.expression.Or|`or`|Logical OR|true|None|
<a name="sql.expression.Pmod"></a>spark.rapids.sql.expression.Pmod|`pmod`|Pmod|true|None|
<a name="sql.expression.Pow"></a>spark.rapids.sql.expression.Pow|`pow`, `power`|lhs ^ rhs|true|None|
<a name="sql.expression.PythonUDF"></a>spark.rapids.sql.expression.PythonUDF| |UDF run in an external python process. Does not actually run on the GPU, but the transfer of data to/from it can be accelerated.|true|None|
<a name="sql.expression.Quarter"></a>spark.rapids.sql.expression.Quarter|`quarter`|Returns the quarter of the year for date, in the range 1 to 4|true|None|
<a name="sql.expression.Rand"></a>spark.rapids.sql.expression.Rand|`random`, `rand`|Generate a random column with i.i.d. uniformly distributed values in [0, 1)|true|None|
<a name="sql.expression.RegExpReplace"></a>spark.rapids.sql.expression.RegExpReplace|`regexp_replace`|RegExpReplace support for string literal input patterns|true|None|
Expand Down Expand Up @@ -253,6 +259,12 @@ Name | Description | Default Value | Notes
<a name="sql.exec.CartesianProductExec"></a>spark.rapids.sql.exec.CartesianProductExec|Implementation of join using brute force|false|This is disabled by default because large joins can cause out of memory errors|
<a name="sql.exec.ShuffledHashJoinExec"></a>spark.rapids.sql.exec.ShuffledHashJoinExec|Implementation of join using hashed shuffled data|true|None|
<a name="sql.exec.SortMergeJoinExec"></a>spark.rapids.sql.exec.SortMergeJoinExec|Sort merge join, replacing with shuffled hash join|true|None|
<a name="sql.exec.AggregateInPandasExec"></a>spark.rapids.sql.exec.AggregateInPandasExec|The backend for Grouped Aggregation Pandas UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.ArrowEvalPythonExec"></a>spark.rapids.sql.exec.ArrowEvalPythonExec|The backend of the Scalar Pandas UDFs, it supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF, also accelerates the data transfer between the Java process and Python process|false|This is disabled by default because Performance is not ideal for UDFs that take a long time|
<a name="sql.exec.FlatMapCoGroupsInPandasExec"></a>spark.rapids.sql.exec.FlatMapCoGroupsInPandasExec|The backend for CoGrouped Aggregation Pandas UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.FlatMapGroupsInPandasExec"></a>spark.rapids.sql.exec.FlatMapGroupsInPandasExec|The backend for Grouped Map Pandas UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.MapInPandasExec"></a>spark.rapids.sql.exec.MapInPandasExec|The backend for Map Pandas Iterator UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.WindowInPandasExec"></a>spark.rapids.sql.exec.WindowInPandasExec|The backend for Pandas UDF with window functions, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.WindowExec"></a>spark.rapids.sql.exec.WindowExec|Window-operator backend|true|None|

### Scans
Expand Down
65 changes: 65 additions & 0 deletions docs/get-started/getting-started-on-prem.md
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,71 @@ This setting controls the amount of host memory (RAM) that can be utilized to sp
the GPU is out of memory, before going to disk. Please verify the [defaults](../configs.md).
- `spark.rapids.memory.host.spillStorageSize`

## GPU Scheduling For Pandas UDF
---
**NOTE**

The _GPU Scheduling for Pandas UDF_ is an experimental feature, and may change at any point it time.

---

_GPU Scheduling for Pandas UDF_ is built on Apache Spark's [Pandas UDF(user defined function)](https://spark.apache.org/docs/3.0.0/sql-pyspark-pandas-with-arrow.html#pandas-udfs-aka-vectorized-udfs), and has two components:

- **Share GPU with JVM**: Let the Python process share JVM GPU. The Python process could run on the same GPU with JVM.

- **Increase Speed**: Make the data transport faster between JVM process and Python process.



To enable _GPU Scheduling for Pandas UDF_, you need to configure your spark job with extra settings.

1. Make sure GPU exclusive mode is disabled. Note that this will not work if you are using exclusive mode to assign GPUs under spark.
2. Currently the python files are packed into the spark rapids plugin jar.

On Yarn, you need to add
```shell
...
--py-files ${SPARK_RAPIDS_PLUGIN_JAR}
```


On Standalone, you need to add
```shell
...
--conf spark.executorEnv.PYTHONPATH=rapids-4-spark_2.12-0.2.0-SNAPSHOT.jar \
--py-files ${SPARK_RAPIDS_PLUGIN_JAR}
```

3. Enable GPU Scheduling for Pandas UDF.

```shell
...
--conf spark.rapids.python.gpu.enabled=true \
--conf spark.rapids.python.memory.gpu.pooling.enabled=false \
--conf spark.rapids.sql.exec.ArrowEvalPythonExec=true \
--conf spark.rapids.sql.exec.MapInPandasExec=true \
--conf spark.rapids.sql.exec.FlatMapGroupsInPandasExec=true \
--conf spark.rapids.sql.exec.AggregateInPandasExec=true \
--conf spark.rapids.sql.exec.FlatMapCoGroupsInPandasExec=true \
--conf spark.rapids.sql.exec.WindowInPandasExec=true
```

Please note the data transfer acceleration only supports scalar UDF and Scalar iterator UDF currently.
You could choose the exec you need to enable.

### Other Configuration

Following configuration settings are also for _GPU Scheduling for Pandas UDF_
```
spark.rapids.python.concurrentPythonWorkers
spark.rapids.python.memory.gpu.allocFraction
spark.rapids.python.memory.gpu.maxAllocFraction
```
To find details on the above Python configuration settings, please see the [RAPIDS Accelerator for Apache Spark Configuration Guide](../configs.md).
## Advanced Configuration
See the [RAPIDS Accelerator for Apache Spark Configuration Guide](../configs.md) for details on all
Expand Down
10 changes: 10 additions & 0 deletions integration_tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ Should be enough to get the basics started.

`sre_yield` provides a set of APIs to generate string data from a regular expression.

### pandas
`pip install pandas`

`pandas` is a fast, powerful, flexible and easy to use open source data analysis and manipulation tool.

### pyarrow
`pip install pyarrow`

`pyarrow` provides a Python API for functionality provided by the Arrow C++ libraries, along with tools for Arrow integration and interoperability with pandas, NumPy, and other software in the Python ecosystem.

## Running

Running the tests follows the pytest conventions, the main difference is using
Expand Down
1 change: 1 addition & 0 deletions integration_tests/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ markers =
incompat: Enable incompat operators
limit(num_rows): Limit the number of rows that will be check in a result
qarun: Mark qa test
cudf_udf: Mark udf cudf test

1 change: 1 addition & 0 deletions integration_tests/src/main/python/marks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
incompat = pytest.mark.incompat
limit = pytest.mark.limit
qarun = pytest.mark.qarun
cudf_udf = pytest.mark.cudf_udf
Loading

0 comments on commit ade7a5f

Please sign in to comment.