diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 0781efd5ff0c..ed9c0ddc0f89 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -349,6 +349,12 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context. }() // Watermark evaluation goroutine. go func() { + defer func() { + // In case of panics in bundle generation, fail and cancel the job. + if e := recover(); e != nil { + upstreamCancelFn(fmt.Errorf("panic in ElementManager.Bundles watermark evaluation goroutine: %v", e)) + } + }() defer close(runStageCh) // If we have a test stream, clear out existing refreshes, so the test stream can diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index 1a62f2f6f420..3d1e506f5e32 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -76,7 +76,13 @@ type stage struct { OutputsToCoders map[string]engine.PColInfo } -func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) error { +func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) (err error) { + defer func() { + // Convert execution panics to errors to fail the bundle. + if e := recover(); e != nil { + err = fmt.Errorf("panic in stage.Execute bundle processing goroutine: %v, stage: %+v", e, s) + } + }() slog.Debug("Execute: starting bundle", "bundle", rb) var b *worker.B