Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark Runner: Change to use partitioner in GroupNonMergingWindowsFunctions#groupByKeyInGlobalWindow #32610

Merged

Conversation

twosom
Copy link
Contributor

@twosom twosom commented Oct 1, 2024

Please add a meaningful description for your change here

fixes #32608

This PR contains these changes

  • change GroupNonMergingWindowsFunctions#groupByKeyInGlobalWindow to use partitioner
  • add related test

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@twosom twosom changed the title Group by key in global window using partitioner Spark Runner: Change to use partitioner in GroupNonMergingWindowsFunctions#groupByKeyInGlobalWindow Oct 1, 2024
@github-actions github-actions bot added the build label Oct 1, 2024
Copy link
Contributor

github-actions bot commented Oct 1, 2024

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@twosom
Copy link
Contributor Author

twosom commented Oct 1, 2024

Run Java PreCommit

@twosom
Copy link
Contributor Author

twosom commented Oct 1, 2024

I think task fails is not related to this PR.

@twosom
Copy link
Contributor Author

twosom commented Oct 1, 2024

Run Java PreCommit

1 similar comment
@twosom
Copy link
Contributor Author

twosom commented Oct 1, 2024

Run Java PreCommit

Copy link
Contributor

github-actions bot commented Oct 1, 2024

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @damccorm for label build.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Contributor

@je-ik je-ik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall, just one question from higher-level perspective. Given that the only partitioner that is ever passed to this method is returned by

  private static @Nullable Partitioner getPartitioner(SparkTranslationContext context) {
    Long bundleSize =
        context.serializablePipelineOptions.get().as(SparkPipelineOptions.class).getBundleSize();
    return (bundleSize > 0)
        ? null
        : new HashPartitioner(context.getSparkContext().defaultParallelism());
  }

does this change have any practical impact? I would expect that the HashPartitioner with default parallelism is what would be used when the partitioner is unused, is this assumption wrong?

@@ -1,5 +1,6 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test"
"https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test",
"https://github.com/apache/beam/pull/32610": "noting that PR #32610 should run this test"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove these lines, the purpose of the file is to be just touched somehow for the tests to run. We might replace the old comment with a single one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@je-ik
Thanks! I rollbacked trigger files!

@twosom
Copy link
Contributor Author

twosom commented Oct 2, 2024

LGTM overall, just one question from higher-level perspective. Given that the only partitioner that is ever passed to this method is returned by

  private static @Nullable Partitioner getPartitioner(SparkTranslationContext context) {
    Long bundleSize =
        context.serializablePipelineOptions.get().as(SparkPipelineOptions.class).getBundleSize();
    return (bundleSize > 0)
        ? null
        : new HashPartitioner(context.getSparkContext().defaultParallelism());
  }

does this change have any practical impact? I would expect that the HashPartitioner with default parallelism is what would be used when the partitioner is unused, is this assumption wrong?

Yes, as you mentioned, following the code ultimately leads to using the HashPartitioner.
However, I assumed that the partitioner would be passed as a parameter to the GroupNonMergingWindowsFunctions#groupByKeyInGlobalWindow method, so that it could be used if necessary.
While the current changes may not have a significant practical impact, this decision was made with future extensibility in mind, allowing the use of custom partitioners for specific Apache Beam PTransforms.
Of course, this may not be necessary in the end, but I wanted to leave room for this kind of flexibility.
Thank you.

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems OK to me. I defer to Jan as well.

@github-actions github-actions bot added build and removed build labels Oct 4, 2024
@je-ik je-ik merged commit a2710ed into apache:master Oct 4, 2024
17 checks passed
@je-ik
Copy link
Contributor

je-ik commented Oct 4, 2024

Thanks @twosom!

reeba212 pushed a commit to reeba212/beam that referenced this pull request Dec 4, 2024
…tions#groupByKeyInGlobalWindow (apache#32610)

* change GroupNonMergingWindowsFunctions#groupByKeyInGlobalWindow to use partitioner
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Task]: Spark Runner GroupNonMergingWindowsFunctions#groupByKeyInGlobalWindow does not using partitioner
3 participants