Skip to content

Commit

Permalink
[BEAM-14484] Step back unexpected primary handling to warnings (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
jrmccluskey authored May 23, 2022
1 parent 3f05429 commit e4a3bdd
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 113 deletions.
43 changes: 2 additions & 41 deletions sdks/go/pkg/beam/core/runtime/exec/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
)

// DataSource is a Root execution unit.
Expand Down Expand Up @@ -349,27 +348,6 @@ func (n *DataSource) makeEncodeElms() func([]*FullValue) ([][]byte, error) {
return encodeElms
}

func getBoundedRTrackerFromRoot(root *FullValue) (sdf.BoundableRTracker, float64, bool) {
tElm := root.Elm.(*FullValue).Elm2.(*FullValue).Elm
tracker, ok := tElm.(sdf.RTracker)
if !ok {
log.Warnf(context.Background(), "expected type sdf.RTracker, got type %T", tElm)
return nil, -1.0, false
}
boundTracker, ok := tracker.(sdf.BoundableRTracker)
if !ok {
log.Warn(context.Background(), "expected type sdf.BoundableRTracker; ensure that the RTracker implements IsBounded()")
// Assume an RTracker that does not implement IsBounded() will always be bounded, wrap so it can be used.
boundTracker = sdf.NewWrappedTracker(tracker)
}
size, ok := root.Elm2.(float64)
if !ok {
log.Warnf(context.Background(), "expected size to be type float64, got type %T", root.Elm2)
return nil, -1.0, false
}
return boundTracker, size, true
}

