Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-14347] Add some benchmarks for generic registration #17613

Merged
merged 6 commits into from
May 11, 2022
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions sdks/go/pkg/beam/register/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (
"reflect"
"testing"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
Expand Down Expand Up @@ -672,3 +675,133 @@ func (fn *PartialCombiner2) AddInput(i int, c CustomType) int {
func (fn *PartialCombiner2) MergeAccumulators(i1 int, i2 int) int {
return i1 + i2
}

// Foo is a struct with a method for measuring method invocation
// overhead for StructuralDoFns.
type Foo struct {
A int
}

// ProcessElement is a method for measuring a baseline of structural dofn overhead.
func (f *Foo) ProcessElement(b CustomType) int {
return f.A + b.val
}

func MakeMultiEdge(f *graph.DoFn) graph.MultiEdge {
return graph.MultiEdge{
DoFn: f,
}
}

type callerCustomTypeГint struct {
Copy link
Contributor

@riteshghorse riteshghorse May 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type callerCustomTypeГint struct {
type callerCustomTypeГint struct {

is this character Г intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question - surprisingly yes it is. This is intentionally mirroring the registration functions generated by the code generator, the naming helps point in that direction even more

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, as a rule, one can but shouldn't use such characters in one's Go identifiers.
So I picked symbols that were hard to type, to emphasize that one shouldn't be using any of these directly.

The generic approach avoids this entirely, since anything generated is already inaccessible to the user, since we don't expose those types to them at all. This allows us to changes things fairly freely, as long as the user interface doesn't change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay.. interesting!

fn func(CustomType) int
}

func funcMakerCustomTypeГint(fn interface{}) reflectx.Func {
f := fn.(func(CustomType) int)
return &callerCustomTypeГint{fn: f}
}

func (c *callerCustomTypeГint) Name() string {
return reflectx.FunctionName(c.fn)
}

func (c *callerCustomTypeГint) Type() reflect.Type {
return reflect.TypeOf(c.fn)
}

func (c *callerCustomTypeГint) Call(args []interface{}) []interface{} {
out0 := c.fn(args[0].(CustomType))
return []interface{}{out0}
}

func (c *callerCustomTypeГint) Call1x1(arg0 interface{}) interface{} {
return c.fn(arg0.(CustomType))
}

func wrapMakerFoo(fn interface{}) map[string]reflectx.Func {
dfn := fn.(*Foo)
return map[string]reflectx.Func{
"ProcessElement": reflectx.MakeFunc(func(a0 CustomType) int { return dfn.ProcessElement(a0) }),
}
}

func GeneratedOptimizationCalls() {
runtime.RegisterType(reflect.TypeOf((*Foo)(nil)).Elem())
schema.RegisterType(reflect.TypeOf((*Foo)(nil)).Elem())
runtime.RegisterType(reflect.TypeOf((*CustomType)(nil)).Elem())
schema.RegisterType(reflect.TypeOf((*CustomType)(nil)).Elem())
reflectx.RegisterFunc(reflect.TypeOf((*func(CustomType) int)(nil)).Elem(), funcMakerCustomTypeГint)
reflectx.RegisterStructWrapper(reflect.TypeOf((*Foo)(nil)).Elem(), wrapMakerFoo)
}

// BenchmarkMethodCalls measures the overhead of invoking several different methods after performing
// different types of registration. The unoptimized calls don't perform any optimization. The
// GenericRegistration calls first register the DoFn being used with this package's generic registration
// functions. This is the preferred path for users. The GeneratedShims calls call various registration
// functions, mirroring the behavior of the shims generated by the code generator. This is not the
// recommended path for most users - if these are materially better than the generic benchmarks,
// this package requires further optimization.
//
// BenchmarkMethodCalls/MakeFunc_Unoptimized-16 11480814 88.35 ns/op
// BenchmarkMethodCalls/NewFn_Unoptimized-16 875199 1385 ns/op
// BenchmarkMethodCalls/EncodeMultiEdge_Unoptimized-16 1000000 1063 ns/op
//
// BenchmarkMethodCalls/MakeFunc_GenericRegistration-16 16266259 72.07 ns/op
// BenchmarkMethodCalls/NewFn_GenericRegistration-16 1000000 1108 ns/op
// BenchmarkMethodCalls/EncodeMultiEdge_GenericRegistration-16 1000000 1052 ns/op
//
// BenchmarkMethodCalls/MakeFunc_GeneratedShims#01-16 16400914 69.17 ns/op
// BenchmarkMethodCalls/NewFn_GeneratedShims#01-16 1000000 1099 ns/op
// BenchmarkMethodCalls/EncodeMultiEdge_GeneratedShims#01-16 1000000 1071 ns/op
func BenchmarkMethodCalls(b *testing.B) {
damccorm marked this conversation as resolved.
Show resolved Hide resolved
f := &Foo{A: 3}
g, err := graph.NewFn(&Foo{A: 5})
if err != nil {
panic(err)
}
gDoFn, err := graph.AsDoFn(g, 1)
if err != nil {
panic(err)
}
me := MakeMultiEdge(gDoFn)

var aFunc reflectx.Func
var aFn *graph.Fn
var aME interface{}

// We need to do this registration just to get it to not panic when encoding the multi-edge with no additional optimization.
// This is currently required of users anyways
runtime.RegisterType(reflect.TypeOf((*Foo)(nil)))
tests := []struct {
name string
fn func()
registration func()
}{
// No optimization performed at all
{"MakeFunc_Unoptimized", func() { aFunc = reflectx.MakeFunc(f.ProcessElement) }, func() { /*No op*/ }}, // Used in graph deserialization
{"NewFn_Unoptimized", func() { aFn, _ = graph.NewFn(f) }, func() { /*No op*/ }}, // Used in graph construction (less valuable)
{"EncodeMultiEdge_Unoptimized", func() { aME, _ = graphx.EncodeMultiEdge(&me) }, func() { /*No op*/ }}, // Used in graph serialization at execution time

// Perform some generic registration to optimize execution
{"MakeFunc_GenericRegistration", func() { aFunc = reflectx.MakeFunc(f.ProcessElement) }, func() { DoFn1x1[CustomType, int](f) }}, // Used in graph deserialization
{"NewFn_GenericRegistration", func() { aFn, _ = graph.NewFn(f) }, func() { DoFn1x1[CustomType, int](f) }}, // Used in graph construction (less valuable)
{"EncodeMultiEdge_GenericRegistration", func() { aME, _ = graphx.EncodeMultiEdge(&me) }, func() { DoFn1x1[CustomType, int](f) }}, // Used in graph serialization at execution time

// Perform some registration via copies of the code generator's shims
{"MakeFunc_GeneratedShims", func() { aFunc = reflectx.MakeFunc(f.ProcessElement) }, func() { GeneratedOptimizationCalls() }}, // Used in graph deserialization
{"NewFn_GeneratedShims", func() { aFn, _ = graph.NewFn(f) }, func() { GeneratedOptimizationCalls() }}, // Used in graph construction (less valuable)
{"EncodeMultiEdge_GeneratedShims", func() { aME, err = graphx.EncodeMultiEdge(&me) }, func() { GeneratedOptimizationCalls() }}, // Used in graph serialization at execution time
}
for _, test := range tests {
test.registration()
b.Run(test.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
test.fn()
}
})
}
b.Log(aFunc)
b.Log(aFn)
b.Log(aME)
}