Skip to content

Commit

Permalink
240701 repartition agg (NVIDIA#41)
Browse files Browse the repository at this point in the history
* workable version without tests

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* doc

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* fix scala 2.13

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* fix compile

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* fix it

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* enable it

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* metric name

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* minor

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* change seed

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

---------

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
Co-authored-by: Hongbin Ma (Mahone) <[email protected]>
  • Loading branch information
wjxiz1992 and binmahone authored Jul 3, 2024
1 parent a39502f commit d347cbb
Show file tree
Hide file tree
Showing 6 changed files with 347 additions and 128 deletions.
1 change: 1 addition & 0 deletions docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Name | Description | Default Value | Applicable at
<a name="shuffle.ucx.activeMessages.forceRndv"></a>spark.rapids.shuffle.ucx.activeMessages.forceRndv|Set to true to force 'rndv' mode for all UCX Active Messages. This should only be required with UCX 1.10.x. UCX 1.11.x deployments should set to false.|false|Startup
<a name="shuffle.ucx.managementServerHost"></a>spark.rapids.shuffle.ucx.managementServerHost|The host to be used to start the management server|null|Startup
<a name="shuffle.ucx.useWakeup"></a>spark.rapids.shuffle.ucx.useWakeup|When set to true, use UCX's event-based progress (epoll) in order to wake up the progress thread when needed, instead of a hot loop.|true|Startup
<a name="sql.agg.fallbackAlgorithm"></a>spark.rapids.sql.agg.fallbackAlgorithm|When agg cannot be done in a single pass, use sort-based fallback or repartition-based fallback.|sort|Runtime
<a name="sql.agg.skipAggPassReductionRatio"></a>spark.rapids.sql.agg.skipAggPassReductionRatio|In non-final aggregation stages, if the previous pass has a row reduction ratio greater than this value, the next aggregation pass will be skipped.Setting this to 1 essentially disables this feature.|1.0|Runtime
<a name="sql.allowMultipleJars"></a>spark.rapids.sql.allowMultipleJars|Allow multiple rapids-4-spark, spark-rapids-jni, and cudf jars on the classpath. Spark will take the first one it finds, so the version may not be expected. Possisble values are ALWAYS: allow all jars, SAME_REVISION: only allow jars with the same revision, NEVER: do not allow multiple jars at all.|SAME_REVISION|Startup
<a name="sql.castDecimalToFloat.enabled"></a>spark.rapids.sql.castDecimalToFloat.enabled|Casting from decimal to floating point types on the GPU returns results that have tiny difference compared to results returned from CPU.|true|Runtime
Expand Down
7 changes: 6 additions & 1 deletion integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
_float_conf_skipagg = copy_and_update(_float_smallbatch_conf,
{'spark.rapids.sql.agg.skipAggPassReductionRatio': '0'})

_float_conf_repartition_fallback = copy_and_update(_float_smallbatch_conf,
{'spark.rapids.sql.agg.fallbackAlgorithm': 'repartition'})

_float_conf_partial = copy_and_update(_float_conf,
{'spark.rapids.sql.hashAgg.replaceMode': 'partial'})

Expand Down Expand Up @@ -225,7 +228,9 @@ def get_params(init_list, marked_params=[]):


# Run these tests with in 5 modes, all on the GPU
_confs = [_float_conf, _float_smallbatch_conf, _float_conf_skipagg, _float_conf_final, _float_conf_partial]
_confs = [_float_conf, _float_smallbatch_conf,
_float_conf_skipagg, _float_conf_repartition_fallback,
_float_conf_final, _float_conf_partial]

# Pytest marker for list of operators allowed to run on the CPU,
# esp. useful in partial and final only modes.
Expand Down
16 changes: 15 additions & 1 deletion sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.nvidia.spark.rapids

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.util.control.ControlThrowable

import com.nvidia.spark.rapids.RapidsPluginImplicits._
Expand Down Expand Up @@ -134,6 +134,20 @@ object Arm extends ArmScalaSpecificImpl {
}
}

/** Executes the provided code block, closing the resources only if an exception occurs */
def closeOnExcept[T <: AutoCloseable, V](r: ListBuffer[T])(block: ListBuffer[T] => V): V = {
try {
block(r)
} catch {
case t: ControlThrowable =>
// Don't close for these cases..
throw t
case t: Throwable =>
r.safeClose(t)
throw t
}
}


/** Executes the provided code block, closing the resources only if an exception occurs */
def closeOnExcept[T <: AutoCloseable, V](r: mutable.Queue[T])(block: mutable.Queue[T] => V): V = {
Expand Down
Loading

0 comments on commit d347cbb

Please sign in to comment.