diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index 5629071aa0c2..c5db9a85f367 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -394,7 +394,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe if err != nil { c.failed[instID] = err - return fail(ctx, instID, "process bundle failed for instruction %v using plan %v : %v", instID, bdID, err) + return fail(ctx, instID, "ProcessBundle failed: %v", err) } tokens := msg.GetCacheTokens() @@ -426,7 +426,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe c.failed[instID] = err } else if dataError != io.EOF && dataError != nil { // If there was an error on the data channel reads, fail this bundle - // since we may have had a short read.' + // since we may have had a short read. c.failed[instID] = dataError err = dataError } else { diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index aeedf730a9b2..ecff740ed86e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -300,7 +300,7 @@ func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, cod wcID := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders()) return makeWindowCoders(coders[wcID]) } - + func getOnlyValue[K comparable, V any](in map[K]V) V { if len(in) != 1 { panic(fmt.Sprintf("expected single value map, had %v - %v", len(in), in)) diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go index 59bb3aee2b15..f41f25ad0f71 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go @@ -13,7 +13,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -package internal_test package internal_test import ( @@ -41,7 +40,6 @@ import ( func initRunner(t *testing.T) { t.Helper() if *jobopts.Endpoint == "" { - s := jobservices.NewServer(0, internal.RunPipeline) s := jobservices.NewServer(0, internal.RunPipeline) *jobopts.Endpoint = s.Endpoint() go s.Serve() diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go index 5660c9158189..05b3d3bbaa0e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go @@ -75,6 +75,10 @@ func (h *runner) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipep // TODO: Implement the windowing strategy the "backup" transforms used for Reshuffle. // TODO: Implement a fusion break for reshuffles. + if h.config.SDKReshuffle { + panic("SDK side reshuffle not yet supported") + } + // A Reshuffle, in principle, is a no-op on the pipeline structure, WRT correctness. // It could however affect performance, so it exists to tell the runner that this // point in the pipeline needs a fusion break, to enable the pipeline to change it's @@ -87,11 +91,11 @@ func (h *runner) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipep // since the input collection and output collection types match. // Get the input and output PCollections, there should only be 1 each. - if len(t.GetOutputs()) != 1 { - panic("Expected single putput PCollection in reshuffle: " + prototext.Format(t)) + if len(t.GetInputs()) != 1 { + panic("Expected single input PCollection in reshuffle: " + prototext.Format(t)) } if len(t.GetOutputs()) != 1 { - panic("Expected single putput PCollection in reshuffle: " + prototext.Format(t)) + panic("Expected single output PCollection in reshuffle: " + prototext.Format(t)) } inColID := getOnlyValue(t.GetInputs()) diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index b2495a499d90..28d994fbfa5a 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -145,21 +145,21 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo } // Inspect Windowing strategies for unsupported features. - for _, ws := range job.Pipeline.GetComponents().GetWindowingStrategies() { + for wsID, ws := range job.Pipeline.GetComponents().GetWindowingStrategies() { check("WindowingStrategy.AllowedLateness", ws.GetAllowedLateness(), int64(0)) check("WindowingStrategy.ClosingBehaviour", ws.GetClosingBehavior(), pipepb.ClosingBehavior_EMIT_IF_NONEMPTY) check("WindowingStrategy.AccumulationMode", ws.GetAccumulationMode(), pipepb.AccumulationMode_DISCARDING) if ws.GetWindowFn().GetUrn() != urns.WindowFnSession { check("WindowingStrategy.MergeStatus", ws.GetMergeStatus(), pipepb.MergeStatus_NON_MERGING) } - // These are used by reshuffle - // TODO have a more aware blocking for reshuffle specifically. - // check("WindowingStrategy.OnTimeBehavior", ws.GetOnTimeBehavior(), pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY) - // check("WindowingStrategy.OutputTime", ws.GetOutputTime(), pipepb.OutputTime_END_OF_WINDOW) - // // Non nil triggers should fail. - // if ws.GetTrigger().GetDefault() == nil { - // check("WindowingStrategy.Trigger", ws.GetTrigger(), &pipepb.Trigger_Default{}) - // } + if !bypassedWindowingStrategies[wsID] { + check("WindowingStrategy.OnTimeBehavior", ws.GetOnTimeBehavior(), pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY) + check("WindowingStrategy.OutputTime", ws.GetOutputTime(), pipepb.OutputTime_END_OF_WINDOW) + // Non nil triggers should fail. + if ws.GetTrigger().GetDefault() == nil { + check("WindowingStrategy.Trigger", ws.GetTrigger(), &pipepb.Trigger_Default{}) + } + } } if len(errs) > 0 { jErr := &joinError{errs: errs} diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go index ea32e7007e29..b2e0c0ed2195 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go +++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go @@ -244,7 +244,6 @@ func defaultFusion(topological []string, comps *pipepb.Components) []*stage { continue } cs := pcolConsumers[pcolID] - fmt.Printf("XXXXXX Fusing %v, with %v\n", tid, cs) for _, c := range cs { stg.transforms = append(stg.transforms, c.transform) consumed[c.transform] = true diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go index 02776dd37705..ba39d024e716 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go @@ -128,7 +128,7 @@ func Test_preprocessor_preProcessGraph(t *testing.T) { pre := newPreprocessor([]transformPreparer{&testPreparer{}}) gotStages := pre.preProcessGraph(test.input) - if diff := cmp.Diff(test.wantStages, gotStages, cmp.AllowUnexported(stage{}), cmp.AllowUnexported(link{}), cmpopts.EquateEmpty()); diff != "" { + if diff := cmp.Diff(test.wantStages, gotStages, cmp.AllowUnexported(stage{}, link{}), cmpopts.EquateEmpty()); diff != "" { t.Errorf("preProcessGraph(%q) stages diff (-want,+got)\n%v", test.name, diff) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index ec2675ff36f9..1a9c2548df83 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -378,36 +378,6 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error { return nil } -// handleSideInputs ensures appropriate coders are available to the bundle, and prepares a function to stage the data. -func handleSideInputs(tid string, t *pipepb.PTransform, comps *pipepb.Components, coders map[string]*pipepb.Coder, wk *worker.W, replacements map[string]string) (func(b *worker.B, tid string, watermark mtime.Time), error) { - sis, err := getSideInputs(t) - if err != nil { - return nil, err - } - var prepSides []func(b *worker.B, watermark mtime.Time) - - // Get WindowedValue Coders for the transform's input and output PCollections. - for local, global := range t.GetInputs() { - _, ok := sis[local] - if !ok { - continue // This is the main input. - } - if oldGlobal, ok := replacements[global]; ok { - global = oldGlobal - } - prepSide, err := handleSideInput(tid, local, global, comps, coders, wk) - if err != nil { - return nil, err - } - prepSides = append(prepSides, prepSide) - } - return func(b *worker.B, tid string, watermark mtime.Time) { - for _, prep := range prepSides { - prep(b, watermark) - } - }, nil -} - // handleSideInput returns a closure that will look up the data for a side input appropriate for the given watermark. func handleSideInput(tid, local, global string, comps *pipepb.Components, coders map[string]*pipepb.Coder, wk *worker.W) (func(b *worker.B, watermark mtime.Time), error) { t := comps.GetTransforms()[tid] diff --git a/sdks/go/pkg/beam/runners/prism/internal/stateful_test.go b/sdks/go/pkg/beam/runners/prism/internal/stateful_test.go deleted file mode 100644 index 687f7e4f0db4..000000000000 --- a/sdks/go/pkg/beam/runners/prism/internal/stateful_test.go +++ /dev/null @@ -1,51 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package internal_test - -import ( - "context" - "testing" - - "github.com/apache/beam/sdks/v2/go/pkg/beam" -) - -// This file covers pipelines with stateful DoFns, in particular, that they -// use the state and timers APIs. - -func TestStateful(t *testing.T) { - initRunner(t) - - tests := []struct { - pipeline func(s beam.Scope) - metrics func(t *testing.T, pr beam.PipelineResult) - }{ - //{pipeline: primitives.BagStateParDo}, - } - - for _, test := range tests { - t.Run(intTestName(test.pipeline), func(t *testing.T) { - p, s := beam.NewPipelineWithRoot() - test.pipeline(s) - pr, err := executeWithT(context.Background(), t, p) - if err != nil { - t.Fatal(err) - } - if test.metrics != nil { - test.metrics(t, pr) - } - }) - } -} diff --git a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go index 3ebc9ed2b332..334d74fcae1d 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go @@ -13,7 +13,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -package internal_test package internal_test import ( @@ -22,11 +21,6 @@ import ( "sort" "time" - "context" - "fmt" - "sort" - "time" - "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" @@ -73,16 +67,6 @@ func init() { register.Emitter2[int64, int64]() } -// The Test DoFns live outside of the test files to get coverage information on DoFn -// Lifecycle method execution. This inflates binary size, but ensures the runner is -// exercising the expected feature set. -// -// Once there's enough confidence in the runner, we can move these into a dedicated testing -// package along with the pipelines that use them. - -// Registrations should happen in the test files, so the compiler can prune these -// when they are not in use. - func dofnEmpty(imp []byte, emit func(int64)) { } diff --git a/sdks/go/pkg/beam/runners/prism/internal/web/web_test.go b/sdks/go/pkg/beam/runners/prism/internal/web/web_test.go deleted file mode 100644 index cc8e979f1917..000000000000 --- a/sdks/go/pkg/beam/runners/prism/internal/web/web_test.go +++ /dev/null @@ -1,16 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package web diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index bc76b319a6c6..a1d0ff79baf1 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -267,7 +267,10 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error { for { select { case req := <-wk.InstReqs: - ctrl.Send(req) + err := ctrl.Send(req) + if err != nil { + return err + } case <-ctrl.Context().Done(): slog.Debug("Control context canceled") return ctrl.Context().Err() @@ -323,7 +326,10 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error { }() for { select { - case req := <-wk.DataReqs: + case req, ok := <-wk.DataReqs: + if !ok { + return nil + } if err := data.Send(req); err != nil { slog.LogAttrs(context.TODO(), slog.LevelDebug, "data.Send error", slog.Any("error", err)) } diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go index b2a9b7dfa86f..eb854dbfcdba 100644 --- a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go +++ b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go @@ -57,7 +57,8 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt *JobO bin = worker } } else if opt.Loopback { - // TODO, determine the canonical location for Beam temp files. + // TODO(https://github.com/apache/beam/issues/27569: determine the canonical location for Beam temp files. + // In loopback mode, the binary is unused, so we can avoid an unnecessary compile step. f, _ := os.CreateTemp(os.TempDir(), "beamloopbackworker-*") bin = f.Name() } else { diff --git a/sdks/go/pkg/beam/testing/passert/count_test.go b/sdks/go/pkg/beam/testing/passert/count_test.go index f5014b840371..c34294998509 100644 --- a/sdks/go/pkg/beam/testing/passert/count_test.go +++ b/sdks/go/pkg/beam/testing/passert/count_test.go @@ -22,10 +22,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" ) -func TestMain(m *testing.M) { - ptest.Main(m) -} - func TestCount(t *testing.T) { var tests = []struct { name string diff --git a/sdks/go/pkg/beam/testing/passert/equals_test.go b/sdks/go/pkg/beam/testing/passert/equals_test.go index 0eb0d0728a3f..27419a5c13c8 100644 --- a/sdks/go/pkg/beam/testing/passert/equals_test.go +++ b/sdks/go/pkg/beam/testing/passert/equals_test.go @@ -180,7 +180,7 @@ func ExampleEqualsList_mismatch() { list := [3]string{"wrong", "inputs", "here"} EqualsList(s, col, list) - ptest.DefaultRunner() + err := ptest.Run(p) err = unwrapError(err) diff --git a/sdks/go/pkg/beam/testing/passert/floats.go b/sdks/go/pkg/beam/testing/passert/floats.go index 962891b7cec9..f71e55090838 100644 --- a/sdks/go/pkg/beam/testing/passert/floats.go +++ b/sdks/go/pkg/beam/testing/passert/floats.go @@ -31,6 +31,7 @@ func init() { register.DoFn2x1[[]byte, func(*beam.T) bool, error]((*boundsFn)(nil)) register.DoFn3x1[[]byte, func(*beam.T) bool, func(*beam.T) bool, error]((*thresholdFn)(nil)) register.Emitter1[beam.T]() + register.Iter1[beam.T]() } // EqualsFloat calls into TryEqualsFloat, checkong that two PCollections of non-complex diff --git a/sdks/go/pkg/beam/testing/passert/passert_test.go b/sdks/go/pkg/beam/testing/passert/passert_test.go index ffd0388644a9..d472f6883939 100644 --- a/sdks/go/pkg/beam/testing/passert/passert_test.go +++ b/sdks/go/pkg/beam/testing/passert/passert_test.go @@ -24,6 +24,10 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" ) +func TestMain(m *testing.M) { + ptest.Main(m) +} + func isA(input string) bool { return input == "a" } func isB(input string) bool { return input == "b" } func lessThan13(input int) bool { return input < 13 } diff --git a/sdks/go/pkg/beam/testing/ptest/ptest.go b/sdks/go/pkg/beam/testing/ptest/ptest.go index 4ac16fd9f97d..0bca8c48ceb6 100644 --- a/sdks/go/pkg/beam/testing/ptest/ptest.go +++ b/sdks/go/pkg/beam/testing/ptest/ptest.go @@ -26,7 +26,8 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/runners" // common runner flag. // ptest uses the prism runner to execute pipelines by default. - // but includes the direct runner for legacy fallback reasons. + // but includes the direct runner for legacy fallback reasons to + // support users overriding the default back to the direct runner. _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism" ) diff --git a/sdks/go/pkg/beam/transforms/filter/filter_test.go b/sdks/go/pkg/beam/transforms/filter/filter_test.go index 14ae106ec962..4e9eb03f9d84 100644 --- a/sdks/go/pkg/beam/transforms/filter/filter_test.go +++ b/sdks/go/pkg/beam/transforms/filter/filter_test.go @@ -31,9 +31,9 @@ func init() { register.Function1x1(greaterThanOne) } -func alwaysTrue(a int) bool { return true } -func alwaysFalse(a int) bool { return false } -func isOne(a int) bool { return a == 1 } +func alwaysTrue(a int) bool { return true } +func alwaysFalse(a int) bool { return false } +func isOne(a int) bool { return a == 1 } func greaterThanOne(a int) bool { return a > 1 } func TestInclude(t *testing.T) { diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 1224ce2bb071..f66cc1f53bfc 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -145,7 +145,7 @@ var prismFilters = []string{ // TODO(https://github.com/apache/beam/issues/21058): Xlang ios don't yet work on prism. "TestKafkaIO.*", - // TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow prism runners. + // TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners. "TestBigQueryIO.*", "TestSpannerIO.*", // The prism runner does not support pipeline drain for SDF.