Skip to content

Commit

Permalink
[#32064] Keep elements heap in sequence order. (#32065)
Browse files Browse the repository at this point in the history
Co-authored-by: lostluck <[email protected]>
  • Loading branch information
lostluck and lostluck authored Aug 2, 2024
1 parent d96fa7d commit bf42a81
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
15 changes: 14 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ type element struct {
holdTimestamp mtime.Time // only used for Timers
pane typex.PaneInfo
transform, family, tag string // only used for Timers.
// Used to ensure ordering within a key when sorting the heap,
// which isn't using a stable sort.
// Since ordering is weak across multiple bundles, it needs only
// be consistent between exiting a stage and entering a stateful stage.
// No synchronization is required in specifying this,
// since keyed elements are only processed by a single bundle at a time,
// if stateful stages are concerned.
sequence int

elmBytes []byte // When nil, indicates this is a timer.
keyBytes []byte
Expand Down Expand Up @@ -103,7 +111,8 @@ func (h elementHeap) Less(i, j int) bool {
} else if h[i].IsData() && h[j].IsTimer() {
return true // i before j.
}
// They're the same kind, fall through to timestamp less for consistency.
// They're the same kind, so compare by the sequence value.
return h[i].sequence < h[j].sequence
}
// Otherwise compare by timestamp.
return h[i].timestamp < h[j].timestamp
Expand Down Expand Up @@ -688,6 +697,7 @@ func reElementResiduals(residuals []Residual, inputInfo PColInfo, rb RunBundle)
pane: pn,
elmBytes: elmBytes,
keyBytes: keyBytes,
sequence: len(unprocessedElements),
})
}
}
Expand All @@ -704,6 +714,7 @@ func reElementResiduals(residuals []Residual, inputInfo PColInfo, rb RunBundle)
// PersistBundle takes in the stage ID, ID of the bundle associated with the pending
// input elements, and the committed output elements.
func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PColInfo, d TentativeData, inputInfo PColInfo, residuals Residuals) {
var seq int
for output, data := range d.Raw {
info := col2Coders[output]
var newPending []element
Expand Down Expand Up @@ -743,7 +754,9 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol
pane: pn,
elmBytes: elmBytes,
keyBytes: keyBytes,
sequence: seq,
})
seq++
}
}
}
Expand Down
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func decodeTimer(keyDec func(io.Reader) []byte, usesGlobalWindow bool, raw []byt
timestamp: firing,
holdTimestamp: hold,
pane: pane,
sequence: len(ret),
})
}
return keyBytes, tag, ret
Expand Down

0 comments on commit bf42a81

Please sign in to comment.