Skip to content

Commit

Permalink
[#24339] Make Slices use iterable coder instead of custom coder. (#24346
Browse files Browse the repository at this point in the history
)

* Improve debugging strings.

* Switch slices to use iterable coder.

* Handle coderrefs

* tests for new translate code

* fulltype testing

* Add Statebacked iterable case.

* Add unlifted combines.

* Comment catches.

Co-authored-by: lostluck <[email protected]>
  • Loading branch information
lostluck and lostluck authored Nov 29, 2022
1 parent 22dcacb commit 166b881
Show file tree
Hide file tree
Showing 10 changed files with 309 additions and 81 deletions.
21 changes: 17 additions & 4 deletions sdks/go/pkg/beam/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,23 @@ func NewCoder(t FullType) Coder {

func inferCoder(t FullType) (*coder.Coder, error) {
switch t.Class() {
case typex.Concrete, typex.Container:
case typex.Container:
switch t.Type() {
case reflectx.ByteSlice:
return &coder.Coder{Kind: coder.Bytes, T: t}, nil
}
switch t.Type().Kind() {
case reflect.Slice:
c, err := inferCoder(t.Components()[0])
if err != nil {
return nil, err
}
return &coder.Coder{Kind: coder.Iterable, T: t, Components: []*coder.Coder{c}}, nil

default:
panic(fmt.Sprintf("inferCoder: unknown container kind %v", t))
}
case typex.Concrete:
switch t.Type() {
case reflectx.Int64:
// use the beam varint coder.
Expand Down Expand Up @@ -183,9 +199,6 @@ func inferCoder(t FullType) (*coder.Coder, error) {
case reflectx.String:
return &coder.Coder{Kind: coder.String, T: t}, nil

case reflectx.ByteSlice:
return &coder.Coder{Kind: coder.Bytes, T: t}, nil

case reflectx.Bool:
return &coder.Coder{Kind: coder.Bool, T: t}, nil

Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/exec/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (n *DataSource) Down(ctx context.Context) error {
}

func (n *DataSource) String() string {
return fmt.Sprintf("DataSource[%v, %v] Coder:%v Out:%v", n.SID, n.Name, n.Coder, n.Out.ID())
return fmt.Sprintf("DataSource[%v, %v] Out:%v Coder:%v ", n.SID, n.Name, n.Out.ID(), n.Coder)
}

// incrementIndexAndCheckSplit increments DataSource.index by one and checks if
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/exec/pardo.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,5 +403,5 @@ func (n *ParDo) fail(err error) error {
}

func (n *ParDo) String() string {
return fmt.Sprintf("ParDo[%v] Out:%v", path.Base(n.Fn.Name()), IDs(n.Out...))
return fmt.Sprintf("ParDo[%v] Out:%v Sig: %v", path.Base(n.Fn.Name()), IDs(n.Out...), n.Fn.ProcessElementFn().Fn.Type())
}
57 changes: 57 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strconv"
"strings"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
Expand Down Expand Up @@ -95,11 +96,67 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) (*Plan, error) {
b.units = b.units[:len(b.units)-1]
}

mayFixDataSourceCoder(u)
b.units = append(b.units, u)
}
return b.build()
}

// mayFixDataSourceCoder checks the node downstream of the DataSource and if applicable, changes
// a KV<k, Iter<V>> coder to a CoGBK<k, v>. This requires knowledge of the downstream node because
// coder interpretation is ambiguous to received types in DoFns, and we can only interpret it right
// at execution time with knowledge of both.
func mayFixDataSourceCoder(u *DataSource) {
if !coder.IsKV(coder.SkipW(u.Coder)) {
return // If it's not a KV, there's nothing to do here.
}
if coder.SkipW(u.Coder).Components[1].Kind != coder.Iterable {
return // If the V is not an iterable, we don't care.
}
out := u.Out
if mp, ok := out.(*Multiplex); ok {
// Here we trust that the Multiplex Outs are all the same signature, since we've validated
// that at construction time.
out = mp.Out[0]
}

switch n := out.(type) {
// These nodes always expect CoGBK behavior.
case *Expand, *MergeAccumulators, *ReshuffleOutput, *Combine:
u.Coder = convertToCoGBK(u.Coder)
return
case *ParDo:
// So we now know we have a KV<k, Iter<V>>. So we need to validate whether the DoFn has an
// iter function in the value slot. If it does, we need to use a CoGBK coder.
sig := n.Fn.ProcessElementFn()
// Get all valid inputs and side inputs.
in := sig.Params(funcx.FnValue | funcx.FnIter | funcx.FnReIter)

if len(in) < 2 {
return // Somehow there's only a single value, so we're done. (Defense against generic KVs)
}
// It's an iterator, so we can assume it's a GBK, due to previous pre-conditions.
if sig.Param[in[1]].Kind == funcx.FnIter {
u.Coder = convertToCoGBK(u.Coder)
return
}
}
}

func convertToCoGBK(oc *coder.Coder) *coder.Coder {
ocnw := coder.SkipW(oc)
// Validate that all values from the coder are iterables.
comps := make([]*coder.Coder, 0, len(ocnw.Components))
comps = append(comps, ocnw.Components[0])
for _, c := range ocnw.Components[1:] {
if c.Kind != coder.Iterable {
panic(fmt.Sprintf("want all values to be iterables: %v", oc))
}
comps = append(comps, c.Components[0])
}
return coder.NewW(coder.NewCoGBK(comps), oc.Window)
}

type builder struct {
desc *fnpb.ProcessBundleDescriptor
coders *graphx.CoderUnmarshaller
Expand Down
94 changes: 94 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/translate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
Expand Down Expand Up @@ -90,6 +91,99 @@ func TestUnmarshalReshuffleCoders(t *testing.T) {
}
}

func TestMayFixDataSourceCoder(t *testing.T) {
knownStart := coder.NewW(
coder.NewKV([]*coder.Coder{coder.NewBytes(), coder.NewI(coder.NewString())}),
coder.NewGlobalWindow())
knownWant := coder.NewW(
coder.NewCoGBK([]*coder.Coder{coder.NewBytes(), coder.NewString()}),
coder.NewGlobalWindow())

makeParDo := func(t *testing.T, fn any) *ParDo {
t.Helper()
dfn, err := graph.NewDoFn(fn)
if err != nil {
t.Fatalf("couldn't construct ParDo with Sig: %T %v", fn, err)
}
return &ParDo{Fn: dfn}
}

tests := []struct {
name string
start, want *coder.Coder
out Node
}{
{
name: "bytes",
start: coder.NewBytes(),
}, {
name: "W<bytes>",
start: coder.NewW(coder.NewBytes(), coder.NewGlobalWindow()),
}, {
name: "W<KV<bytes,bool>",
start: coder.NewW(
coder.NewKV([]*coder.Coder{coder.NewBytes(), coder.NewBool()}),
coder.NewGlobalWindow()),
}, {
name: "W<KV<bytes,Iterable<string>>_nil",
start: knownStart,
}, {
name: "W<KV<bytes,Iterable<string>>_Expand",
out: &Expand{},
start: knownStart,
want: knownWant,
}, {
name: "W<KV<bytes,Iterable<string>>_Combine",
out: &Combine{},
start: knownStart,
want: knownWant,
}, {
name: "W<KV<bytes,Iterable<string>>_ReshuffleOutput",
out: &ReshuffleOutput{},
start: knownStart,
want: knownWant,
}, {
name: "W<KV<bytes,Iterable<string>>_MergeAccumulators",
out: &MergeAccumulators{},
start: knownStart,
want: knownWant,
}, {
name: "W<KV<bytes,Iterable<string>>_Multiplex_Expand",
out: &Multiplex{Out: []Node{&Expand{}}},
start: knownStart,
want: knownWant,
}, {
name: "W<KV<bytes,Iterable<string>>_Multiplex_ParDo_KV",
out: &Multiplex{Out: []Node{makeParDo(t, func([]byte, []string) {})}},
start: knownStart,
}, {
name: "W<KV<bytes,Iterable<string>>_Multiplex_ParDo_GBK",
out: &Multiplex{Out: []Node{makeParDo(t, func([]byte, func(*string) bool) {})}},
start: knownStart,
want: knownWant,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// If want is nil, we expect no changes.
if test.want == nil {
test.want = test.start
}

u := &DataSource{
Coder: test.start,
Out: test.out,
}
mayFixDataSourceCoder(u)
if !test.want.Equals(u.Coder) {
t.Errorf("mayFixDataSourceCoder(Datasource[Coder: %v, Out: %T]), got %v, want %v", test.start, test.out, u.Coder, test.want)
}

})
}
}

func TestUnmarshallWindowFn(t *testing.T) {
tests := []struct {
name string
Expand Down
30 changes: 18 additions & 12 deletions sdks/go/pkg/beam/core/runtime/graphx/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,44 +216,43 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder,
}

id := components[1]
kind := coder.KV
root := typex.KVType

elm, err := b.peek(id)
if err != nil {
return nil, err
}

switch elm.GetSpec().GetUrn() {
case urnIterableCoder, urnStateBackedIterableCoder:
id = elm.GetComponentCoderIds()[0]
kind = coder.CoGBK
root = typex.CoGBKType
iterElmID := elm.GetComponentCoderIds()[0]

// TODO(https://github.com/apache/beam/issues/18032): If CoGBK with > 1 input, handle as special GBK. We expect
// it to be encoded as CoGBK<K,LP<CoGBKList<V,W,..>>>. Remove this handling once
// CoGBK has a first-class representation.

if ids, ok := b.isCoGBKList(id); ok {
// If the value is an iterable, and a special CoGBK type, then expand it to the real
// CoGBK signature, instead of the special type.
if ids, ok := b.isCoGBKList(iterElmID); ok {
// CoGBK<K,V,W,..>

values, err := b.Coders(ids)
if err != nil {
return nil, err
}

t := typex.New(root, append([]typex.FullType{key.T}, coder.Types(values)...)...)
return &coder.Coder{Kind: kind, T: t, Components: append([]*coder.Coder{key}, values...)}, nil
t := typex.New(typex.CoGBKType, append([]typex.FullType{key.T}, coder.Types(values)...)...)
return &coder.Coder{Kind: coder.CoGBK, T: t, Components: append([]*coder.Coder{key}, values...)}, nil
}
// It's valid to have a KV<k,Iter<v>> without being a CoGBK, and validating if we need to change to
// a CoGBK is done at the DataSource, since that's when we can check against the downstream nodes.
}

value, err := b.Coder(id)
if err != nil {
return nil, err
}

t := typex.New(root, key.T, value.T)
return &coder.Coder{Kind: kind, T: t, Components: []*coder.Coder{key, value}}, nil
t := typex.New(typex.KVType, key.T, value.T)
return &coder.Coder{Kind: coder.KV, T: t, Components: []*coder.Coder{key, value}}, nil

case urnLengthPrefixCoder:
if len(components) != 1 {
Expand Down Expand Up @@ -338,7 +337,7 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder,
}
return c, nil

case urnIterableCoder:
case urnIterableCoder, urnStateBackedIterableCoder:
if len(components) != 1 {
return nil, errors.Errorf("could not unmarshal iterable coder from %v, expected one component but got %d", c, len(components))
}
Expand Down Expand Up @@ -553,6 +552,13 @@ func (b *CoderMarshaller) Add(c *coder.Coder) (string, error) {

return b.internBuiltInCoder(urnTimerCoder, comp...), nil

case coder.Iterable:
comp, err := b.AddMulti(c.Components)
if err != nil {
return "", errors.Wrapf(err, "failed to marshal iterable coder %v", c)
}
return b.internBuiltInCoder(urnIterableCoder, comp...), nil

default:
err := errors.Errorf("unexpected coder kind: %v", c.Kind)
return "", errors.WithContextf(err, "failed to marshal coder %v", c)
Expand Down
Loading

0 comments on commit 166b881

Please sign in to comment.