From 1e04f6879cd8a70107548ff1a27c94427f5fc74e Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 9 Dec 2024 15:17:46 -0800 Subject: [PATCH] [#32211] Support OnWindowExpiry in Prism --- runners/prism/java/build.gradle | 16 +- .../prism/internal/engine/elementmanager.go | 188 +++++++++++++++--- .../beam/runners/prism/internal/execute.go | 4 + .../runners/prism/internal/jobservices/job.go | 1 + .../prism/internal/jobservices/management.go | 2 - .../beam/runners/prism/internal/preprocess.go | 6 + .../pkg/beam/runners/prism/internal/stage.go | 22 +- .../runners/prism/internal/worker/bundle.go | 2 +- 8 files changed, 192 insertions(+), 49 deletions(-) diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index f225a98f7bc8..ce71151099bd 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -106,6 +106,12 @@ def sickbayTests = [ 'org.apache.beam.sdk.testing.TestStreamTest.testMultipleStreams', 'org.apache.beam.sdk.testing.TestStreamTest.testProcessingTimeTrigger', + // GroupIntoBatchesTest tests that fail: + // Teststream has bad KV encodings due to using an outer context. + 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInStreamingMode', + // ShardedKey not yet implemented. + 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow', + // Coding error somehow: short write: reached end of stream after reading 5 bytes; 98 bytes expected 'org.apache.beam.sdk.testing.TestStreamTest.testMultiStage', @@ -231,9 +237,13 @@ def createPrismValidatesRunnerTask = { name, environmentType -> // https://github.com/apache/beam/issues/32929 excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' - // Not supported in Portable Java SDK yet. - // https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+MultimapState - excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState' + // Not supported in Portable Java SDK yet. + // https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+MultimapState + excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState' + + // Processing time with TestStream is unreliable without being able to control + // SDK side time portably. Ignore these tests. + excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime' } filter { for (String test : sickbayTests) { diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 1739efdb742a..5a43f8306df3 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -194,9 +194,10 @@ type ElementManager struct { pcolParents map[string]string // Map from pcollectionID to stageIDs that produce the pcollection. - refreshCond sync.Cond // refreshCond protects the following fields with it's lock, and unblocks bundle scheduling. - inprogressBundles set[string] // Active bundleIDs - changedStages set[string] // Stages that have changed and need their watermark refreshed. + refreshCond sync.Cond // refreshCond protects the following fields with it's lock, and unblocks bundle scheduling. + inprogressBundles set[string] // Active bundleIDs + changedStages set[string] // Stages that have changed and need their watermark refreshed. + sideChannelBundles []RunBundle // Represents ready to executed bundles prepared on the side by a stage instead of in the main loop, such as for onWindowExpiry, or for Triggers. livePending atomic.Int64 // An accessible live pending count. DEBUG USE ONLY pendingElements sync.WaitGroup // pendingElements counts all unprocessed elements in a job. Jobs with no pending elements terminate successfully. @@ -271,6 +272,16 @@ func (em *ElementManager) StageStateful(ID string) { em.stages[ID].stateful = true } +// StageOnWindowExpiration marks the given stage as stateful, which means elements are +// processed by key. +func (em *ElementManager) StageOnWindowExpiration(stageID string, timer StaticTimerID) { + ss := em.stages[stageID] + ss.onWindowExpirationFamily = timer.TimerFamily + ss.onWindowExpirationTransform = timer.Transform + ss.keysToExpireByWindow = map[typex.Window]set[string]{} + ss.inProgressExpiredWindows = map[typex.Window]int{} +} + // StageProcessingTimeTimers indicates which timers are processingTime domain timers. func (em *ElementManager) StageProcessingTimeTimers(ID string, ptTimers map[string]bool) { em.stages[ID].processingTimeTimersFamilies = ptTimers @@ -371,7 +382,7 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context. em.changedStages.merge(changedByProcessingTime) // If there are no changed stages or ready processing time events available, we wait until there are. - for len(em.changedStages)+len(changedByProcessingTime) == 0 { + for len(em.changedStages)+len(changedByProcessingTime)+len(em.sideChannelBundles) == 0 { // Check to see if we must exit select { case <-ctx.Done(): @@ -386,6 +397,20 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context. changedByProcessingTime = em.processTimeEvents.AdvanceTo(emNow) em.changedStages.merge(changedByProcessingTime) } + // Run any side channel bundles first. + for len(em.sideChannelBundles) > 0 { + rb := em.sideChannelBundles[0] + em.sideChannelBundles = em.sideChannelBundles[1:] + em.refreshCond.L.Unlock() + + select { + case <-ctx.Done(): + return + case runStageCh <- rb: + } + slog.Debug("OnWindowExpiration-Bundle sent", slog.Any("bundle", rb), slog.Int64("livePending", em.livePending.Load())) + em.refreshCond.L.Lock() + } // We know there is some work we can do that may advance the watermarks, // refresh them, and see which stages have advanced. @@ -628,6 +653,11 @@ type Block struct { Transform, Family string } +// StaticTimerID represents the static user identifiers for a timer. +type StaticTimerID struct { + Transform, TimerFamily string +} + // StateForBundle retreives relevant state for the given bundle, WRT the data in the bundle. // // TODO(lostluck): Consider unifiying with InputForBundle, to reduce lock contention. @@ -1068,6 +1098,12 @@ type stageState struct { strat winStrat // Windowing Strategy for aggregation fireings. processingTimeTimersFamilies map[string]bool // Indicates which timer families use the processing time domain. + // onWindowExpiration management + onWindowExpirationFamily string // If non-empty, indicates that this stage has an OnWindowExpiration timer that must trigger. + onWindowExpirationTransform string // The transform name of the DoFn with OnWindowExpiration + keysToExpireByWindow map[typex.Window]set[string] + inProgressExpiredWindows map[typex.Window]int + mu sync.Mutex upstreamWatermarks sync.Map // watermark set from inputPCollection's parent. input mtime.Time // input watermark for the parallel input. @@ -1158,6 +1194,14 @@ func (ss *stageState) AddPending(newPending []element) int { timers: map[timerKey]timerTimes{}, } ss.pendingByKeys[string(e.keyBytes)] = dnt + if ss.keysToExpireByWindow != nil { + w, ok := ss.keysToExpireByWindow[e.window] + if !ok { + w = make(set[string]) + ss.keysToExpireByWindow[e.window] = w + } + w.insert(string(e.keyBytes)) + } } heap.Push(&dnt.elements, e) @@ -1555,45 +1599,123 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { if minWatermarkHold < newOut { newOut = minWatermarkHold } - refreshes := set[string]{} + // If the newOut is smaller, then don't change downstream watermarks. + if newOut <= ss.output { + return nil + } + // If bigger, advance the output watermark - if newOut > ss.output { - ss.output = newOut - for _, outputCol := range ss.outputIDs { - consumers := em.consumers[outputCol] - - for _, sID := range consumers { - em.stages[sID].updateUpstreamWatermark(outputCol, ss.output) - refreshes.insert(sID) - } - // Inform side input consumers, but don't update the upstream watermark. - for _, sID := range em.sideConsumers[outputCol] { - refreshes.insert(sID.Global) + var preventDownstreamUpdate bool + for win, keys := range ss.keysToExpireByWindow { + // Check if the window has expired. + // TODO(#https://github.com/apache/beam/issues/31438): + // Adjust with AllowedLateness + if win.MaxTimestamp() >= newOut { + continue + } + // We can't advance the output watermark if there's garbage to collect. + preventDownstreamUpdate = true + // Hold off on garbage collecting data for these windows while these + // are in progress. + ss.inProgressExpiredWindows[win] += 1 + + // Produce bundle(s) for these keys and window, and side channel them. + wm := win.MaxTimestamp() + rb := RunBundle{StageID: ss.ID, BundleID: "owe-" + ss.ID + "-" + win.MaxTimestamp().String(), Watermark: wm} + + // Now we need to actually build the bundle. + var toProcess []element + busyKeys := set[string]{} + for k := range keys { + if ss.inprogressKeys.present(k) { + busyKeys.insert(k) + continue } + toProcess = append(toProcess, element{ + window: win, + timestamp: wm, + pane: typex.NoFiringPane(), + holdTimestamp: wm, + transform: ss.onWindowExpirationTransform, + family: ss.onWindowExpirationFamily, + sequence: 1, + keyBytes: []byte(k), + elmBytes: nil, + }) + } + em.addPending(len(toProcess)) + ss.watermarkHolds.Add(wm, 1) + ss.makeInProgressBundle( + func() string { return rb.BundleID }, + toProcess, + wm, + keys, + map[mtime.Time]int{wm: 1}, + ) + + slog.Debug("OnWindowExpiration-Bundle Created", slog.Any("bundle", rb), slog.Any("keys", keys), slog.Any("window", win), slog.Any("toProcess", toProcess)) + // We're already in the refreshCond critical section. + // Insert that this is in progress here to avoid a race condition. + em.inprogressBundles.insert(rb.BundleID) + em.sideChannelBundles = append(em.sideChannelBundles, rb) + + // Remove the key accounting, or continue tracking which keys still need clearing. + if len(busyKeys) == 0 { + delete(ss.keysToExpireByWindow, win) + } else { + ss.keysToExpireByWindow[win] = busyKeys } - // Garbage collect state, timers and side inputs, for all windows - // that are before the new output watermark. - // They'll never be read in again. - for _, wins := range ss.sideInputs { - for win := range wins { - // TODO(#https://github.com/apache/beam/issues/31438): - // Adjust with AllowedLateness - // Clear out anything we've already used. - if win.MaxTimestamp() < newOut { - delete(wins, win) + } + // Garbage collect state, timers and side inputs, for all windows + // that are before the new output watermark. + // They'll never be read in again. + for _, wins := range ss.sideInputs { + for win := range wins { + // TODO(#https://github.com/apache/beam/issues/31438): + // Adjust with AllowedLateness + // Clear out anything we've already used. + if win.MaxTimestamp() < newOut { + // If the expiry is in progress, skip this window. + if ss.inProgressExpiredWindows[win] > 0 { + continue } + delete(wins, win) } } - for _, wins := range ss.state { - for win := range wins { - // TODO(#https://github.com/apache/beam/issues/31438): - // Adjust with AllowedLateness - if win.MaxTimestamp() < newOut { - delete(wins, win) + } + for _, wins := range ss.state { + for win := range wins { + // TODO(#https://github.com/apache/beam/issues/31438): + // Adjust with AllowedLateness + if win.MaxTimestamp() < newOut { + // If the expiry is in progress, skip collecting this window. + if ss.inProgressExpiredWindows[win] > 0 { + continue } + delete(wins, win) } } } + // If there are windows to expire, we don't update the output watermark yet. + if preventDownstreamUpdate { + return nil + } + + // Update this stage's output watermark, and then propagate that to downstream stages + refreshes := set[string]{} + ss.output = newOut + for _, outputCol := range ss.outputIDs { + consumers := em.consumers[outputCol] + + for _, sID := range consumers { + em.stages[sID].updateUpstreamWatermark(outputCol, ss.output) + refreshes.insert(sID) + } + // Inform side input consumers, but don't update the upstream watermark. + for _, sID := range em.sideConsumers[outputCol] { + refreshes.insert(sID.Global) + } + } return refreshes } diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 614edee47721..fde62f00c7c1 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -318,6 +318,10 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic if stage.stateful { em.StageStateful(stage.ID) } + if stage.onWindowExpiration.TimerFamily != "" { + slog.Debug("OnWindowExpiration", slog.String("stage", stage.ID), slog.Any("values", stage.onWindowExpiration)) + em.StageOnWindowExpiration(stage.ID, stage.onWindowExpiration) + } if len(stage.processingTimeTimers) > 0 { em.StageProcessingTimeTimers(stage.ID, stage.processingTimeTimers) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go index deef259a99d1..6158cd6d612c 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go @@ -45,6 +45,7 @@ var supportedRequirements = map[string]struct{}{ urns.RequirementSplittableDoFn: {}, urns.RequirementStatefulProcessing: {}, urns.RequirementBundleFinalization: {}, + urns.RequirementOnWindowExpiration: {}, } // TODO, move back to main package, and key off of executor handlers? diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index a2840760bf7a..894a6e1427a2 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -182,8 +182,6 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ * check("TimerFamilySpecs.TimeDomain.Urn", spec.GetTimeDomain(), pipepb.TimeDomain_EVENT_TIME, pipepb.TimeDomain_PROCESSING_TIME) } - check("OnWindowExpirationTimerFamily", pardo.GetOnWindowExpirationTimerFamilySpec(), "") // Unsupported for now. - // Check for a stateful SDF and direct user to https://github.com/apache/beam/issues/32139 if pardo.GetRestrictionCoderId() != "" && isStateful { check("Splittable+Stateful DoFn", "See https://github.com/apache/beam/issues/32139 for information.", "") diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go index dceaa9ab8fcb..0e17d642d88a 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go +++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go @@ -449,6 +449,12 @@ func finalizeStage(stg *stage, comps *pipepb.Components, pipelineFacts *fusionFa if len(pardo.GetTimerFamilySpecs())+len(pardo.GetStateSpecs())+len(pardo.GetOnWindowExpirationTimerFamilySpec()) > 0 { stg.stateful = true } + if pardo.GetOnWindowExpirationTimerFamilySpec() != "" { + stg.onWindowExpiration = struct { + Transform string + TimerFamily string + }{Transform: link.Transform, TimerFamily: pardo.GetOnWindowExpirationTimerFamilySpec()} + } sis = pardo.GetSideInputs() } if _, ok := sis[link.Local]; ok { diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index 9f00c22789b6..7cc37754f0ef 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -57,18 +57,20 @@ type link struct { // account, but all serialization boundaries remain since the pcollections // would continue to get serialized. type stage struct { - ID string - transforms []string - primaryInput string // PCollection used as the parallel input. - outputs []link // PCollections that must escape this stage. - sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers - internalCols []string // PCollections that escape. Used for precise coder sending. - envID string - finalize bool - stateful bool + ID string + transforms []string + primaryInput string // PCollection used as the parallel input. + outputs []link // PCollections that must escape this stage. + sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers + internalCols []string // PCollections that escape. Used for precise coder sending. + envID string + finalize bool + stateful bool + onWindowExpiration engine.StaticTimerID + // hasTimers indicates the transform+timerfamily pairs that need to be waited on for // the stage to be considered complete. - hasTimers []struct{ Transform, TimerFamily string } + hasTimers []engine.StaticTimerID processingTimeTimers map[string]bool exe transformExecuter diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go index 83ad1bda9841..4f25c1399924 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go @@ -42,7 +42,7 @@ type B struct { InputTransformID string Input []*engine.Block // Data and Timers for this bundle. EstimatedInputElements int - HasTimers []struct{ Transform, TimerFamily string } // Timer streams to terminate. + HasTimers []engine.StaticTimerID // Timer streams to terminate. // IterableSideInputData is a map from transformID + inputID, to window, to data. IterableSideInputData map[SideInputKey]map[typex.Window][][]byte