Skip to content

Commit

Permalink
Merge pull request NVIDIA#2148 from NVIDIA/branch-0.5
Browse files Browse the repository at this point in the history
[auto-merge] branch-0.5 to branch-0.6 [skip ci] [bot]
  • Loading branch information
nvauto authored Apr 15, 2021
2 parents dee1b64 + 357759b commit 98091fc
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 12 deletions.
6 changes: 3 additions & 3 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -584,9 +584,9 @@ Accelerator supports are described below.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
<td><em>PS* (Round-robin partitioning is not supported for arrays if spark.sql.execution.sortBeforeRepartition is true; missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
<td><em>PS* (Round-robin partitioning is not supported for maps if spark.sql.execution.sortBeforeRepartition is true; missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
<td><em>PS* (Round-robin partitioning is not supported for nested structs if spark.sql.execution.sortBeforeRepartition is true; missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down
15 changes: 10 additions & 5 deletions integration_tests/src/main/python/repart_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,19 @@ def test_coalesce_df(num_parts, length):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, gen_list, length=length).coalesce(num_parts))

@pytest.mark.parametrize('num_parts', [1, 10, 100, 1000, 2000], ids=idfn)
@pytest.mark.parametrize('data_gen', [
pytest.param([('_c' + str(i), gen) for i, gen in enumerate(all_basic_gens + decimal_gens)]),
pytest.param([('s', StructGen([['child0', all_basic_struct_gen]]))]),
pytest.param([('a', ArrayGen(string_gen))]),
], ids=idfn)
@pytest.mark.parametrize('num_parts', [1, 10, 2345], ids=idfn)
@pytest.mark.parametrize('length', [0, 2048, 4096], ids=idfn)
@ignore_order(local=True) # To avoid extra data shuffle by 'sort on Spark' for this repartition test.
def test_repartion_df(num_parts, length):
#This should change eventually to be more than just the basic gens
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(all_basic_gens + decimal_gens)]
def test_repartition_df(data_gen, num_parts, length):
from pyspark.sql.functions import lit
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, gen_list, length=length).repartition(num_parts),
# Add a computed column to avoid shuffle being optimized back to a CPU shuffle
lambda spark : gen_df(spark, data_gen, length=length).withColumn('x', lit(1)).repartition(num_parts),
conf = allow_negative_scale_of_decimal_conf)

@ignore_order(local=True) # To avoid extra data shuffle by 'sort on Spark' for this repartition test.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,7 @@ object GpuOverrides {

private[this] val _commonTypes = TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL

private[this] val pluginSupportedOrderableSig = _commonTypes +
TypeSig.STRUCT.nested(_commonTypes)
val pluginSupportedOrderableSig: TypeSig = _commonTypes + TypeSig.STRUCT.nested(_commonTypes)

private[this] def isStructType(dataType: DataType) = dataType match {
case StructType(_) => true
Expand Down Expand Up @@ -2745,7 +2744,14 @@ object GpuOverrides {
exec[ShuffleExchangeExec](
"The backend for most data being exchanged between processes",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.ARRAY +
TypeSig.STRUCT).nested(), TypeSig.all),
TypeSig.STRUCT).nested()
.withPsNote(TypeEnum.STRUCT, "Round-robin partitioning is not supported for nested " +
s"structs if ${SQLConf.SORT_BEFORE_REPARTITION.key} is true")
.withPsNote(TypeEnum.ARRAY, "Round-robin partitioning is not supported for arrays if " +
s"${SQLConf.SORT_BEFORE_REPARTITION.key} is true")
.withPsNote(TypeEnum.MAP, "Round-robin partitioning is not supported for maps if " +
s"${SQLConf.SORT_BEFORE_REPARTITION.key} is true"),
TypeSig.all),
(shuffle, conf, p, r) => new GpuShuffleMeta(shuffle, conf, p, r)),
exec[UnionExec](
"The backend for the union operator",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, RoundRobinPartitioning}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
import org.apache.spark.sql.execution.metric._
Expand All @@ -53,6 +53,16 @@ class GpuShuffleMeta(
// when AQE is enabled and we are planning a new query stage, we need to look at meta-data
// previously stored on the spark plan to determine whether this exchange can run on GPU
wrapped.getTagValue(gpuSupportedTag).foreach(_.foreach(willNotWorkOnGpu))

if (shuffle.outputPartitioning.isInstanceOf[RoundRobinPartitioning] &&
shuffle.sqlContext.conf.sortBeforeRepartition) {
val orderableTypes = GpuOverrides.pluginSupportedOrderableSig
shuffle.output.map(_.dataType)
.filterNot(orderableTypes.isSupportedByPlugin(_, conf.decimalTypeEnabled))
.foreach { dataType =>
willNotWorkOnGpu(s"round-robin partitioning cannot sort $dataType")
}
}
}

override def convertToGpu(): GpuExec =
Expand Down

0 comments on commit 98091fc

Please sign in to comment.