Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#22737] Re-write Go SDK data plane to support timers. #25982

Merged
merged 6 commits into from
Mar 29, 2023

Conversation

lostluck
Copy link
Contributor

@lostluck lostluck commented Mar 26, 2023

Support receiving and directing timers to the *exec.DataSource from the DataManager.

This doesn't fully implement timers for #22737, and only comprises the required data plane changes. This unblocks their development however, though it includes an untested TimerWriter path, and unused callback invocations in the DataSource to support this later.

The FnAPI Data Stream multiplexes all data for all instructions onto a single channel from the Runner harness. Each message may contain Data or Timers bytes for one or more instructions executing on the SDK harness, to be decoded by the execution plane's DataSource. The Data or Timers are directed to a specfic PTransform.

The data and timers may arrive before or after the associated instruction begins execution. This means that the Data and Timers must be buffered in the SDK, until they are consumed, or their associated instruction is marked as ended. Since a single instruction may have more than one consuming PTransform it's important to track the wanted number of "islast" signals, to compare against the number received, and only signal that all byte streams for this instruction are complete when they match.

Specific things to note:

  • In elementChan we use a mutex as well as the channel to avoid concurrent changes to the "want" and "got", but use an atomic to check if elementChan is closed. This is lighterweight than using the mutex to check the condition all the time.
  • The channel index in DataSource now defaults to 0 instead of -1, and we now do a "post processing check" to validate if a split has occurred. This ensures that the expected primary and residuals are respected in the same way they were before.

Unit tests were updated to match the current possible cases, and validate the above mentioned semantics. A "read" benchmark was added as well. Additional larger scale testing was also done to ensure continued correctness of the data plane's behavior.

Also Closes #21164, ensuring that the last element is always consumed.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

@lostluck lostluck requested a review from riteshghorse March 26, 2023 23:05
@github-actions github-actions bot added the go label Mar 26, 2023
@lostluck lostluck marked this pull request as ready for review March 26, 2023 23:06
@lostluck
Copy link
Contributor Author

R: @riteshghorse

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@codecov
Copy link

codecov bot commented Mar 26, 2023

Codecov Report

Merging #25982 (63708ae) into master (d80f1ff) will decrease coverage by 0.09%.
The diff coverage is 55.22%.

@@            Coverage Diff             @@
##           master   #25982      +/-   ##
==========================================
- Coverage   71.41%   71.33%   -0.09%     
==========================================
  Files         779      779              
  Lines      102530   102662     +132     
==========================================
+ Hits        73220    73230      +10     
- Misses      27849    27960     +111     
- Partials     1461     1472      +11     
Flag Coverage Δ
go 53.77% <55.22%> (-0.19%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdks/go/pkg/beam/core/runtime/exec/data.go 0.00% <ø> (ø)
sdks/go/pkg/beam/core/runtime/harness/datamgr.go 57.07% <50.69%> (-16.70%) ⬇️
sdks/go/pkg/beam/core/runtime/exec/datasource.go 67.31% <66.29%> (-1.62%) ⬇️

... and 2 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

@riteshghorse riteshghorse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great. Just few understanding questions and minor nit

sdks/go/pkg/beam/core/runtime/harness/datamgr.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/core/runtime/harness/datamgr.go Outdated Show resolved Hide resolved
@riteshghorse
Copy link
Contributor

Run GoPortable PreCommit

@lostluck
Copy link
Contributor Author

Run Go PostCommit

@lostluck
Copy link
Contributor Author

Run Go Portable PreCommit

@lostluck
Copy link
Contributor Author

Run Go PostCommit

1 similar comment
@lostluck
Copy link
Contributor Author

Run Go PostCommit

@lostluck
Copy link
Contributor Author

Run GoPortable PreCommit

@lostluck
Copy link
Contributor Author

Thanks for the review!

Jenkins is currently down, so I'm going to wait till at most tomorrow before merging, to get those runs in.

@lostluck
Copy link
Contributor Author

Run GoPortable PreCommit

@lostluck
Copy link
Contributor Author

Run Go PostCommit

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Ensure that the payload is consumed on Elements.Data and Elements.Timers when is_last = true
2 participants