Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the broadcast joins issues caused by InputFileBlockRule[databricks] #9673

Merged
merged 8 commits into from
Nov 20, 2023

Conversation

firestarman
Copy link
Collaborator

@firestarman firestarman commented Nov 10, 2023

close #9469

InputFileBlockRule may change the meta of a broadcast join and its child plans, and this change may break the rule of the broadcast join running on GPU, leading to errors. Because GPU broadcast joins require the build side BroadcastExchangeExec running on GPU, and similarly if BroadcastExchangeExec runs on CPU, the broadcast joins should also run on CPU.

Change made:

  1. Optimize the InputFileBlockRule by skipping the BroadcastExchangeLike because the file info cannot come from a broadcast. (This idea is from [BUG] InputFileBlock walks through broadcasts and does not deal with mismatched broadcasts #9473)
  2. Check the tagging for broadcast joins again after applying the InputFileBlockRule to fix the potential break.
  3. Some API refactor, moving all input file related methods into the InputFileBlockRule object.
  • Add tests
    I also tested the user case in the linked issue locally, and it can pass with this fix.

@firestarman firestarman changed the title Fix the broadcast join issues caused by InputFileBlockRule Fix the broadcast join issues caused by InputFileBlockRule[databricks] Nov 10, 2023
case _: InputFileName => true
case _: InputFileBlockStart => true
case _: InputFileBlockLength => true
case _: GpuInputFileName => true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm why do we still need to return true given it's already converted to Gpu case? Given the reason mentioned above is GPU plans may get incorrect file name or file start or file length from a CPU scan.

Copy link
Collaborator Author

@firestarman firestarman Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be used for two stages during the overiding process. The stage after inserting transitions for row and column may get a InputFileName or a GpuInputFileName.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concerning this issue, we will never get a GpuInputFileName since plan conversion does not happen.

@firestarman firestarman changed the title Fix the broadcast join issues caused by InputFileBlockRule[databricks] Fix the broadcast joins issues caused by InputFileBlockRule[databricks] Nov 10, 2023
Signed-off-by: Firestarman <[email protected]>
@firestarman
Copy link
Collaborator Author

build

1 similar comment
@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <[email protected]>
@firestarman
Copy link
Collaborator Author

build

@firestarman
Copy link
Collaborator Author

Hi @revans2 , could you take a look at this ? thx

@sameerz sameerz added the bug Something isn't working label Nov 14, 2023
winningsix
winningsix previously approved these changes Nov 15, 2023
Copy link
Collaborator

@winningsix winningsix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -78,6 +78,22 @@ abstract class GpuBroadcastHashJoinMetaBase(
}
}

// Called in runAfterTagRules for a special post tagging for this broadcast join.
def checkTagForBuildSide(): Unit = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make more sense to move this into GpuBroadcastJoinMeta?

Copy link
Collaborator Author

@firestarman firestarman Nov 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not do that because there are 4 shims for GpuBroadcastJoinMeta, which means I need to duplicate this code 4 times. The current option looks much simpler, only two times.

ids=["GpuParquetScan", "ParquetScan"])
@pytest.mark.parametrize("is_gpu_broadcast", [True, False],
ids=["GpuBroadcastExchange", "BroadcastExchange"])
def test_broadcast_hash_join_fix_fallback_by_inputfile(spark_tmp_path, is_gpu_parquet,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran these tests on the current 23.12 and test_broadcast_hash_join_fix_fallback_by_inputfile[BroadcastExchange-ParquetScan] produced the wrong answer, but test_broadcast_hash_join_fix_fallback_by_inputfile[GpuBroadcastExchange-ParquetScan] failed with not falling back as expected.

java.lang.AssertionError: assertion failed: Could not find BroadcastHashJoinExec in the Spark plan

test_broadcast_nested_join_fix_fallback_by_inputfile passed in all cases and none of them triggered the error as described in #9469

Can we please add in a test that is the same as #9469 so we can be sure that it is fixed?

Copy link
Collaborator Author

@firestarman firestarman Nov 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case in #9469 requires Iceberg to run, so we can not test this for Spark 330+, is it OK?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the tests, now they can reproduce the same error as #9469 on the current 23.12.

E                               raise Py4JJavaError(
                        "An error occurred while calling {0}{1}{2}.\n".
>                       format(target_id, ".", name), value)
E                   py4j.protocol.Py4JJavaError: An error occurred while calling o659.collectToPython.
E                   : java.lang.IllegalStateException: the broadcast must be on the GPU too
E                   	at com.nvidia.spark.rapids.shims.GpuBroadcastJoinMeta.verifyBuildSideWasReplaced(GpuBroadcastJoinMeta.scala:72)
E                   	at org.apache.spark.sql.rapids.execution.GpuBroadcastNestedLoopJoinMeta.convertToGpu(GpuBroadcastNestedLoopJoinExec.scala:59)
E                   	at org.apache.spark.sql.rapids.execution.GpuBroadcastNestedLoopJoinMeta.convertToGpu(GpuBroadcastNestedLoopJoinExec.scala:45)
E                   	at com.nvidia.spark.rapids.SparkPlanMeta.convertIfNeeded(RapidsMeta.scala:799)





                    raise Py4JJavaError(
                        "An error occurred while calling {0}{1}{2}.\n".
>                       format(target_id, ".", name), value)
E                   py4j.protocol.Py4JJavaError: An error occurred while calling o663.collectToPython.
E                   : java.lang.IllegalStateException: the broadcast must be on the GPU too
E                   	at com.nvidia.spark.rapids.shims.GpuBroadcastJoinMeta.verifyBuildSideWasReplaced(GpuBroadcastJoinMeta.scala:72)
E                   	at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinMeta.convertToGpu(GpuBroadcastHashJoinExec.scala:63)
E                   	at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinMeta.convertToGpu(GpuBroadcastHashJoinExec.scala:44)
E                   	at com.nvidia.spark.rapids.SparkPlanMeta.convertIfNeeded(RapidsMeta.scala:799)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@revans2 Could you take a look again? Thx in advance.

Signed-off-by: Firestarman <[email protected]>
Signed-off-by: Firestarman <[email protected]>
@firestarman
Copy link
Collaborator Author

build

@firestarman
Copy link
Collaborator Author

The failing test is not related, try again

@firestarman
Copy link
Collaborator Author

build

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks better. I have not manually run the tests yet. But it looks correct.

@firestarman firestarman merged commit 9ed98c8 into NVIDIA:branch-23.12 Nov 20, 2023
37 checks passed
@firestarman firestarman deleted the fix-join-inputfile branch November 20, 2023 02:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
4 participants