From ba04213af09ab34fb3768145fbddd270683cf679 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Thu, 27 Jun 2024 13:24:29 -0700 Subject: [PATCH] [#28187][Prism] Relax or fix issues in Prism to allow Python pipelines to execute. (#31694) --- .../pkg/beam/runners/prism/internal/coders.go | 25 +++++++++++++++++++ .../runners/prism/internal/handlerunner.go | 15 +++++++++++ .../prism/internal/jobservices/management.go | 15 ++++++++--- .../beam/runners/prism/internal/preprocess.go | 5 ---- .../pkg/beam/runners/prism/internal/stage.go | 18 ++++++++++++- .../prism/internal/unimplemented_test.go | 10 +++++--- .../runners/prism/internal/worker/worker.go | 11 +++++++- 7 files changed, 85 insertions(+), 14 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go index b7157b8598de..6fdaf804a34f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "strings" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" @@ -250,6 +251,9 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(i case urns.CoderKV: ccids := c.GetComponentCoderIds() + if len(ccids) != 2 { + panic(fmt.Sprintf("KV coder with more than 2 components: %s", prototext.Format(c))) + } kd := pullDecoderNoAlloc(coders[ccids[0]], coders) vd := pullDecoderNoAlloc(coders[ccids[1]], coders) return func(r io.Reader) { @@ -262,3 +266,24 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(i panic(fmt.Sprintf("unknown coder urn key: %v", urn)) } } + +// debugCoder is developer code to get the structure of a proto coder visible when +// debugging coder errors in prism. It may sometimes be unused, so we do this to avoid +// linting errors. +var _ = debugCoder + +func debugCoder(cid string, coders map[string]*pipepb.Coder) string { + var b strings.Builder + b.WriteString(cid) + b.WriteRune('\n') + c := coders[cid] + if len(c.ComponentCoderIds) > 0 { + b.WriteRune('\t') + b.WriteString(strings.Join(c.ComponentCoderIds, ", ")) + b.WriteRune('\n') + for _, ccid := range c.GetComponentCoderIds() { + b.WriteString(debugCoder(ccid, coders)) + } + } + return b.String() +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go index 59e926754821..a205c768731b 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go @@ -99,6 +99,20 @@ func (h *runner) handleFlatten(tid string, t *pipepb.PTransform, comps *pipepb.C } } + // Change the coders of PCollections being input into a flatten to match the + // Flatten's output coder. They must be compatible SDK side anyway, so ensure + // they're written out to the runner in the same fashion. + // This may stop being necessary once Flatten Unzipping happens in the optimizer. + outPCol := comps.GetPcollections()[outColID] + outCoder := comps.GetCoders()[outPCol.GetCoderId()] + coderSubs := map[string]*pipepb.Coder{} + for _, p := range t.GetInputs() { + inPCol := comps.GetPcollections()[p] + if inPCol.CoderId != outPCol.CoderId { + coderSubs[inPCol.CoderId] = outCoder + } + } + // Return the new components which is the transforms consumer return prepareResult{ // We sub this flatten with itself, to not drop it. @@ -106,6 +120,7 @@ func (h *runner) handleFlatten(tid string, t *pipepb.PTransform, comps *pipepb.C Transforms: map[string]*pipepb.PTransform{ tid: t, }, + Coders: coderSubs, }, RemovedLeaves: nil, ForcedRoots: forcedRoots, diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index 5760ce7871b7..737a1b22276a 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -204,16 +204,25 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo } if !bypassedWindowingStrategies[wsID] { check("WindowingStrategy.OnTimeBehavior", ws.GetOnTimeBehavior(), pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY, pipepb.OnTimeBehavior_FIRE_ALWAYS) - check("WindowingStrategy.OutputTime", ws.GetOutputTime(), pipepb.OutputTime_END_OF_WINDOW) - // Non nil triggers should fail. + + // Allow earliest and latest in pane to unblock running python tasks. + // Tests actually using the set behavior will fail. + check("WindowingStrategy.OutputTime", ws.GetOutputTime(), pipepb.OutputTime_END_OF_WINDOW, + pipepb.OutputTime_EARLIEST_IN_PANE, pipepb.OutputTime_LATEST_IN_PANE) + // Non default triggers should fail. if ws.GetTrigger().GetDefault() == nil { dt := &pipepb.Trigger{ Trigger: &pipepb.Trigger_Default_{}, } + // Allow Never and Always triggers to unblock iteration on Java and Python SDKs. + // Without multiple firings, these will be very similar to the default trigger. nt := &pipepb.Trigger{ Trigger: &pipepb.Trigger_Never_{}, } - check("WindowingStrategy.Trigger", ws.GetTrigger().String(), dt.String(), nt.String()) + at := &pipepb.Trigger{ + Trigger: &pipepb.Trigger_Always_{}, + } + check("WindowingStrategy.Trigger", ws.GetTrigger().String(), dt.String(), nt.String(), at.String()) } } } diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go index e357714166a5..95f6af18ac74 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go +++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go @@ -106,11 +106,6 @@ func (p *preprocessor) preProcessGraph(comps *pipepb.Components, j *jobservices. // If there's an unknown urn, and it's not composite, simply add it to the leaves. if len(t.GetSubtransforms()) == 0 { leaves[tid] = struct{}{} - } else { - slog.Info("composite transform has unknown urn", - slog.Group("transform", slog.String("ID", tid), - slog.String("name", t.GetUniqueName()), - slog.String("urn", spec.GetUrn()))) } continue } diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index a8b8bdd918e4..9d1c8481d65e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -310,7 +310,18 @@ func portFor(wInCid string, wk *worker.W) []byte { // It assumes that the side inputs are not sourced from PCollections generated by any transform in this stage. // // Because we need the local ids for routing the sources/sinks information. -func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *engine.ElementManager) error { +func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *engine.ElementManager) (err error) { + // Catch construction time panics and produce them as errors out. + defer func() { + if r := recover(); r != nil { + switch rt := r.(type) { + case error: + err = rt + default: + err = fmt.Errorf("%v", r) + } + } + }() // Assume stage has an indicated primary input coders := map[string]*pipepb.Coder{} @@ -484,6 +495,11 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng for _, pid := range stg.internalCols { lpUnknownCoders(comps.GetPcollections()[pid].GetCoderId(), coders, comps.GetCoders()) } + // Add coders for all windowing strategies. + // TODO: filter PCollections, filter windowing strategies by Pcollections instead. + for _, ws := range comps.GetWindowingStrategies() { + lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders()) + } reconcileCoders(coders, comps.GetCoders()) diff --git a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go index 27fc7f76bbc8..6afb04521af0 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go @@ -44,7 +44,6 @@ func TestUnimplemented(t *testing.T) { // {pipeline: primitives.Drain}, // Can't test drain automatically yet. // Triggers (Need teststream and are unimplemented.) - {pipeline: primitives.TriggerAlways}, {pipeline: primitives.TriggerAfterAll}, {pipeline: primitives.TriggerAfterAny}, {pipeline: primitives.TriggerAfterEach}, @@ -54,9 +53,7 @@ func TestUnimplemented(t *testing.T) { {pipeline: primitives.TriggerElementCount}, {pipeline: primitives.TriggerOrFinally}, {pipeline: primitives.TriggerRepeat}, - - // Needs triggers. - {pipeline: primitives.Panes}, + {pipeline: primitives.TriggerAlways}, } for _, test := range tests { @@ -86,7 +83,12 @@ func TestImplemented(t *testing.T) { {pipeline: primitives.Checkpoints}, {pipeline: primitives.CoGBK}, {pipeline: primitives.ReshuffleKV}, + + // The following have been "allowed" to unblock further development + // But it's not clear these tests truly validate the expected behavior + // of the triggers or panes. {pipeline: primitives.TriggerNever}, + {pipeline: primitives.Panes}, } for _, test := range tests { diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index d8eb4c961493..d25c173e8c2f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -28,6 +28,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" @@ -139,7 +140,15 @@ func (wk *W) Stop() { wk.stopped.Store(true) close(wk.InstReqs) close(wk.DataReqs) - wk.server.Stop() + + // Give the SDK side 5 seconds to gracefully stop, before + // hard stopping all RPCs. + tim := time.AfterFunc(5*time.Second, func() { + wk.server.Stop() + }) + wk.server.GracefulStop() + tim.Stop() + wk.lis.Close() slog.Debug("stopped", "worker", wk) }