Skip to content

Commit

Permalink
[BEAM-14484] Improve behavior surrounding primary roots in self-check…
Browse files Browse the repository at this point in the history
…pointing (apache#17716)
  • Loading branch information
jrmccluskey authored May 19, 2022
1 parent e9cfd8e commit f1980dc
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 5 deletions.
43 changes: 41 additions & 2 deletions sdks/go/pkg/beam/core/runtime/exec/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
)

// DataSource is a Root execution unit.
Expand Down Expand Up @@ -348,6 +349,27 @@ func (n *DataSource) makeEncodeElms() func([]*FullValue) ([][]byte, error) {
return encodeElms
}

func getBoundedRTrackerFromRoot(root *FullValue) (sdf.BoundableRTracker, float64, bool) {
tElm := root.Elm.(*FullValue).Elm2.(*FullValue).Elm
tracker, ok := tElm.(sdf.RTracker)
if !ok {
log.Warnf(context.Background(), "expected type sdf.RTracker, got type %T", tElm)
return nil, -1.0, false
}
boundTracker, ok := tracker.(sdf.BoundableRTracker)
if !ok {
log.Warn(context.Background(), "expected type sdf.BoundableRTracker; ensure that the RTracker implements IsBounded()")
// Assume an RTracker that does not implement IsBounded() will always be bounded, wrap so it can be used.
boundTracker = sdf.NewWrappedTracker(tracker)
}
size, ok := root.Elm2.(float64)
if !ok {
log.Warnf(context.Background(), "expected size to be type float64, got type %T", root.Elm2)
return nil, -1.0, false
}
return boundTracker, size, true
}

// Checkpoint attempts to split an SDF that has self-checkpointed (e.g. returned a
// ProcessContinuation) and needs to be resumed later. If the underlying DoFn is not
// splittable or has not returned a resuming continuation, the function returns an empty
Expand All @@ -366,13 +388,30 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) {

ow := su.GetOutputWatermark()

// Always split at fraction 0.0, should have no primaries left.
// Always split at fraction 0.0. All remaining work should be returned as a residual, as anything left in the primaries
// will not be rescheduled and could represent data loss. We expect nil primaries but will also ignore any restrictions
// that are bounded and of size 0 as they represent no remaining work.
ps, rs, err := su.Split(0.0)
if err != nil {
return SplitResult{}, -1 * time.Minute, false, err
}
if len(rs) == 0 {
return SplitResult{}, -1 * time.Minute, false, nil
}
if len(ps) != 0 {
return SplitResult{}, -1 * time.Minute, false, fmt.Errorf("failed to checkpoint: got %v primary roots, want none", ps)
// Expected structure of the root FullValue is KV<KV<Elm, KV<BoundedRTracker, watermarkEstimatorState>>, Size>
for _, root := range ps {
tracker, size, ok := getBoundedRTrackerFromRoot(root)
// If type assertion didn't return a BoundableRTracker, we move on.
if !ok {
log.Warnf(context.Background(), "got unexpected primary root contents %v, please check the output of the restriction tracker's TrySplit() function", root)
continue
}
if !tracker.IsBounded() || size > 0.00001 {
return SplitResult{}, -1 * time.Minute, false, fmt.Errorf("failed to checkpoint: got %#v primary roots, want none. Ensure that the restriction tracker returns nil in TrySplit() when the split fraction is 0.0", ps)
}
}

}

encodeElms := n.makeEncodeElms()
Expand Down
69 changes: 69 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -914,3 +915,71 @@ func validateSource(t *testing.T, out *CaptureNode, source *DataSource, expected
t.Errorf("DataSource => %#v, want %#v", extractValues(out.Elements...), extractValues(expected...))
}
}

func constructRootFullValue(rt, size interface{}) *FullValue {
return &FullValue{
Elm: &FullValue{
Elm2: &FullValue{
Elm: rt,
},
},
Elm2: size,
}
}

