Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
defaults and programming guide
Browse files Browse the repository at this point in the history
lostluck committed Aug 3, 2023
1 parent 1b76101 commit 8bb1053
Showing 4 changed files with 54 additions and 42 deletions.
43 changes: 23 additions & 20 deletions sdks/go/examples/snippets/04transforms.go
Original file line number Diff line number Diff line change
@@ -65,17 +65,17 @@ func applyWordLen(s beam.Scope, words beam.PCollection) beam.PCollection {
return wordLengths
}

// [START model_pardo_apply_anon]

func wordLengths(word string) int { return len(word) }
func init() { register.Function1x1(wordLengths) }

func applyWordLenAnon(s beam.Scope, words beam.PCollection) beam.PCollection {
// [START model_pardo_apply_anon]
// Apply an anonymous function as a DoFn PCollection words.
// Save the result as the PCollection wordLengths.
wordLengths := beam.ParDo(s, func(word string) int {
return len(word)
}, words)
// [END model_pardo_apply_anon]
return wordLengths
return beam.ParDo(s, wordLengths, words)
}

// [END model_pardo_apply_anon]

func applyGbk(s beam.Scope, input []stringPair) beam.PCollection {
// [START groupbykey]
// CreateAndSplit creates and returns a PCollection with <K,V>
@@ -345,26 +345,29 @@ func globallyAverage(s beam.Scope, ints beam.PCollection) beam.PCollection {
return average
}

// [START combine_global_with_default]

func returnSideOrDefault(d float64, iter func(*float64) bool) float64 {
var c float64
if iter(&c) {
// Side input has a value, so return it.
return c
}
// Otherwise, return the default
return d
}
func init() { register.Function2x1(returnSideOrDefault) }

func globallyAverageWithDefault(s beam.Scope, ints beam.PCollection) beam.PCollection {
// [START combine_global_with_default]
// Setting combine defaults has requires no helper function in the Go SDK.
average := beam.Combine(s, &averageFn{}, ints)

// To add a default value:
defaultValue := beam.Create(s, float64(0))
avgWithDefault := beam.ParDo(s, func(d float64, iter func(*float64) bool) float64 {
var c float64
if iter(&c) {
// Side input has a value, so return it.
return c
}
// Otherwise, return the default
return d
}, defaultValue, beam.SideInput{Input: average})
// [END combine_global_with_default]
return avgWithDefault
return beam.ParDo(s, returnSideOrDefault, defaultValue, beam.SideInput{Input: average})
}

// [END combine_global_with_default]
func perKeyAverage(s beam.Scope, playerAccuracies beam.PCollection) beam.PCollection {
// [START combine_per_key]
avgAccuracyPerPlayer := stats.MeanPerKey(s, playerAccuracies)
15 changes: 10 additions & 5 deletions sdks/go/examples/snippets/04transforms_test.go
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ import (
"testing"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)
@@ -205,6 +206,14 @@ func TestSideInputs(t *testing.T) {
ptest.RunAndValidate(t, p)
}

func emitOnTestKey(k string, v int, emit func(int)) {
if k == "test" {
emit(v)
}
}

func init() { register.Function3x0(emitOnTestKey) }

func TestComposite(t *testing.T) {
p, s, lines := ptest.CreateList([]string{
"this test dataset has the word test",
@@ -215,11 +224,7 @@ func TestComposite(t *testing.T) {
// A Composite PTransform function is called like any other function.
wordCounts := CountWords(s, lines) // returns a PCollection<KV<string,int>>
// [END countwords_composite_call]
testCount := beam.ParDo(s, func(k string, v int, emit func(int)) {
if k == "test" {
emit(v)
}
}, wordCounts)
testCount := beam.ParDo(s, emitOnTestKey, wordCounts)
passert.Equals(s, testCount, 4)
ptest.RunAndValidate(t, p)
}
26 changes: 15 additions & 11 deletions sdks/go/pkg/beam/testing/ptest/ptest.go
Original file line number Diff line number Diff line change
@@ -60,27 +60,31 @@ func CreateList2(a, b any) (*beam.Pipeline, beam.Scope, beam.PCollection, beam.P
return p, s, beam.CreateList(s, a), beam.CreateList(s, b)
}

const (
defaultRunner = "prism"
)

// Runner is a flag that sets which runner pipelines under test will use.
//
// The test file must have a TestMain that calls Main or MainWithDefault
// to function.
var (
Runner = runners.Runner
defaultRunner = "prism"
mainCalled = false
Runner = runners.Runner
defaultRunnerOverride = defaultRunner
mainCalled = false
)

func getRunner() string {
r := *Runner
if r == "" {
r = defaultRunner
r = defaultRunnerOverride
}
return r
}

// DefaultRunner returns the default runner name for the test file.
func DefaultRunner() string {
return defaultRunner
return defaultRunnerOverride
}

// MainCalled returns true iff Main or MainRet has been called.
@@ -133,38 +137,38 @@ func BuildAndRun(t *testing.T, build func(s beam.Scope)) beam.PipelineResult {
// ptest.Main(m)
// }
func Main(m *testing.M) {
MainWithDefault(m, "direct")
MainWithDefault(m, defaultRunner)
}

// MainWithDefault is an implementation of testing's TestMain to permit testing
// pipelines on runners other than the direct runner, while setting the default
// runner to use.
func MainWithDefault(m *testing.M, runner string) {
mainCalled = true
defaultRunner = runner
defaultRunnerOverride = runner
if !flag.Parsed() {
flag.Parse()
}
beam.Init()
os.Exit(m.Run())
}

// MainRet is equivelant to Main, but returns an exit code to pass to os.Exit().
// MainRet is equivalent to Main, but returns an exit code to pass to os.Exit().
//
// Example:
//
// func TestMain(m *testing.M) {
// os.Exit(ptest.Main(m))
// }
func MainRet(m *testing.M) int {
return MainRetWithDefault(m, "direct")
return MainRetWithDefault(m, defaultRunner)
}

// MainRetWithDefault is equivelant to MainWithDefault but returns an exit code
// MainRetWithDefault is equivalent to MainWithDefault but returns an exit code
// to pass to os.Exit().
func MainRetWithDefault(m *testing.M, runner string) int {
mainCalled = true
defaultRunner = runner
defaultRunnerOverride = runner
if !flag.Parsed() {
flag.Parse()
}
12 changes: 6 additions & 6 deletions sdks/go/pkg/beam/transforms/filter/filter_test.go
Original file line number Diff line number Diff line change
@@ -48,17 +48,17 @@ func TestInclude(t *testing.T) {
}{
{
[]int{1, 2, 3},
func(a int) bool { return true },
alwaysTrue,
[]int{1, 2, 3},
},
{
[]int{1, 2, 3},
func(a int) bool { return a == 1 },
isOne,
[]int{1},
},
{
[]int{1, 2, 3},
func(a int) bool { return a > 1 },
greaterThanOne,
[]int{2, 3},
},
}
@@ -81,17 +81,17 @@ func TestExclude(t *testing.T) {
}{
{
[]int{1, 2, 3},
func(a int) bool { return false },
alwaysFalse,
[]int{1, 2, 3},
},
{
[]int{1, 2, 3},
func(a int) bool { return a == 1 },
isOne,
[]int{2, 3},
},
{
[]int{1, 2, 3},
func(a int) bool { return a > 1 },
greaterThanOne,
[]int{1},
},
}

0 comments on commit 8bb1053

Please sign in to comment.