Skip to content

Commit

Permalink
[BEAM-14499] Step global, unbounded side input case back to warning (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
jrmccluskey authored May 23, 2022
1 parent ac24771 commit 3f05429
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
5 changes: 4 additions & 1 deletion sdks/go/pkg/beam/pardo.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package beam

import (
"context"
"fmt"
"reflect"

Expand All @@ -24,6 +25,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
)

func addParDoCtx(err error, s Scope) error {
Expand Down Expand Up @@ -63,7 +65,8 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCo
return nil, fmt.Errorf("main input is global windowed in DoFn %v but side input %v is not, cannot map windows correctly. Consider re-windowing the side input PCollection before use", fn, i)
}
if (sideWfn.Kind == window.GlobalWindows) && !sideNode.Bounded() {
return nil, fmt.Errorf("side input %v is global windowed in DoFn %v but is unbounded, DoFn will block until end of Global Window. Consider windowing your unbounded side input PCollection before use", i, fn)
// TODO(BEAM-14501): Replace this warning with an error return when proper streaming test functions have been added.
log.Warnf(context.Background(), "side input %v is global windowed in DoFn %v but is unbounded, DoFn will block until end of Global Window. Consider windowing your unbounded side input PCollection before use. This will cause your pipeline to fail in a future release, see BEAM-14501 for details", i, fn)
}
in = append(in, s.Input.n)
}
Expand Down
11 changes: 6 additions & 5 deletions sdks/go/pkg/beam/pardo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,12 @@ func TestParDoSideInputValidation(t *testing.T) {
wFn *window.Fn
isBounded bool
}{
{
"global window unbounded",
window.NewGlobalWindows(),
false,
},
// TODO(BEAM-14501): Re-enable this test case once proper streaming testing support is finished.
// {
// "global window unbounded",
// window.NewGlobalWindows(),
// false,
// },
{
"side input session windowed",
window.NewSessions(1 * time.Minute),
Expand Down

0 comments on commit 3f05429

Please sign in to comment.