func TestGetRTrackerFromRoot(t *testing.T) {
var tests = []struct {
name string
inRt interface{}
inSize interface{}
valid bool
expSize float64
}{
{
"valid",
offsetrange.NewTracker(offsetrange.Restriction{Start: int64(0), End: int64(1)}),
1.0,
true,
1.0,
},
{
"not a bounded rtracker",
int64(42),
1.0,
false,
-1.0,
},
{
"non-float size",
offsetrange.NewTracker(offsetrange.Restriction{Start: int64(0), End: int64(1)}),
int64(1),
false,
-1.0,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
root := constructRootFullValue(test.inRt, test.inSize)
tracker, size, ok := getBoundedRTrackerFromRoot(root)

if test.valid {
if !ok {
t.Fatalf("failed to get tracker and size from root")
}
if tracker == nil {
t.Errorf("got nil tracker, expected %#v", test.inRt)
}
} else {
if ok {
t.Errorf("invalid root returned ok")
}
if tracker != nil {
t.Errorf("got tracker %#v, want nil", tracker)
}
}
if !floatEquals(test.expSize, size, 0.001) {
t.Errorf("got size %f, want %f", size, test.inSize)
}
})
}
}
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/core/runtime/graphx/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const (

URNRequiresSplittableDoFn = "beam:requirement:pardo:splittable_dofn:v1"
URNRequiresBundleFinalization = "beam:requirement:pardo:finalization:v1"
URNTruncate = "beam:transform:sdf_truncate_sized_restrictions_v1"

// Deprecated: Determine worker binary based on GoWorkerBinary Role instead.
URNArtifactGoWorker = "beam:artifact:type:go_worker_binary:v1"
Expand Down
4 changes: 4 additions & 0 deletions sdks/go/pkg/beam/core/sdf/sdf.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ type RTracker interface {
// the only split point is the end of the restriction, or the split failed for some recoverable
// reason), then this function returns nil as the residual.
//
// If the split fraction is 0 (e.g. a self-checkpointing split) TrySplit() should return either
// a nil primary or an RTracker that is both bounded and has size 0. This ensures that there is
// no data that is lost by not being rescheduled for execution later.
//
// If an error is returned, some catastrophic failure occurred and the entire bundle will fail.
TrySplit(fraction float64) (primary, residual interface{}, err error)

Expand Down
34 changes: 34 additions & 0 deletions sdks/go/pkg/beam/core/sdf/wrappedbounded.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sdf

// WrappedTracker wraps an implementation of an RTracker and adds an IsBounded() function
// that returns true in order to allow RTrackers to be handled as bounded BoundableRTrackers
// if necessary (like in self-checkpointing evaluation.)
type WrappedTracker struct {
RTracker
}

// IsBounded returns true, indicating that the underlying RTracker represents a bounded
// amount of work.
func (t *WrappedTracker) IsBounded() bool {
return true
}

// NewWrappedTracker is a constructor for an RTracker that wraps another RTracker into a BoundedRTracker.
func NewWrappedTracker(underlying RTracker) *WrappedTracker {
return &WrappedTracker{RTracker: underlying}
}
16 changes: 13 additions & 3 deletions sdks/go/test/integration/primitives/checkpointing.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
package primitives

import (
"context"
"reflect"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
)

Expand Down Expand Up @@ -53,7 +55,7 @@ func (fn *selfCheckpointingDoFn) RestrictionSize(_ []byte, rest offsetrange.Rest
// SplitRestriction modifies the offsetrange.Restriction's sized restriction function to produce a size-zero restriction
// at the end of execution.
func (fn *selfCheckpointingDoFn) SplitRestriction(_ []byte, rest offsetrange.Restriction) []offsetrange.Restriction {
size := int64(1)
size := int64(10)
s := rest.Start
var splits []offsetrange.Restriction
for e := s + size; e <= rest.End; s, e = e, e+size {
Expand All @@ -68,19 +70,27 @@ func (fn *selfCheckpointingDoFn) SplitRestriction(_ []byte, rest offsetrange.Res
func (fn *selfCheckpointingDoFn) ProcessElement(rt *sdf.LockRTracker, _ []byte, emit func(int64)) sdf.ProcessContinuation {
position := rt.GetRestriction().(offsetrange.Restriction).Start

counter := 0
for {
if rt.TryClaim(position) {
// Successful claim, emit the value and move on.
emit(position)
position++
return sdf.ResumeProcessingIn(1 * time.Second)
counter++
} else if rt.GetError() != nil || rt.IsDone() {
// Stop processing on error or completion
if err := rt.GetError(); err != nil {
log.Errorf(context.Background(), "error in restriction tracker, got %v", err)
}
return sdf.StopProcessing()
} else {
// Failed to claim but no error, resume later.
// Resume later.
return sdf.ResumeProcessingIn(5 * time.Second)
}

if counter >= 10 {
return sdf.ResumeProcessingIn(1 * time.Second)
}
}
}

Expand Down

0 comments on commit f1980dc

Please sign in to comment.