Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved pipeline translation in SparkStructuredStreamingRunner #22446

Merged

Conversation

mosche
Copy link
Member

@mosche mosche commented Jul 26, 2022

Improved pipeline translation in SparkStructuredStreamingRunner (closes #22445, #22382):

  • Make use of Spark Encoders to leverage structural information in translation (and potentially benefit from Catalyst optimizer). Though note, the possible benefit is limited as every ParDo is a black box and a hard boundary for anything that could be optimized.
  • Improved translation of GroupByKey. When applicable, group also by window to better scale out and/or use Spark native collect_list to collect values of group.
  • Make use of specialised Spark Aggregators for combine (per key / globally), particularly Sessions can be improved significantly.
  • Dedicated translation for Combine.Globally to avoid additional shuffle of data.
  • Remove additional serialization roundtrip when reading from a Beam BoundedSource.

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@mosche
Copy link
Member Author

mosche commented Jul 26, 2022

R: @aromanenko-dev
R: @echauchot

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@mosche
Copy link
Member Author

mosche commented Aug 2, 2022

ping @echauchot @aromanenko-dev 😀

@aromanenko-dev
Copy link
Contributor

Run Spark ValidatesRunner

@aromanenko-dev
Copy link
Contributor

Run Spark StructuredStreaming ValidatesRunner

Copy link
Contributor

@aromanenko-dev aromanenko-dev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Great work, it looks very promising in terms of potential performance improvements.

I gave a brief look on this, quite a lot of code tbh that make difficult to review in one shot. Just left minor comments. I'd leave it for @echauchot, as a main author of this code, to do the other part of review.

Also, could you add any perf (or any other) test results that you did?

@mosche
Copy link
Member Author

mosche commented Aug 11, 2022

results
@aromanenko-dev

@mosche
Copy link
Member Author

mosche commented Sep 5, 2022

@echauchot Kind ping :)

@mosche
Copy link
Member Author

mosche commented Sep 14, 2022

@echauchot @aromanenko-dev Btw, I was thinking a bit about a better name for this runner. I'd suggest to rename it to SparkSqlRunner taking into account it's build on top of the Spark SQL module:

Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as a distributed SQL query engine.

https://www.databricks.com/glossary/what-is-spark-sql

@aromanenko-dev
Copy link
Contributor

@mosche I'm totally agree that the current name is not very practical in a way that it's quite long and, even worse, very confusing since it contains a Streaming word in its name but this runner doesn't support streaming mode at all (we know the reasons but it is what it is).

So, it would be better to rename it, though, I'm not sure about SparkSqlRunner as a new name. IMHO, it may be also confusing and give some false expectations that it supports only Spark (or Beam?) SQL pipelines.

I'd suggest the name SparkDatasetRunner since it's based on Spark Dataset API. This name is quite short and gives the basic idea of what to expect from this runner. Old runner could be called SparkRDDRunner but let's keep it as it is - just SparkRunner.

On the other hand, this renaming will require many incompatible changes, starting from new packages and artifacts names. However, I'm pretty sure that the most users, that run Beam pipelines on Spark, still use the old classical Spark(RDD)Runner. We can check it out on user@ and twitter, if needed.

@mosche
Copy link
Member Author

mosche commented Sep 14, 2022

I agree, that leaves room for potential new confusion. Giving this a 2nd thought I suppose you're right and SparkDatasetRunner is the better name with less ambiguity ... nevertheless it's a rather technical name which i'd usually rather avoid.

Regarding the rename or any other incompatible changes I'm personally fairly relaxed at this stage:

  • it's clearly marked as experimental and such changes are to be expected
  • the runner isn't optimised in any way yet, so there's little to no reason to use an experimental runner over a proven existing one (besides that, there's potential scalability issues that make me doubt a bit it would work well on decent sized datasets)
  • users typically don't (and probably shouldn't) interact with runner packages / classes (the metrics sink might be the only exception)
  • and last, in case it is used, changing the runner name is trivial enough... There could be a dummy runner with the old name that calls out to the new one and asks users to change their configuration ....

