Skip to content

Commit

Permalink
[apache#32211] Support OnWindowExpiry in Prism
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck committed Dec 9, 2024
1 parent ba02635 commit 1e04f68
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 49 deletions.
16 changes: 13 additions & 3 deletions runners/prism/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ def sickbayTests = [
'org.apache.beam.sdk.testing.TestStreamTest.testMultipleStreams',
'org.apache.beam.sdk.testing.TestStreamTest.testProcessingTimeTrigger',

// GroupIntoBatchesTest tests that fail:
// Teststream has bad KV encodings due to using an outer context.
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInStreamingMode',
// ShardedKey not yet implemented.
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow',

// Coding error somehow: short write: reached end of stream after reading 5 bytes; 98 bytes expected
'org.apache.beam.sdk.testing.TestStreamTest.testMultiStage',

Expand Down Expand Up @@ -231,9 +237,13 @@ def createPrismValidatesRunnerTask = { name, environmentType ->
// https://github.com/apache/beam/issues/32929
excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'

// Not supported in Portable Java SDK yet.
// https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+MultimapState
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
// Not supported in Portable Java SDK yet.
// https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+MultimapState
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'

// Processing time with TestStream is unreliable without being able to control
// SDK side time portably. Ignore these tests.
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime'
}
filter {
for (String test : sickbayTests) {
Expand Down
188 changes: 155 additions & 33 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,10 @@ type ElementManager struct {

pcolParents map[string]string // Map from pcollectionID to stageIDs that produce the pcollection.

refreshCond sync.Cond // refreshCond protects the following fields with it's lock, and unblocks bundle scheduling.
inprogressBundles set[string] // Active bundleIDs
changedStages set[string] // Stages that have changed and need their watermark refreshed.
refreshCond sync.Cond // refreshCond protects the following fields with it's lock, and unblocks bundle scheduling.
inprogressBundles set[string] // Active bundleIDs
changedStages set[string] // Stages that have changed and need their watermark refreshed.
sideChannelBundles []RunBundle // Represents ready to executed bundles prepared on the side by a stage instead of in the main loop, such as for onWindowExpiry, or for Triggers.

livePending atomic.Int64 // An accessible live pending count. DEBUG USE ONLY
pendingElements sync.WaitGroup // pendingElements counts all unprocessed elements in a job. Jobs with no pending elements terminate successfully.
Expand Down Expand Up @@ -271,6 +272,16 @@ func (em *ElementManager) StageStateful(ID string) {
em.stages[ID].stateful = true
}

// StageOnWindowExpiration marks the given stage as stateful, which means elements are
// processed by key.
func (em *ElementManager) StageOnWindowExpiration(stageID string, timer StaticTimerID) {
ss := em.stages[stageID]
ss.onWindowExpirationFamily = timer.TimerFamily
ss.onWindowExpirationTransform = timer.Transform
ss.keysToExpireByWindow = map[typex.Window]set[string]{}
ss.inProgressExpiredWindows = map[typex.Window]int{}
}

// StageProcessingTimeTimers indicates which timers are processingTime domain timers.
func (em *ElementManager) StageProcessingTimeTimers(ID string, ptTimers map[string]bool) {
em.stages[ID].processingTimeTimersFamilies = ptTimers
Expand Down Expand Up @@ -371,7 +382,7 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.
em.changedStages.merge(changedByProcessingTime)

// If there are no changed stages or ready processing time events available, we wait until there are.
for len(em.changedStages)+len(changedByProcessingTime) == 0 {
for len(em.changedStages)+len(changedByProcessingTime)+len(em.sideChannelBundles) == 0 {
// Check to see if we must exit
select {
case <-ctx.Done():
Expand All @@ -386,6 +397,20 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.
changedByProcessingTime = em.processTimeEvents.AdvanceTo(emNow)
em.changedStages.merge(changedByProcessingTime)
}
// Run any side channel bundles first.
for len(em.sideChannelBundles) > 0 {
rb := em.sideChannelBundles[0]
em.sideChannelBundles = em.sideChannelBundles[1:]
em.refreshCond.L.Unlock()

select {
case <-ctx.Done():
return
case runStageCh <- rb:
}
slog.Debug("OnWindowExpiration-Bundle sent", slog.Any("bundle", rb), slog.Int64("livePending", em.livePending.Load()))
em.refreshCond.L.Lock()
}

// We know there is some work we can do that may advance the watermarks,
// refresh them, and see which stages have advanced.
Expand Down Expand Up @@ -628,6 +653,11 @@ type Block struct {
Transform, Family string
}

// StaticTimerID represents the static user identifiers for a timer.
type StaticTimerID struct {
Transform, TimerFamily string
}

// StateForBundle retreives relevant state for the given bundle, WRT the data in the bundle.
//
// TODO(lostluck): Consider unifiying with InputForBundle, to reduce lock contention.
Expand Down Expand Up @@ -1068,6 +1098,12 @@ type stageState struct {
strat winStrat // Windowing Strategy for aggregation fireings.
processingTimeTimersFamilies map[string]bool // Indicates which timer families use the processing time domain.

// onWindowExpiration management
onWindowExpirationFamily string // If non-empty, indicates that this stage has an OnWindowExpiration timer that must trigger.
onWindowExpirationTransform string // The transform name of the DoFn with OnWindowExpiration
keysToExpireByWindow map[typex.Window]set[string]
inProgressExpiredWindows map[typex.Window]int

mu sync.Mutex
upstreamWatermarks sync.Map // watermark set from inputPCollection's parent.
input mtime.Time // input watermark for the parallel input.
Expand Down Expand Up @@ -1158,6 +1194,14 @@ func (ss *stageState) AddPending(newPending []element) int {
timers: map[timerKey]timerTimes{},
}
ss.pendingByKeys[string(e.keyBytes)] = dnt
if ss.keysToExpireByWindow != nil {
w, ok := ss.keysToExpireByWindow[e.window]
if !ok {
w = make(set[string])
ss.keysToExpireByWindow[e.window] = w
}
w.insert(string(e.keyBytes))
}
}
heap.Push(&dnt.elements, e)

Expand Down Expand Up @@ -1555,45 +1599,123 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
if minWatermarkHold < newOut {
newOut = minWatermarkHold
}
refreshes := set[string]{}
// If the newOut is smaller, then don't change downstream watermarks.
if newOut <= ss.output {
return nil
}

// If bigger, advance the output watermark
if newOut > ss.output {
ss.output = newOut
for _, outputCol := range ss.outputIDs {
consumers := em.consumers[outputCol]

for _, sID := range consumers {
em.stages[sID].updateUpstreamWatermark(outputCol, ss.output)
refreshes.insert(sID)
}
// Inform side input consumers, but don't update the upstream watermark.
for _, sID := range em.sideConsumers[outputCol] {
refreshes.insert(sID.Global)
var preventDownstreamUpdate bool
for win, keys := range ss.keysToExpireByWindow {
// Check if the window has expired.
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
if win.MaxTimestamp() >= newOut {
continue
}
// We can't advance the output watermark if there's garbage to collect.
preventDownstreamUpdate = true
// Hold off on garbage collecting data for these windows while these
// are in progress.
ss.inProgressExpiredWindows[win] += 1

// Produce bundle(s) for these keys and window, and side channel them.
wm := win.MaxTimestamp()
rb := RunBundle{StageID: ss.ID, BundleID: "owe-" + ss.ID + "-" + win.MaxTimestamp().String(), Watermark: wm}

// Now we need to actually build the bundle.
var toProcess []element
busyKeys := set[string]{}
for k := range keys {
if ss.inprogressKeys.present(k) {
busyKeys.insert(k)
continue
}
toProcess = append(toProcess, element{
window: win,
timestamp: wm,
pane: typex.NoFiringPane(),
holdTimestamp: wm,
transform: ss.onWindowExpirationTransform,
family: ss.onWindowExpirationFamily,
sequence: 1,
keyBytes: []byte(k),
elmBytes: nil,
})
}
em.addPending(len(toProcess))
ss.watermarkHolds.Add(wm, 1)
ss.makeInProgressBundle(
func() string { return rb.BundleID },
toProcess,
wm,
keys,
map[mtime.Time]int{wm: 1},
)

slog.Debug("OnWindowExpiration-Bundle Created", slog.Any("bundle", rb), slog.Any("keys", keys), slog.Any("window", win), slog.Any("toProcess", toProcess))
// We're already in the refreshCond critical section.
// Insert that this is in progress here to avoid a race condition.
em.inprogressBundles.insert(rb.BundleID)
em.sideChannelBundles = append(em.sideChannelBundles, rb)

// Remove the key accounting, or continue tracking which keys still need clearing.
if len(busyKeys) == 0 {
delete(ss.keysToExpireByWindow, win)
} else {
ss.keysToExpireByWindow[win] = busyKeys
}
// Garbage collect state, timers and side inputs, for all windows
// that are before the new output watermark.
// They'll never be read in again.
for _, wins := range ss.sideInputs {
for win := range wins {
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
// Clear out anything we've already used.
if win.MaxTimestamp() < newOut {
delete(wins, win)
}
// Garbage collect state, timers and side inputs, for all windows
// that are before the new output watermark.
// They'll never be read in again.
for _, wins := range ss.sideInputs {
for win := range wins {
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
// Clear out anything we've already used.
if win.MaxTimestamp() < newOut {
// If the expiry is in progress, skip this window.
if ss.inProgressExpiredWindows[win] > 0 {
continue
}
delete(wins, win)
}
}
for _, wins := range ss.state {
for win := range wins {
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
if win.MaxTimestamp() < newOut {
delete(wins, win)
}
for _, wins := range ss.state {
for win := range wins {
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
if win.MaxTimestamp() < newOut {
// If the expiry is in progress, skip collecting this window.
if ss.inProgressExpiredWindows[win] > 0 {
continue
}
delete(wins, win)
}
}
}
// If there are windows to expire, we don't update the output watermark yet.
if preventDownstreamUpdate {
return nil
}

// Update this stage's output watermark, and then propagate that to downstream stages
refreshes := set[string]{}
ss.output = newOut
for _, outputCol := range ss.outputIDs {
consumers := em.consumers[outputCol]

for _, sID := range consumers {
em.stages[sID].updateUpstreamWatermark(outputCol, ss.output)
refreshes.insert(sID)
}
// Inform side input consumers, but don't update the upstream watermark.
for _, sID := range em.sideConsumers[outputCol] {
refreshes.insert(sID.Global)
}
}
return refreshes
}

Expand Down
4 changes: 4 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
if stage.stateful {
em.StageStateful(stage.ID)
}
if stage.onWindowExpiration.TimerFamily != "" {
slog.Debug("OnWindowExpiration", slog.String("stage", stage.ID), slog.Any("values", stage.onWindowExpiration))
em.StageOnWindowExpiration(stage.ID, stage.onWindowExpiration)
}
if len(stage.processingTimeTimers) > 0 {
em.StageProcessingTimeTimers(stage.ID, stage.processingTimeTimers)
}
Expand Down
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var supportedRequirements = map[string]struct{}{
urns.RequirementSplittableDoFn: {},
urns.RequirementStatefulProcessing: {},
urns.RequirementBundleFinalization: {},
urns.RequirementOnWindowExpiration: {},
}

// TODO, move back to main package, and key off of executor handlers?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,6 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ *
check("TimerFamilySpecs.TimeDomain.Urn", spec.GetTimeDomain(), pipepb.TimeDomain_EVENT_TIME, pipepb.TimeDomain_PROCESSING_TIME)
}

check("OnWindowExpirationTimerFamily", pardo.GetOnWindowExpirationTimerFamilySpec(), "") // Unsupported for now.

// Check for a stateful SDF and direct user to https://github.com/apache/beam/issues/32139
if pardo.GetRestrictionCoderId() != "" && isStateful {
check("Splittable+Stateful DoFn", "See https://github.com/apache/beam/issues/32139 for information.", "")
Expand Down
6 changes: 6 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,12 @@ func finalizeStage(stg *stage, comps *pipepb.Components, pipelineFacts *fusionFa
if len(pardo.GetTimerFamilySpecs())+len(pardo.GetStateSpecs())+len(pardo.GetOnWindowExpirationTimerFamilySpec()) > 0 {
stg.stateful = true
}
if pardo.GetOnWindowExpirationTimerFamilySpec() != "" {
stg.onWindowExpiration = struct {
Transform string
TimerFamily string
}{Transform: link.Transform, TimerFamily: pardo.GetOnWindowExpirationTimerFamilySpec()}
}
sis = pardo.GetSideInputs()
}
if _, ok := sis[link.Local]; ok {
Expand Down
22 changes: 12 additions & 10 deletions sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,20 @@ type link struct {
// account, but all serialization boundaries remain since the pcollections
// would continue to get serialized.
type stage struct {
ID string
transforms []string
primaryInput string // PCollection used as the parallel input.
outputs []link // PCollections that must escape this stage.
sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers
internalCols []string // PCollections that escape. Used for precise coder sending.
envID string
finalize bool
stateful bool
ID string
transforms []string
primaryInput string // PCollection used as the parallel input.
outputs []link // PCollections that must escape this stage.
sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers
internalCols []string // PCollections that escape. Used for precise coder sending.
envID string
finalize bool
stateful bool
onWindowExpiration engine.StaticTimerID

// hasTimers indicates the transform+timerfamily pairs that need to be waited on for
// the stage to be considered complete.
hasTimers []struct{ Transform, TimerFamily string }
hasTimers []engine.StaticTimerID
processingTimeTimers map[string]bool

exe transformExecuter
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type B struct {
InputTransformID string
Input []*engine.Block // Data and Timers for this bundle.
EstimatedInputElements int
HasTimers []struct{ Transform, TimerFamily string } // Timer streams to terminate.
HasTimers []engine.StaticTimerID // Timer streams to terminate.

// IterableSideInputData is a map from transformID + inputID, to window, to data.
IterableSideInputData map[SideInputKey]map[typex.Window][][]byte
Expand Down

0 comments on commit 1e04f68

Please sign in to comment.