Skip to content

Commit

Permalink
[BEAM-13757] adds pane observation in DoFn (#16629)
Browse files Browse the repository at this point in the history
  • Loading branch information
riteshghorse authored Feb 3, 2022
1 parent 9794fb4 commit 32a0f09
Show file tree
Hide file tree
Showing 11 changed files with 336 additions and 112 deletions.
39 changes: 37 additions & 2 deletions sdks/go/pkg/beam/core/funcx/fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ package funcx

import (
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"reflect"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
Expand Down Expand Up @@ -76,6 +76,8 @@ const (
// Example:
// "func(string) func (*int) bool"
FnMultiMap FnParamKind = 0x200
// FnPane indicates a function input parameter that is a PaneInfo
FnPane FnParamKind = 0x400
)

func (k FnParamKind) String() string {
Expand All @@ -100,6 +102,8 @@ func (k FnParamKind) String() string {
return "RTracker"
case FnMultiMap:
return "MultiMap"
case FnPane:
return "Pane"
default:
return fmt.Sprintf("%v", int(k))
}
Expand Down Expand Up @@ -243,6 +247,16 @@ func (u *Fn) Window() (pos int, exists bool) {
return -1, false
}

// Pane returns (index, true) iff the function expects a PaneInfo.
func (u *Fn) Pane() (pos int, exists bool) {
for i, p := range u.Param {
if p.Kind == FnPane {
return i, true
}
}
return -1, false
}

// RTracker returns (index, true) iff the function expects an sdf.RTracker.
func (u *Fn) RTracker() (pos int, exists bool) {
for i, p := range u.Param {
Expand Down Expand Up @@ -329,6 +343,8 @@ func New(fn reflectx.Func) (*Fn, error) {
kind = FnReIter
case IsMultiMap(t):
kind = FnMultiMap
case t == typex.PaneInfoType:
kind = FnPane
default:
return nil, errors.Errorf("bad parameter type for %s: %v", fn.Name(), t)
}
Expand Down Expand Up @@ -386,7 +402,7 @@ func SubReturns(list []ReturnParam, indices ...int) []ReturnParam {
}

// The order of present parameters and return values must be as follows:
// func(FnContext?, FnWindow?, FnEventTime?, FnType?, FnRTracker?, (FnValue, SideInput*)?, FnEmit*) (RetEventTime?, RetOutput?, RetError?)
// func(FnContext?, FnPane?, FnWindow?, FnEventTime?, FnType?, FnRTracker?, (FnValue, SideInput*)?, FnEmit*) (RetEventTime?, RetOutput?, RetError?)
// where ? indicates 0 or 1, and * indicates any number.
// and a SideInput is one of FnValue or FnIter or FnReIter
// Note: Fns with inputs must have at least one FnValue as the main input.
Expand All @@ -411,6 +427,7 @@ func validateOrder(u *Fn) error {

var (
errContextParam = errors.New("may only have a single context.Context parameter and it must be the first parameter")
errPaneParamPrecedence = errors.New("may only have a single PaneInfo parameter and it must precede the WindowParam, EventTime and main input parameter")
errWindowParamPrecedence = errors.New("may only have a single Window parameter and it must precede the EventTime and main input parameter")
errEventTimeParamPrecedence = errors.New("may only have a single beam.EventTime parameter and it must precede the main input parameter")
errReflectTypePrecedence = errors.New("may only have a single reflect.Type parameter and it must precede the main input parameter")
Expand All @@ -423,6 +440,7 @@ type paramState int
const (
psStart paramState = iota
psContext
psPane
psWindow
psEventTime
psType
Expand All @@ -437,6 +455,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
switch transition {
case FnContext:
return psContext, nil
case FnPane:
return psPane, nil
case FnWindow:
return psWindow, nil
case FnEventTime:
Expand All @@ -447,6 +467,19 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
return psRTracker, nil
}
case psContext:
switch transition {
case FnPane:
return psPane, nil
case FnWindow:
return psWindow, nil
case FnEventTime:
return psEventTime, nil
case FnType:
return psType, nil
case FnRTracker:
return psRTracker, nil
}
case psPane:
switch transition {
case FnWindow:
return psWindow, nil
Expand Down Expand Up @@ -495,6 +528,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
switch transition {
case FnContext:
return -1, errContextParam
case FnPane:
return -1, errPaneParamPrecedence
case FnWindow:
return -1, errWindowParamPrecedence
case FnEventTime:
Expand Down
104 changes: 100 additions & 4 deletions sdks/go/pkg/beam/core/funcx/fn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ func TestNew(t *testing.T) {
Fn: func(int, func(string) func(*int) bool) {},
Param: []FnParamKind{FnValue, FnMultiMap},
},
{
Name: "good7",
Fn: func(typex.PaneInfo, typex.Window, typex.EventTime, reflect.Type, []byte) {},
Param: []FnParamKind{FnPane, FnWindow, FnEventTime, FnType, FnValue},
},
{
Name: "good-method",
Fn: foo{1}.Do,
Expand Down Expand Up @@ -122,6 +127,11 @@ func TestNew(t *testing.T) {
Fn: func(context.Context, context.Context, int) {},
Err: errContextParam,
},
{
Name: "errPaneParam: after Window",
Fn: func(typex.Window, typex.PaneInfo, int) {},
Err: errPaneParamPrecedence,
},
{
Name: "errWindowParamPrecedence: after EventTime",
Fn: func(typex.EventTime, typex.Window, int) {
Expand Down Expand Up @@ -273,8 +283,7 @@ func TestEmits(t *testing.T) {
params[i].Kind = kind
params[i].T = nil
}
fn := new(Fn)
fn.Param = params
fn := &Fn{Param: params}

// Validate we get expected results for Emits function.
pos, num, exists := fn.Emits()
Expand All @@ -291,6 +300,94 @@ func TestEmits(t *testing.T) {
}
}

func TestPane(t *testing.T) {
tests := []struct {
Name string
Params []FnParamKind
Pos int
Exists bool
}{
{
Name: "pane input",
Params: []FnParamKind{FnContext, FnPane},
Pos: 1,
Exists: true,
},
{
Name: "no pane input",
Params: []FnParamKind{FnContext, FnEventTime},
Pos: -1,
Exists: false,
},
}

for _, test := range tests {
test := test
t.Run(test.Name, func(t *testing.T) {
// Create a Fn with a filled params list.
params := make([]FnParam, len(test.Params))
for i, kind := range test.Params {
params[i].Kind = kind
params[i].T = nil
}
fn := &Fn{Param: params}

// Validate we get expected results for pane function.
pos, exists := fn.Pane()
if exists != test.Exists {
t.Errorf("Pane(%v) - exists: got %v, want %v", params, exists, test.Exists)
}
if pos != test.Pos {
t.Errorf("Pane(%v) - pos: got %v, want %v", params, pos, test.Pos)
}
})
}
}

func TestWindow(t *testing.T) {
tests := []struct {
Name string
Params []FnParamKind
Pos int
Exists bool
}{
{
Name: "window input",
Params: []FnParamKind{FnContext, FnWindow},
Pos: 1,
Exists: true,
},
{
Name: "no window input",
Params: []FnParamKind{FnContext, FnEventTime},
Pos: -1,
Exists: false,
},
}

for _, test := range tests {
test := test
t.Run(test.Name, func(t *testing.T) {
// Create a Fn with a filled params list.
params := make([]FnParam, len(test.Params))
for i, kind := range test.Params {
params[i].Kind = kind
params[i].T = nil
}
fn := &Fn{Param: params}

// Validate we get expected results for pane function.
pos, exists := fn.Window()
if exists != test.Exists {
t.Errorf("Window(%v) - exists: got %v, want %v", params, exists, test.Exists)
}
if pos != test.Pos {
t.Errorf("Window(%v) - pos: got %v, want %v", params, pos, test.Pos)
}
})
}
}

func TestInputs(t *testing.T) {
tests := []struct {
Name string
Expand Down Expand Up @@ -352,8 +449,7 @@ func TestInputs(t *testing.T) {
params[i].Kind = kind
params[i].T = nil
}
fn := new(Fn)
fn.Param = params
fn := &Fn{Param: params}

// Validate we get expected results for Inputs function.
pos, num, exists := fn.Inputs()
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/exec/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
)

// Decoder is a uniform custom encoder interface. It wraps various
// Decoder is a uniform custom decoder interface. It wraps various
// forms of reflectx.Funcs.
type Decoder interface {
// Decode decodes the []byte in to a value of the given type.
Expand Down
Loading

0 comments on commit 32a0f09

Please sign in to comment.