Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added in basic support for broadcast nested loop join #296

Merged
merged 3 commits into from
Jun 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ incompatibilities.

### Expressions

Name | Description | Default Value | Incompatibilities
Name | Description | Default Value | Notes
-----|-------------|---------------|------------------
<a name="sql.expression.Abs"></a>spark.rapids.sql.expression.Abs|absolute value|true|None|
<a name="sql.expression.Acos"></a>spark.rapids.sql.expression.Acos|inverse cosine|true|None|
Expand Down Expand Up @@ -218,7 +218,7 @@ Name | Description | Default Value | Incompatibilities

### Execution

Name | Description | Default Value | Incompatibilities
Name | Description | Default Value | Notes
-----|-------------|---------------|------------------
<a name="sql.exec.CoalesceExec"></a>spark.rapids.sql.exec.CoalesceExec|The backend for the dataframe coalesce method|true|None|
<a name="sql.exec.CollectLimitExec"></a>spark.rapids.sql.exec.CollectLimitExec|Reduce to single partition and apply limit|true|None|
Expand All @@ -238,21 +238,22 @@ Name | Description | Default Value | Incompatibilities
<a name="sql.exec.BroadcastExchangeExec"></a>spark.rapids.sql.exec.BroadcastExchangeExec|The backend for broadcast exchange of data|true|None|
<a name="sql.exec.ShuffleExchangeExec"></a>spark.rapids.sql.exec.ShuffleExchangeExec|The backend for most data being exchanged between processes|true|None|
<a name="sql.exec.BroadcastHashJoinExec"></a>spark.rapids.sql.exec.BroadcastHashJoinExec|Implementation of join using broadcast data|true|None|
<a name="sql.exec.BroadcastNestedLoopJoinExec"></a>spark.rapids.sql.exec.BroadcastNestedLoopJoinExec|Implementation of join using brute force|false|This is disabled by default because large joins can cause out of memory errors|
<a name="sql.exec.ShuffledHashJoinExec"></a>spark.rapids.sql.exec.ShuffledHashJoinExec|Implementation of join using hashed shuffled data|true|None|
<a name="sql.exec.SortMergeJoinExec"></a>spark.rapids.sql.exec.SortMergeJoinExec|Sort merge join, replacing with shuffled hash join|true|None|
<a name="sql.exec.WindowExec"></a>spark.rapids.sql.exec.WindowExec|Window-operator backend|true|None|

### Scans

Name | Description | Default Value | Incompatibilities
Name | Description | Default Value | Notes
-----|-------------|---------------|------------------
<a name="sql.input.CSVScan"></a>spark.rapids.sql.input.CSVScan|CSV parsing|true|None|
<a name="sql.input.OrcScan"></a>spark.rapids.sql.input.OrcScan|ORC parsing|true|None|
<a name="sql.input.ParquetScan"></a>spark.rapids.sql.input.ParquetScan|Parquet parsing|true|None|

### Partitioning

