Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#32211] Support OnWindowExpiration in Prism. #33337

Merged
merged 9 commits into from
Dec 12, 2024

Conversation

lostluck
Copy link
Contributor

@lostluck lostluck commented Dec 9, 2024

  • Adds tracking for keys used within a window, so the OnWindowExpiration callback can be invoked.
  • Restricts the output watermark from advancing until a window's been garbage collected and all keys have been expired.
  • Added a "side channel" mechanism to insert bundles for execution outside of normal execution flow.
    • This may ultimately be used for ProcessingTime timers, as well as for Triggers/
  • Supports OnWindowExpiration, enabling initial GroupIntoBatches support.

Fixes #32211


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Copy link

codecov bot commented Dec 9, 2024

Codecov Report

Attention: Patch coverage is 26.44628% with 89 lines in your changes missing coverage. Please review.

Project coverage is 57.37%. Comparing base (122368f) to head (de16e08).
Report is 24 commits behind head on master.

Files with missing lines Patch % Lines
...am/runners/prism/internal/engine/elementmanager.go 28.82% 73 Missing and 6 partials ⚠️
...s/go/pkg/beam/runners/prism/internal/preprocess.go 0.00% 5 Missing and 1 partial ⚠️
sdks/go/pkg/beam/runners/prism/internal/execute.go 0.00% 3 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #33337      +/-   ##
============================================
- Coverage     57.41%   57.37%   -0.04%     
  Complexity     1474     1474              
============================================
  Files           970      970              
  Lines        154497   154622     +125     
  Branches       1076     1076              
============================================
+ Hits          88702    88719      +17     
- Misses        63593    63688      +95     
- Partials       2202     2215      +13     
Flag Coverage Δ
go 34.62% <26.44%> (-0.04%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @jrmccluskey for label go.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@lostluck
Copy link
Contributor Author

I'm adding a few unit tests for the primary decision logic so that the go code coverage isn't so bad, and refactoring it a touch too.

@lostluck
Copy link
Contributor Author

Ready for a look!

@lostluck
Copy link
Contributor Author

R: @damondouglas
cc: @jrmccluskey (in case you want to look too, due to the auto assignment)

Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@@ -68,6 +68,8 @@
## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Support OnWindowExpiration in Prism ([#32211](https://github.com/apache/beam/issues/32211)).
* This enables initial Java GroupIntoBatches support.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May we consider:

This enables Java @OnWindowExpiration which enables initial GroupIntoBatches support.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, "@" based annotations are a notion in Java, and "@" indicates a Decorator in Python, which doesn't perform the same task as they do in Java.

We want to refer to the Beam feature, not list the specific and complete manifestations of that feature in all SDK languages, so we can't use @OnWindowExpiration. It's immaterial that the feature currently only exists in Java, and not the other SDKs.

What we could do, in a separate PR, is update the programming guide for the feature, and link to that. Then it would be pointing to the documentation for all SDKs, without fitting to a single SDK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed thanks.

refreshCond sync.Cond // refreshCond protects the following fields with it's lock, and unblocks bundle scheduling.
inprogressBundles set[string] // Active bundleIDs
changedStages set[string] // Stages that have changed and need their watermark refreshed.
sideChannelBundles []RunBundle // Represents ready to executed bundles prepared on the side by a stage instead of in the main loop, such as for onWindowExpiry, or for Triggers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May we consider the following? I'm rewriting the comment as a learning feedback.

injectedChannelBundles []RunBundle // Represents ready to execute bundles prepared outside the main loop, that have been injected by expired windows pending garbage collection; includes onWindowExpiry or Triggers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropping "channel", so it's just injectedBundles.

It's not useful to overspecify the conditions why/when a bundle would be injected this way. It is useful to describe situations that do (onWindowExpiration), or will (triggers). Those specific locations (which can be found by finding where this field is used) can document the conditions where they apply.

@@ -386,6 +400,19 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.
changedByProcessingTime = em.processTimeEvents.AdvanceTo(emNow)
em.changedStages.merge(changedByProcessingTime)
}
// Run any side channel bundles first.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to // Run any injected channel bundles first.

Question: Is it first because we want to prioritize processing expired windows? And I think I remember you mentioning that timers are processed when the watermark timestamp >= the end of the window timestamp so these @OnTimer and @OnTimerFamily would be processed here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct.

