Skip to content

Commit

Permalink
[apache#28187][Prism] Relax or fix issues in Prism to allow Python pi…
Browse files Browse the repository at this point in the history
…pelines to execute. (apache#31694)
  • Loading branch information
lostluck authored and reeba212 committed Dec 4, 2024
1 parent c80f8a6 commit 9dd6d03
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 14 deletions.
25 changes: 25 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/coders.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"strings"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
Expand Down Expand Up @@ -250,6 +251,9 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(i

case urns.CoderKV:
ccids := c.GetComponentCoderIds()
if len(ccids) != 2 {
panic(fmt.Sprintf("KV coder with more than 2 components: %s", prototext.Format(c)))
}
kd := pullDecoderNoAlloc(coders[ccids[0]], coders)
vd := pullDecoderNoAlloc(coders[ccids[1]], coders)
return func(r io.Reader) {
Expand All @@ -262,3 +266,24 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(i
panic(fmt.Sprintf("unknown coder urn key: %v", urn))
}
}

// debugCoder is developer code to get the structure of a proto coder visible when
// debugging coder errors in prism. It may sometimes be unused, so we do this to avoid
// linting errors.
var _ = debugCoder

func debugCoder(cid string, coders map[string]*pipepb.Coder) string {
var b strings.Builder
b.WriteString(cid)
b.WriteRune('\n')
c := coders[cid]
if len(c.ComponentCoderIds) > 0 {
b.WriteRune('\t')
b.WriteString(strings.Join(c.ComponentCoderIds, ", "))
b.WriteRune('\n')
for _, ccid := range c.GetComponentCoderIds() {
b.WriteString(debugCoder(ccid, coders))
}
}
return b.String()
}
15 changes: 15 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,28 @@ func (h *runner) handleFlatten(tid string, t *pipepb.PTransform, comps *pipepb.C
}
}

// Change the coders of PCollections being input into a flatten to match the
// Flatten's output coder. They must be compatible SDK side anyway, so ensure
// they're written out to the runner in the same fashion.
// This may stop being necessary once Flatten Unzipping happens in the optimizer.
outPCol := comps.GetPcollections()[outColID]
outCoder := comps.GetCoders()[outPCol.GetCoderId()]
coderSubs := map[string]*pipepb.Coder{}
for _, p := range t.GetInputs() {
inPCol := comps.GetPcollections()[p]
if inPCol.CoderId != outPCol.CoderId {
coderSubs[inPCol.CoderId] = outCoder
}
}

// Return the new components which is the transforms consumer
return prepareResult{
// We sub this flatten with itself, to not drop it.
SubbedComps: &pipepb.Components{
Transforms: map[string]*pipepb.PTransform{
tid: t,
},
Coders: coderSubs,
},
RemovedLeaves: nil,
ForcedRoots: forcedRoots,
Expand Down
15 changes: 12 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,16 +204,25 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
}
if !bypassedWindowingStrategies[wsID] {
check("WindowingStrategy.OnTimeBehavior", ws.GetOnTimeBehavior(), pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY, pipepb.OnTimeBehavior_FIRE_ALWAYS)
check("WindowingStrategy.OutputTime", ws.GetOutputTime(), pipepb.OutputTime_END_OF_WINDOW)
// Non nil triggers should fail.

// Allow earliest and latest in pane to unblock running python tasks.
// Tests actually using the set behavior will fail.
check("WindowingStrategy.OutputTime", ws.GetOutputTime(), pipepb.OutputTime_END_OF_WINDOW,
pipepb.OutputTime_EARLIEST_IN_PANE, pipepb.OutputTime_LATEST_IN_PANE)
// Non default triggers should fail.
if ws.GetTrigger().GetDefault() == nil {
dt := &pipepb.Trigger{
Trigger: &pipepb.Trigger_Default_{},
}
// Allow Never and Always triggers to unblock iteration on Java and Python SDKs.
// Without multiple firings, these will be very similar to the default trigger.
nt := &pipepb.Trigger{
Trigger: &pipepb.Trigger_Never_{},
}
check("WindowingStrategy.Trigger", ws.GetTrigger().String(), dt.String(), nt.String())
at := &pipepb.Trigger{
Trigger: &pipepb.Trigger_Always_{},
}
check("WindowingStrategy.Trigger", ws.GetTrigger().String(), dt.String(), nt.String(), at.String())
}
}
}
Expand Down
5 changes: 0 additions & 5 deletions sdks/go/pkg/beam/runners/prism/internal/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,6 @@ func (p *preprocessor) preProcessGraph(comps *pipepb.Components, j *jobservices.
// If there's an unknown urn, and it's not composite, simply add it to the leaves.
if len(t.GetSubtransforms()) == 0 {
leaves[tid] = struct{}{}
} else {
slog.Info("composite transform has unknown urn",
slog.Group("transform", slog.String("ID", tid),
slog.String("name", t.GetUniqueName()),
slog.String("urn", spec.GetUrn())))
}
continue
}
Expand Down
18 changes: 17 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,18 @@ func portFor(wInCid string, wk *worker.W) []byte {
// It assumes that the side inputs are not sourced from PCollections generated by any transform in this stage.
//
// Because we need the local ids for routing the sources/sinks information.
func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *engine.ElementManager) error {
func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *engine.ElementManager) (err error) {
// Catch construction time panics and produce them as errors out.
defer func() {
if r := recover(); r != nil {
switch rt := r.(type) {
case error:
err = rt
default:
err = fmt.Errorf("%v", r)
}
}
}()
// Assume stage has an indicated primary input

coders := map[string]*pipepb.Coder{}
Expand Down Expand Up @@ -484,6 +495,11 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, em *eng
for _, pid := range stg.internalCols {
lpUnknownCoders(comps.GetPcollections()[pid].GetCoderId(), coders, comps.GetCoders())
}
// Add coders for all windowing strategies.
// TODO: filter PCollections, filter windowing strategies by Pcollections instead.
for _, ws := range comps.GetWindowingStrategies() {
lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders())
}

reconcileCoders(coders, comps.GetCoders())

Expand Down
10 changes: 6 additions & 4 deletions sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func TestUnimplemented(t *testing.T) {
// {pipeline: primitives.Drain}, // Can't test drain automatically yet.

// Triggers (Need teststream and are unimplemented.)
{pipeline: primitives.TriggerAlways},
{pipeline: primitives.TriggerAfterAll},
{pipeline: primitives.TriggerAfterAny},
{pipeline: primitives.TriggerAfterEach},
Expand All @@ -54,9 +53,7 @@ func TestUnimplemented(t *testing.T) {
{pipeline: primitives.TriggerElementCount},
{pipeline: primitives.TriggerOrFinally},
{pipeline: primitives.TriggerRepeat},

// Needs triggers.
{pipeline: primitives.Panes},
{pipeline: primitives.TriggerAlways},
}

for _, test := range tests {
Expand Down Expand Up @@ -86,7 +83,12 @@ func TestImplemented(t *testing.T) {
{pipeline: primitives.Checkpoints},
{pipeline: primitives.CoGBK},
{pipeline: primitives.ReshuffleKV},

// The following have been "allowed" to unblock further development
// But it's not clear these tests truly validate the expected behavior
// of the triggers or panes.
{pipeline: primitives.TriggerNever},
{pipeline: primitives.Panes},
}

for _, test := range tests {
Expand Down
11 changes: 10 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
Expand Down Expand Up @@ -139,7 +140,15 @@ func (wk *W) Stop() {
wk.stopped.Store(true)
close(wk.InstReqs)
close(wk.DataReqs)
wk.server.Stop()

// Give the SDK side 5 seconds to gracefully stop, before
// hard stopping all RPCs.
tim := time.AfterFunc(5*time.Second, func() {
wk.server.Stop()
})
wk.server.GracefulStop()
tim.Stop()

wk.lis.Close()
slog.Debug("stopped", "worker", wk)
}
Expand Down

0 comments on commit 9dd6d03

Please sign in to comment.