diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index e47fee00c962..8c39aeaed417 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -29,6 +29,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" ) // DataSource is a Root execution unit. @@ -348,6 +349,27 @@ func (n *DataSource) makeEncodeElms() func([]*FullValue) ([][]byte, error) { return encodeElms } +func getBoundedRTrackerFromRoot(root *FullValue) (sdf.BoundableRTracker, float64, bool) { + tElm := root.Elm.(*FullValue).Elm2.(*FullValue).Elm + tracker, ok := tElm.(sdf.RTracker) + if !ok { + log.Warnf(context.Background(), "expected type sdf.RTracker, got type %T", tElm) + return nil, -1.0, false + } + boundTracker, ok := tracker.(sdf.BoundableRTracker) + if !ok { + log.Warn(context.Background(), "expected type sdf.BoundableRTracker; ensure that the RTracker implements IsBounded()") + // Assume an RTracker that does not implement IsBounded() will always be bounded, wrap so it can be used. + boundTracker = sdf.NewWrappedTracker(tracker) + } + size, ok := root.Elm2.(float64) + if !ok { + log.Warnf(context.Background(), "expected size to be type float64, got type %T", root.Elm2) + return nil, -1.0, false + } + return boundTracker, size, true +} + // Checkpoint attempts to split an SDF that has self-checkpointed (e.g. returned a // ProcessContinuation) and needs to be resumed later. If the underlying DoFn is not // splittable or has not returned a resuming continuation, the function returns an empty @@ -366,13 +388,30 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) { ow := su.GetOutputWatermark() - // Always split at fraction 0.0, should have no primaries left. + // Always split at fraction 0.0. All remaining work should be returned as a residual, as anything left in the primaries + // will not be rescheduled and could represent data loss. We expect nil primaries but will also ignore any restrictions + // that are bounded and of size 0 as they represent no remaining work. ps, rs, err := su.Split(0.0) if err != nil { return SplitResult{}, -1 * time.Minute, false, err } + if len(rs) == 0 { + return SplitResult{}, -1 * time.Minute, false, nil + } if len(ps) != 0 { - return SplitResult{}, -1 * time.Minute, false, fmt.Errorf("failed to checkpoint: got %v primary roots, want none", ps) + // Expected structure of the root FullValue is KV>, Size> + for _, root := range ps { + tracker, size, ok := getBoundedRTrackerFromRoot(root) + // If type assertion didn't return a BoundableRTracker, we move on. + if !ok { + log.Warnf(context.Background(), "got unexpected primary root contents %v, please check the output of the restriction tracker's TrySplit() function", root) + continue + } + if !tracker.IsBounded() || size > 0.00001 { + return SplitResult{}, -1 * time.Minute, false, fmt.Errorf("failed to checkpoint: got %#v primary roots, want none. Ensure that the restriction tracker returns nil in TrySplit() when the split fraction is 0.0", ps) + } + } + } encodeElms := n.makeEncodeElms() diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go index bf367ae1cb43..365cf52062ff 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go @@ -28,6 +28,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -914,3 +915,71 @@ func validateSource(t *testing.T, out *CaptureNode, source *DataSource, expected t.Errorf("DataSource => %#v, want %#v", extractValues(out.Elements...), extractValues(expected...)) } } + +func constructRootFullValue(rt, size interface{}) *FullValue { + return &FullValue{ + Elm: &FullValue{ + Elm2: &FullValue{ + Elm: rt, + }, + }, + Elm2: size, + } +} + +func TestGetRTrackerFromRoot(t *testing.T) { + var tests = []struct { + name string + inRt interface{} + inSize interface{} + valid bool + expSize float64 + }{ + { + "valid", + offsetrange.NewTracker(offsetrange.Restriction{Start: int64(0), End: int64(1)}), + 1.0, + true, + 1.0, + }, + { + "not a bounded rtracker", + int64(42), + 1.0, + false, + -1.0, + }, + { + "non-float size", + offsetrange.NewTracker(offsetrange.Restriction{Start: int64(0), End: int64(1)}), + int64(1), + false, + -1.0, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + root := constructRootFullValue(test.inRt, test.inSize) + tracker, size, ok := getBoundedRTrackerFromRoot(root) + + if test.valid { + if !ok { + t.Fatalf("failed to get tracker and size from root") + } + if tracker == nil { + t.Errorf("got nil tracker, expected %#v", test.inRt) + } + } else { + if ok { + t.Errorf("invalid root returned ok") + } + if tracker != nil { + t.Errorf("got tracker %#v, want nil", tracker) + } + } + if !floatEquals(test.expSize, size, 0.001) { + t.Errorf("got size %f, want %f", size, test.inSize) + } + }) + } +} diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go index d5c7a31b3e4c..7b777771a8c3 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go @@ -68,6 +68,7 @@ const ( URNRequiresSplittableDoFn = "beam:requirement:pardo:splittable_dofn:v1" URNRequiresBundleFinalization = "beam:requirement:pardo:finalization:v1" + URNTruncate = "beam:transform:sdf_truncate_sized_restrictions_v1" // Deprecated: Determine worker binary based on GoWorkerBinary Role instead. URNArtifactGoWorker = "beam:artifact:type:go_worker_binary:v1" diff --git a/sdks/go/pkg/beam/core/sdf/sdf.go b/sdks/go/pkg/beam/core/sdf/sdf.go index 2876d5985a23..9812d300e5b1 100644 --- a/sdks/go/pkg/beam/core/sdf/sdf.go +++ b/sdks/go/pkg/beam/core/sdf/sdf.go @@ -72,6 +72,10 @@ type RTracker interface { // the only split point is the end of the restriction, or the split failed for some recoverable // reason), then this function returns nil as the residual. // + // If the split fraction is 0 (e.g. a self-checkpointing split) TrySplit() should return either + // a nil primary or an RTracker that is both bounded and has size 0. This ensures that there is + // no data that is lost by not being rescheduled for execution later. + // // If an error is returned, some catastrophic failure occurred and the entire bundle will fail. TrySplit(fraction float64) (primary, residual interface{}, err error) diff --git a/sdks/go/pkg/beam/core/sdf/wrappedbounded.go b/sdks/go/pkg/beam/core/sdf/wrappedbounded.go new file mode 100644 index 000000000000..36f44817a83c --- /dev/null +++ b/sdks/go/pkg/beam/core/sdf/wrappedbounded.go @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sdf + +// WrappedTracker wraps an implementation of an RTracker and adds an IsBounded() function +// that returns true in order to allow RTrackers to be handled as bounded BoundableRTrackers +// if necessary (like in self-checkpointing evaluation.) +type WrappedTracker struct { + RTracker +} + +// IsBounded returns true, indicating that the underlying RTracker represents a bounded +// amount of work. +func (t *WrappedTracker) IsBounded() bool { + return true +} + +// NewWrappedTracker is a constructor for an RTracker that wraps another RTracker into a BoundedRTracker. +func NewWrappedTracker(underlying RTracker) *WrappedTracker { + return &WrappedTracker{RTracker: underlying} +} diff --git a/sdks/go/test/integration/primitives/checkpointing.go b/sdks/go/test/integration/primitives/checkpointing.go index f26b9f392acd..5b1079ad4ef5 100644 --- a/sdks/go/test/integration/primitives/checkpointing.go +++ b/sdks/go/test/integration/primitives/checkpointing.go @@ -16,12 +16,14 @@ package primitives import ( + "context" "reflect" "time" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" ) @@ -53,7 +55,7 @@ func (fn *selfCheckpointingDoFn) RestrictionSize(_ []byte, rest offsetrange.Rest // SplitRestriction modifies the offsetrange.Restriction's sized restriction function to produce a size-zero restriction // at the end of execution. func (fn *selfCheckpointingDoFn) SplitRestriction(_ []byte, rest offsetrange.Restriction) []offsetrange.Restriction { - size := int64(1) + size := int64(10) s := rest.Start var splits []offsetrange.Restriction for e := s + size; e <= rest.End; s, e = e, e+size { @@ -68,19 +70,27 @@ func (fn *selfCheckpointingDoFn) SplitRestriction(_ []byte, rest offsetrange.Res func (fn *selfCheckpointingDoFn) ProcessElement(rt *sdf.LockRTracker, _ []byte, emit func(int64)) sdf.ProcessContinuation { position := rt.GetRestriction().(offsetrange.Restriction).Start + counter := 0 for { if rt.TryClaim(position) { // Successful claim, emit the value and move on. emit(position) position++ - return sdf.ResumeProcessingIn(1 * time.Second) + counter++ } else if rt.GetError() != nil || rt.IsDone() { // Stop processing on error or completion + if err := rt.GetError(); err != nil { + log.Errorf(context.Background(), "error in restriction tracker, got %v", err) + } return sdf.StopProcessing() } else { - // Failed to claim but no error, resume later. + // Resume later. return sdf.ResumeProcessingIn(5 * time.Second) } + + if counter >= 10 { + return sdf.ResumeProcessingIn(1 * time.Second) + } } }