// Checkpoint attempts to split an SDF that has self-checkpointed (e.g. returned a
// ProcessContinuation) and needs to be resumed later. If the underlying DoFn is not
// splittable or has not returned a resuming continuation, the function returns an empty
Expand All @@ -388,31 +366,14 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) {

ow := su.GetOutputWatermark()

// Always split at fraction 0.0. All remaining work should be returned as a residual, as anything left in the primaries
// will not be rescheduled and could represent data loss. We expect nil primaries but will also ignore any restrictions
// that are bounded and of size 0 as they represent no remaining work.
ps, rs, err := su.Split(0.0)
// Checkpointing is functionally a split at fraction 0.0
rs, err := su.Checkpoint()
if err != nil {
return SplitResult{}, -1 * time.Minute, false, err
}
if len(rs) == 0 {
return SplitResult{}, -1 * time.Minute, false, nil
}
if len(ps) != 0 {
// Expected structure of the root FullValue is KV<KV<Elm, KV<BoundedRTracker, watermarkEstimatorState>>, Size>
for _, root := range ps {
tracker, size, ok := getBoundedRTrackerFromRoot(root)
// If type assertion didn't return a BoundableRTracker, we move on.
if !ok {
log.Warnf(context.Background(), "got unexpected primary root contents %v, please check the output of the restriction tracker's TrySplit() function", root)
continue
}
if !tracker.IsBounded() || size > 0.00001 {
return SplitResult{}, -1 * time.Minute, false, fmt.Errorf("failed to checkpoint: got %#v primary roots, want none. Ensure that the restriction tracker returns nil in TrySplit() when the split fraction is 0.0", ps)
}
}

}

encodeElms := n.makeEncodeElms()

Expand Down
75 changes: 6 additions & 69 deletions sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -612,6 +611,12 @@ func (n *TestSplittableUnit) Split(f float64) ([]*FullValue, []*FullValue, error
return []*FullValue{{Elm: n.elm}}, []*FullValue{{Elm: n.elm}}, nil
}

// Checkpoint routes through the Split() function to satisfy the interface.
func (n *TestSplittableUnit) Checkpoint() ([]*FullValue, error) {
_, r, err := n.Split(0.0)
return r, err
}

// GetProgress always returns 0, to keep tests consistent.
func (n *TestSplittableUnit) GetProgress() float64 {
return 0
Expand Down Expand Up @@ -915,71 +920,3 @@ func validateSource(t *testing.T, out *CaptureNode, source *DataSource, expected
t.Errorf("DataSource => %#v, want %#v", extractValues(out.Elements...), extractValues(expected...))
}
}

func constructRootFullValue(rt, size interface{}) *FullValue {
return &FullValue{
Elm: &FullValue{
Elm2: &FullValue{
Elm: rt,
},
},
Elm2: size,
}
}

func TestGetRTrackerFromRoot(t *testing.T) {
var tests = []struct {
name string
inRt interface{}
inSize interface{}
valid bool
expSize float64
}{
{
"valid",
offsetrange.NewTracker(offsetrange.Restriction{Start: int64(0), End: int64(1)}),
1.0,
true,
1.0,
},
{
"not a bounded rtracker",
int64(42),
1.0,
false,
-1.0,
},
{
"non-float size",
offsetrange.NewTracker(offsetrange.Restriction{Start: int64(0), End: int64(1)}),
int64(1),
false,
-1.0,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
root := constructRootFullValue(test.inRt, test.inSize)
tracker, size, ok := getBoundedRTrackerFromRoot(root)

if test.valid {
if !ok {
t.Fatalf("failed to get tracker and size from root")
}
if tracker == nil {
t.Errorf("got nil tracker, expected %#v", test.inRt)
}
} else {
if ok {
t.Errorf("invalid root returned ok")
}
if tracker != nil {
t.Errorf("got tracker %#v, want nil", tracker)
}
}
if !floatEquals(test.expSize, size, 0.001) {
t.Errorf("got size %f, want %f", size, test.inSize)
}
})
}
}
27 changes: 27 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/sdf.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,12 @@ type SplittableUnit interface {
// fully represented in just one.
Split(fraction float64) (primaries, residuals []*FullValue, err error)

// Checkpoint performs a split at fraction 0.0 of an element that has stopped
// processing and has work that needs to be resumed later. This function will
// check that the produced primary restriction from the split represents
// completed work to avoid data loss and will error if work remains.
Checkpoint() (residuals []*FullValue, err error)

// GetProgress returns the fraction of progress the current element has
// made in processing. (ex. 0.0 means no progress, and 1.0 means fully
// processed.)
Expand Down Expand Up @@ -647,6 +653,27 @@ func (n *ProcessSizedElementsAndRestrictions) Split(f float64) ([]*FullValue, []
return p, r, nil
}

// Checkpoint splits the remaining work in a restriction into residuals to be resumed
// later by the runner. This is done iff the underlying Splittable DoFn returns a resuming
// ProcessContinuation. If the split occurs and the primary restriction is marked as done
// my the RTracker, the Checkpoint fails as this is a potential data-loss case.
func (n *ProcessSizedElementsAndRestrictions) Checkpoint() ([]*FullValue, error) {
addContext := func(err error) error {
return errors.WithContext(err, "Attempting checkpoint in ProcessSizedElementsAndRestrictions")
}
_, r, err := n.Split(0.0)

if err != nil {
return nil, addContext(err)
}

if !n.rt.IsDone() {
return nil, addContext(errors.Errorf("Primary restriction %#v is not done. Check that the RTracker's TrySplit() at fraction 0.0 returns a completed primary restriction", n.rt))
}

return r, nil
}

// singleWindowSplit is intended for splitting elements in non window-observing
// DoFns (or single-window elements in window-observing DoFns, since the
// behavior is identical). A single restriction split will occur and all windows
Expand Down
119 changes: 119 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/sdf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ func TestAsSplittableUnit(t *testing.T) {
if err != nil {
t.Fatalf("invalid function: %v", err)
}

multiWindows := []typex.Window{
window.IntervalWindow{Start: 10, End: 20},
window.IntervalWindow{Start: 11, End: 21},
Expand Down Expand Up @@ -1197,6 +1198,103 @@ func TestAsSplittableUnit(t *testing.T) {
}
})

t.Run("Checkpoint", func(t *testing.T) {
var tests = []struct {
name string
fn *graph.DoFn
in FullValue
finishPrimary bool
expErr bool
wantResiduals []*FullValue
}{
{
name: "base case",
fn: dfn,
in: FullValue{
Elm: &FullValue{
Elm: 1,
Elm2: &FullValue{
Elm: &VetRestriction{ID: "Sdf"},
Elm2: false,
},
},
Elm2: 1.0,
Timestamp: testTimestamp,
Windows: testWindows,
},
finishPrimary: true,
expErr: false,
wantResiduals: []*FullValue{{
Elm: &FullValue{
Elm: 1,
Elm2: &FullValue{
Elm: &VetRestriction{ID: "Sdf.2", RestSize: true, Val: 1},
Elm2: false,
},
},
Elm2: 1.0,
Timestamp: testTimestamp,
Windows: testWindows,
}},
},
{
name: "unfinished primary",
fn: dfn,
in: FullValue{
Elm: &FullValue{
Elm: 1,
Elm2: &FullValue{
Elm: &VetRestriction{ID: "Sdf"},
Elm2: false,
},
},
Elm2: 1.0,
Timestamp: testTimestamp,
Windows: testWindows,
},
finishPrimary: false,
expErr: true,
wantResiduals: []*FullValue{},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// Setup, create transforms, inputs, and desired outputs.
n := &ParDo{UID: 1, Fn: test.fn, Out: []Node{}}
node := &ProcessSizedElementsAndRestrictions{PDo: n}
node.rt = &SplittableUnitCheckpointingRTracker{
VetRTracker: VetRTracker{Rest: test.in.Elm.(*FullValue).Elm2.(*FullValue).Elm.(*VetRestriction)},
primaryDone: test.finishPrimary,
isDone: false,
}
node.elm = &test.in
node.numW = len(test.in.Windows)
node.currW = 0
// Call from SplittableUnit and check results.
su := SplittableUnit(node)
if err := node.Up(context.Background()); err != nil {
t.Fatalf("ProcessSizedElementsAndRestrictions.Up() failed: %v", err)
}
gotResiduals, err := su.Checkpoint()
if test.expErr {
if err == nil {
t.Errorf("SplittableUnit.Checkpoint() succeeded when it should have failed")
}
if len(gotResiduals) != 0 {
t.Errorf("SplittableUnit.Checkpoint() got residuals %v, want none", gotResiduals)
}
} else {
if err != nil {
t.Fatalf("SplittableUnit.Checkpoint() returned error, got %v", err)
}
if diff := cmp.Diff(gotResiduals, test.wantResiduals); diff != "" {
t.Errorf("SplittableUnit.Checkpoint() has incorrect residual (-got, +want)\n%v", diff)
}
}
})
}
})