We do want to prioritized bundles that were added in, instead of creating new bundles as a result of the eventtime watermark advancing.

Remember, as a general rule: Bundles are only produced as a result of where the watermark is.
Technically, that's true for OnWindowExpiration as well.

These onWindowExpiration injected bundles also hold back the watermark as a result (because we can't garbage collect the window until all onWindowExpiration callbacks for that window have been processed), so it's better to handle them ASAP, before attempting a "normal" bundle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I had mentioned that onWindowExpiration callbacks are implemented as timers.

Normally timers are handled as part of standard bundle execution when the watermark advances. That's why timers and data are simply "elements".

Not in this PR, but later, I plan on moving ProcessingTime timers to use the injection mechanism, since they don't fire based on the event time watermark, and if you look further below, it complicates the handling there. But that would be scope creep to clean that up right now.

@@ -628,6 +655,11 @@ type Block struct {
Transform, Family string
}

// StaticTimerID represents the static user identifiers for a timer.
type StaticTimerID struct {
Transform, TimerFamily string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Transform identifies fused topologically sorted ids of Pipeline steps.
Transform string

// TimerFamily is the user identifier for a timer family, such as the parameter annotation for the TimerMap in a Java SDK DoFn.
TimerFamily string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Transform is the direct ID of a single transform, it's immaterial whether it's in a stage or not.

Stages (and their IDs) represent fused, topologically sorted transforms. Essentially, within Beam, everything has it's own ID, and we generally refer to things by ID, instead of by direct code (except for Schema Row types, which is why they're awkward).

The goal is to avoid putting SDK specific information in Prism, so referring to an SDK specific implementation isn't useful, unless you already know that implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get code comments for these properties?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, I renamed the Transform field to TransformID, which makes comments on both fields redundant (they'd be of the form "TransformID is the ID of the transform containing the timer" etc).

Just to re-iterate the other point I made offline, documentation is best served in appropriate contexts. In this case, this is a type in an internal package, not intended for arbitrary user use, so the documentation bar is lower. That context is safe enough to assume that whomever needs to make a change here knows what a Timer is and how they are identified.

Otherwise they should really look into the beam Protos directly to start. (or some kind of SDK Authoring guide...)

Comment on lines 1715 to 1725
toProcess = append(toProcess, element{
window: win,
timestamp: wm,
pane: typex.NoFiringPane(),
holdTimestamp: wm,
transform: ss.onWindowExpiration.Transform,
family: ss.onWindowExpiration.TimerFamily,
sequence: 1,
keyBytes: []byte(k),
elmBytes: nil,
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the elements available to the SDK during its OnWindowExpiration callback associated with a given key/window?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, this is where we're producing the timer values that the SDK will consume and ultimately call the OnWindowExpiration callbacks.

This sort of thing usually lives in "PersistBundle" but since these timers aren't directly produced by users, we need to synthesize them here.

@@ -182,8 +182,6 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ *
check("TimerFamilySpecs.TimeDomain.Urn", spec.GetTimeDomain(), pipepb.TimeDomain_EVENT_TIME, pipepb.TimeDomain_PROCESSING_TIME)
}

check("OnWindowExpirationTimerFamily", pardo.GetOnWindowExpirationTimerFamilySpec(), "") // Unsupported for now.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ask this here as it's where the Pipeline graph is first analyzed. Is it the responsibility of the SDK or the Runner to check whether the input PCollection is a PCollection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand this question. What would an input PCollection be if it wasn't a PCollection?

In some ways it's both.

I can say it's the SDK's responsibility to send the runner well formed Directed Acyclic Graphs as Pipelines. This lets it produce better error messages than the runner can.
But, it's the Runner's responsibility to validate that the SDK is doing so, and report that as an error or not.

@lostluck
Copy link
Contributor Author

RFAL

@@ -628,6 +655,11 @@ type Block struct {
Transform, Family string
}

// StaticTimerID represents the static user identifiers for a timer.
type StaticTimerID struct {
Transform, TimerFamily string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get code comments for these properties?

@lostluck lostluck merged commit de5e8eb into apache:master Dec 12, 2024
21 of 23 checks passed
@lostluck lostluck deleted the onWindowExpiry branch December 12, 2024 21:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[prism] Support OnWindowExpiry
2 participants