You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
These Java tests were added in #15378 to add an invariant that observed strong ordering should be maintained. That is, if a user is emitting elements for a key in a particular order, with the same event time, then they should be ordered the same in subsequent stateful stages.
The tests are cleverly written, to randomly order a sequence of values, Emit keys with those values, and ensure each key, with the state API is observing the same sequence of values in the same order.
The root of the issue here is that prism uses a heap of the elements, and all the elements in the test use the same timestamp. But the heap isn't using a stable sort, causing the discrepancy, since we're only comparing by element kind (data or timer), and timestamp.
The fix is to add a sequence number to the elements, so when the heap sorts them, they keep their relative ordering to match their write order.
The sequence number only needs to be added within a given bundle, and doesn't need to persist through a group by key. It would need to be persisted through a Reshuffle however. But in Prism, we only Reshuffle by adding a fusion break, and not by actually shuffling. So there's no work to be done. The subsequent stage would have the same read order as was written to Prism.
There isn't even an avenue for a failure with this approach when a bundle for a stateful stage is channel split. Elements in the bundle with the same key won't start executing because a bundle is already in progress with the same key, and element in other keys would be re-persisted in the same order.
The text was updated successfully, but these errors were encountered:
lostluck
changed the title
[prism] Java PerKeyOrderingTest - test failures (likely state)
[prism] Java PerKeyOrderingTest - test failures (per key order not maintained)
Aug 2, 2024
reeba212
pushed a commit
to reeba212/beam
that referenced
this issue
Dec 4, 2024
These Java tests were added in #15378 to add an invariant that observed strong ordering should be maintained. That is, if a user is emitting elements for a key in a particular order, with the same event time, then they should be ordered the same in subsequent stateful stages.
The tests are cleverly written, to randomly order a sequence of values, Emit keys with those values, and ensure each key, with the state API is observing the same sequence of values in the same order.
The root of the issue here is that prism uses a heap of the elements, and all the elements in the test use the same timestamp. But the heap isn't using a stable sort, causing the discrepancy, since we're only comparing by element kind (data or timer), and timestamp.
The fix is to add a sequence number to the elements, so when the heap sorts them, they keep their relative ordering to match their write order.
The sequence number only needs to be added within a given bundle, and doesn't need to persist through a group by key. It would need to be persisted through a Reshuffle however. But in Prism, we only Reshuffle by adding a fusion break, and not by actually shuffling. So there's no work to be done. The subsequent stage would have the same read order as was written to Prism.
There isn't even an avenue for a failure with this approach when a bundle for a stateful stage is channel split. Elements in the bundle with the same key won't start executing because a bundle is already in progress with the same key, and element in other keys would be re-persisted in the same order.
The text was updated successfully, but these errors were encountered: