diff --git a/sdks/go/pkg/beam/pardo.go b/sdks/go/pkg/beam/pardo.go index e2d536cb4f02..86fcdaf1ea04 100644 --- a/sdks/go/pkg/beam/pardo.go +++ b/sdks/go/pkg/beam/pardo.go @@ -16,6 +16,7 @@ package beam import ( + "context" "fmt" "reflect" @@ -24,6 +25,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" ) func addParDoCtx(err error, s Scope) error { @@ -63,7 +65,8 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCo return nil, fmt.Errorf("main input is global windowed in DoFn %v but side input %v is not, cannot map windows correctly. Consider re-windowing the side input PCollection before use", fn, i) } if (sideWfn.Kind == window.GlobalWindows) && !sideNode.Bounded() { - return nil, fmt.Errorf("side input %v is global windowed in DoFn %v but is unbounded, DoFn will block until end of Global Window. Consider windowing your unbounded side input PCollection before use", i, fn) + // TODO(BEAM-14501): Replace this warning with an error return when proper streaming test functions have been added. + log.Warnf(context.Background(), "side input %v is global windowed in DoFn %v but is unbounded, DoFn will block until end of Global Window. Consider windowing your unbounded side input PCollection before use. This will cause your pipeline to fail in a future release, see BEAM-14501 for details", i, fn) } in = append(in, s.Input.n) } diff --git a/sdks/go/pkg/beam/pardo_test.go b/sdks/go/pkg/beam/pardo_test.go index 48c2b2adbecf..e124b7f2af92 100644 --- a/sdks/go/pkg/beam/pardo_test.go +++ b/sdks/go/pkg/beam/pardo_test.go @@ -146,11 +146,12 @@ func TestParDoSideInputValidation(t *testing.T) { wFn *window.Fn isBounded bool }{ - { - "global window unbounded", - window.NewGlobalWindows(), - false, - }, + // TODO(BEAM-14501): Re-enable this test case once proper streaming testing support is finished. + // { + // "global window unbounded", + // window.NewGlobalWindows(), + // false, + // }, { "side input session windowed", window.NewSessions(1 * time.Minute),