Name | Description | Default Value | Incompatibilities
Name | Description | Default Value | Notes
-----|-------------|---------------|------------------
<a name="sql.partitioning.HashPartitioning"></a>spark.rapids.sql.partitioning.HashPartitioning|Hash based partitioning|true|None|
<a name="sql.partitioning.RangePartitioning"></a>spark.rapids.sql.partitioning.RangePartitioning|Range Partitioning|true|None|
Expand Down
34 changes: 29 additions & 5 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def do_join(spark):
# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti',
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross',
pytest.param('FullOuter', marks=[pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/280')])], ids=idfn)
def test_sortmerge_join(data_gen, join_type):
def do_join(spark):
Expand All @@ -79,20 +79,44 @@ def do_join(spark):
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
# Not all join types can be translated to a broadcast join, but this tests them to be sure we
# can handle what spark is doing
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti',
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross',
pytest.param('FullOuter', marks=[pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/280')])], ids=idfn)
def test_broadcast_join_right_table(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
return left.join(broadcast(right), left.a == right.r_a, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
def test_broadcast_nested_loop_join(data_gen):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
return left.crossJoin(broadcast(right))
assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.exec.BroadcastNestedLoopJoinExec': 'true'})

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Inner', 'Cross'], ids=idfn)
def test_broadcast_nested_loop_join_with_conditionals(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
# This test is impacted by https://github.com/NVIDIA/spark-rapids/issues/294
# if the sizes are large enough to have both 0.0 and -0.0 show up 500 and 250
# but these take a long time to verify so we run with smaller numbers by default
# that do not expose the error
return left.join(broadcast(right),
(left.b >= right.r_b), join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.exec.BroadcastNestedLoopJoinExec': 'true'})

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
# Not all join types can be translated to a broadcast join, but this tests them to be sure we
# can handle what spark is doing
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti',
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross',
pytest.param('FullOuter', marks=[pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/280')])], ids=idfn)
def test_broadcast_join_left_table(data_gen, join_type):
def do_join(spark):
Expand All @@ -103,7 +127,7 @@ def do_join(spark):
# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Inner'], ids=idfn)
@pytest.mark.parametrize('join_type', ['Inner', 'Cross'], ids=idfn)
def test_broadcast_join_with_conditionals(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
Expand All @@ -118,7 +142,7 @@ def do_join(spark):
('b', StringGen()), ('c', BooleanGen())]

@ignore_order
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'FullOuter'], ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'FullOuter', 'Cross'], ids=idfn)
def test_broadcast_join_mixed(join_type):
def do_join(spark):
left = gen_df(spark, _mixed_df1_with_nulls, length=500)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class JoinsSuite extends SparkQueryCompareTestSuite {
.set("spark.sql.autoBroadcastJoinThreshold", "160")
.set("spark.sql.join.preferSortMergeJoin", "false")
.set("spark.sql.shuffle.partitions", "2") // hack to try and work around bug in cudf
.set("spark.rapids.sql.exec.BroadcastNestedLoopJoinExec", "true")

IGNORE_ORDER_testSparkResultsAreEqual2("Test hash join", longsDf, biggerLongsDf,
conf = shuffledJoinConf) {
Expand All @@ -63,6 +64,11 @@ class JoinsSuite extends SparkQueryCompareTestSuite {
(A, B) => A.join(B, A("longs") === B("longs"), "FullOuter")
}

IGNORE_ORDER_testSparkResultsAreEqual2("Test cross join", longsDf, biggerLongsDf,
conf = shuffledJoinConf) {
(A, B) => A.join(B.hint("broadcast"), A("longs") < B("longs"), "Cross")
}

// test replacement of sort merge join with hash join
// make sure broadcast size small enough it doesn't get used
testSparkResultsAreEqual2("Test replace sort merge join with hash join",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import ai.rapids.cudf.{NvtxColor, Table}

import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, Inner, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, InnerLike, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, HashJoin}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
Expand All @@ -31,7 +31,7 @@ object GpuHashJoin {
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
condition: Option[Expression]): Unit = joinType match {
case Inner =>
case _: InnerLike =>
case FullOuter =>
if (leftKeys.exists(_.nullable) || rightKeys.exists(_.nullable)) {
// https://github.com/rapidsai/cudf/issues/5563
Expand Down Expand Up @@ -227,7 +227,7 @@ trait GpuHashJoin extends GpuExec with HashJoin {
.leftJoin(rightTable.onColumns(joinKeyIndices: _*))
case RightOuter => rightTable.onColumns(joinKeyIndices: _*)
.leftJoin(leftTable.onColumns(joinKeyIndices: _*))
case Inner =>
case _: InnerLike =>
leftTable.onColumns(joinKeyIndices: _*).innerJoin(rightTable.onColumns(joinKeyIndices: _*))
case LeftSemi =>
leftTable.onColumns(joinKeyIndices: _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,17 @@ import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.rapids._
import org.apache.spark.sql.rapids.catalyst.expressions.GpuRand
import org.apache.spark.sql.rapids.execution.{GpuBroadcastHashJoinMeta, GpuBroadcastMeta}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastHashJoinMeta, GpuBroadcastMeta, GpuBroadcastNestedLoopJoinMeta}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

/**
* Base class for all ReplacementRules
* @param doWrap wraps a part of the plan in a [[RapidsMeta]] for further processing.
* @param incomDoc docs explaining if this rule produces an GPU version that is incompatible with
* the CPU version in some way.
* @param desc a description of what this part of the plan does.
* @param tag metadata used to determine what INPUT is at runtime.
* @tparam INPUT the exact type of the class we are wrapping.
Expand All @@ -67,19 +65,32 @@ abstract class ReplacementRule[INPUT <: BASE, BASE, WRAP_TYPE <: RapidsMeta[INPU
RapidsConf,
Option[RapidsMeta[_, _, _]],
ConfKeysAndIncompat) => WRAP_TYPE,
protected var incomDoc: Option[String],
protected var desc: String,
final val tag: ClassTag[INPUT]) extends ConfKeysAndIncompat {

override def incompatDoc: Option[String] = incomDoc
private var _incompatDoc: Option[String] = None
private var _disabledDoc: Option[String] = None

override def incompatDoc: Option[String] = _incompatDoc
override def disabledMsg: Option[String] = _disabledDoc

/**
* Mark this expression as incompatible with the original Spark version
* @param str a description of how it is incompatible.
* @return this for chaining.
*/
final def incompat(str: String) : this.type = {
incomDoc = Some(str)
_incompatDoc = Some(str)
this
}

/**
* Mark this expression as disabled by default.
* @param str a description of why it is disabled by default.
* @return this for chaining.
*/
final def disabledByDefault(str: String) : this.type = {
_disabledDoc = Some(str)
this
}

Expand Down Expand Up @@ -108,8 +119,6 @@ abstract class ReplacementRule[INPUT <: BASE, BASE, WRAP_TYPE <: RapidsMeta[INPU
this
}

def isIncompat: Boolean = incompatDoc.isDefined

private var confKeyCache: String = null
protected val confKeyPart: String

Expand All @@ -120,19 +129,21 @@ abstract class ReplacementRule[INPUT <: BASE, BASE, WRAP_TYPE <: RapidsMeta[INPU
confKeyCache
}

private def isIncompatMsg(): Option[String] = if (incompatDoc.isDefined) {
private def notes(): Option[String] = if (incompatDoc.isDefined) {
Some(s"This is not 100% compatible with the Spark version because ${incompatDoc.get}")
} else if (disabledMsg.isDefined) {
Some(s"This is disabled by default because ${disabledMsg.get}")
} else {
None
}

def confHelp(asTable: Boolean = false): Unit = {
val incompatMsg = isIncompatMsg()
val notesMsg = notes()
if (asTable) {
import ConfHelper.makeConfAnchor
print(s"${makeConfAnchor(confKey)}|$desc|${incompatMsg.isEmpty}|")
if (incompatMsg.isDefined) {
print(s"${incompatMsg.get}")
print(s"${makeConfAnchor(confKey)}|$desc|${notesMsg.isEmpty}|")
if (notesMsg.isDefined) {
print(s"${notesMsg.get}")
} else {
print("None")
}
Expand All @@ -141,10 +152,10 @@ abstract class ReplacementRule[INPUT <: BASE, BASE, WRAP_TYPE <: RapidsMeta[INPU
println(s"$confKey:")
println(s"\tEnable (true) or disable (false) the $tag $operationName.")
println(s"\t$desc")
if (incompatMsg.isDefined) {
println(s"\t${incompatMsg.get}")
if (notesMsg.isDefined) {
println(s"\t${notesMsg.get}")
}
println(s"\tdefault: ${incompatDoc.isEmpty}")
println(s"\tdefault: ${notesMsg.isEmpty}")
println()
}
}
Expand All @@ -169,11 +180,9 @@ class ExprRule[INPUT <: Expression](
RapidsConf,
Option[RapidsMeta[_, _, _]],
ConfKeysAndIncompat) => BaseExprMeta[INPUT],
incompatDoc: Option[String],
desc: String,
tag: ClassTag[INPUT])
extends ReplacementRule[INPUT, Expression, BaseExprMeta[INPUT]](doWrap,
incompatDoc, desc, tag) {
extends ReplacementRule[INPUT, Expression, BaseExprMeta[INPUT]](doWrap, desc, tag) {

override val confKeyPart = "expression"
override val operationName = "Expression"
Expand All @@ -188,11 +197,9 @@ class ScanRule[INPUT <: Scan](
RapidsConf,
Option[RapidsMeta[_, _, _]],
ConfKeysAndIncompat) => ScanMeta[INPUT],
incompatDoc: Option[String],
desc: String,
tag: ClassTag[INPUT])
extends ReplacementRule[INPUT, Scan, ScanMeta[INPUT]](
doWrap, incompatDoc, desc, tag) {
extends ReplacementRule[INPUT, Scan, ScanMeta[INPUT]](doWrap, desc, tag) {

override val confKeyPart: String = "input"
override val operationName: String = "Input"
Expand All @@ -207,11 +214,9 @@ class PartRule[INPUT <: Partitioning](
RapidsConf,
Option[RapidsMeta[_, _, _]],
ConfKeysAndIncompat) => PartMeta[INPUT],
incompatDoc: Option[String],
desc: String,
tag: ClassTag[INPUT])
extends ReplacementRule[INPUT, Partitioning,
PartMeta[INPUT]](doWrap, incompatDoc, desc, tag) {
extends ReplacementRule[INPUT, Partitioning, PartMeta[INPUT]](doWrap, desc, tag) {

override val confKeyPart: String = "partitioning"
override val operationName: String = "Partitioning"
Expand All @@ -226,10 +231,9 @@ class ExecRule[INPUT <: SparkPlan](
RapidsConf,
Option[RapidsMeta[_, _, _]],
ConfKeysAndIncompat) => SparkPlanMeta[INPUT],
incompatDoc: Option[String],
desc: String,
tag: ClassTag[INPUT])
extends ReplacementRule[INPUT, SparkPlan, SparkPlanMeta[INPUT]](doWrap, incompatDoc, desc, tag) {
extends ReplacementRule[INPUT, SparkPlan, SparkPlanMeta[INPUT]](doWrap, desc, tag) {

override val confKeyPart: String = "exec"
override val operationName: String = "Exec"
Expand All @@ -245,11 +249,10 @@ class DataWritingCommandRule[INPUT <: DataWritingCommand](
RapidsConf,
Option[RapidsMeta[_, _, _]],
ConfKeysAndIncompat) => DataWritingCommandMeta[INPUT],
incompatDoc: Option[String],
desc: String,
tag: ClassTag[INPUT])
extends ReplacementRule[INPUT, DataWritingCommand, DataWritingCommandMeta[INPUT]](
doWrap, incompatDoc, desc, tag) {
doWrap, desc, tag) {

override val confKeyPart: String = "output"
override val operationName: String = "Output"
Expand Down Expand Up @@ -397,7 +400,7 @@ object GpuOverrides {
(implicit tag: ClassTag[INPUT]): ExprRule[INPUT] = {
assert(desc != null)
assert(doWrap != null)
new ExprRule[INPUT](doWrap, None, desc, tag)
new ExprRule[INPUT](doWrap, desc, tag)
}

def scan[INPUT <: Scan](
Expand All @@ -407,7 +410,7 @@ object GpuOverrides {
(implicit tag: ClassTag[INPUT]): ScanRule[INPUT] = {
assert(desc != null)
assert(doWrap != null)
new ScanRule[INPUT](doWrap, None, desc, tag)
new ScanRule[INPUT](doWrap, desc, tag)
}

def part[INPUT <: Partitioning](
Expand All @@ -417,7 +420,7 @@ object GpuOverrides {
(implicit tag: ClassTag[INPUT]): PartRule[INPUT] = {
assert(desc != null)
assert(doWrap != null)
new PartRule[INPUT](doWrap, None, desc, tag)
new PartRule[INPUT](doWrap, desc, tag)
}

def exec[INPUT <: SparkPlan](
Expand All @@ -427,7 +430,7 @@ object GpuOverrides {
(implicit tag: ClassTag[INPUT]): ExecRule[INPUT] = {
assert(desc != null)
assert(doWrap != null)
new ExecRule[INPUT](doWrap, None, desc, tag)
new ExecRule[INPUT](doWrap, desc, tag)
}

def dataWriteCmd[INPUT <: DataWritingCommand](
Expand All @@ -437,7 +440,7 @@ object GpuOverrides {
(implicit tag: ClassTag[INPUT]): DataWritingCommandRule[INPUT] = {
assert(desc != null)
assert(doWrap != null)
new DataWritingCommandRule[INPUT](doWrap, None, desc, tag)
new DataWritingCommandRule[INPUT](doWrap, desc, tag)
}

def wrapExpr[INPUT <: Expression](
Expand Down Expand Up @@ -1713,6 +1716,10 @@ object GpuOverrides {
exec[ShuffledHashJoinExec](
"Implementation of join using hashed shuffled data",
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r)),
exec[BroadcastNestedLoopJoinExec](
"Implementation of join using brute force",
(join, conf, p, r) => new GpuBroadcastNestedLoopJoinMeta(join, conf, p, r))
.disabledByDefault("large joins can cause out of memory errors"),
exec[SortMergeJoinExec](
"Sort merge join, replacing with shuffled hash join",
(join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)),
Expand Down
Loading