Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT SUBMIT] Umbrella PR for setting prism as the default Go SDK runner instead of the direct runner. #27550

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev-support/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ RUN pip3 install distlib==0.3.1 yapf==0.29.0 pytest
###
# Install Go
###
ENV DOWNLOAD_GO_VERSION=1.19.6
ENV DOWNLOAD_GO_VERSION=1.20.5
RUN wget https://golang.org/dl/go${DOWNLOAD_GO_VERSION}.linux-amd64.tar.gz && \
tar -C /usr/local -xzf go${DOWNLOAD_GO_VERSION}.linux-amd64.tar.gz
ENV GOROOT /usr/local/go
Expand Down
2 changes: 1 addition & 1 deletion playground/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ build, test, and deploy the frontend and backend services.
> - buf
> - sbt

1. Install Go 1.18+
1. Install Go 1.20+

**Ubuntu 22.04 and newer:**
```shell
Expand Down
2 changes: 1 addition & 1 deletion playground/backend/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

module beam.apache.org/playground/backend

go 1.18
go 1.20

require (
cloud.google.com/go/datastore v1.10.0
Expand Down
2 changes: 1 addition & 1 deletion sdks/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// directory.
module github.com/apache/beam/sdks/v2

go 1.19
go 1.20

require (
cloud.google.com/go/bigquery v1.52.0
Expand Down
1 change: 1 addition & 0 deletions sdks/go/examples/large_wordcount/large_wordcount.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ import (
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dot"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal"
Expand Down
6 changes: 3 additions & 3 deletions sdks/go/examples/minimal_wordcount/minimal_wordcount.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ import (

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"

_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
Expand Down Expand Up @@ -119,6 +119,6 @@ func main() {
// formatted strings) to a text file.
textio.Write(s, "wordcounts.txt", formatted)

// Run the pipeline on the direct runner.
direct.Execute(context.Background(), p)
// Run the pipeline on the prism runner.
prism.Execute(context.Background(), p)
}
40 changes: 22 additions & 18 deletions sdks/go/examples/snippets/04transforms.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,17 @@ func applyWordLen(s beam.Scope, words beam.PCollection) beam.PCollection {
return wordLengths
}

// [START model_pardo_apply_anon]

func wordLengths(word string) int { return len(word) }
func init() { register.Function1x1(wordLengths) }

func applyWordLenAnon(s beam.Scope, words beam.PCollection) beam.PCollection {
// [START model_pardo_apply_anon]
// Apply an anonymous function as a DoFn PCollection words.
// Save the result as the PCollection wordLengths.
wordLengths := beam.ParDo(s, func(word string) int {
return len(word)
}, words)
// [END model_pardo_apply_anon]
return wordLengths
return beam.ParDo(s, wordLengths, words)
}

// [END model_pardo_apply_anon]

func applyGbk(s beam.Scope, input []stringPair) beam.PCollection {
// [START groupbykey]
// CreateAndSplit creates and returns a PCollection with <K,V>
Expand Down Expand Up @@ -345,22 +345,26 @@ func globallyAverage(s beam.Scope, ints beam.PCollection) beam.PCollection {
return average
}

// [START combine_global_with_default]

func returnSideOrDefault(d float64, iter func(*float64) bool) float64 {
var c float64
if iter(&c) {
// Side input has a value, so return it.
return c
}
// Otherwise, return the default
return d
}
func init() { register.Function2x1(returnSideOrDefault) }

func globallyAverageWithDefault(s beam.Scope, ints beam.PCollection) beam.PCollection {
// [START combine_global_with_default]
// Setting combine defaults has requires no helper function in the Go SDK.
average := beam.Combine(s, &averageFn{}, ints)

// To add a default value:
defaultValue := beam.Create(s, float64(0))
avgWithDefault := beam.ParDo(s, func(d float64, iter func(*float64) bool) float64 {
var c float64
if iter(&c) {
// Side input has a value, so return it.
return c
}
// Otherwise, return the default
return d
}, defaultValue, beam.SideInput{Input: average})
avgWithDefault := beam.ParDo(s, returnSideOrDefault, defaultValue, beam.SideInput{Input: average})
// [END combine_global_with_default]
return avgWithDefault
}
Expand Down
15 changes: 10 additions & 5 deletions sdks/go/examples/snippets/04transforms_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)
Expand Down Expand Up @@ -205,6 +206,14 @@ func TestSideInputs(t *testing.T) {
ptest.RunAndValidate(t, p)
}

func emitOnTestKey(k string, v int, emit func(int)) {
if k == "test" {
emit(v)
}
}

func init() { register.Function3x0(emitOnTestKey) }

func TestComposite(t *testing.T) {
p, s, lines := ptest.CreateList([]string{
"this test dataset has the word test",
Expand All @@ -215,11 +224,7 @@ func TestComposite(t *testing.T) {
// A Composite PTransform function is called like any other function.
wordCounts := CountWords(s, lines) // returns a PCollection<KV<string,int>>
// [END countwords_composite_call]
testCount := beam.ParDo(s, func(k string, v int, emit func(int)) {
if k == "test" {
emit(v)
}
}, wordCounts)
testCount := beam.ParDo(s, emitOnTestKey, wordCounts)
passert.Equals(s, testCount, 4)
ptest.RunAndValidate(t, p)
}
2 changes: 1 addition & 1 deletion sdks/go/examples/snippets/10metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func queryMetrics(pr beam.PipelineResult, ns, n string) metrics.QueryResults {

// [END metrics_query]

var runner = "direct"
var runner = "prism"

// [START metrics_pipeline]

Expand Down
10 changes: 0 additions & 10 deletions sdks/go/pkg/beam/beam.shims.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/exec/pardo.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,5 +552,5 @@ func (n *ParDo) fail(err error) error {
}

func (n *ParDo) String() string {
return fmt.Sprintf("ParDo[%v] Out:%v Sig: %v", path.Base(n.Fn.Name()), IDs(n.Out...), n.Fn.ProcessElementFn().Fn.Type())
return fmt.Sprintf("ParDo[%v] Out:%v Sig: %v, SideInputs: %v", path.Base(n.Fn.Name()), IDs(n.Out...), n.Fn.ProcessElementFn().Fn.Type(), n.Side)
}
3 changes: 2 additions & 1 deletion sdks/go/pkg/beam/core/runtime/exec/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ func (p *Plan) Down(ctx context.Context) error {

func (p *Plan) String() string {
var units []string
for _, u := range p.units {
for i := len(p.units) - 1; i >= 0; i-- {
u := p.units[i]
units = append(units, fmt.Sprintf("%v: %v", u.ID(), u))
}
return fmt.Sprintf("Plan[%v]:\n%v", p.ID(), strings.Join(units, "\n"))
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/exec/sideinput.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (s *sideInputAdapter) NewKeyedIterable(ctx context.Context, reader StateRea
}

func (s *sideInputAdapter) String() string {
return fmt.Sprintf("SideInputAdapter[%v, %v]", s.sid, s.sideInputID)
return fmt.Sprintf("SideInputAdapter[%v, %v] - Coder %v", s.sid, s.sideInputID, s.c)
}

// proxyReStream is a simple wrapper of an open function.
Expand Down
12 changes: 9 additions & 3 deletions sdks/go/pkg/beam/core/runtime/exec/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ func newBuilder(desc *fnpb.ProcessBundleDescriptor) (*builder, error) {

input := unmarshalKeyedValues(transform.GetInputs())
for i, from := range input {
succ[from] = append(succ[from], linkID{id, i})
// We don't need to multiplex successors for pardo side inputs.
// so we only do so for SDK side Flattens.
if i == 0 || transform.GetSpec().GetUrn() == graphx.URNFlatten {
succ[from] = append(succ[from], linkID{id, i})
}
}
output := unmarshalKeyedValues(transform.GetOutputs())
for _, to := range output {
Expand Down Expand Up @@ -608,7 +612,6 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {

for i := 1; i < len(input); i++ {
// TODO(https://github.com/apache/beam/issues/18602) Handle ViewFns for side inputs

ec, wc, err := b.makeCoderForPCollection(input[i])
if err != nil {
return nil, err
Expand Down Expand Up @@ -731,7 +734,10 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
}
// Strip PCollections from Expand nodes, as CoGBK metrics are handled by
// the DataSource that preceeds them.
trueOut := out[0].(*PCollection).Out
trueOut := out[0]
if pcol, ok := trueOut.(*PCollection); ok {
trueOut = pcol.Out
}
b.units = b.units[:len(b.units)-1]
u = &Expand{UID: b.idgen.New(), ValueDecoders: decoders, Out: trueOut}

Expand Down
12 changes: 10 additions & 2 deletions sdks/go/pkg/beam/core/runtime/harness/datamgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -128,7 +130,12 @@ func (m *DataChannelManager) Open(ctx context.Context, port exec.Port) (*DataCha
return nil, err
}
ch.forceRecreate = func(id string, err error) {
log.Warnf(ctx, "forcing DataChannel[%v] reconnection on port %v due to %v", id, port, err)
switch status.Code(err) {
case codes.Canceled:
// Don't log on context canceled path.
default:
log.Warnf(ctx, "forcing DataChannel[%v] reconnection on port %v due to %v", id, port, err)
}
m.mu.Lock()
delete(m.ports, port.URL)
m.mu.Unlock()
Expand Down Expand Up @@ -371,7 +378,8 @@ func (c *DataChannel) read(ctx context.Context) {
c.terminateStreamOnError(err)
c.mu.Unlock()

if err == io.EOF {
st := status.Code(err)
if st == codes.Canceled || err == io.EOF {
return
}
log.Errorf(ctx, "DataChannel.read %v bad: %v", c.id, err)
Expand Down
8 changes: 5 additions & 3 deletions sdks/go/pkg/beam/core/runtime/harness/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,8 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
c.mu.Unlock()

if err != nil {
return fail(ctx, instID, "Failed: %v", err)
c.failed[instID] = err
return fail(ctx, instID, "process bundle failed for instruction %v using plan %v : %v", instID, bdID, err)
}

tokens := msg.GetCacheTokens()
Expand Down Expand Up @@ -425,8 +426,9 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
c.failed[instID] = err
} else if dataError != io.EOF && dataError != nil {
// If there was an error on the data channel reads, fail this bundle
// since we may have had a short read.
// since we may have had a short read.'
c.failed[instID] = dataError
err = dataError
} else {
// Non failure plans should either be moved to the finalized state
// or to plans so they can be re-used.
Expand Down Expand Up @@ -706,6 +708,6 @@ func fail(ctx context.Context, id instructionID, format string, args ...any) *fn
// dial to the specified endpoint. if timeout <=0, call blocks until
// grpc.Dial succeeds.
func dial(ctx context.Context, endpoint, purpose string, timeout time.Duration) (*grpc.ClientConn, error) {
log.Infof(ctx, "Connecting via grpc @ %s for %s ...", endpoint, purpose)
log.Output(ctx, log.SevDebug, 1, fmt.Sprintf("Connecting via grpc @ %s for %s ...", endpoint, purpose))
return grpcx.Dial(ctx, endpoint, timeout)
}
9 changes: 8 additions & 1 deletion sdks/go/pkg/beam/core/runtime/harness/statemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type writeTypeEnum int32
Expand Down Expand Up @@ -525,7 +527,12 @@ func (m *StateChannelManager) Open(ctx context.Context, port exec.Port) (*StateC
return nil, err
}
ch.forceRecreate = func(id string, err error) {
log.Warnf(ctx, "forcing StateChannel[%v] reconnection on port %v due to %v", id, port, err)
switch status.Code(err) {
case codes.Canceled:
// Don't log on context canceled path.
default:
log.Warnf(ctx, "forcing StateChannel[%v] reconnection on port %v due to %v", id, port, err)
}
m.mu.Lock()
delete(m.ports, port.URL)
m.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/symbols.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,5 @@ func ResolveFunction(name string, t reflect.Type) (any, error) {
type failResolver bool

func (p failResolver) Sym2Addr(name string) (uintptr, error) {
return 0, errors.Errorf("%v not found. Use runtime.RegisterFunction in unit tests", name)
return 0, errors.Errorf("%v not found. Register DoFns and functions with the the beam/register package.", name)
}
6 changes: 6 additions & 0 deletions sdks/go/pkg/beam/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"reflect"

"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
)

// Create inserts a fixed non-empty set of values into the pipeline. The values must
Expand Down Expand Up @@ -106,6 +107,11 @@ func createList(s Scope, values []any, t reflect.Type) (PCollection, error) {

// TODO(herohde) 6/26/2017: make 'create' a SDF once supported. See BEAM-2421.

func init() {
register.DoFn2x1[[]byte, func(T), error]((*createFn)(nil))
register.Emitter1[T]()
}

type createFn struct {
Values [][]byte `json:"values"`
Type EncodedType `json:"type"`
Expand Down
Loading