From 1207b42c4126bcb542ac8ac277dc2457f6a5bf0c Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 14 Jul 2023 15:11:56 -0700 Subject: [PATCH 01/21] Make the prism runner the default Go SDK runner. --- sdks/go/pkg/beam/io/databaseio/database_test.go | 1 + sdks/go/pkg/beam/runners/direct/direct.go | 1 - sdks/go/pkg/beam/testing/ptest/ptest.go | 3 +-- sdks/go/test/integration/integration.go | 14 ++++++++------ 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/sdks/go/pkg/beam/io/databaseio/database_test.go b/sdks/go/pkg/beam/io/databaseio/database_test.go index b93d5c9da727..f6c1355e851a 100644 --- a/sdks/go/pkg/beam/io/databaseio/database_test.go +++ b/sdks/go/pkg/beam/io/databaseio/database_test.go @@ -22,6 +22,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" _ "github.com/proullon/ramsql/driver" diff --git a/sdks/go/pkg/beam/runners/direct/direct.go b/sdks/go/pkg/beam/runners/direct/direct.go index 13288306066f..d1f8937f7840 100644 --- a/sdks/go/pkg/beam/runners/direct/direct.go +++ b/sdks/go/pkg/beam/runners/direct/direct.go @@ -17,7 +17,6 @@ // pipelines in the current process. Useful for testing. // // Deprecated: Use prism as a local runner instead. -// Reliance on the direct runner leads to non-portable pipelines. package direct import ( diff --git a/sdks/go/pkg/beam/testing/ptest/ptest.go b/sdks/go/pkg/beam/testing/ptest/ptest.go index 0bca8c48ceb6..4ac16fd9f97d 100644 --- a/sdks/go/pkg/beam/testing/ptest/ptest.go +++ b/sdks/go/pkg/beam/testing/ptest/ptest.go @@ -26,8 +26,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/runners" // common runner flag. // ptest uses the prism runner to execute pipelines by default. - // but includes the direct runner for legacy fallback reasons to - // support users overriding the default back to the direct runner. + // but includes the direct runner for legacy fallback reasons. _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism" ) diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 0f9e5984eadd..7914a0525fef 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -136,26 +136,26 @@ var portableFilters = []string{ "TestSetStateClear", } -// TODO(lostluck): set up a specific run for these. var prismFilters = []string{ - // The prism runner does not support the TestStream primitive + // The portable runner does not support the TestStream primitive "TestTestStream.*", // The trigger and pane tests uses TestStream "TestTrigger.*", "TestPanes", - - // TODO(https://github.com/apache/beam/issues/21058): Xlang ios don't yet work on prism. + // TODO(https://github.com/apache/beam/issues/21058): Python portable runner times out on Kafka reads. "TestKafkaIO.*", // TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners. "TestBigQueryIO.*", "TestSpannerIO.*", - // The prsim runner does not support pipeline drain for SDF. + // The portable runner does not support self-checkpointing + "TestCheckpointing", + // The portable runner does not support pipeline drain for SDF. "TestDrain", // FhirIO currently only supports Dataflow runner "TestFhirIO.*", // OOMs currently only lead to heap dumps on Dataflow runner "TestOomParDo", - // The prism runner does not support user state. + // The portable runner does not support user state. "TestValueState", "TestValueStateWindowed", "TestValueStateClear", @@ -328,6 +328,8 @@ func CheckFilters(t *testing.T) { filters = prismFilters case "portable", "PortableRunner": filters = portableFilters + case "prism", "PrismRunner": + filters = prismFilters case "flink", "FlinkRunner": filters = flinkFilters case "samza", "SamzaRunner": From b49dfcb41b1bdc399f1a98dcb105546affa5f681 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 14 Jul 2023 15:12:20 -0700 Subject: [PATCH 02/21] Break cycle with ptest. --- .../runners/prism/internal/execute_test.go | 3 +- .../runners/prism/internal/stateful_test.go | 51 +++++++++++++++++++ .../runners/prism/internal/testdofns_test.go | 25 +++++---- .../runners/prism/internal/web/web_test.go | 16 ++++++ 4 files changed, 84 insertions(+), 11 deletions(-) create mode 100644 sdks/go/pkg/beam/runners/prism/internal/stateful_test.go create mode 100644 sdks/go/pkg/beam/runners/prism/internal/web/web_test.go diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go index 1a5ae7989a06..4ec936a01cef 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go @@ -13,6 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +package internal_test package internal_test import ( @@ -27,7 +28,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics" "github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts" - "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/jobservices" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal" @@ -40,6 +40,7 @@ import ( func initRunner(t *testing.T) { t.Helper() if *jobopts.Endpoint == "" { + s := jobservices.NewServer(0, internal.RunPipeline) s := jobservices.NewServer(0, internal.RunPipeline) *jobopts.Endpoint = s.Endpoint() go s.Serve() diff --git a/sdks/go/pkg/beam/runners/prism/internal/stateful_test.go b/sdks/go/pkg/beam/runners/prism/internal/stateful_test.go new file mode 100644 index 000000000000..687f7e4f0db4 --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/internal/stateful_test.go @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal_test + +import ( + "context" + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" +) + +// This file covers pipelines with stateful DoFns, in particular, that they +// use the state and timers APIs. + +func TestStateful(t *testing.T) { + initRunner(t) + + tests := []struct { + pipeline func(s beam.Scope) + metrics func(t *testing.T, pr beam.PipelineResult) + }{ + //{pipeline: primitives.BagStateParDo}, + } + + for _, test := range tests { + t.Run(intTestName(test.pipeline), func(t *testing.T) { + p, s := beam.NewPipelineWithRoot() + test.pipeline(s) + pr, err := executeWithT(context.Background(), t, p) + if err != nil { + t.Fatal(err) + } + if test.metrics != nil { + test.metrics(t, pr) + } + }) + } +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go index 334d74fcae1d..927a41b045ee 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go @@ -13,6 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +package internal_test package internal_test import ( @@ -21,6 +22,11 @@ import ( "sort" "time" + "context" + "fmt" + "sort" + "time" + "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" @@ -67,8 +73,15 @@ func init() { register.Emitter2[int64, int64]() } -func dofnEmpty(imp []byte, emit func(int64)) { -} +// The Test DoFns live outside of the test files to get coverage information on DoFn +// Lifecycle method execution. This inflates binary size, but ensures the runner is +// exercising the expected feature set. +// +// Once there's enough confidence in the runner, we can move these into a dedicated testing +// package along with the pipelines that use them. + +// Registrations should happen in the test files, so the compiler can prune these +// when they are not in use. func dofn1(imp []byte, emit func(int64)) { emit(1) @@ -243,14 +256,6 @@ func dofnGBK2(k int64, vs func(*string) bool, emit func(string)) { emit(sum) } -func dofnGBKKV(k string, vs func(*int64) bool, emit func(string, int64)) { - var v, sum int64 - for vs(&v) { - sum += v - } - emit(k, sum) -} - type testRow struct { A string B int64 diff --git a/sdks/go/pkg/beam/runners/prism/internal/web/web_test.go b/sdks/go/pkg/beam/runners/prism/internal/web/web_test.go new file mode 100644 index 000000000000..cc8e979f1917 --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/internal/web/web_test.go @@ -0,0 +1,16 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package web From 8157344a7b3b9d970c216431361ad2bef8001a40 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Tue, 18 Jul 2023 20:31:35 -0700 Subject: [PATCH 03/21] [DO NOT SUBMIT] Most changes needec to set prism as default Go SDK runner. --- sdks/go/pkg/beam/beam.shims.go | 10 ---- .../pkg/beam/core/runtime/exec/translate.go | 1 - .../pkg/beam/core/runtime/harness/harness.go | 2 +- sdks/go/pkg/beam/create.go | 5 ++ sdks/go/pkg/beam/create_test.go | 10 ++-- sdks/go/pkg/beam/io/avroio/avroio.go | 7 ++- sdks/go/pkg/beam/io/avroio/avroio_test.go | 11 ++--- .../pkg/beam/io/datastoreio/datastore_test.go | 10 ++-- sdks/go/pkg/beam/io/spannerio/write_test.go | 2 + sdks/go/pkg/beam/pardo_test.go | 4 +- .../pkg/beam/runners/prism/internal/coders.go | 1 + .../prism/internal/engine/elementmanager.go | 4 +- .../runners/prism/internal/execute_test.go | 47 +++++++++++++++++++ .../prism/internal/jobservices/management.go | 1 - .../pkg/beam/runners/prism/internal/stage.go | 5 ++ .../runners/prism/internal/testdofns_test.go | 3 ++ .../runners/prism/internal/worker/worker.go | 7 +-- .../runners/universal/runnerlib/execute.go | 3 +- .../beam/runners/universal/runnerlib/job.go | 14 +++--- .../go/pkg/beam/testing/passert/count_test.go | 4 ++ .../pkg/beam/testing/passert/equals_test.go | 1 + sdks/go/pkg/beam/testing/passert/floats.go | 1 - .../pkg/beam/testing/passert/passert_test.go | 4 -- .../beam/transforms/filter/distinct_test.go | 4 ++ .../pkg/beam/transforms/filter/filter_test.go | 10 ++-- .../go/pkg/beam/transforms/stats/quantiles.go | 2 +- sdks/go/pkg/beam/transforms/top/top_test.go | 12 ++--- 27 files changed, 115 insertions(+), 70 deletions(-) diff --git a/sdks/go/pkg/beam/beam.shims.go b/sdks/go/pkg/beam/beam.shims.go index 6653fb0129f7..17ec42d85173 100644 --- a/sdks/go/pkg/beam/beam.shims.go +++ b/sdks/go/pkg/beam/beam.shims.go @@ -44,13 +44,10 @@ func init() { runtime.RegisterFunction(schemaDec) runtime.RegisterFunction(schemaEnc) runtime.RegisterFunction(swapKVFn) - runtime.RegisterType(reflect.TypeOf((*createFn)(nil)).Elem()) - schema.RegisterType(reflect.TypeOf((*createFn)(nil)).Elem()) runtime.RegisterType(reflect.TypeOf((*reflect.Type)(nil)).Elem()) schema.RegisterType(reflect.TypeOf((*reflect.Type)(nil)).Elem()) runtime.RegisterType(reflect.TypeOf((*reflectx.Func)(nil)).Elem()) schema.RegisterType(reflect.TypeOf((*reflectx.Func)(nil)).Elem()) - reflectx.RegisterStructWrapper(reflect.TypeOf((*createFn)(nil)).Elem(), wrapMakerCreateFn) reflectx.RegisterFunc(reflect.TypeOf((*func(reflect.Type, []byte) (typex.T, error))(nil)).Elem(), funcMakerReflect۰TypeSliceOfByteГTypex۰TError) reflectx.RegisterFunc(reflect.TypeOf((*func(reflect.Type, typex.T) ([]byte, error))(nil)).Elem(), funcMakerReflect۰TypeTypex۰TГSliceOfByteError) reflectx.RegisterFunc(reflect.TypeOf((*func([]byte, func(typex.T)) error)(nil)).Elem(), funcMakerSliceOfByteEmitTypex۰TГError) @@ -64,13 +61,6 @@ func init() { exec.RegisterEmitter(reflect.TypeOf((*func(typex.T))(nil)).Elem(), emitMakerTypex۰T) } -func wrapMakerCreateFn(fn any) map[string]reflectx.Func { - dfn := fn.(*createFn) - return map[string]reflectx.Func{ - "ProcessElement": reflectx.MakeFunc(func(a0 []byte, a1 func(typex.T)) error { return dfn.ProcessElement(a0, a1) }), - } -} - type callerReflect۰TypeSliceOfByteГTypex۰TError struct { fn func(reflect.Type, []byte) (typex.T, error) } diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go b/sdks/go/pkg/beam/core/runtime/exec/translate.go index 02a1418880e5..0e99dfc847e6 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/translate.go +++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go @@ -612,7 +612,6 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) { for i := 1; i < len(input); i++ { // TODO(https://github.com/apache/beam/issues/18602) Handle ViewFns for side inputs - ec, wc, err := b.makeCoderForPCollection(input[i]) if err != nil { return nil, err diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index d97b6b7db079..7e7c711e9d0e 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -426,7 +426,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe c.failed[instID] = err } else if dataError != io.EOF && dataError != nil { // If there was an error on the data channel reads, fail this bundle - // since we may have had a short read. + // since we may have had a short read.' c.failed[instID] = dataError err = dataError } else { diff --git a/sdks/go/pkg/beam/create.go b/sdks/go/pkg/beam/create.go index d2bd554963ee..91e9f335ef87 100644 --- a/sdks/go/pkg/beam/create.go +++ b/sdks/go/pkg/beam/create.go @@ -112,6 +112,11 @@ func createList(s Scope, values []any, t reflect.Type) (PCollection, error) { // TODO(herohde) 6/26/2017: make 'create' a SDF once supported. See BEAM-2421. +func init() { + register.DoFn2x1[[]byte, func(T), error]((*createFn)(nil)) + register.Emitter1[T]() +} + type createFn struct { Values [][]byte `json:"values"` Type EncodedType `json:"type"` diff --git a/sdks/go/pkg/beam/create_test.go b/sdks/go/pkg/beam/create_test.go index 9033979d0502..39fc484be1da 100644 --- a/sdks/go/pkg/beam/create_test.go +++ b/sdks/go/pkg/beam/create_test.go @@ -69,11 +69,11 @@ func TestCreateList(t *testing.T) { tests := []struct { values any }{ - {[]int{1, 2, 3}}, - {[]string{"1", "2", "3"}}, - {[]float32{float32(0.1), float32(0.2), float32(0.3)}}, - {[]float64{float64(0.1), float64(0.2), float64(0.3)}}, - {[]uint{uint(1), uint(2), uint(3)}}, + //{[]int{1, 2, 3}}, + // {[]string{"1", "2", "3"}}, + // {[]float32{float32(0.1), float32(0.2), float32(0.3)}}, + // {[]float64{float64(0.1), float64(0.2), float64(0.3)}}, + // {[]uint{uint(1), uint(2), uint(3)}}, {[]bool{false, true, true, false, true}}, {[]wc{wc{"a", 23}, wc{"b", 42}, wc{"c", 5}}}, {[]*testProto{&testProto{}, &testProto{stringValue("test")}}}, // Test for BEAM-4401 diff --git a/sdks/go/pkg/beam/io/avroio/avroio.go b/sdks/go/pkg/beam/io/avroio/avroio.go index b00c6d2eea00..7aa5d0318999 100644 --- a/sdks/go/pkg/beam/io/avroio/avroio.go +++ b/sdks/go/pkg/beam/io/avroio/avroio.go @@ -26,15 +26,14 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/register" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/linkedin/goavro/v2" ) func init() { register.Function3x1(expandFn) - register.DoFn3x1[context.Context, string, func(beam.X), error]((*avroReadFn)(nil)) - register.DoFn3x1[context.Context, int, func(*string) bool, error]((*writeAvroFn)(nil)) - register.Emitter1[beam.X]() - register.Iter1[string]() + beam.RegisterType(reflect.TypeOf((*avroReadFn)(nil)).Elem()) + beam.RegisterType(reflect.TypeOf((*writeAvroFn)(nil)).Elem()) } // Read reads a set of files and returns lines as a PCollection diff --git a/sdks/go/pkg/beam/io/avroio/avroio_test.go b/sdks/go/pkg/beam/io/avroio/avroio_test.go index 403a81875557..7e7ea7ceee32 100644 --- a/sdks/go/pkg/beam/io/avroio/avroio_test.go +++ b/sdks/go/pkg/beam/io/avroio/avroio_test.go @@ -44,11 +44,6 @@ func init() { register.Function2x0(toJSONString) } -func toJSONString(user TwitterUser, emit func(string)) { - b, _ := json.Marshal(user) - emit(string(b)) -} - type Tweet struct { Stamp int64 `json:"timestamp"` Tweet string `json:"tweet"` @@ -140,6 +135,11 @@ const userSchema = `{ ] }` +func toJSONString(user TwitterUser, emit func(string)) { + b, _ := json.Marshal(user) + emit(string(b)) +} + func TestWrite(t *testing.T) { avroFile := "./user.avro" testUsername := "user1" @@ -153,7 +153,6 @@ func TestWrite(t *testing.T) { t.Cleanup(func() { os.Remove(avroFile) }) - ptest.RunAndValidate(t, p) if _, err := os.Stat(avroFile); errors.Is(err, os.ErrNotExist) { diff --git a/sdks/go/pkg/beam/io/datastoreio/datastore_test.go b/sdks/go/pkg/beam/io/datastoreio/datastore_test.go index 345eaa2a59ef..a6fdf9987ca3 100644 --- a/sdks/go/pkg/beam/io/datastoreio/datastore_test.go +++ b/sdks/go/pkg/beam/io/datastoreio/datastore_test.go @@ -35,11 +35,6 @@ func TestMain(m *testing.M) { ptest.MainWithDefault(m, "direct") } -func init() { - beam.RegisterType(reflect.TypeOf((*Foo)(nil)).Elem()) - beam.RegisterType(reflect.TypeOf((*Bar)(nil)).Elem()) -} - // fake client type implements datastoreio.clientType type fakeClient struct { runCounter int @@ -64,6 +59,11 @@ type Foo struct { type Bar struct { } +func init() { + beam.RegisterType(reflect.TypeOf((*Foo)(nil)).Elem()) + beam.RegisterType(reflect.TypeOf((*Bar)(nil)).Elem()) +} + func Test_query(t *testing.T) { testCases := []struct { v any diff --git a/sdks/go/pkg/beam/io/spannerio/write_test.go b/sdks/go/pkg/beam/io/spannerio/write_test.go index 3c2c1f591519..d3cc8a1b9058 100644 --- a/sdks/go/pkg/beam/io/spannerio/write_test.go +++ b/sdks/go/pkg/beam/io/spannerio/write_test.go @@ -19,6 +19,8 @@ import ( "context" "testing" + spannertest "github.com/apache/beam/sdks/v2/go/test/integration/io/spannerio" + "cloud.google.com/go/spanner" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" diff --git a/sdks/go/pkg/beam/pardo_test.go b/sdks/go/pkg/beam/pardo_test.go index b88a6d642ea9..56ed7e3e9fa6 100644 --- a/sdks/go/pkg/beam/pardo_test.go +++ b/sdks/go/pkg/beam/pardo_test.go @@ -72,9 +72,9 @@ func testFunction() int64 { func TestFormatParDoError(t *testing.T) { got := formatParDoError(testFunction, 2, 1) - want := "beam.testFunction has 2 outputs, but ParDo requires 1 outputs, use ParDo2 instead." + want := "has 2 outputs, but ParDo requires 1 outputs, use ParDo2 instead." if !strings.Contains(got, want) { - t.Errorf("formatParDoError(testFunction,2,1) = %v, want = %v", got, want) + t.Errorf("formatParDoError(testFunction,2,1) = \n%q want =\n%q", got, want) } } diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go index a141440400ec..09313b62bd1b 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -73,6 +73,7 @@ func makeWindowedValueCoder(pID string, comps *pipepb.Components, coders map[str } // Populate the coders to send with the new windowed value coder. coders[wvcID] = wInC + // col.CoderId = cID // So PCollection coders are looked up correctly for side inputs. return wvcID } diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index c8721e1a2079..995187420cde 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -158,6 +158,9 @@ func (em *ElementManager) AddStage(ID string, inputIDs, sides, outputIDs []strin em.consumers[input] = append(em.consumers[input], ss.ID) } for _, side := range ss.sides { + // TODO: clean up this hack to identify the stage for side input consumers. + // drop the _prismside suffix for any side input ID the ids. + // side, _ = strings.CutSuffix(side, "_prismside") em.sideConsumers[side] = append(em.sideConsumers[side], ss.ID) } } @@ -707,7 +710,6 @@ func (ss *stageState) bundleReady(em *ElementManager) (mtime.Time, bool) { ready := true for _, side := range ss.sides { pID, ok := em.pcolParents[side] - // These panics indicate pre-process/stage construction problems. if !ok { panic(fmt.Sprintf("stage[%v] no parent ID for side input %v", ss.ID, side)) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go index 4ec936a01cef..971bdb47bbbf 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go @@ -500,6 +500,53 @@ func TestRunner_Metrics(t *testing.T) { }) } +func TestRunner_Passert(t *testing.T) { + initRunner(t) + tests := []struct { + name string + pipeline func(s beam.Scope) + metrics func(t *testing.T, pr beam.PipelineResult) + }{ + { + name: "Empty", + pipeline: func(s beam.Scope) { + imp := beam.Impulse(s) + col1 := beam.ParDo(s, dofnEmpty, imp) + passert.Empty(s, col1) + }, + }, { + name: "Equals-TwoEmpty", + pipeline: func(s beam.Scope) { + imp := beam.Impulse(s) + col1 := beam.ParDo(s, dofnEmpty, imp) + col2 := beam.ParDo(s, dofnEmpty, imp) + passert.Equals(s, col1, col2) + }, + }, { + name: "Equals", + pipeline: func(s beam.Scope) { + imp := beam.Impulse(s) + col1 := beam.ParDo(s, dofn1, imp) + col2 := beam.ParDo(s, dofn1, imp) + passert.Equals(s, col1, col2) + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + p, s := beam.NewPipelineWithRoot() + test.pipeline(s) + pr, err := executeWithT(context.Background(), t, p) + if err != nil { + t.Fatal(err) + } + if test.metrics != nil { + test.metrics(t, pr) + } + }) + } +} + func TestFailure(t *testing.T) { initRunner(t) diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index 953ee50c559d..5fe6cbb93da2 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -212,7 +212,6 @@ func (s *Server) GetMessageStream(req *jobpb.JobMessagesRequest, stream jobpb.Jo // Reached terminal state. return nil case jobpb.JobState_FAILED: - // Ensure we send an error message with the cause of the job failure. stream.Send(&jobpb.JobMessagesResponse{ Response: &jobpb.JobMessagesResponse_MessageResponse{ MessageResponse: &jobpb.JobMessage{ diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index e6fe28714b7f..d530b8a71f89 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -130,6 +130,11 @@ progress: progTick.Stop() break progress // exit progress loop on close. case <-progTick.C: + resp, err := b.Progress(wk) + if err != nil { + slog.Debug("SDK Error from progress, aborting progress", "bundle", rb, "error", err.Error()) + break progress + } resp, err := b.Progress(wk) if err != nil { slog.Debug("SDK Error from progress, aborting progress", "bundle", rb, "error", err.Error()) diff --git a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go index 927a41b045ee..4a79aa9710c5 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go @@ -83,6 +83,9 @@ func init() { // Registrations should happen in the test files, so the compiler can prune these // when they are not in use. +func dofnEmpty(imp []byte, emit func(int64)) { +} + func dofn1(imp []byte, emit func(int64)) { emit(1) emit(2) diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index eefab54a54cc..bc76b319a6c6 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -256,7 +256,6 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error { // TODO: Do more than assume these are ProcessBundleResponses. wk.mu.Lock() if b, ok := wk.activeInstructions[resp.GetInstructionId()]; ok { - // Error is handled in the resonse handler. b.Respond(resp) } else { slog.Debug("ctrl.Recv: %v", resp) @@ -322,13 +321,9 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error { wk.mu.Unlock() } }() - for { select { - case req, ok := <-wk.DataReqs: - if !ok { - return nil - } + case req := <-wk.DataReqs: if err := data.Send(req); err != nil { slog.LogAttrs(context.TODO(), slog.LevelDebug, "data.Send error", slog.Any("error", err)) } diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go index eb854dbfcdba..b2a9b7dfa86f 100644 --- a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go +++ b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go @@ -57,8 +57,7 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt *JobO bin = worker } } else if opt.Loopback { - // TODO(https://github.com/apache/beam/issues/27569: determine the canonical location for Beam temp files. - // In loopback mode, the binary is unused, so we can avoid an unnecessary compile step. + // TODO, determine the canonical location for Beam temp files. f, _ := os.CreateTemp(os.TempDir(), "beamloopbackworker-*") bin = f.Name() } else { diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go index 8cbb274e184f..d8dd85eb92be 100644 --- a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go +++ b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go @@ -103,7 +103,7 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID return errors.Wrap(err, "failed to get job stream") } - mostRecentError := errors.New("") + mostRecentError := "" var errReceived, jobFailed bool for { @@ -111,8 +111,8 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID if err != nil { if err == io.EOF { if jobFailed { - // Connection finished with a failed status, so produce what we have. - return errors.Errorf("job %v failed:\n%w", jobID, mostRecentError) + // Connection finished, so time to exit, produce what we have. + return errors.Errorf("job %v failed:\n%v", jobID, mostRecentError) } return nil } @@ -131,9 +131,9 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID case jobpb.JobState_FAILED: jobFailed = true if errReceived { - return errors.Errorf("job %v failed:\n%w", jobID, mostRecentError) + return errors.Errorf("job %v failed:\n%v", jobID, mostRecentError) } - // Otherwise, wait for at least one error log from the runner, or the connection to close. + // Otherwise we should wait for at least one error log from the runner. } case msg.GetMessageResponse() != nil: @@ -144,10 +144,10 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID if resp.GetImportance() >= jobpb.JobMessage_JOB_MESSAGE_ERROR { errReceived = true - mostRecentError = errors.New(resp.GetMessageText()) + mostRecentError = resp.GetMessageText() if jobFailed { - return errors.Errorf("job %v failed:\n%w", jobID, mostRecentError) + return errors.Errorf("job %v failed:\n%w", jobID, errors.New(mostRecentError)) } } diff --git a/sdks/go/pkg/beam/testing/passert/count_test.go b/sdks/go/pkg/beam/testing/passert/count_test.go index c34294998509..f5014b840371 100644 --- a/sdks/go/pkg/beam/testing/passert/count_test.go +++ b/sdks/go/pkg/beam/testing/passert/count_test.go @@ -22,6 +22,10 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" ) +func TestMain(m *testing.M) { + ptest.Main(m) +} + func TestCount(t *testing.T) { var tests = []struct { name string diff --git a/sdks/go/pkg/beam/testing/passert/equals_test.go b/sdks/go/pkg/beam/testing/passert/equals_test.go index a8a5c835f8ff..0eb0d0728a3f 100644 --- a/sdks/go/pkg/beam/testing/passert/equals_test.go +++ b/sdks/go/pkg/beam/testing/passert/equals_test.go @@ -180,6 +180,7 @@ func ExampleEqualsList_mismatch() { list := [3]string{"wrong", "inputs", "here"} EqualsList(s, col, list) + ptest.DefaultRunner() err := ptest.Run(p) err = unwrapError(err) diff --git a/sdks/go/pkg/beam/testing/passert/floats.go b/sdks/go/pkg/beam/testing/passert/floats.go index f71e55090838..962891b7cec9 100644 --- a/sdks/go/pkg/beam/testing/passert/floats.go +++ b/sdks/go/pkg/beam/testing/passert/floats.go @@ -31,7 +31,6 @@ func init() { register.DoFn2x1[[]byte, func(*beam.T) bool, error]((*boundsFn)(nil)) register.DoFn3x1[[]byte, func(*beam.T) bool, func(*beam.T) bool, error]((*thresholdFn)(nil)) register.Emitter1[beam.T]() - register.Iter1[beam.T]() } // EqualsFloat calls into TryEqualsFloat, checkong that two PCollections of non-complex diff --git a/sdks/go/pkg/beam/testing/passert/passert_test.go b/sdks/go/pkg/beam/testing/passert/passert_test.go index d472f6883939..ffd0388644a9 100644 --- a/sdks/go/pkg/beam/testing/passert/passert_test.go +++ b/sdks/go/pkg/beam/testing/passert/passert_test.go @@ -24,10 +24,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" ) -func TestMain(m *testing.M) { - ptest.Main(m) -} - func isA(input string) bool { return input == "a" } func isB(input string) bool { return input == "b" } func lessThan13(input int) bool { return input < 13 } diff --git a/sdks/go/pkg/beam/transforms/filter/distinct_test.go b/sdks/go/pkg/beam/transforms/filter/distinct_test.go index bb275cc8fb5b..0620c917d6e0 100644 --- a/sdks/go/pkg/beam/transforms/filter/distinct_test.go +++ b/sdks/go/pkg/beam/transforms/filter/distinct_test.go @@ -23,6 +23,10 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter" ) +func TestMain(m *testing.M) { + ptest.Main(m) +} + type s struct { A int B string diff --git a/sdks/go/pkg/beam/transforms/filter/filter_test.go b/sdks/go/pkg/beam/transforms/filter/filter_test.go index ffa138e099a6..14ae106ec962 100644 --- a/sdks/go/pkg/beam/transforms/filter/filter_test.go +++ b/sdks/go/pkg/beam/transforms/filter/filter_test.go @@ -24,10 +24,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter" ) -func TestMain(m *testing.M) { - ptest.Main(m) -} - func init() { register.Function1x1(alwaysTrue) register.Function1x1(alwaysFalse) @@ -35,9 +31,9 @@ func init() { register.Function1x1(greaterThanOne) } -func alwaysTrue(a int) bool { return true } -func alwaysFalse(a int) bool { return false } -func isOne(a int) bool { return a == 1 } +func alwaysTrue(a int) bool { return true } +func alwaysFalse(a int) bool { return false } +func isOne(a int) bool { return a == 1 } func greaterThanOne(a int) bool { return a > 1 } func TestInclude(t *testing.T) { diff --git a/sdks/go/pkg/beam/transforms/stats/quantiles.go b/sdks/go/pkg/beam/transforms/stats/quantiles.go index 6d2baa8b5e99..7685852efba6 100644 --- a/sdks/go/pkg/beam/transforms/stats/quantiles.go +++ b/sdks/go/pkg/beam/transforms/stats/quantiles.go @@ -47,7 +47,7 @@ func init() { beam.RegisterCoder(weightedElementType, encodeWeightedElement, decodeWeightedElement) register.Function1x2(fixedKey) - register.Function2x1(makeWeightedElement) + register.Function2x1(makeWeightedElement) // TODO make prism fail faster when this is commented out. } // Opts contains settings used to configure how approximate quantiles are computed. diff --git a/sdks/go/pkg/beam/transforms/top/top_test.go b/sdks/go/pkg/beam/transforms/top/top_test.go index 39d774a66300..f1a88fe0c6fa 100644 --- a/sdks/go/pkg/beam/transforms/top/top_test.go +++ b/sdks/go/pkg/beam/transforms/top/top_test.go @@ -31,12 +31,6 @@ func TestMain(m *testing.M) { ptest.Main(m) } -func init() { - register.Function2x2(addKeyFn) - register.Function2x1(lessInt) - register.Function2x1(shorterString) -} - func lessInt(a, b int) bool { return a < b } @@ -45,6 +39,12 @@ func shorterString(a, b string) bool { return len(a) < len(b) } +func init() { + register.Function2x2(addKeyFn) + register.Function2x1(lessInt) + register.Function2x1(shorterString) +} + // TestCombineFn3String verifies that the accumulator correctly // maintains the top 3 longest strings. func TestCombineFn3String(t *testing.T) { From 3ff77b162eb83bf2ef202c5622ea892e893f643b Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Tue, 18 Jul 2023 21:26:44 -0700 Subject: [PATCH 04/21] rm commented out code. --- sdks/go/pkg/beam/runners/prism/internal/coders.go | 1 - .../pkg/beam/runners/prism/internal/engine/elementmanager.go | 3 --- 2 files changed, 4 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go index 09313b62bd1b..a141440400ec 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -73,7 +73,6 @@ func makeWindowedValueCoder(pID string, comps *pipepb.Components, coders map[str } // Populate the coders to send with the new windowed value coder. coders[wvcID] = wInC - // col.CoderId = cID // So PCollection coders are looked up correctly for side inputs. return wvcID } diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 995187420cde..5e1585ffcd1f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -158,9 +158,6 @@ func (em *ElementManager) AddStage(ID string, inputIDs, sides, outputIDs []strin em.consumers[input] = append(em.consumers[input], ss.ID) } for _, side := range ss.sides { - // TODO: clean up this hack to identify the stage for side input consumers. - // drop the _prismside suffix for any side input ID the ids. - // side, _ = strings.CutSuffix(side, "_prismside") em.sideConsumers[side] = append(em.sideConsumers[side], ss.ID) } } From 0bc3b4348f8ac6d0439bd81be40067ff59e1ae37 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 20 Jul 2023 11:15:21 -0700 Subject: [PATCH 05/21] Avoid unnecessary logs on normal path. --- sdks/go/pkg/beam/core/runtime/harness/datamgr.go | 12 ++++++++++-- sdks/go/pkg/beam/core/runtime/harness/harness.go | 4 ++-- sdks/go/pkg/beam/core/runtime/harness/statemgr.go | 9 ++++++++- sdks/go/pkg/beam/runners/universal/runnerlib/job.go | 2 +- 4 files changed, 21 insertions(+), 6 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go index d8c0f4d1d852..9662ac07c9cd 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go +++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go @@ -27,6 +27,8 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -128,7 +130,12 @@ func (m *DataChannelManager) Open(ctx context.Context, port exec.Port) (*DataCha return nil, err } ch.forceRecreate = func(id string, err error) { - log.Warnf(ctx, "forcing DataChannel[%v] reconnection on port %v due to %v", id, port, err) + switch status.Code(err) { + case codes.Canceled: + // Don't log on context canceled path. + default: + log.Warnf(ctx, "forcing DataChannel[%v] reconnection on port %v due to %v", id, port, err) + } m.mu.Lock() delete(m.ports, port.URL) m.mu.Unlock() @@ -371,7 +378,8 @@ func (c *DataChannel) read(ctx context.Context) { c.terminateStreamOnError(err) c.mu.Unlock() - if err == io.EOF { + st := status.Code(err) + if st == codes.Canceled || err == io.EOF { return } log.Errorf(ctx, "DataChannel.read %v bad: %v", c.id, err) diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index 7e7c711e9d0e..5629071aa0c2 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -394,7 +394,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe if err != nil { c.failed[instID] = err - return fail(ctx, instID, "ProcessBundle failed: %v", err) + return fail(ctx, instID, "process bundle failed for instruction %v using plan %v : %v", instID, bdID, err) } tokens := msg.GetCacheTokens() @@ -708,6 +708,6 @@ func fail(ctx context.Context, id instructionID, format string, args ...any) *fn // dial to the specified endpoint. if timeout <=0, call blocks until // grpc.Dial succeeds. func dial(ctx context.Context, endpoint, purpose string, timeout time.Duration) (*grpc.ClientConn, error) { - log.Infof(ctx, "Connecting via grpc @ %s for %s ...", endpoint, purpose) + log.Output(ctx, log.SevDebug, 1, fmt.Sprintf("Connecting via grpc @ %s for %s ...", endpoint, purpose)) return grpcx.Dial(ctx, endpoint, timeout) } diff --git a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go index f10f0d92e84e..76d4e1f32c23 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go @@ -29,6 +29,8 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/log" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" "github.com/golang/protobuf/proto" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type writeTypeEnum int32 @@ -525,7 +527,12 @@ func (m *StateChannelManager) Open(ctx context.Context, port exec.Port) (*StateC return nil, err } ch.forceRecreate = func(id string, err error) { - log.Warnf(ctx, "forcing StateChannel[%v] reconnection on port %v due to %v", id, port, err) + switch status.Code(err) { + case codes.Canceled: + // Don't log on context canceled path. + default: + log.Warnf(ctx, "forcing StateChannel[%v] reconnection on port %v due to %v", id, port, err) + } m.mu.Lock() delete(m.ports, port.URL) m.mu.Unlock() diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go index d8dd85eb92be..4e50661b3db8 100644 --- a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go +++ b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go @@ -123,7 +123,7 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID case msg.GetStateResponse() != nil: resp := msg.GetStateResponse() - log.Infof(ctx, "Job state: %v", resp.GetState().String()) + log.Infof(ctx, "Job[%v] state: %v", jobID, resp.GetState().String()) switch resp.State { case jobpb.JobState_DONE, jobpb.JobState_CANCELLED: From 35799b56d2ec90db9d143a2fae838d6740192126 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 20 Jul 2023 11:16:13 -0700 Subject: [PATCH 06/21] Fix top. --- sdks/go/pkg/beam/transforms/top/top_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdks/go/pkg/beam/transforms/top/top_test.go b/sdks/go/pkg/beam/transforms/top/top_test.go index f1a88fe0c6fa..39d774a66300 100644 --- a/sdks/go/pkg/beam/transforms/top/top_test.go +++ b/sdks/go/pkg/beam/transforms/top/top_test.go @@ -31,6 +31,12 @@ func TestMain(m *testing.M) { ptest.Main(m) } +func init() { + register.Function2x2(addKeyFn) + register.Function2x1(lessInt) + register.Function2x1(shorterString) +} + func lessInt(a, b int) bool { return a < b } @@ -39,12 +45,6 @@ func shorterString(a, b string) bool { return len(a) < len(b) } -func init() { - register.Function2x2(addKeyFn) - register.Function2x1(lessInt) - register.Function2x1(shorterString) -} - // TestCombineFn3String verifies that the accumulator correctly // maintains the top 3 longest strings. func TestCombineFn3String(t *testing.T) { From 0112cc86b2fe7bd816a4051b1ecc99f3b6a09e53 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 20 Jul 2023 11:35:47 -0700 Subject: [PATCH 07/21] Adjust Go versions? --- dev-support/docker/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev-support/docker/Dockerfile b/dev-support/docker/Dockerfile index 9422ae5a8863..5b7262ef681f 100644 --- a/dev-support/docker/Dockerfile +++ b/dev-support/docker/Dockerfile @@ -78,7 +78,7 @@ RUN pip3 install distlib==0.3.1 yapf==0.29.0 pytest ### # Install Go ### -ENV DOWNLOAD_GO_VERSION=1.20.6 +ENV DOWNLOAD_GO_VERSION=1.20.5 RUN wget https://golang.org/dl/go${DOWNLOAD_GO_VERSION}.linux-amd64.tar.gz && \ tar -C /usr/local -xzf go${DOWNLOAD_GO_VERSION}.linux-amd64.tar.gz ENV GOROOT /usr/local/go From 9caa24030fd85d93f32c81eec3829bfa194f491e Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 20 Jul 2023 17:18:14 -0700 Subject: [PATCH 08/21] [prism] Update symbol lookup to not be unit test specific. --- sdks/go/pkg/beam/core/runtime/symbols.go | 2 +- sdks/go/pkg/beam/runners/vet/vet.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/symbols.go b/sdks/go/pkg/beam/core/runtime/symbols.go index e8ff532e7637..84afe9b769af 100644 --- a/sdks/go/pkg/beam/core/runtime/symbols.go +++ b/sdks/go/pkg/beam/core/runtime/symbols.go @@ -105,5 +105,5 @@ func ResolveFunction(name string, t reflect.Type) (any, error) { type failResolver bool func (p failResolver) Sym2Addr(name string) (uintptr, error) { - return 0, errors.Errorf("%v not found. Use runtime.RegisterFunction in unit tests", name) + return 0, errors.Errorf("%v not found. Register DoFns and functions with the the beam/register package.", name) } diff --git a/sdks/go/pkg/beam/runners/vet/vet.go b/sdks/go/pkg/beam/runners/vet/vet.go index 131fa0b1ec12..2b5238ddc608 100644 --- a/sdks/go/pkg/beam/runners/vet/vet.go +++ b/sdks/go/pkg/beam/runners/vet/vet.go @@ -54,7 +54,7 @@ func init() { type disabledResolver bool func (p disabledResolver) Sym2Addr(name string) (uintptr, error) { - return 0, errors.Errorf("%v not found. Use runtime.RegisterFunction in unit tests", name) + return 0, errors.Errorf("%v not found. Register DoFns and functions with the the beam/register package.", name) } // Execute evaluates the pipeline on whether it can run without reflection. From bcff8903b2b8c5f1e8e9f99449e8ae7c6121f031 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 20 Jul 2023 17:19:17 -0700 Subject: [PATCH 09/21] [prism] Support for reshuffles. --- .../runners/prism/internal/handlerunner.go | 12 ++++-------- .../prism/internal/jobservices/management.go | 19 ++++++++++--------- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go index 27303f03b705..5660c9158189 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go @@ -43,7 +43,7 @@ import ( type RunnerCharacteristic struct { SDKFlatten bool // Sets whether we should force an SDK side flatten. SDKGBK bool // Sets whether the GBK should be handled by the SDK, if possible by the SDK. - SDKReshuffle bool + SDKReshuffle bool // Sets whether we should use the SDK backup implementation to handle a Reshuffle. } func Runner(config any) *runner { @@ -75,10 +75,6 @@ func (h *runner) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipep // TODO: Implement the windowing strategy the "backup" transforms used for Reshuffle. // TODO: Implement a fusion break for reshuffles. - if h.config.SDKReshuffle { - panic("SDK side reshuffle not yet supported") - } - // A Reshuffle, in principle, is a no-op on the pipeline structure, WRT correctness. // It could however affect performance, so it exists to tell the runner that this // point in the pipeline needs a fusion break, to enable the pipeline to change it's @@ -91,11 +87,11 @@ func (h *runner) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipep // since the input collection and output collection types match. // Get the input and output PCollections, there should only be 1 each. - if len(t.GetInputs()) != 1 { - panic("Expected single input PCollection in reshuffle: " + prototext.Format(t)) + if len(t.GetOutputs()) != 1 { + panic("Expected single putput PCollection in reshuffle: " + prototext.Format(t)) } if len(t.GetOutputs()) != 1 { - panic("Expected single output PCollection in reshuffle: " + prototext.Format(t)) + panic("Expected single putput PCollection in reshuffle: " + prototext.Format(t)) } inColID := getOnlyValue(t.GetInputs()) diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index 5fe6cbb93da2..3dee7a1b787d 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -112,7 +112,8 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo urns.TransformGBK, urns.TransformFlatten, urns.TransformCombinePerKey, - urns.TransformAssignWindows: + urns.TransformAssignWindows, + urns.TransformReshuffle: // Very few expected transforms types for submitted pipelines. // Most URNs are for the runner to communicate back to the SDK for execution. case urns.TransformReshuffle: @@ -151,14 +152,14 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo if ws.GetWindowFn().GetUrn() != urns.WindowFnSession { check("WindowingStrategy.MergeStatus", ws.GetMergeStatus(), pipepb.MergeStatus_NON_MERGING) } - if !bypassedWindowingStrategies[wsID] { - check("WindowingStrategy.OnTimeBehavior", ws.GetOnTimeBehavior(), pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY) - check("WindowingStrategy.OutputTime", ws.GetOutputTime(), pipepb.OutputTime_END_OF_WINDOW) - // Non nil triggers should fail. - if ws.GetTrigger().GetDefault() == nil { - check("WindowingStrategy.Trigger", ws.GetTrigger(), &pipepb.Trigger_Default{}) - } - } + // These are used by reshuffle + // TODO have a more aware blocking for reshuffle specifically. + // check("WindowingStrategy.OnTimeBehavior", ws.GetOnTimeBehavior(), pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY) + // check("WindowingStrategy.OutputTime", ws.GetOutputTime(), pipepb.OutputTime_END_OF_WINDOW) + // // Non nil triggers should fail. + // if ws.GetTrigger().GetDefault() == nil { + // check("WindowingStrategy.Trigger", ws.GetTrigger(), &pipepb.Trigger_Default{}) + // } } if len(errs) > 0 { jErr := &joinError{errs: errs} From bd7d1bc68866f22a691016730265f62cbb5c43a7 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 21 Jul 2023 09:26:48 -0700 Subject: [PATCH 10/21] [prism] move reshuffle test out of unimplemented. --- .../beam/runners/prism/internal/unimplemented_test.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go index f738a299cfd2..f1655294ea50 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go @@ -69,6 +69,9 @@ func TestUnimplemented(t *testing.T) { {pipeline: primitives.TriggerOrFinally}, {pipeline: primitives.TriggerRepeat}, + // Reshuffle (Due to missing windowing strategy features) + {pipeline: primitives.ReshuffleKV}, + // State API {pipeline: primitives.BagStateParDo}, {pipeline: primitives.BagStateParDoClear}, @@ -106,12 +109,6 @@ func TestImplemented(t *testing.T) { pipeline func(s beam.Scope) }{ {pipeline: primitives.Reshuffle}, - {pipeline: primitives.Flatten}, - {pipeline: primitives.FlattenDup}, - {pipeline: primitives.Checkpoints}, - - {pipeline: primitives.CoGBK}, - {pipeline: primitives.ReshuffleKV}, } for _, test := range tests { From 423f45fea66801e3e58cb357df412e3c95b72f46 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 21 Jul 2023 09:40:27 -0700 Subject: [PATCH 11/21] [prism] Add CoGBK test to unimplemented set. --- sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go index f1655294ea50..2521421f1106 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go @@ -43,6 +43,10 @@ func TestUnimplemented(t *testing.T) { }{ // These tests don't terminate, so can't be run. // {pipeline: primitives.Drain}, // Can't test drain automatically yet. + // {pipeline: primitives.Checkpoints}, // Doesn't self terminate? + // {pipeline: primitives.Flatten}, // Times out, should be quick. + // {pipeline: primitives.FlattenDup}, // Times out, should be quick. + {pipeline: primitives.CoGBK}, {pipeline: primitives.TestStreamBoolSequence}, {pipeline: primitives.TestStreamByteSliceSequence}, From f4c4fca3da43c233da1c24ba4b74a096bfecf48d Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 21 Jul 2023 10:00:27 -0700 Subject: [PATCH 12/21] [prism] graduate additional tests. --- .../runners/prism/internal/unimplemented_test.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go index 2521421f1106..6a94aad246e0 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go @@ -41,12 +41,11 @@ func TestUnimplemented(t *testing.T) { tests := []struct { pipeline func(s beam.Scope) }{ - // These tests don't terminate, so can't be run. // {pipeline: primitives.Drain}, // Can't test drain automatically yet. - // {pipeline: primitives.Checkpoints}, // Doesn't self terminate? - // {pipeline: primitives.Flatten}, // Times out, should be quick. - // {pipeline: primitives.FlattenDup}, // Times out, should be quick. + + // Can't do Expand/CoGBK {pipeline: primitives.CoGBK}, + {pipeline: primitives.ReshuffleKV}, {pipeline: primitives.TestStreamBoolSequence}, {pipeline: primitives.TestStreamByteSliceSequence}, @@ -73,9 +72,6 @@ func TestUnimplemented(t *testing.T) { {pipeline: primitives.TriggerOrFinally}, {pipeline: primitives.TriggerRepeat}, - // Reshuffle (Due to missing windowing strategy features) - {pipeline: primitives.ReshuffleKV}, - // State API {pipeline: primitives.BagStateParDo}, {pipeline: primitives.BagStateParDoClear}, @@ -113,6 +109,9 @@ func TestImplemented(t *testing.T) { pipeline func(s beam.Scope) }{ {pipeline: primitives.Reshuffle}, + {pipeline: primitives.Flatten}, + {pipeline: primitives.FlattenDup}, + {pipeline: primitives.Checkpoints}, } for _, test := range tests { From fc62c3f54ea7317692f4c4b6122f3226d28e02ad Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 21 Jul 2023 12:53:40 -0700 Subject: [PATCH 13/21] delint --- sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go index 30515fa6f6e8..c931655f000b 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go @@ -145,6 +145,7 @@ func (b *B) Cleanup(wk *W) { wk.mu.Unlock() } +// Progress sends a progress request for the given bundle to the passed in worker, blocking on the response. func (b *B) Progress(wk *W) (*fnpb.ProcessBundleProgressResponse, error) { resp := wk.sendInstruction(&fnpb.InstructionRequest{ Request: &fnpb.InstructionRequest_ProcessBundleProgress{ @@ -159,6 +160,7 @@ func (b *B) Progress(wk *W) (*fnpb.ProcessBundleProgressResponse, error) { return resp.GetProcessBundleProgress(), nil } +// Split sends a split request for the given bundle to the passed in worker, blocking on the response. func (b *B) Split(wk *W, fraction float64, allowedSplits []int64) (*fnpb.ProcessBundleSplitResponse, error) { resp := wk.sendInstruction(&fnpb.InstructionRequest{ Request: &fnpb.InstructionRequest_ProcessBundleSplit{ From 359f1dc75455ad430bfc43304e3ff1c547376fbb Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 21 Jul 2023 12:54:32 -0700 Subject: [PATCH 14/21] [prog] guide updates --- .../site/content/en/documentation/programming-guide.md | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index 82ada91f26a4..b0118df39872 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -1124,6 +1124,9 @@ words = ... {{< /highlight >}} {{< highlight go >}} + +The Go SDK cannot support anonymous functions outside of the deprecated Go Direct runner. + // words is the input PCollection of strings var words beam.PCollection = ... @@ -1170,8 +1173,8 @@ words = ... {{< /highlight >}} {{< highlight go >}} -// words is the input PCollection of strings -var words beam.PCollection = ... + +The Go SDK cannot support anonymous functions outside of the deprecated Go Direct runner. {{< code_sample "sdks/go/examples/snippets/04transforms.go" model_pardo_apply_anon >}} {{< /highlight >}} @@ -1191,7 +1194,7 @@ words = ... -> **Note:** Anonymous function DoFns may not work on distributed runners. +> **Note:** Anonymous function DoFns do not work on distributed runners. > It's recommended to use named functions and register them with `register.FunctionXxY` in > an `init()` block. From 7112f72dc356f727ba6a04f29fe8e73c8a62f586 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Tue, 25 Jul 2023 16:48:43 -0700 Subject: [PATCH 15/21] [prism] Support CoGBKs and wafer thin fusion. --- .../runners/prism/internal/execute_test.go | 4 +- .../beam/runners/prism/internal/preprocess.go | 2 +- .../runners/prism/internal/preprocess_test.go | 2 +- .../pkg/beam/runners/prism/internal/stage.go | 38 ++++++++++++++++++- .../prism/internal/unimplemented_test.go | 7 ++-- 5 files changed, 44 insertions(+), 9 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go index 971bdb47bbbf..365c63a5ae47 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go @@ -326,6 +326,8 @@ func TestRunner_Pipelines(t *testing.T) { pipeline: func(s beam.Scope) { imp := beam.Impulse(s) col0 := beam.ParDo(s, dofn1, imp) + // col1 := beam.ParDo(s, dofn2, col0) + // Doesn't matter which of col1 or col2 is used. sum := beam.ParDo(s, dofn3x1, col0, beam.SideInput{Input: col0}, beam.SideInput{Input: col0}) beam.ParDo(s, &int64Check{ Name: "sum sideinput check", @@ -338,7 +340,7 @@ func TestRunner_Pipelines(t *testing.T) { imp := beam.Impulse(s) col0 := beam.ParDo(s, dofn1, imp) col1 := beam.ParDo(s, dofn2, col0) - // Doesn't matter which of col0 or col1 is used. + // Doesn't matter which of col1 or col2 is used. sum := beam.ParDo(s, dofn3x1, col0, beam.SideInput{Input: col0}, beam.SideInput{Input: col1}) beam.ParDo(s, &int64Check{ Name: "sum sideinput check", diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go index 96c5f5549b02..ea32e7007e29 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go +++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go @@ -244,7 +244,7 @@ func defaultFusion(topological []string, comps *pipepb.Components) []*stage { continue } cs := pcolConsumers[pcolID] - + fmt.Printf("XXXXXX Fusing %v, with %v\n", tid, cs) for _, c := range cs { stg.transforms = append(stg.transforms, c.transform) consumed[c.transform] = true diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go index ba39d024e716..02776dd37705 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go @@ -128,7 +128,7 @@ func Test_preprocessor_preProcessGraph(t *testing.T) { pre := newPreprocessor([]transformPreparer{&testPreparer{}}) gotStages := pre.preProcessGraph(test.input) - if diff := cmp.Diff(test.wantStages, gotStages, cmp.AllowUnexported(stage{}, link{}), cmpopts.EquateEmpty()); diff != "" { + if diff := cmp.Diff(test.wantStages, gotStages, cmp.AllowUnexported(stage{}), cmp.AllowUnexported(link{}), cmpopts.EquateEmpty()); diff != "" { t.Errorf("preProcessGraph(%q) stages diff (-want,+got)\n%v", test.name, diff) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index d530b8a71f89..6e244905ceda 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -50,6 +50,10 @@ type link struct { // should in principle be able to connect two SDK environments directly // instead of going through the runner at all, which would be a small // efficiency gain, in runner memory use. +// +// That would also warrant an execution mode where fusion is taken into +// account, but all serialization boundaries remain since the pcollections +// would continue to get serialized. type stage struct { ID string transforms []string @@ -150,11 +154,11 @@ progress: if previousIndex == index && !splitsDone { sr, err := b.Split(wk, 0.5 /* fraction of remainder */, nil /* allowed splits */) if err != nil { - slog.Debug("SDK Error from split, aborting splits", "bundle", rb, "error", err.Error()) + slog.Warn("SDK Error from split, aborting splits", "bundle", rb, "error", err.Error()) break progress } if sr.GetChannelSplits() == nil { - slog.Warn("split failed", "bundle", rb) + slog.Debug("SDK returned no splits", "bundle", rb) splitsDone = true continue progress } @@ -379,6 +383,36 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error { return nil } +// handleSideInputs ensures appropriate coders are available to the bundle, and prepares a function to stage the data. +func handleSideInputs(tid string, t *pipepb.PTransform, comps *pipepb.Components, coders map[string]*pipepb.Coder, wk *worker.W, replacements map[string]string) (func(b *worker.B, tid string, watermark mtime.Time), error) { + sis, err := getSideInputs(t) + if err != nil { + return nil, err + } + var prepSides []func(b *worker.B, watermark mtime.Time) + + // Get WindowedValue Coders for the transform's input and output PCollections. + for local, global := range t.GetInputs() { + _, ok := sis[local] + if !ok { + continue // This is the main input. + } + if oldGlobal, ok := replacements[global]; ok { + global = oldGlobal + } + prepSide, err := handleSideInput(tid, local, global, comps, coders, wk) + if err != nil { + return nil, err + } + prepSides = append(prepSides, prepSide) + } + return func(b *worker.B, tid string, watermark mtime.Time) { + for _, prep := range prepSides { + prep(b, watermark) + } + }, nil +} + // handleSideInput returns a closure that will look up the data for a side input appropriate for the given watermark. func handleSideInput(tid, local, global string, comps *pipepb.Components, coders map[string]*pipepb.Coder, wk *worker.W) (func(b *worker.B, watermark mtime.Time), error) { t := comps.GetTransforms()[tid] diff --git a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go index 6a94aad246e0..5f8d38759998 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go @@ -43,10 +43,6 @@ func TestUnimplemented(t *testing.T) { }{ // {pipeline: primitives.Drain}, // Can't test drain automatically yet. - // Can't do Expand/CoGBK - {pipeline: primitives.CoGBK}, - {pipeline: primitives.ReshuffleKV}, - {pipeline: primitives.TestStreamBoolSequence}, {pipeline: primitives.TestStreamByteSliceSequence}, {pipeline: primitives.TestStreamFloat64Sequence}, @@ -112,6 +108,9 @@ func TestImplemented(t *testing.T) { {pipeline: primitives.Flatten}, {pipeline: primitives.FlattenDup}, {pipeline: primitives.Checkpoints}, + + {pipeline: primitives.CoGBK}, + {pipeline: primitives.ReshuffleKV}, } for _, test := range tests { From 51124cc78e2a6e38f9c8e07f2f9f6a584fba1e2c Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 26 Jul 2023 16:48:00 -0700 Subject: [PATCH 16/21] Make window close strict. --- .../beam/runners/prism/internal/execute.go | 2 +- .../runners/prism/internal/execute_test.go | 48 +------------------ .../runners/prism/internal/testdofns_test.go | 8 ++++ 3 files changed, 10 insertions(+), 48 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index ecff740ed86e..aeedf730a9b2 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -300,7 +300,7 @@ func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, cod wcID := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders()) return makeWindowCoders(coders[wcID]) } - + func getOnlyValue[K comparable, V any](in map[K]V) V { if len(in) != 1 { panic(fmt.Sprintf("expected single value map, had %v - %v", len(in), in)) diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go index 365c63a5ae47..59bb3aee2b15 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go @@ -28,6 +28,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics" "github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/jobservices" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal" @@ -564,53 +565,6 @@ func TestFailure(t *testing.T) { } } -func TestRunner_Passert(t *testing.T) { - initRunner(t) - tests := []struct { - name string - pipeline func(s beam.Scope) - metrics func(t *testing.T, pr beam.PipelineResult) - }{ - { - name: "Empty", - pipeline: func(s beam.Scope) { - imp := beam.Impulse(s) - col1 := beam.ParDo(s, dofnEmpty, imp) - passert.Empty(s, col1) - }, - }, { - name: "Equals-TwoEmpty", - pipeline: func(s beam.Scope) { - imp := beam.Impulse(s) - col1 := beam.ParDo(s, dofnEmpty, imp) - col2 := beam.ParDo(s, dofnEmpty, imp) - passert.Equals(s, col1, col2) - }, - }, { - name: "Equals", - pipeline: func(s beam.Scope) { - imp := beam.Impulse(s) - col1 := beam.ParDo(s, dofn1, imp) - col2 := beam.ParDo(s, dofn1, imp) - passert.Equals(s, col1, col2) - }, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - p, s := beam.NewPipelineWithRoot() - test.pipeline(s) - pr, err := executeWithT(context.Background(), t, p) - if err != nil { - t.Fatal(err) - } - if test.metrics != nil { - test.metrics(t, pr) - } - }) - } -} - func toFoo(et beam.EventTime, id int, _ func(*int64) bool) (int, string) { return id, "ooo" } diff --git a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go index 4a79aa9710c5..3ebc9ed2b332 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go @@ -259,6 +259,14 @@ func dofnGBK2(k int64, vs func(*string) bool, emit func(string)) { emit(sum) } +func dofnGBKKV(k string, vs func(*int64) bool, emit func(string, int64)) { + var v, sum int64 + for vs(&v) { + sum += v + } + emit(k, sum) +} + type testRow struct { A string B int64 From 864c0429a0f7390cb6dc0d86710e4bb217399fa4 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 2 Aug 2023 14:32:08 -0700 Subject: [PATCH 17/21] quick first pass --- dev-support/docker/Dockerfile | 2 +- .../prism/internal/jobservices/management.go | 2 +- sdks/go/pkg/beam/runners/prism/internal/stage.go | 7 +------ sdks/go/pkg/beam/transforms/stats/quantiles.go | 2 +- sdks/go/test/integration/integration.go | 15 ++++++--------- 5 files changed, 10 insertions(+), 18 deletions(-) diff --git a/dev-support/docker/Dockerfile b/dev-support/docker/Dockerfile index 5b7262ef681f..9422ae5a8863 100644 --- a/dev-support/docker/Dockerfile +++ b/dev-support/docker/Dockerfile @@ -78,7 +78,7 @@ RUN pip3 install distlib==0.3.1 yapf==0.29.0 pytest ### # Install Go ### -ENV DOWNLOAD_GO_VERSION=1.20.5 +ENV DOWNLOAD_GO_VERSION=1.20.6 RUN wget https://golang.org/dl/go${DOWNLOAD_GO_VERSION}.linux-amd64.tar.gz && \ tar -C /usr/local -xzf go${DOWNLOAD_GO_VERSION}.linux-amd64.tar.gz ENV GOROOT /usr/local/go diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index 3dee7a1b787d..b2495a499d90 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -145,7 +145,7 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo } // Inspect Windowing strategies for unsupported features. - for wsID, ws := range job.Pipeline.GetComponents().GetWindowingStrategies() { + for _, ws := range job.Pipeline.GetComponents().GetWindowingStrategies() { check("WindowingStrategy.AllowedLateness", ws.GetAllowedLateness(), int64(0)) check("WindowingStrategy.ClosingBehaviour", ws.GetClosingBehavior(), pipepb.ClosingBehavior_EMIT_IF_NONEMPTY) check("WindowingStrategy.AccumulationMode", ws.GetAccumulationMode(), pipepb.AccumulationMode_DISCARDING) diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index 6e244905ceda..ec2675ff36f9 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -134,11 +134,6 @@ progress: progTick.Stop() break progress // exit progress loop on close. case <-progTick.C: - resp, err := b.Progress(wk) - if err != nil { - slog.Debug("SDK Error from progress, aborting progress", "bundle", rb, "error", err.Error()) - break progress - } resp, err := b.Progress(wk) if err != nil { slog.Debug("SDK Error from progress, aborting progress", "bundle", rb, "error", err.Error()) @@ -410,7 +405,7 @@ func handleSideInputs(tid string, t *pipepb.PTransform, comps *pipepb.Components for _, prep := range prepSides { prep(b, watermark) } - }, nil + }, nil } // handleSideInput returns a closure that will look up the data for a side input appropriate for the given watermark. diff --git a/sdks/go/pkg/beam/transforms/stats/quantiles.go b/sdks/go/pkg/beam/transforms/stats/quantiles.go index 7685852efba6..2734bba3dc60 100644 --- a/sdks/go/pkg/beam/transforms/stats/quantiles.go +++ b/sdks/go/pkg/beam/transforms/stats/quantiles.go @@ -47,7 +47,7 @@ func init() { beam.RegisterCoder(weightedElementType, encodeWeightedElement, decodeWeightedElement) register.Function1x2(fixedKey) - register.Function2x1(makeWeightedElement) // TODO make prism fail faster when this is commented out. + //register.Function2x1(makeWeightedElement) // TODO make prism fail faster when this is commented out. } // Opts contains settings used to configure how approximate quantiles are computed. diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 7914a0525fef..1224ce2bb071 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -137,25 +137,24 @@ var portableFilters = []string{ } var prismFilters = []string{ - // The portable runner does not support the TestStream primitive + // The prism runner does not support the TestStream primitive "TestTestStream.*", // The trigger and pane tests uses TestStream "TestTrigger.*", "TestPanes", - // TODO(https://github.com/apache/beam/issues/21058): Python portable runner times out on Kafka reads. + + // TODO(https://github.com/apache/beam/issues/21058): Xlang ios don't yet work on prism. "TestKafkaIO.*", - // TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners. + // TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow prism runners. "TestBigQueryIO.*", "TestSpannerIO.*", - // The portable runner does not support self-checkpointing - "TestCheckpointing", - // The portable runner does not support pipeline drain for SDF. + // The prism runner does not support pipeline drain for SDF. "TestDrain", // FhirIO currently only supports Dataflow runner "TestFhirIO.*", // OOMs currently only lead to heap dumps on Dataflow runner "TestOomParDo", - // The portable runner does not support user state. + // The prism runner does not support user state. "TestValueState", "TestValueStateWindowed", "TestValueStateClear", @@ -328,8 +327,6 @@ func CheckFilters(t *testing.T) { filters = prismFilters case "portable", "PortableRunner": filters = portableFilters - case "prism", "PrismRunner": - filters = prismFilters case "flink", "FlinkRunner": filters = flinkFilters case "samza", "SamzaRunner": From a9474db3e348a6fd8de24fd60a30979c8d515569 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 2 Aug 2023 16:17:58 -0700 Subject: [PATCH 18/21] chang cleanup --- .../pkg/beam/core/runtime/harness/harness.go | 4 +- .../beam/runners/prism/internal/execute.go | 2 +- .../runners/prism/internal/execute_test.go | 2 - .../runners/prism/internal/handlerunner.go | 10 ++-- .../prism/internal/jobservices/management.go | 18 +++---- .../beam/runners/prism/internal/preprocess.go | 1 - .../runners/prism/internal/preprocess_test.go | 2 +- .../pkg/beam/runners/prism/internal/stage.go | 30 ----------- .../runners/prism/internal/stateful_test.go | 51 ------------------- .../runners/prism/internal/testdofns_test.go | 16 ------ .../runners/prism/internal/web/web_test.go | 16 ------ .../runners/prism/internal/worker/worker.go | 10 +++- .../runners/universal/runnerlib/execute.go | 3 +- .../go/pkg/beam/testing/passert/count_test.go | 4 -- .../pkg/beam/testing/passert/equals_test.go | 2 +- sdks/go/pkg/beam/testing/passert/floats.go | 1 + .../pkg/beam/testing/passert/passert_test.go | 4 ++ sdks/go/pkg/beam/testing/ptest/ptest.go | 3 +- .../pkg/beam/transforms/filter/filter_test.go | 6 +-- sdks/go/test/integration/integration.go | 2 +- 20 files changed, 42 insertions(+), 145 deletions(-) delete mode 100644 sdks/go/pkg/beam/runners/prism/internal/stateful_test.go delete mode 100644 sdks/go/pkg/beam/runners/prism/internal/web/web_test.go diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index 5629071aa0c2..c5db9a85f367 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -394,7 +394,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe if err != nil { c.failed[instID] = err - return fail(ctx, instID, "process bundle failed for instruction %v using plan %v : %v", instID, bdID, err) + return fail(ctx, instID, "ProcessBundle failed: %v", err) } tokens := msg.GetCacheTokens() @@ -426,7 +426,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe c.failed[instID] = err } else if dataError != io.EOF && dataError != nil { // If there was an error on the data channel reads, fail this bundle - // since we may have had a short read.' + // since we may have had a short read. c.failed[instID] = dataError err = dataError } else { diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index aeedf730a9b2..ecff740ed86e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -300,7 +300,7 @@ func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, cod wcID := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders()) return makeWindowCoders(coders[wcID]) } - + func getOnlyValue[K comparable, V any](in map[K]V) V { if len(in) != 1 { panic(fmt.Sprintf("expected single value map, had %v - %v", len(in), in)) diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go index 59bb3aee2b15..f41f25ad0f71 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go @@ -13,7 +13,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -package internal_test package internal_test import ( @@ -41,7 +40,6 @@ import ( func initRunner(t *testing.T) { t.Helper() if *jobopts.Endpoint == "" { - s := jobservices.NewServer(0, internal.RunPipeline) s := jobservices.NewServer(0, internal.RunPipeline) *jobopts.Endpoint = s.Endpoint() go s.Serve() diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go index 5660c9158189..05b3d3bbaa0e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go @@ -75,6 +75,10 @@ func (h *runner) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipep // TODO: Implement the windowing strategy the "backup" transforms used for Reshuffle. // TODO: Implement a fusion break for reshuffles. + if h.config.SDKReshuffle { + panic("SDK side reshuffle not yet supported") + } + // A Reshuffle, in principle, is a no-op on the pipeline structure, WRT correctness. // It could however affect performance, so it exists to tell the runner that this // point in the pipeline needs a fusion break, to enable the pipeline to change it's @@ -87,11 +91,11 @@ func (h *runner) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipep // since the input collection and output collection types match. // Get the input and output PCollections, there should only be 1 each. - if len(t.GetOutputs()) != 1 { - panic("Expected single putput PCollection in reshuffle: " + prototext.Format(t)) + if len(t.GetInputs()) != 1 { + panic("Expected single input PCollection in reshuffle: " + prototext.Format(t)) } if len(t.GetOutputs()) != 1 { - panic("Expected single putput PCollection in reshuffle: " + prototext.Format(t)) + panic("Expected single output PCollection in reshuffle: " + prototext.Format(t)) } inColID := getOnlyValue(t.GetInputs()) diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index b2495a499d90..28d994fbfa5a 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -145,21 +145,21 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo } // Inspect Windowing strategies for unsupported features. - for _, ws := range job.Pipeline.GetComponents().GetWindowingStrategies() { + for wsID, ws := range job.Pipeline.GetComponents().GetWindowingStrategies() { check("WindowingStrategy.AllowedLateness", ws.GetAllowedLateness(), int64(0)) check("WindowingStrategy.ClosingBehaviour", ws.GetClosingBehavior(), pipepb.ClosingBehavior_EMIT_IF_NONEMPTY) check("WindowingStrategy.AccumulationMode", ws.GetAccumulationMode(), pipepb.AccumulationMode_DISCARDING) if ws.GetWindowFn().GetUrn() != urns.WindowFnSession { check("WindowingStrategy.MergeStatus", ws.GetMergeStatus(), pipepb.MergeStatus_NON_MERGING) } - // These are used by reshuffle - // TODO have a more aware blocking for reshuffle specifically. - // check("WindowingStrategy.OnTimeBehavior", ws.GetOnTimeBehavior(), pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY) - // check("WindowingStrategy.OutputTime", ws.GetOutputTime(), pipepb.OutputTime_END_OF_WINDOW) - // // Non nil triggers should fail. - // if ws.GetTrigger().GetDefault() == nil { - // check("WindowingStrategy.Trigger", ws.GetTrigger(), &pipepb.Trigger_Default{}) - // } + if !bypassedWindowingStrategies[wsID] { + check("WindowingStrategy.OnTimeBehavior", ws.GetOnTimeBehavior(), pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY) + check("WindowingStrategy.OutputTime", ws.GetOutputTime(), pipepb.OutputTime_END_OF_WINDOW) + // Non nil triggers should fail. + if ws.GetTrigger().GetDefault() == nil { + check("WindowingStrategy.Trigger", ws.GetTrigger(), &pipepb.Trigger_Default{}) + } + } } if len(errs) > 0 { jErr := &joinError{errs: errs} diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go index ea32e7007e29..b2e0c0ed2195 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go +++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go @@ -244,7 +244,6 @@ func defaultFusion(topological []string, comps *pipepb.Components) []*stage { continue } cs := pcolConsumers[pcolID] - fmt.Printf("XXXXXX Fusing %v, with %v\n", tid, cs) for _, c := range cs { stg.transforms = append(stg.transforms, c.transform) consumed[c.transform] = true diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go index 02776dd37705..ba39d024e716 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go @@ -128,7 +128,7 @@ func Test_preprocessor_preProcessGraph(t *testing.T) { pre := newPreprocessor([]transformPreparer{&testPreparer{}}) gotStages := pre.preProcessGraph(test.input) - if diff := cmp.Diff(test.wantStages, gotStages, cmp.AllowUnexported(stage{}), cmp.AllowUnexported(link{}), cmpopts.EquateEmpty()); diff != "" { + if diff := cmp.Diff(test.wantStages, gotStages, cmp.AllowUnexported(stage{}, link{}), cmpopts.EquateEmpty()); diff != "" { t.Errorf("preProcessGraph(%q) stages diff (-want,+got)\n%v", test.name, diff) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index ec2675ff36f9..1a9c2548df83 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -378,36 +378,6 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error { return nil } -// handleSideInputs ensures appropriate coders are available to the bundle, and prepares a function to stage the data. -func handleSideInputs(tid string, t *pipepb.PTransform, comps *pipepb.Components, coders map[string]*pipepb.Coder, wk *worker.W, replacements map[string]string) (func(b *worker.B, tid string, watermark mtime.Time), error) { - sis, err := getSideInputs(t) - if err != nil { - return nil, err - } - var prepSides []func(b *worker.B, watermark mtime.Time) - - // Get WindowedValue Coders for the transform's input and output PCollections. - for local, global := range t.GetInputs() { - _, ok := sis[local] - if !ok { - continue // This is the main input. - } - if oldGlobal, ok := replacements[global]; ok { - global = oldGlobal - } - prepSide, err := handleSideInput(tid, local, global, comps, coders, wk) - if err != nil { - return nil, err - } - prepSides = append(prepSides, prepSide) - } - return func(b *worker.B, tid string, watermark mtime.Time) { - for _, prep := range prepSides { - prep(b, watermark) - } - }, nil -} - // handleSideInput returns a closure that will look up the data for a side input appropriate for the given watermark. func handleSideInput(tid, local, global string, comps *pipepb.Components, coders map[string]*pipepb.Coder, wk *worker.W) (func(b *worker.B, watermark mtime.Time), error) { t := comps.GetTransforms()[tid] diff --git a/sdks/go/pkg/beam/runners/prism/internal/stateful_test.go b/sdks/go/pkg/beam/runners/prism/internal/stateful_test.go deleted file mode 100644 index 687f7e4f0db4..000000000000 --- a/sdks/go/pkg/beam/runners/prism/internal/stateful_test.go +++ /dev/null @@ -1,51 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package internal_test - -import ( - "context" - "testing" - - "github.com/apache/beam/sdks/v2/go/pkg/beam" -) - -// This file covers pipelines with stateful DoFns, in particular, that they -// use the state and timers APIs. - -func TestStateful(t *testing.T) { - initRunner(t) - - tests := []struct { - pipeline func(s beam.Scope) - metrics func(t *testing.T, pr beam.PipelineResult) - }{ - //{pipeline: primitives.BagStateParDo}, - } - - for _, test := range tests { - t.Run(intTestName(test.pipeline), func(t *testing.T) { - p, s := beam.NewPipelineWithRoot() - test.pipeline(s) - pr, err := executeWithT(context.Background(), t, p) - if err != nil { - t.Fatal(err) - } - if test.metrics != nil { - test.metrics(t, pr) - } - }) - } -} diff --git a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go index 3ebc9ed2b332..334d74fcae1d 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go @@ -13,7 +13,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -package internal_test package internal_test import ( @@ -22,11 +21,6 @@ import ( "sort" "time" - "context" - "fmt" - "sort" - "time" - "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" @@ -73,16 +67,6 @@ func init() { register.Emitter2[int64, int64]() } -// The Test DoFns live outside of the test files to get coverage information on DoFn -// Lifecycle method execution. This inflates binary size, but ensures the runner is -// exercising the expected feature set. -// -// Once there's enough confidence in the runner, we can move these into a dedicated testing -// package along with the pipelines that use them. - -// Registrations should happen in the test files, so the compiler can prune these -// when they are not in use. - func dofnEmpty(imp []byte, emit func(int64)) { } diff --git a/sdks/go/pkg/beam/runners/prism/internal/web/web_test.go b/sdks/go/pkg/beam/runners/prism/internal/web/web_test.go deleted file mode 100644 index cc8e979f1917..000000000000 --- a/sdks/go/pkg/beam/runners/prism/internal/web/web_test.go +++ /dev/null @@ -1,16 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package web diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index bc76b319a6c6..a1d0ff79baf1 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -267,7 +267,10 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error { for { select { case req := <-wk.InstReqs: - ctrl.Send(req) + err := ctrl.Send(req) + if err != nil { + return err + } case <-ctrl.Context().Done(): slog.Debug("Control context canceled") return ctrl.Context().Err() @@ -323,7 +326,10 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error { }() for { select { - case req := <-wk.DataReqs: + case req, ok := <-wk.DataReqs: + if !ok { + return nil + } if err := data.Send(req); err != nil { slog.LogAttrs(context.TODO(), slog.LevelDebug, "data.Send error", slog.Any("error", err)) } diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go index b2a9b7dfa86f..eb854dbfcdba 100644 --- a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go +++ b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go @@ -57,7 +57,8 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt *JobO bin = worker } } else if opt.Loopback { - // TODO, determine the canonical location for Beam temp files. + // TODO(https://github.com/apache/beam/issues/27569: determine the canonical location for Beam temp files. + // In loopback mode, the binary is unused, so we can avoid an unnecessary compile step. f, _ := os.CreateTemp(os.TempDir(), "beamloopbackworker-*") bin = f.Name() } else { diff --git a/sdks/go/pkg/beam/testing/passert/count_test.go b/sdks/go/pkg/beam/testing/passert/count_test.go index f5014b840371..c34294998509 100644 --- a/sdks/go/pkg/beam/testing/passert/count_test.go +++ b/sdks/go/pkg/beam/testing/passert/count_test.go @@ -22,10 +22,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" ) -func TestMain(m *testing.M) { - ptest.Main(m) -} - func TestCount(t *testing.T) { var tests = []struct { name string diff --git a/sdks/go/pkg/beam/testing/passert/equals_test.go b/sdks/go/pkg/beam/testing/passert/equals_test.go index 0eb0d0728a3f..27419a5c13c8 100644 --- a/sdks/go/pkg/beam/testing/passert/equals_test.go +++ b/sdks/go/pkg/beam/testing/passert/equals_test.go @@ -180,7 +180,7 @@ func ExampleEqualsList_mismatch() { list := [3]string{"wrong", "inputs", "here"} EqualsList(s, col, list) - ptest.DefaultRunner() + err := ptest.Run(p) err = unwrapError(err) diff --git a/sdks/go/pkg/beam/testing/passert/floats.go b/sdks/go/pkg/beam/testing/passert/floats.go index 962891b7cec9..f71e55090838 100644 --- a/sdks/go/pkg/beam/testing/passert/floats.go +++ b/sdks/go/pkg/beam/testing/passert/floats.go @@ -31,6 +31,7 @@ func init() { register.DoFn2x1[[]byte, func(*beam.T) bool, error]((*boundsFn)(nil)) register.DoFn3x1[[]byte, func(*beam.T) bool, func(*beam.T) bool, error]((*thresholdFn)(nil)) register.Emitter1[beam.T]() + register.Iter1[beam.T]() } // EqualsFloat calls into TryEqualsFloat, checkong that two PCollections of non-complex diff --git a/sdks/go/pkg/beam/testing/passert/passert_test.go b/sdks/go/pkg/beam/testing/passert/passert_test.go index ffd0388644a9..d472f6883939 100644 --- a/sdks/go/pkg/beam/testing/passert/passert_test.go +++ b/sdks/go/pkg/beam/testing/passert/passert_test.go @@ -24,6 +24,10 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" ) +func TestMain(m *testing.M) { + ptest.Main(m) +} + func isA(input string) bool { return input == "a" } func isB(input string) bool { return input == "b" } func lessThan13(input int) bool { return input < 13 } diff --git a/sdks/go/pkg/beam/testing/ptest/ptest.go b/sdks/go/pkg/beam/testing/ptest/ptest.go index 4ac16fd9f97d..0bca8c48ceb6 100644 --- a/sdks/go/pkg/beam/testing/ptest/ptest.go +++ b/sdks/go/pkg/beam/testing/ptest/ptest.go @@ -26,7 +26,8 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/runners" // common runner flag. // ptest uses the prism runner to execute pipelines by default. - // but includes the direct runner for legacy fallback reasons. + // but includes the direct runner for legacy fallback reasons to + // support users overriding the default back to the direct runner. _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism" ) diff --git a/sdks/go/pkg/beam/transforms/filter/filter_test.go b/sdks/go/pkg/beam/transforms/filter/filter_test.go index 14ae106ec962..4e9eb03f9d84 100644 --- a/sdks/go/pkg/beam/transforms/filter/filter_test.go +++ b/sdks/go/pkg/beam/transforms/filter/filter_test.go @@ -31,9 +31,9 @@ func init() { register.Function1x1(greaterThanOne) } -func alwaysTrue(a int) bool { return true } -func alwaysFalse(a int) bool { return false } -func isOne(a int) bool { return a == 1 } +func alwaysTrue(a int) bool { return true } +func alwaysFalse(a int) bool { return false } +func isOne(a int) bool { return a == 1 } func greaterThanOne(a int) bool { return a > 1 } func TestInclude(t *testing.T) { diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 1224ce2bb071..f66cc1f53bfc 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -145,7 +145,7 @@ var prismFilters = []string{ // TODO(https://github.com/apache/beam/issues/21058): Xlang ios don't yet work on prism. "TestKafkaIO.*", - // TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow prism runners. + // TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners. "TestBigQueryIO.*", "TestSpannerIO.*", // The prism runner does not support pipeline drain for SDF. From f48fb14db3c8dacdfaf65cc8761d4a15f36b6c4d Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 4 Aug 2023 14:04:01 -0700 Subject: [PATCH 19/21] remove unnecessary churn changes --- sdks/go/pkg/beam/beam.shims.go | 10 ++++++ .../pkg/beam/core/runtime/exec/translate.go | 1 + sdks/go/pkg/beam/create_test.go | 14 ++++---- sdks/go/pkg/beam/io/avroio/avroio.go | 7 ++-- sdks/go/pkg/beam/io/avroio/avroio_test.go | 11 +++--- .../pkg/beam/io/datastoreio/datastore_test.go | 5 +++ sdks/go/pkg/beam/io/spannerio/write_test.go | 2 -- sdks/go/pkg/beam/runners/direct/direct.go | 1 + .../runners/prism/internal/execute_test.go | 35 +++++++++---------- .../prism/internal/jobservices/management.go | 4 +-- .../beam/runners/prism/internal/preprocess.go | 1 + .../pkg/beam/testing/passert/equals_test.go | 1 - .../beam/transforms/filter/distinct_test.go | 4 --- .../pkg/beam/transforms/filter/filter_test.go | 4 +++ .../go/pkg/beam/transforms/stats/quantiles.go | 2 +- 15 files changed, 59 insertions(+), 43 deletions(-) diff --git a/sdks/go/pkg/beam/beam.shims.go b/sdks/go/pkg/beam/beam.shims.go index 17ec42d85173..6653fb0129f7 100644 --- a/sdks/go/pkg/beam/beam.shims.go +++ b/sdks/go/pkg/beam/beam.shims.go @@ -44,10 +44,13 @@ func init() { runtime.RegisterFunction(schemaDec) runtime.RegisterFunction(schemaEnc) runtime.RegisterFunction(swapKVFn) + runtime.RegisterType(reflect.TypeOf((*createFn)(nil)).Elem()) + schema.RegisterType(reflect.TypeOf((*createFn)(nil)).Elem()) runtime.RegisterType(reflect.TypeOf((*reflect.Type)(nil)).Elem()) schema.RegisterType(reflect.TypeOf((*reflect.Type)(nil)).Elem()) runtime.RegisterType(reflect.TypeOf((*reflectx.Func)(nil)).Elem()) schema.RegisterType(reflect.TypeOf((*reflectx.Func)(nil)).Elem()) + reflectx.RegisterStructWrapper(reflect.TypeOf((*createFn)(nil)).Elem(), wrapMakerCreateFn) reflectx.RegisterFunc(reflect.TypeOf((*func(reflect.Type, []byte) (typex.T, error))(nil)).Elem(), funcMakerReflect۰TypeSliceOfByteГTypex۰TError) reflectx.RegisterFunc(reflect.TypeOf((*func(reflect.Type, typex.T) ([]byte, error))(nil)).Elem(), funcMakerReflect۰TypeTypex۰TГSliceOfByteError) reflectx.RegisterFunc(reflect.TypeOf((*func([]byte, func(typex.T)) error)(nil)).Elem(), funcMakerSliceOfByteEmitTypex۰TГError) @@ -61,6 +64,13 @@ func init() { exec.RegisterEmitter(reflect.TypeOf((*func(typex.T))(nil)).Elem(), emitMakerTypex۰T) } +func wrapMakerCreateFn(fn any) map[string]reflectx.Func { + dfn := fn.(*createFn) + return map[string]reflectx.Func{ + "ProcessElement": reflectx.MakeFunc(func(a0 []byte, a1 func(typex.T)) error { return dfn.ProcessElement(a0, a1) }), + } +} + type callerReflect۰TypeSliceOfByteГTypex۰TError struct { fn func(reflect.Type, []byte) (typex.T, error) } diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go b/sdks/go/pkg/beam/core/runtime/exec/translate.go index 0e99dfc847e6..02a1418880e5 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/translate.go +++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go @@ -612,6 +612,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) { for i := 1; i < len(input); i++ { // TODO(https://github.com/apache/beam/issues/18602) Handle ViewFns for side inputs + ec, wc, err := b.makeCoderForPCollection(input[i]) if err != nil { return nil, err diff --git a/sdks/go/pkg/beam/create_test.go b/sdks/go/pkg/beam/create_test.go index 39fc484be1da..785c3b33db62 100644 --- a/sdks/go/pkg/beam/create_test.go +++ b/sdks/go/pkg/beam/create_test.go @@ -69,14 +69,14 @@ func TestCreateList(t *testing.T) { tests := []struct { values any }{ - //{[]int{1, 2, 3}}, - // {[]string{"1", "2", "3"}}, - // {[]float32{float32(0.1), float32(0.2), float32(0.3)}}, - // {[]float64{float64(0.1), float64(0.2), float64(0.3)}}, - // {[]uint{uint(1), uint(2), uint(3)}}, + {[]int{1, 2, 3}}, + {[]string{"1", "2", "3"}}, + {[]float32{float32(0.1), float32(0.2), float32(0.3)}}, + {[]float64{float64(0.1), float64(0.2), float64(0.3)}}, + {[]uint{uint(1), uint(2), uint(3)}}, {[]bool{false, true, true, false, true}}, - {[]wc{wc{"a", 23}, wc{"b", 42}, wc{"c", 5}}}, - {[]*testProto{&testProto{}, &testProto{stringValue("test")}}}, // Test for BEAM-4401 + {[]wc{{"a", 23}, {"b", 42}, {"c", 5}}}, + {[]*testProto{{}, {stringValue("test")}}}, // Test for BEAM-4401 } for _, test := range tests { diff --git a/sdks/go/pkg/beam/io/avroio/avroio.go b/sdks/go/pkg/beam/io/avroio/avroio.go index 7aa5d0318999..b00c6d2eea00 100644 --- a/sdks/go/pkg/beam/io/avroio/avroio.go +++ b/sdks/go/pkg/beam/io/avroio/avroio.go @@ -26,14 +26,15 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/register" - "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/linkedin/goavro/v2" ) func init() { register.Function3x1(expandFn) - beam.RegisterType(reflect.TypeOf((*avroReadFn)(nil)).Elem()) - beam.RegisterType(reflect.TypeOf((*writeAvroFn)(nil)).Elem()) + register.DoFn3x1[context.Context, string, func(beam.X), error]((*avroReadFn)(nil)) + register.DoFn3x1[context.Context, int, func(*string) bool, error]((*writeAvroFn)(nil)) + register.Emitter1[beam.X]() + register.Iter1[string]() } // Read reads a set of files and returns lines as a PCollection diff --git a/sdks/go/pkg/beam/io/avroio/avroio_test.go b/sdks/go/pkg/beam/io/avroio/avroio_test.go index 7e7ea7ceee32..403a81875557 100644 --- a/sdks/go/pkg/beam/io/avroio/avroio_test.go +++ b/sdks/go/pkg/beam/io/avroio/avroio_test.go @@ -44,6 +44,11 @@ func init() { register.Function2x0(toJSONString) } +func toJSONString(user TwitterUser, emit func(string)) { + b, _ := json.Marshal(user) + emit(string(b)) +} + type Tweet struct { Stamp int64 `json:"timestamp"` Tweet string `json:"tweet"` @@ -135,11 +140,6 @@ const userSchema = `{ ] }` -func toJSONString(user TwitterUser, emit func(string)) { - b, _ := json.Marshal(user) - emit(string(b)) -} - func TestWrite(t *testing.T) { avroFile := "./user.avro" testUsername := "user1" @@ -153,6 +153,7 @@ func TestWrite(t *testing.T) { t.Cleanup(func() { os.Remove(avroFile) }) + ptest.RunAndValidate(t, p) if _, err := os.Stat(avroFile); errors.Is(err, os.ErrNotExist) { diff --git a/sdks/go/pkg/beam/io/datastoreio/datastore_test.go b/sdks/go/pkg/beam/io/datastoreio/datastore_test.go index a6fdf9987ca3..b95439e2d56d 100644 --- a/sdks/go/pkg/beam/io/datastoreio/datastore_test.go +++ b/sdks/go/pkg/beam/io/datastoreio/datastore_test.go @@ -35,6 +35,11 @@ func TestMain(m *testing.M) { ptest.MainWithDefault(m, "direct") } +func init() { + beam.RegisterType(reflect.TypeOf((*Foo)(nil)).Elem()) + beam.RegisterType(reflect.TypeOf((*Bar)(nil)).Elem()) +} + // fake client type implements datastoreio.clientType type fakeClient struct { runCounter int diff --git a/sdks/go/pkg/beam/io/spannerio/write_test.go b/sdks/go/pkg/beam/io/spannerio/write_test.go index d3cc8a1b9058..3c2c1f591519 100644 --- a/sdks/go/pkg/beam/io/spannerio/write_test.go +++ b/sdks/go/pkg/beam/io/spannerio/write_test.go @@ -19,8 +19,6 @@ import ( "context" "testing" - spannertest "github.com/apache/beam/sdks/v2/go/test/integration/io/spannerio" - "cloud.google.com/go/spanner" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" diff --git a/sdks/go/pkg/beam/runners/direct/direct.go b/sdks/go/pkg/beam/runners/direct/direct.go index d1f8937f7840..13288306066f 100644 --- a/sdks/go/pkg/beam/runners/direct/direct.go +++ b/sdks/go/pkg/beam/runners/direct/direct.go @@ -17,6 +17,7 @@ // pipelines in the current process. Useful for testing. // // Deprecated: Use prism as a local runner instead. +// Reliance on the direct runner leads to non-portable pipelines. package direct import ( diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go index f41f25ad0f71..b8492adc574c 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go @@ -325,8 +325,7 @@ func TestRunner_Pipelines(t *testing.T) { pipeline: func(s beam.Scope) { imp := beam.Impulse(s) col0 := beam.ParDo(s, dofn1, imp) - // col1 := beam.ParDo(s, dofn2, col0) - // Doesn't matter which of col1 or col2 is used. + // Doesn't matter which of col0 or col1 is used. sum := beam.ParDo(s, dofn3x1, col0, beam.SideInput{Input: col0}, beam.SideInput{Input: col0}) beam.ParDo(s, &int64Check{ Name: "sum sideinput check", @@ -339,7 +338,7 @@ func TestRunner_Pipelines(t *testing.T) { imp := beam.Impulse(s) col0 := beam.ParDo(s, dofn1, imp) col1 := beam.ParDo(s, dofn2, col0) - // Doesn't matter which of col1 or col2 is used. + // Doesn't matter which of col0 or col1 is used. sum := beam.ParDo(s, dofn3x1, col0, beam.SideInput{Input: col0}, beam.SideInput{Input: col1}) beam.ParDo(s, &int64Check{ Name: "sum sideinput check", @@ -501,6 +500,21 @@ func TestRunner_Metrics(t *testing.T) { }) } +func TestFailure(t *testing.T) { + initRunner(t) + + p, s := beam.NewPipelineWithRoot() + imp := beam.Impulse(s) + beam.ParDo(s, doFnFail, imp) + _, err := executeWithT(context.Background(), t, p) + if err == nil { + t.Fatalf("expected pipeline failure, but got a success") + } + if want := "doFnFail: failing as intended"; !strings.Contains(err.Error(), want) { + t.Fatalf("expected pipeline failure with %q, but was %v", want, err) + } +} + func TestRunner_Passert(t *testing.T) { initRunner(t) tests := []struct { @@ -548,21 +562,6 @@ func TestRunner_Passert(t *testing.T) { } } -func TestFailure(t *testing.T) { - initRunner(t) - - p, s := beam.NewPipelineWithRoot() - imp := beam.Impulse(s) - beam.ParDo(s, doFnFail, imp) - _, err := executeWithT(context.Background(), t, p) - if err == nil { - t.Fatalf("expected pipeline failure, but got a success") - } - if want := "doFnFail: failing as intended"; !strings.Contains(err.Error(), want) { - t.Fatalf("expected pipeline failure with %q, but was %v", want, err) - } -} - func toFoo(et beam.EventTime, id int, _ func(*int64) bool) (int, string) { return id, "ooo" } diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index 28d994fbfa5a..953ee50c559d 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -112,8 +112,7 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo urns.TransformGBK, urns.TransformFlatten, urns.TransformCombinePerKey, - urns.TransformAssignWindows, - urns.TransformReshuffle: + urns.TransformAssignWindows: // Very few expected transforms types for submitted pipelines. // Most URNs are for the runner to communicate back to the SDK for execution. case urns.TransformReshuffle: @@ -213,6 +212,7 @@ func (s *Server) GetMessageStream(req *jobpb.JobMessagesRequest, stream jobpb.Jo // Reached terminal state. return nil case jobpb.JobState_FAILED: + // Ensure we send an error message with the cause of the job failure. stream.Send(&jobpb.JobMessagesResponse{ Response: &jobpb.JobMessagesResponse_MessageResponse{ MessageResponse: &jobpb.JobMessage{ diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go index b2e0c0ed2195..96c5f5549b02 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go +++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go @@ -244,6 +244,7 @@ func defaultFusion(topological []string, comps *pipepb.Components) []*stage { continue } cs := pcolConsumers[pcolID] + for _, c := range cs { stg.transforms = append(stg.transforms, c.transform) consumed[c.transform] = true diff --git a/sdks/go/pkg/beam/testing/passert/equals_test.go b/sdks/go/pkg/beam/testing/passert/equals_test.go index 27419a5c13c8..a8a5c835f8ff 100644 --- a/sdks/go/pkg/beam/testing/passert/equals_test.go +++ b/sdks/go/pkg/beam/testing/passert/equals_test.go @@ -180,7 +180,6 @@ func ExampleEqualsList_mismatch() { list := [3]string{"wrong", "inputs", "here"} EqualsList(s, col, list) - err := ptest.Run(p) err = unwrapError(err) diff --git a/sdks/go/pkg/beam/transforms/filter/distinct_test.go b/sdks/go/pkg/beam/transforms/filter/distinct_test.go index 0620c917d6e0..bb275cc8fb5b 100644 --- a/sdks/go/pkg/beam/transforms/filter/distinct_test.go +++ b/sdks/go/pkg/beam/transforms/filter/distinct_test.go @@ -23,10 +23,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter" ) -func TestMain(m *testing.M) { - ptest.Main(m) -} - type s struct { A int B string diff --git a/sdks/go/pkg/beam/transforms/filter/filter_test.go b/sdks/go/pkg/beam/transforms/filter/filter_test.go index 4e9eb03f9d84..ffa138e099a6 100644 --- a/sdks/go/pkg/beam/transforms/filter/filter_test.go +++ b/sdks/go/pkg/beam/transforms/filter/filter_test.go @@ -24,6 +24,10 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter" ) +func TestMain(m *testing.M) { + ptest.Main(m) +} + func init() { register.Function1x1(alwaysTrue) register.Function1x1(alwaysFalse) diff --git a/sdks/go/pkg/beam/transforms/stats/quantiles.go b/sdks/go/pkg/beam/transforms/stats/quantiles.go index 2734bba3dc60..6d2baa8b5e99 100644 --- a/sdks/go/pkg/beam/transforms/stats/quantiles.go +++ b/sdks/go/pkg/beam/transforms/stats/quantiles.go @@ -47,7 +47,7 @@ func init() { beam.RegisterCoder(weightedElementType, encodeWeightedElement, decodeWeightedElement) register.Function1x2(fixedKey) - //register.Function2x1(makeWeightedElement) // TODO make prism fail faster when this is commented out. + register.Function2x1(makeWeightedElement) } // Opts contains settings used to configure how approximate quantiles are computed. From cd16c24f4423f60ee88dda9d83eb5bef71f1b1bb Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 4 Aug 2023 14:19:41 -0700 Subject: [PATCH 20/21] rm execute line --- sdks/go/pkg/beam/runners/prism/internal/execute_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go index b8492adc574c..1a5ae7989a06 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go @@ -325,7 +325,6 @@ func TestRunner_Pipelines(t *testing.T) { pipeline: func(s beam.Scope) { imp := beam.Impulse(s) col0 := beam.ParDo(s, dofn1, imp) - // Doesn't matter which of col0 or col1 is used. sum := beam.ParDo(s, dofn3x1, col0, beam.SideInput{Input: col0}, beam.SideInput{Input: col0}) beam.ParDo(s, &int64Check{ Name: "sum sideinput check", From 2ea91e504d27122e66c92d07d1edf3ac0e032794 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Mon, 7 Aug 2023 09:54:20 -0700 Subject: [PATCH 21/21] Update sdks/go/pkg/beam/runners/vet/vet.go Co-authored-by: Ritesh Ghorse --- sdks/go/pkg/beam/runners/vet/vet.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/vet/vet.go b/sdks/go/pkg/beam/runners/vet/vet.go index 2b5238ddc608..739f5db61c5b 100644 --- a/sdks/go/pkg/beam/runners/vet/vet.go +++ b/sdks/go/pkg/beam/runners/vet/vet.go @@ -54,7 +54,7 @@ func init() { type disabledResolver bool func (p disabledResolver) Sym2Addr(name string) (uintptr, error) { - return 0, errors.Errorf("%v not found. Register DoFns and functions with the the beam/register package.", name) + return 0, errors.Errorf("%v not found. Register DoFns and functions with the beam/register package.", name) } // Execute evaluates the pipeline on whether it can run without reflection.