-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Dataflow Streaming GroupByKey state multiplexing for small keys #33318
base: master
Are you sure you want to change the base?
Conversation
…oupByKey state over a fixed number of sharding keys. The number of sharding keys is fixed at 32k.
5af575c
to
ede9dea
Compare
...ava/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java
Outdated
Show resolved
Hide resolved
// Note that Never trigger finishes *at* GC time so it is OK, and | ||
// AfterWatermark.fromEndOfWindow() finishes at end-of-window time so it is | ||
// OK if there is no allowed lateness. | ||
private static boolean triggerIsSafe(WindowingStrategy<?, ?> windowingStrategy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
...ava/src/main/java/org/apache/beam/runners/dataflow/internal/StateMultiplexingGroupByKey.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Override | ||
public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to verify that the trigger is of some safe type that just uses processing timer or watermark time to trigger?
For example, after count (beyond 1) will have confusing results if it's supposed to count per key but won't now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is all trigger state are per window and the associated state variables are in the window namespace.
beam/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
Line 214 in e1245d9
return stateInternals.state(windowNamespace(), address, context); |
Since the userKey is part of the KeyedWindow and the window namespaces, state for different userKeys won't collide.
If there are custom trigger implementation relying on external state, that might have problems depending on how it accesses state.
after count (beyond 1) will have confusing results if it's supposed to count per key but won't now.
IIUC, this change should be transparent and not change semantics for any triggers/windows. Could you explain more on when the after count semantics can change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline, I was confusing things and this should work as triggers and state are per-window.
ByteString key = ((KeyedWindow<?>) w).getKey(); | ||
try { | ||
|
||
// is it correct to use the pane from Keyed window here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see above, I think we need to make sure we only do this for correct triggers. If that is done, I think it would be safe to use the pane.
Run Java Precommit |
Run Precommit Java |
This is a POC showing how state multiplexing can work for GroupByKey.
The 4k threshold is currently an arbitrary small value and can be tweaked. The constraint is any state tags should not exceed 64k and keys are now part of the windowed state tags.
Need to cleanup comments and add tests, sending this to share the idea and get initial feedback.
R: @scwhittle