@mosche
Copy link
Member Author

mosche commented Sep 15, 2022

@echauchot Will you be able to review this? Otherwise I'd suggest to merge this to not further block follow ups. Looking forward to your feedback.

@aromanenko-dev
Copy link
Contributor

Run Java PreCommit

@aromanenko-dev
Copy link
Contributor

Run Spark ValidatesRunner

@aromanenko-dev
Copy link
Contributor

Run Spark StructuredStreaming ValidatesRunner

Copy link
Contributor

@aromanenko-dev aromanenko-dev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a glance on this change and LGTM for me.
Taking into account that this PR really improves the performance of some transforms while running it on Spark (according to Nexmark results), I believe we have to merge it once all tests will be green.

@echauchot
Copy link
Contributor

I took a glance on this change and LGTM for me. Taking into account that this PR really improves the performance of some transforms while running it on Spark (according to Nexmark results), I believe we have to merge it once all tests will be green.

I would like to review this before merging but it is very long and I'm stuck on other thinks. I'll try my best to take a look ASAP

@echauchot
Copy link
Contributor

I agree, that leaves room for potential new confusion. Giving this a 2nd thought I suppose you're right and SparkDatasetRunner is the better name with less ambiguity ... nevertheless it's a rather technical name which i'd usually rather avoid.

Regarding the rename or any other incompatible changes I'm personally fairly relaxed at this stage:

  • it's clearly marked as experimental and such changes are to be expected
  • the runner isn't optimised in any way yet, so there's little to no reason to use an experimental runner over a proven existing one (besides that, there's potential scalability issues that make me doubt a bit it would work well on decent sized datasets)
  • users typically don't (and probably shouldn't) interact with runner packages / classes (the metrics sink might be the only exception)
  • and last, in case it is used, changing the runner name is trivial enough... There could be a dummy runner with the old name that calls out to the new one and asks users to change their configuration ....

I agree, the name needs to change. I also agree with @aromanenko-dev SparkSQLRunner is confusing. I agree on the proposal of SparkDatasetRunner

@echauchot
Copy link
Contributor

@mosche reviewing ...
cc: @aromanenko-dev

@echauchot
Copy link
Contributor

@mosche: did you rebase this PR on top of the previous merged code about the Encoders? I have the impression it contains the same changes ?

@mosche
Copy link
Member Author

mosche commented Sep 15, 2022

There was no such PR yet @echauchot ... maybe you already had a look at that code on the branch

@mosche
Copy link
Member Author

mosche commented Sep 15, 2022

oh, I remember ... you mean this one #22157?
Yes, that's rebased ... but obviously this one here contains lots of changes to encoders to use encoders that are aware of the structure rather than just using binary encoders.

@echauchot
Copy link
Contributor

oh, I remember ... you mean this one #22157? Yes, that's rebased ... but obviously this one here contains lots of changes to encoders to use encoders that are aware of the structure rather than just using binary encoders.

Yes I meant #22157.
Ok so my intuition was incorrect: the encoders changes are not the same as in #22157.

@echauchot
Copy link
Contributor

results @aromanenko-dev

I think you should also run the TPCDS suite on this PR (ask @aromanenko-dev ) because when we compared the 2 spark runners in the past we've seen big differences between nexmark and tpcds suites (nexmark was slighly in fravor of dataset runner for some queries but tpcds was way in favor of RDD runner for almost all queries).

@aromanenko-dev
Copy link
Contributor

We can run it on Jenkins against this PR, if needed.

Copy link
Contributor

@echauchot echauchot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mosche very minor iterative review: I just took a look at Sessions Aggregator. Only minor nits on comments for readability / clarification

@mosche
Copy link
Member Author

mosche commented Sep 19, 2022

Run Spark StructuredStreaming ValidatesRunner

@mosche
Copy link
Member Author

mosche commented Sep 19, 2022

