Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-14484] Improve behavior surrounding primary roots in self-checkpointing #17716

Merged
merged 16 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions sdks/go/pkg/beam/core/runtime/exec/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
)

// DataSource is a Root execution unit.
Expand Down Expand Up @@ -348,6 +349,18 @@ func (n *DataSource) makeEncodeElms() func([]*FullValue) ([][]byte, error) {
return encodeElms
}

func getBoundedRTrackerFromRoot(root *FullValue) (sdf.BoundableRTracker, float64, bool) {
tracker, ok := root.Elm.(*FullValue).Elm2.(*FullValue).Elm.(sdf.BoundableRTracker)
if !ok {
lostluck marked this conversation as resolved.
Show resolved Hide resolved
return nil, -1.0, false
}
size, ok := root.Elm2.(float64)
if !ok {
jrmccluskey marked this conversation as resolved.
Show resolved Hide resolved
jrmccluskey marked this conversation as resolved.
Show resolved Hide resolved
return nil, -1.0, false
}
return tracker, size, true
}

// Checkpoint attempts to split an SDF that has self-checkpointed (e.g. returned a
// ProcessContinuation) and needs to be resumed later. If the underlying DoFn is not
// splittable or has not returned a resuming continuation, the function returns an empty
Expand All @@ -366,13 +379,27 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) {

ow := su.GetOutputWatermark()

// Always split at fraction 0.0, should have no primaries left.
// Always split at fraction 0.0. All remaining work should be returned as a residual, as anything left in the primaries
// will not be rescheduled and could represent data loss. We expect nil primaries but will also ignore any restrictions
// that are bounded and of size 0 as they represent no remaining work.
ps, rs, err := su.Split(0.0)
if err != nil {
return SplitResult{}, -1 * time.Minute, false, err
}
if len(ps) != 0 {
return SplitResult{}, -1 * time.Minute, false, fmt.Errorf("failed to checkpoint: got %v primary roots, want none", ps)
// Expected structure of the root FullValue is KV<KV<Elm, KV<BoundedRTracker, watermarkEstimatorState>>, Size, (Timestamp?, Windows?)>
jrmccluskey marked this conversation as resolved.
Show resolved Hide resolved
for _, root := range ps {
tracker, size, ok := getBoundedRTrackerFromRoot(root)
// If type assertion didn't return a BoundableRTracker, we move on.
if !ok {
log.Warnf(context.Background(), "got unexpected primary root contents %v, please check the output of the restriction tracker's TrySplit() function", root)
continue
jrmccluskey marked this conversation as resolved.
Show resolved Hide resolved
}
if !tracker.IsBounded() || size > 0.00001 {
jrmccluskey marked this conversation as resolved.
Show resolved Hide resolved
return SplitResult{}, -1 * time.Minute, false, fmt.Errorf("failed to checkpoint: got %#v primary roots, want none. Ensure that the restriction tracker returns nil in TrySplit() when the split fraction is 0.0", ps)
jrmccluskey marked this conversation as resolved.
Show resolved Hide resolved
}
}

}

encodeElms := n.makeEncodeElms()
Expand Down
69 changes: 69 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -914,3 +915,71 @@ func validateSource(t *testing.T, out *CaptureNode, source *DataSource, expected
t.Errorf("DataSource => %#v, want %#v", extractValues(out.Elements...), extractValues(expected...))
}
}

func constructRootFullValue(rt, size interface{}) *FullValue {
return &FullValue{
Elm: &FullValue{
Elm2: &FullValue{
Elm: rt,
},
},
Elm2: size,
}
}

func TestGetRTrackerFromRoot(t *testing.T) {
var tests = []struct {
name string
inRt interface{}
inSize interface{}
valid bool
expSize float64
}{
{
"valid",
offsetrange.NewTracker(offsetrange.Restriction{Start: int64(0), End: int64(1)}),
1.0,
true,
1.0,
},
{
"not a bounded rtracker",
int64(42),
1.0,
false,
-1.0,
},
{
"non-float size",
offsetrange.NewTracker(offsetrange.Restriction{Start: int64(0), End: int64(1)}),
int64(1),
false,
-1.0,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
root := constructRootFullValue(test.inRt, test.inSize)
tracker, size, ok := getBoundedRTrackerFromRoot(root)

if test.valid {
if !ok {
t.Fatalf("failed to get tracker and size from root")
}
if tracker == nil {
t.Errorf("got nil tracker, expected %#v", test.inRt)
}
} else {
if ok {
t.Errorf("invalid root returned ok")
}
if tracker != nil {
t.Errorf("got tracker %#v, want nil", tracker)
}
}
if !floatEquals(test.expSize, size, 0.001) {
t.Errorf("got size %f, want %f", size, test.inSize)
}
})
}
}
4 changes: 4 additions & 0 deletions sdks/go/pkg/beam/core/sdf/sdf.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ type RTracker interface {
// the only split point is the end of the restriction, or the split failed for some recoverable
// reason), then this function returns nil as the residual.
//
// If the split fraction is 0 (e.g. a self-checkpointing split) TrySplit() should return either
// a nil primary or an RTracker that is both bounded and has size 0. This ensures that there is
// no data that is lost by not being rescheduled for execution later.
//
// If an error is returned, some catastrophic failure occurred and the entire bundle will fail.
TrySplit(fraction float64) (primary, residual interface{}, err error)

Expand Down