t.Run("WatermarkEstimation", func(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -1478,3 +1576,24 @@ func (rt *SplittableUnitRTracker) TrySplit(_ float64) (interface{}, interface{},
func (rt *SplittableUnitRTracker) GetProgress() (float64, float64) {
return rt.Done, rt.Remaining
}

// SplittableUnitCheckpointingRTracker adds support to the VetRTracker to enable
// happy path testing of checkpointing.
type SplittableUnitCheckpointingRTracker struct {
VetRTracker
primaryDone bool
isDone bool
}

func (rt *SplittableUnitCheckpointingRTracker) IsDone() bool {
return rt.isDone
}

func (rt *SplittableUnitCheckpointingRTracker) TrySplit(_ float64) (interface{}, interface{}, error) {
rest1 := rt.Rest.copy()
rest1.ID += ".1"
rest2 := rt.Rest.copy()
rest2.ID += ".2"
rt.isDone = rt.primaryDone
return &rest1, &rest2, nil
}
12 changes: 9 additions & 3 deletions sdks/go/pkg/beam/core/sdf/sdf.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,12 @@ type RTracker interface {
// the only split point is the end of the restriction, or the split failed for some recoverable
// reason), then this function returns nil as the residual.
//
// If the split fraction is 0 (e.g. a self-checkpointing split) TrySplit() should return either
// a nil primary or an RTracker that is both bounded and has size 0. This ensures that there is
// no data that is lost by not being rescheduled for execution later.
// If the split fraction is 0 (e.g. a self-checkpointing split) TrySplit() should return
// a primary restriction that represents no remaining work, and the residual should
// contain all remaining work. The RTracker should be marked as done
// (and return true when IsDone() is called) after that split.
// This will ensure that there is no data loss, which would result in
// the pipeline failing during the checkpoint.
//
// If an error is returned, some catastrophic failure occurred and the entire bundle will fail.
TrySplit(fraction float64) (primary, residual interface{}, err error)
Expand All @@ -88,6 +91,9 @@ type RTracker interface {
// claimed. This method is called by the SDK Harness to validate that a splittable DoFn has
// correctly processed all work in a restriction before finishing. If this method still returns
// false after processing, then GetError is expected to return a non-nil error.
//
// When called immediately following a checkpointing TrySplit() call (with value 0.0), this
// should return true.
IsDone() bool

// GetRestriction returns the restriction this tracker is tracking, or nil if the restriction
Expand Down

0 comments on commit e4a3bdd

Please sign in to comment.