Skip to content

Commit

Permalink
Make window close strict.
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck committed Aug 3, 2023
1 parent 7112f72 commit 51124cc
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 48 deletions.
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, cod
wcID := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders())
return makeWindowCoders(coders[wcID])
}

func getOnlyValue[K comparable, V any](in map[K]V) V {
if len(in) != 1 {
panic(fmt.Sprintf("expected single value map, had %v - %v", len(in), in))
Expand Down
48 changes: 1 addition & 47 deletions sdks/go/pkg/beam/runners/prism/internal/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/jobservices"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal"
Expand Down Expand Up @@ -564,53 +565,6 @@ func TestFailure(t *testing.T) {
}
}

func TestRunner_Passert(t *testing.T) {
initRunner(t)
tests := []struct {
name string
pipeline func(s beam.Scope)
metrics func(t *testing.T, pr beam.PipelineResult)
}{
{
name: "Empty",
pipeline: func(s beam.Scope) {
imp := beam.Impulse(s)
col1 := beam.ParDo(s, dofnEmpty, imp)
passert.Empty(s, col1)
},
}, {
name: "Equals-TwoEmpty",
pipeline: func(s beam.Scope) {
imp := beam.Impulse(s)
col1 := beam.ParDo(s, dofnEmpty, imp)
col2 := beam.ParDo(s, dofnEmpty, imp)
passert.Equals(s, col1, col2)
},
}, {
name: "Equals",
pipeline: func(s beam.Scope) {
imp := beam.Impulse(s)
col1 := beam.ParDo(s, dofn1, imp)
col2 := beam.ParDo(s, dofn1, imp)
passert.Equals(s, col1, col2)
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p, s := beam.NewPipelineWithRoot()
test.pipeline(s)
pr, err := executeWithT(context.Background(), t, p)
if err != nil {
t.Fatal(err)
}
if test.metrics != nil {
test.metrics(t, pr)
}
})
}
}

func toFoo(et beam.EventTime, id int, _ func(*int64) bool) (int, string) {
return id, "ooo"
}
Expand Down
8 changes: 8 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,14 @@ func dofnGBK2(k int64, vs func(*string) bool, emit func(string)) {
emit(sum)
}

func dofnGBKKV(k string, vs func(*int64) bool, emit func(string, int64)) {
var v, sum int64
for vs(&v) {
sum += v
}
emit(k, sum)
}

type testRow struct {
A string
B int64
Expand Down

0 comments on commit 51124cc

Please sign in to comment.