diff --git a/sdks/go/examples/minimal_wordcount/minimal_wordcount.go b/sdks/go/examples/minimal_wordcount/minimal_wordcount.go index f5f22cae1d65..83cb390a6093 100644 --- a/sdks/go/examples/minimal_wordcount/minimal_wordcount.go +++ b/sdks/go/examples/minimal_wordcount/minimal_wordcount.go @@ -27,6 +27,7 @@ // // Concepts: // +// 0. Registering transforms with Beam. // 1. Reading data from text files // 2. Specifying 'inline' transforms // 3. Counting items in a PCollection @@ -62,6 +63,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" @@ -71,6 +73,26 @@ import ( var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`) +func splitWords(line string, emit func(string)) { + for _, word := range wordRE.FindAllString(line, -1) { + emit(word) + } +} + +func formatCounts(w string, c int) string { + return fmt.Sprintf("%s: %v", w, c) +} + +// Concept #0: Transform functions executed by Beam need to be registered +// so they can be executed by portable runners. We use the register package +// in an init block to inform Beam of the functions we will be using, so +// it can access them on workers. +func init() { + register.Function2x0(splitWords) + register.Function2x1(formatCounts) + register.Emitter1[string]() +} + func main() { // beam.Init() is an initialization hook that must be called on startup. beam.Init() @@ -91,15 +113,11 @@ func main() { lines := textio.Read(s, "gs://apache-beam-samples/shakespeare/kinglear.txt") // Concept #2: Invoke a ParDo transform on our PCollection of text lines. - // This ParDo invokes a DoFn (defined in-line) on each element that + // This ParDo invokes a DoFn (registered earlier) on each element that // tokenizes the text line into individual words. The ParDo returns a // PCollection of type string, where each element is an individual word in // Shakespeare's collected texts. - words := beam.ParDo(s, func(line string, emit func(string)) { - for _, word := range wordRE.FindAllString(line, -1) { - emit(word) - } - }, lines) + words := beam.ParDo(s, splitWords, lines) // Concept #3: Invoke the stats.Count transform on our PCollection of // individual words. The Count transform returns a new PCollection of @@ -110,9 +128,7 @@ func main() { // Use a ParDo to format our PCollection of word counts into a printable // string, suitable for writing to an output file. When each element // produces exactly one element, the DoFn can simply return it. - formatted := beam.ParDo(s, func(w string, c int) string { - return fmt.Sprintf("%s: %v", w, c) - }, counted) + formatted := beam.ParDo(s, formatCounts, counted) // Concept #4: Invoke textio.Write at the end of the pipeline to write // the contents of a PCollection (in this case, our PCollection of