From e4a3bdd26d3b3bfc57e795559ae4d20e7c12f3cd Mon Sep 17 00:00:00 2001 From: Jack McCluskey <34928439+jrmccluskey@users.noreply.github.com> Date: Mon, 23 May 2022 18:24:01 -0400 Subject: [PATCH] [BEAM-14484] Step back unexpected primary handling to warnings (#17724) --- .../pkg/beam/core/runtime/exec/datasource.go | 43 +------ .../beam/core/runtime/exec/datasource_test.go | 75 +---------- sdks/go/pkg/beam/core/runtime/exec/sdf.go | 27 ++++ .../go/pkg/beam/core/runtime/exec/sdf_test.go | 119 ++++++++++++++++++ sdks/go/pkg/beam/core/sdf/sdf.go | 12 +- 5 files changed, 163 insertions(+), 113 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index 8c39aeaed417..473c93596ba3 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -29,7 +29,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" - "github.com/apache/beam/sdks/v2/go/pkg/beam/log" ) // DataSource is a Root execution unit. @@ -349,27 +348,6 @@ func (n *DataSource) makeEncodeElms() func([]*FullValue) ([][]byte, error) { return encodeElms } -func getBoundedRTrackerFromRoot(root *FullValue) (sdf.BoundableRTracker, float64, bool) { - tElm := root.Elm.(*FullValue).Elm2.(*FullValue).Elm - tracker, ok := tElm.(sdf.RTracker) - if !ok { - log.Warnf(context.Background(), "expected type sdf.RTracker, got type %T", tElm) - return nil, -1.0, false - } - boundTracker, ok := tracker.(sdf.BoundableRTracker) - if !ok { - log.Warn(context.Background(), "expected type sdf.BoundableRTracker; ensure that the RTracker implements IsBounded()") - // Assume an RTracker that does not implement IsBounded() will always be bounded, wrap so it can be used. - boundTracker = sdf.NewWrappedTracker(tracker) - } - size, ok := root.Elm2.(float64) - if !ok { - log.Warnf(context.Background(), "expected size to be type float64, got type %T", root.Elm2) - return nil, -1.0, false - } - return boundTracker, size, true -} - // Checkpoint attempts to split an SDF that has self-checkpointed (e.g. returned a // ProcessContinuation) and needs to be resumed later. If the underlying DoFn is not // splittable or has not returned a resuming continuation, the function returns an empty @@ -388,31 +366,14 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) { ow := su.GetOutputWatermark() - // Always split at fraction 0.0. All remaining work should be returned as a residual, as anything left in the primaries - // will not be rescheduled and could represent data loss. We expect nil primaries but will also ignore any restrictions - // that are bounded and of size 0 as they represent no remaining work. - ps, rs, err := su.Split(0.0) + // Checkpointing is functionally a split at fraction 0.0 + rs, err := su.Checkpoint() if err != nil { return SplitResult{}, -1 * time.Minute, false, err } if len(rs) == 0 { return SplitResult{}, -1 * time.Minute, false, nil } - if len(ps) != 0 { - // Expected structure of the root FullValue is KV>, Size> - for _, root := range ps { - tracker, size, ok := getBoundedRTrackerFromRoot(root) - // If type assertion didn't return a BoundableRTracker, we move on. - if !ok { - log.Warnf(context.Background(), "got unexpected primary root contents %v, please check the output of the restriction tracker's TrySplit() function", root) - continue - } - if !tracker.IsBounded() || size > 0.00001 { - return SplitResult{}, -1 * time.Minute, false, fmt.Errorf("failed to checkpoint: got %#v primary roots, want none. Ensure that the restriction tracker returns nil in TrySplit() when the split fraction is 0.0", ps) - } - } - - } encodeElms := n.makeEncodeElms() diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go index 365cf52062ff..848cf279970d 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go @@ -28,7 +28,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" - "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -612,6 +611,12 @@ func (n *TestSplittableUnit) Split(f float64) ([]*FullValue, []*FullValue, error return []*FullValue{{Elm: n.elm}}, []*FullValue{{Elm: n.elm}}, nil } +// Checkpoint routes through the Split() function to satisfy the interface. +func (n *TestSplittableUnit) Checkpoint() ([]*FullValue, error) { + _, r, err := n.Split(0.0) + return r, err +} + // GetProgress always returns 0, to keep tests consistent. func (n *TestSplittableUnit) GetProgress() float64 { return 0 @@ -915,71 +920,3 @@ func validateSource(t *testing.T, out *CaptureNode, source *DataSource, expected t.Errorf("DataSource => %#v, want %#v", extractValues(out.Elements...), extractValues(expected...)) } } - -func constructRootFullValue(rt, size interface{}) *FullValue { - return &FullValue{ - Elm: &FullValue{ - Elm2: &FullValue{ - Elm: rt, - }, - }, - Elm2: size, - } -} - -func TestGetRTrackerFromRoot(t *testing.T) { - var tests = []struct { - name string - inRt interface{} - inSize interface{} - valid bool - expSize float64 - }{ - { - "valid", - offsetrange.NewTracker(offsetrange.Restriction{Start: int64(0), End: int64(1)}), - 1.0, - true, - 1.0, - }, - { - "not a bounded rtracker", - int64(42), - 1.0, - false, - -1.0, - }, - { - "non-float size", - offsetrange.NewTracker(offsetrange.Restriction{Start: int64(0), End: int64(1)}), - int64(1), - false, - -1.0, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - root := constructRootFullValue(test.inRt, test.inSize) - tracker, size, ok := getBoundedRTrackerFromRoot(root) - - if test.valid { - if !ok { - t.Fatalf("failed to get tracker and size from root") - } - if tracker == nil { - t.Errorf("got nil tracker, expected %#v", test.inRt) - } - } else { - if ok { - t.Errorf("invalid root returned ok") - } - if tracker != nil { - t.Errorf("got tracker %#v, want nil", tracker) - } - } - if !floatEquals(test.expSize, size, 0.001) { - t.Errorf("got size %f, want %f", size, test.inSize) - } - }) - } -} diff --git a/sdks/go/pkg/beam/core/runtime/exec/sdf.go b/sdks/go/pkg/beam/core/runtime/exec/sdf.go index 8ca57625573f..86040a9f89ff 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/sdf.go +++ b/sdks/go/pkg/beam/core/runtime/exec/sdf.go @@ -578,6 +578,12 @@ type SplittableUnit interface { // fully represented in just one. Split(fraction float64) (primaries, residuals []*FullValue, err error) + // Checkpoint performs a split at fraction 0.0 of an element that has stopped + // processing and has work that needs to be resumed later. This function will + // check that the produced primary restriction from the split represents + // completed work to avoid data loss and will error if work remains. + Checkpoint() (residuals []*FullValue, err error) + // GetProgress returns the fraction of progress the current element has // made in processing. (ex. 0.0 means no progress, and 1.0 means fully // processed.) @@ -647,6 +653,27 @@ func (n *ProcessSizedElementsAndRestrictions) Split(f float64) ([]*FullValue, [] return p, r, nil } +// Checkpoint splits the remaining work in a restriction into residuals to be resumed +// later by the runner. This is done iff the underlying Splittable DoFn returns a resuming +// ProcessContinuation. If the split occurs and the primary restriction is marked as done +// my the RTracker, the Checkpoint fails as this is a potential data-loss case. +func (n *ProcessSizedElementsAndRestrictions) Checkpoint() ([]*FullValue, error) { + addContext := func(err error) error { + return errors.WithContext(err, "Attempting checkpoint in ProcessSizedElementsAndRestrictions") + } + _, r, err := n.Split(0.0) + + if err != nil { + return nil, addContext(err) + } + + if !n.rt.IsDone() { + return nil, addContext(errors.Errorf("Primary restriction %#v is not done. Check that the RTracker's TrySplit() at fraction 0.0 returns a completed primary restriction", n.rt)) + } + + return r, nil +} + // singleWindowSplit is intended for splitting elements in non window-observing // DoFns (or single-window elements in window-observing DoFns, since the // behavior is identical). A single restriction split will occur and all windows diff --git a/sdks/go/pkg/beam/core/runtime/exec/sdf_test.go b/sdks/go/pkg/beam/core/runtime/exec/sdf_test.go index 8955d9707079..85cccea270ef 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/sdf_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/sdf_test.go @@ -689,6 +689,7 @@ func TestAsSplittableUnit(t *testing.T) { if err != nil { t.Fatalf("invalid function: %v", err) } + multiWindows := []typex.Window{ window.IntervalWindow{Start: 10, End: 20}, window.IntervalWindow{Start: 11, End: 21}, @@ -1197,6 +1198,103 @@ func TestAsSplittableUnit(t *testing.T) { } }) + t.Run("Checkpoint", func(t *testing.T) { + var tests = []struct { + name string + fn *graph.DoFn + in FullValue + finishPrimary bool + expErr bool + wantResiduals []*FullValue + }{ + { + name: "base case", + fn: dfn, + in: FullValue{ + Elm: &FullValue{ + Elm: 1, + Elm2: &FullValue{ + Elm: &VetRestriction{ID: "Sdf"}, + Elm2: false, + }, + }, + Elm2: 1.0, + Timestamp: testTimestamp, + Windows: testWindows, + }, + finishPrimary: true, + expErr: false, + wantResiduals: []*FullValue{{ + Elm: &FullValue{ + Elm: 1, + Elm2: &FullValue{ + Elm: &VetRestriction{ID: "Sdf.2", RestSize: true, Val: 1}, + Elm2: false, + }, + }, + Elm2: 1.0, + Timestamp: testTimestamp, + Windows: testWindows, + }}, + }, + { + name: "unfinished primary", + fn: dfn, + in: FullValue{ + Elm: &FullValue{ + Elm: 1, + Elm2: &FullValue{ + Elm: &VetRestriction{ID: "Sdf"}, + Elm2: false, + }, + }, + Elm2: 1.0, + Timestamp: testTimestamp, + Windows: testWindows, + }, + finishPrimary: false, + expErr: true, + wantResiduals: []*FullValue{}, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // Setup, create transforms, inputs, and desired outputs. + n := &ParDo{UID: 1, Fn: test.fn, Out: []Node{}} + node := &ProcessSizedElementsAndRestrictions{PDo: n} + node.rt = &SplittableUnitCheckpointingRTracker{ + VetRTracker: VetRTracker{Rest: test.in.Elm.(*FullValue).Elm2.(*FullValue).Elm.(*VetRestriction)}, + primaryDone: test.finishPrimary, + isDone: false, + } + node.elm = &test.in + node.numW = len(test.in.Windows) + node.currW = 0 + // Call from SplittableUnit and check results. + su := SplittableUnit(node) + if err := node.Up(context.Background()); err != nil { + t.Fatalf("ProcessSizedElementsAndRestrictions.Up() failed: %v", err) + } + gotResiduals, err := su.Checkpoint() + if test.expErr { + if err == nil { + t.Errorf("SplittableUnit.Checkpoint() succeeded when it should have failed") + } + if len(gotResiduals) != 0 { + t.Errorf("SplittableUnit.Checkpoint() got residuals %v, want none", gotResiduals) + } + } else { + if err != nil { + t.Fatalf("SplittableUnit.Checkpoint() returned error, got %v", err) + } + if diff := cmp.Diff(gotResiduals, test.wantResiduals); diff != "" { + t.Errorf("SplittableUnit.Checkpoint() has incorrect residual (-got, +want)\n%v", diff) + } + } + }) + } + }) + t.Run("WatermarkEstimation", func(t *testing.T) { tests := []struct { name string @@ -1478,3 +1576,24 @@ func (rt *SplittableUnitRTracker) TrySplit(_ float64) (interface{}, interface{}, func (rt *SplittableUnitRTracker) GetProgress() (float64, float64) { return rt.Done, rt.Remaining } + +// SplittableUnitCheckpointingRTracker adds support to the VetRTracker to enable +// happy path testing of checkpointing. +type SplittableUnitCheckpointingRTracker struct { + VetRTracker + primaryDone bool + isDone bool +} + +func (rt *SplittableUnitCheckpointingRTracker) IsDone() bool { + return rt.isDone +} + +func (rt *SplittableUnitCheckpointingRTracker) TrySplit(_ float64) (interface{}, interface{}, error) { + rest1 := rt.Rest.copy() + rest1.ID += ".1" + rest2 := rt.Rest.copy() + rest2.ID += ".2" + rt.isDone = rt.primaryDone + return &rest1, &rest2, nil +} diff --git a/sdks/go/pkg/beam/core/sdf/sdf.go b/sdks/go/pkg/beam/core/sdf/sdf.go index 9812d300e5b1..d78af2714fb9 100644 --- a/sdks/go/pkg/beam/core/sdf/sdf.go +++ b/sdks/go/pkg/beam/core/sdf/sdf.go @@ -72,9 +72,12 @@ type RTracker interface { // the only split point is the end of the restriction, or the split failed for some recoverable // reason), then this function returns nil as the residual. // - // If the split fraction is 0 (e.g. a self-checkpointing split) TrySplit() should return either - // a nil primary or an RTracker that is both bounded and has size 0. This ensures that there is - // no data that is lost by not being rescheduled for execution later. + // If the split fraction is 0 (e.g. a self-checkpointing split) TrySplit() should return + // a primary restriction that represents no remaining work, and the residual should + // contain all remaining work. The RTracker should be marked as done + // (and return true when IsDone() is called) after that split. + // This will ensure that there is no data loss, which would result in + // the pipeline failing during the checkpoint. // // If an error is returned, some catastrophic failure occurred and the entire bundle will fail. TrySplit(fraction float64) (primary, residual interface{}, err error) @@ -88,6 +91,9 @@ type RTracker interface { // claimed. This method is called by the SDK Harness to validate that a splittable DoFn has // correctly processed all work in a restriction before finishing. If this method still returns // false after processing, then GetError is expected to return a non-nil error. + // + // When called immediately following a checkpointing TrySplit() call (with value 0.0), this + // should return true. IsDone() bool // GetRestriction returns the restriction this tracker is tracking, or nil if the restriction