Skip to content

Commit

Permalink
[BEAM-14473] Throw error if using globally windowed, unbounded side i…
Browse files Browse the repository at this point in the history
…nput (apache#17681)
  • Loading branch information
jrmccluskey authored May 16, 2022
1 parent e6aab06 commit 9eb8644
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
5 changes: 4 additions & 1 deletion sdks/go/pkg/beam/pardo.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCo
return nil, fmt.Errorf("error with side input %d in DoFn %v: PCollections using merging WindowFns are not supported as side inputs. Consider re-windowing the side input PCollection before use", i, fn)
}
if (inWfn.Kind == window.GlobalWindows) && (sideWfn.Kind != window.GlobalWindows) {
return nil, fmt.Errorf("main input is global windowed in DoFn %v but side input %v is not, cannot map windows correctly. Consider re-windowing the side input PCOllection before use", fn, i)
return nil, fmt.Errorf("main input is global windowed in DoFn %v but side input %v is not, cannot map windows correctly. Consider re-windowing the side input PCollection before use", fn, i)
}
if (sideWfn.Kind == window.GlobalWindows) && !sideNode.Bounded() {
return nil, fmt.Errorf("side input %v is global windowed in DoFn %v but is unbounded, DoFn will block until end of Global Window. Consider windowing your unbounded side input PCollection before use", i, fn)
}
in = append(in, s.Input.n)
}
Expand Down
47 changes: 47 additions & 0 deletions sdks/go/pkg/beam/pardo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@ import (
"reflect"
"strings"
"testing"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
)

Expand Down Expand Up @@ -132,3 +138,44 @@ type AnnotationsFn struct {
func (fn *AnnotationsFn) ProcessElement(v int) int {
return v
}

func doNothing(_ []byte, _ int) {}
func TestParDoSideInputValdiation(t *testing.T) {
var tests = []struct {
name string
wFn *window.Fn
isBounded bool
}{
{
"global window unbounded",
window.NewGlobalWindows(),
false,
},
{
"side input session windowed",
window.NewSessions(1 * time.Minute),
true,
},
{
"global main, interval side",
window.NewFixedWindows(10 * time.Second),
true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p := NewPipeline()
s := p.Root()

strat := &window.WindowingStrategy{Fn: test.wFn, Trigger: trigger.Default(), AccumulationMode: window.Discarding, AllowedLateness: 0}
sideCol := PCollection{n: graph.New().NewNode(typex.New(reflectx.Int), strat, test.isBounded)}
outCol, err := TryParDo(s, doNothing, Impulse(s), SideInput{Input: sideCol})
if outCol != nil {
t.Errorf("TryParDo() produced an output PCollection when it should have failed, got %v", outCol)
}
if err == nil {
t.Errorf("TryParDo() did not return an error when it should have")
}
})
}
}

0 comments on commit 9eb8644

Please sign in to comment.