Run Spark ValidatesRunner

1 similar comment
@mosche
Copy link
Member Author

mosche commented Sep 19, 2022

Run Spark ValidatesRunner

@echauchot
Copy link
Contributor

We can run it on Jenkins against this PR, if needed.

@mosche did you manage to run TPCDS suite on this PR ?

@echauchot
Copy link
Contributor

I see that Nexmark query 5 and 7 have improved quite a lot. They are mainly based on combiners and windows. Nice !

Copy link
Contributor

@echauchot echauchot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review: for now I have looked at

  • the general architecture
  • the aggregators
  • the combiners translatios (globally and per key)
  • the source
  • started GBK
    This looks very good to me.
    I need to finish taking a look at the GBK and the Encoders and we could merge if it is all good. The changes are minor (except the one on triggers in batch)


/**
* Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with the build-in aggregation
* function {@code collect_list} when applicable.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, avoiding materialization like with ReduceFnRunner and using a spark native instead is better because it allows spark to spill to disk instead of throwing OOM.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately that's not the case, that's way in both cases above is important... either way there's a risk of OOMs, collect_list is just more efficient...
the alternative is the iterableOnce, though that will through if users attempt to iterate multiple times

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ouch I was hopping on collect_list. But at least you managed to avoid OOM in some cases compared to previous impl

Copy link
Contributor

@echauchot echauchot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finished GBK review. Thanks for great work ! Only encoders left to review.

result =
input
.groupBy(col("value.key").as("key"))
.agg(collect_list(col("value.value")).as("values"), timestampAggregator(tsCombiner))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clever !


/**
* Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with the build-in aggregation
* function {@code collect_list} when applicable.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ouch I was hopping on collect_list. But at least you managed to avoid OOM in some cases compared to previous impl

Copy link
Contributor

@echauchot echauchot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finished my review, took a look at the Encoders part. Now need to take a look at your latest comments / commits and the TPCDS run of at least Q3 (because I don't trust nexmark results 100% about being representative of user pipelines) and we will merge pretty soon.

Thanks a lot for the great work !

@echauchot
Copy link
Contributor

Alternatively you could run the load tests for combiners and GBK available in sdk/testing they are per transform

@mosche
Copy link
Member Author

mosche commented Sep 22, 2022

@echauchot As said I don't see the value of spending more time on load tests / benchmark at this point. Correctness is tested by the VR tests. One thing at a time

@echauchot
Copy link
Contributor

echauchot commented Sep 22, 2022

We can run it on Jenkins against this PR, if needed.

@mosche did you manage to run TPCDS suite on this PR ?

Maybe avoiding jenkins as overloaded is a good idea and run either TPCDS Q3 or combine load test and GBK load test and compare RDD runner and Dataset runner

@echauchot
Copy link
Contributor

@echauchot As said I don't see the value of spending more time on load tests / benchmark at this point. Correctness is tested by the VR tests. One thing at a time

I'll run them

@mosche
Copy link
Member Author

mosche commented Sep 22, 2022

@echauchot Please focus on what's important ... that's not the scope of this PR anymore!

@mosche
Copy link
Member Author

mosche commented Sep 22, 2022

Running additional benchmarks makes sense if you plan to take actions, if not ... what's the point?

@echauchot
Copy link
Contributor

@echauchot Please focus on what's important ... that's not the scope of this PR anymore!

I disagree: the point of this PR is too improve performance of the runner. Problem is that It contains only nexmark performance results. As I wrote I don't trust nexmark test suite as it showed optimistic results that prove wrong when we ran TPCDS on this runner. So I'd like to ensure the performance of the changes. As it is the whole point of the PR I'm totally focusing on the scope !

@echauchot echauchot merged commit 762edd7 into apache:master Sep 22, 2022
@mosche mosche deleted the 22445-ImprovedSparkStructuredStreamingRunner branch November 9, 2022 12:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Improvement]: Improved pipeline translation in experimental SparkStructuredStreamingRunner
3 participants