Skip to content

Commit

Permalink
[Prism] Terminate Job with CancelFn instead of panic (#31599)
Browse files Browse the repository at this point in the history
* Refactor elementmanager Bundles with upstream CancelCauseFunc

* Fix minor edits
  • Loading branch information
damondouglas authored Jun 15, 2024
1 parent 0d1cd69 commit e2d6246
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 15 deletions.
23 changes: 13 additions & 10 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"golang.org/x/exp/maps"
"golang.org/x/exp/slog"
)
Expand Down Expand Up @@ -290,7 +291,7 @@ func (rb RunBundle) LogValue() slog.Value {
// Bundles is the core execution loop. It produces a sequences of bundles able to be executed.
// The returned channel is closed when the context is canceled, or there are no pending elements
// remaining.
func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string) <-chan RunBundle {
func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.CancelCauseFunc, nextBundID func() string) <-chan RunBundle {
runStageCh := make(chan RunBundle)
ctx, cancelFn := context.WithCancelCause(ctx)
go func() {
Expand Down Expand Up @@ -384,7 +385,9 @@ func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string)
}
}
}
em.checkForQuiescence(advanced)
if err := em.checkForQuiescence(advanced); err != nil {
upstreamCancelFn(err)
}
}
}()
return runStageCh
Expand All @@ -400,11 +403,11 @@ func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string)
// executing off the next TestStream event.
//
// Must be called while holding em.refreshCond.L.
func (em *ElementManager) checkForQuiescence(advanced set[string]) {
func (em *ElementManager) checkForQuiescence(advanced set[string]) error {
defer em.refreshCond.L.Unlock()
if len(em.inprogressBundles) > 0 {
// If there are bundles in progress, then there may be watermark refreshes when they terminate.
return
return nil
}
if len(em.watermarkRefreshes) > 0 {
// If there are watermarks to refresh, we aren't yet stuck.
Expand All @@ -414,12 +417,12 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) {
slog.Int("refreshCount", len(em.watermarkRefreshes)),
slog.Int64("pendingElementCount", v),
)
return
return nil
}
if em.testStreamHandler == nil && len(em.processTimeEvents.events) > 0 {
// If there's no test stream involved, and processing time events exist, then
// it's only a matter of time.
return
return nil
}
// The job has quiesced!

Expand All @@ -433,20 +436,20 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) {
// Note: it's a prism bug if test stream never causes a refresh to occur for a given event.
// It's not correct to move to the next event if no refreshes would occur.
if len(em.watermarkRefreshes) > 0 {
return
return nil
} else if _, ok := nextEvent.(tsProcessingTimeEvent); ok {
// It's impossible to fully control processing time SDK side handling for processing time
// Runner side, so we specialize refresh handling here to avoid spuriously getting stuck.
em.watermarkRefreshes.insert(em.testStreamHandler.ID)
return
return nil
}
// If there are no refreshes, then there's no mechanism to make progress, so it's time to fast fail.
}

v := em.livePending.Load()
if v == 0 {
// Since there are no further pending elements, the job will be terminating successfully.
return
return nil
}
// The job is officially stuck. Fail fast and produce debugging information.
// Jobs must never get stuck so this indicates a bug in prism to be investigated.
Expand All @@ -469,7 +472,7 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) {
upS := em.pcolParents[upPCol]
stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts", ss.watermarkHolds.counts, "holdsInBundle", ss.inprogressHoldsByBundle, "pttEvents", ss.processingTimeTimers.toFire))
}
panic(fmt.Sprintf("nothing in progress and no refreshes with non zero pending elements: %v\n%v", v, strings.Join(stageState, "")))
return errors.Errorf("nothing in progress and no refreshes with non zero pending elements: %v\n%v", v, strings.Join(stageState, ""))
}

// InputForBundle returns pre-allocated data for the given bundle, encoding the elements using
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func TestStageState_updateWatermarks(t *testing.T) {

func TestElementManager(t *testing.T) {
t.Run("impulse", func(t *testing.T) {
ctx, cancelFn := context.WithCancelCause(context.Background())
em := NewElementManager(Config{})
em.AddStage("impulse", nil, []string{"output"}, nil)
em.AddStage("dofn", []string{"output"}, nil, nil)
Expand All @@ -327,7 +328,7 @@ func TestElementManager(t *testing.T) {
}

var i int
ch := em.Bundles(context.Background(), func() string {
ch := em.Bundles(ctx, cancelFn, func() string {
defer func() { i++ }()
return fmt.Sprintf("%v", i)
})
Expand Down Expand Up @@ -371,14 +372,15 @@ func TestElementManager(t *testing.T) {
}

t.Run("dofn", func(t *testing.T) {
ctx, cancelFn := context.WithCancelCause(context.Background())
em := NewElementManager(Config{})
em.AddStage("impulse", nil, []string{"input"}, nil)
em.AddStage("dofn1", []string{"input"}, []string{"output"}, nil)
em.AddStage("dofn2", []string{"output"}, nil, nil)
em.Impulse("impulse")

var i int
ch := em.Bundles(context.Background(), func() string {
ch := em.Bundles(ctx, cancelFn, func() string {
defer func() { i++ }()
t.Log("generating bundle", i)
return fmt.Sprintf("%v", i)
Expand Down Expand Up @@ -422,14 +424,15 @@ func TestElementManager(t *testing.T) {
})

t.Run("side", func(t *testing.T) {
ctx, cancelFn := context.WithCancelCause(context.Background())
em := NewElementManager(Config{})
em.AddStage("impulse", nil, []string{"input"}, nil)
em.AddStage("dofn1", []string{"input"}, []string{"output"}, nil)
em.AddStage("dofn2", []string{"input"}, nil, []LinkID{{Transform: "dofn2", Global: "output", Local: "local"}})
em.Impulse("impulse")

var i int
ch := em.Bundles(context.Background(), func() string {
ch := em.Bundles(ctx, cancelFn, func() string {
defer func() { i++ }()
t.Log("generating bundle", i)
return fmt.Sprintf("%v", i)
Expand Down Expand Up @@ -473,13 +476,14 @@ func TestElementManager(t *testing.T) {
}
})
t.Run("residual", func(t *testing.T) {
ctx, cancelFn := context.WithCancelCause(context.Background())
em := NewElementManager(Config{})
em.AddStage("impulse", nil, []string{"input"}, nil)
em.AddStage("dofn", []string{"input"}, nil, nil)
em.Impulse("impulse")

var i int
ch := em.Bundles(context.Background(), func() string {
ch := em.Bundles(ctx, cancelFn, func() string {
defer func() { i++ }()
t.Log("generating bundle", i)
return fmt.Sprintf("%v", i)
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
eg.SetLimit(8)

var instID uint64
bundles := em.Bundles(egctx, func() string {
bundles := em.Bundles(egctx, j.CancelFn, func() string {
return fmt.Sprintf("inst%03d", atomic.AddUint64(&instID, 1))
})
for {
Expand Down

0 comments on commit e2d6246

Please sign in to comment.