From a058f3832a31481a1c0b1fe0a245b144b134adec Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 19 May 2022 09:23:47 -0400 Subject: [PATCH 01/16] Beef up error message in DataSource --- sdks/go/pkg/beam/core/runtime/exec/datasource.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index e47fee00c96..67c45c83916 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -366,13 +366,15 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) { ow := su.GetOutputWatermark() - // Always split at fraction 0.0, should have no primaries left. + // Always split at fraction 0.0. All remaining work should be returned as a residual, as anything left in the primaries + // will not be rescheduled and could represent data loss. We expect nil primaries but will also ignore any restrictions + // that are bounded and of size 0 as they represent no remaining work. ps, rs, err := su.Split(0.0) if err != nil { return SplitResult{}, -1 * time.Minute, false, err } if len(ps) != 0 { - return SplitResult{}, -1 * time.Minute, false, fmt.Errorf("failed to checkpoint: got %v primary roots, want none", ps) + return SplitResult{}, -1 * time.Minute, false, fmt.Errorf("failed to checkpoint: got %#v primary roots, want none. Ensure that the restriction tracker returns nil in TrySplit() when the split fraction is 0.0", ps) } encodeElms := n.makeEncodeElms() From 81d0c0ce6a13f92be1807f9f858cc974628aaa44 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 19 May 2022 09:27:04 -0400 Subject: [PATCH 02/16] Update TrySplit() comment --- sdks/go/pkg/beam/core/sdf/sdf.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdks/go/pkg/beam/core/sdf/sdf.go b/sdks/go/pkg/beam/core/sdf/sdf.go index 2876d5985a2..9812d300e5b 100644 --- a/sdks/go/pkg/beam/core/sdf/sdf.go +++ b/sdks/go/pkg/beam/core/sdf/sdf.go @@ -72,6 +72,10 @@ type RTracker interface { // the only split point is the end of the restriction, or the split failed for some recoverable // reason), then this function returns nil as the residual. // + // If the split fraction is 0 (e.g. a self-checkpointing split) TrySplit() should return either + // a nil primary or an RTracker that is both bounded and has size 0. This ensures that there is + // no data that is lost by not being rescheduled for execution later. + // // If an error is returned, some catastrophic failure occurred and the entire bundle will fail. TrySplit(fraction float64) (primary, residual interface{}, err error) From b7569000a4e49f2a72ee7164bdab92ed99dd0e5d Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 19 May 2022 10:08:43 -0400 Subject: [PATCH 03/16] Add extra check in case primaries are returned --- .../pkg/beam/core/runtime/exec/datasource.go | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index 67c45c83916..30b435fbb7a 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -348,6 +348,18 @@ func (n *DataSource) makeEncodeElms() func([]*FullValue) ([][]byte, error) { return encodeElms } +func getRTrackerFromRoot(root *FullValue) (sdf.BoundableRTracker, float64, bool) { + tracker, ok := root.Elm.(*FullValue).Elm2.(*FullValue).Elm.(sdf.BoundableRTracker) + if !ok { + return nil, -1.0, false + } + size, ok := root.Elm2.(float64) + if !ok { + return nil, -1.0, false + } + return tracker, size, true +} + // Checkpoint attempts to split an SDF that has self-checkpointed (e.g. returned a // ProcessContinuation) and needs to be resumed later. If the underlying DoFn is not // splittable or has not returned a resuming continuation, the function returns an empty @@ -374,7 +386,17 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) { return SplitResult{}, -1 * time.Minute, false, err } if len(ps) != 0 { - return SplitResult{}, -1 * time.Minute, false, fmt.Errorf("failed to checkpoint: got %#v primary roots, want none. Ensure that the restriction tracker returns nil in TrySplit() when the split fraction is 0.0", ps) + for _, root := range ps { + tracker, size, ok := getRTrackerFromRoot(root) + // If type assertion didn't return a BoundableRTracker, we move on. + if !ok { + continue + } + if !tracker.IsBounded() || size > 0.00001 { + return SplitResult{}, -1 * time.Minute, false, fmt.Errorf("failed to checkpoint: got %#v primary roots, want none. Ensure that the restriction tracker returns nil in TrySplit() when the split fraction is 0.0", ps) + } + } + } encodeElms := n.makeEncodeElms() From dda46a39039315df84aaf1cf02bb4d930d744464 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 19 May 2022 10:31:37 -0400 Subject: [PATCH 04/16] Add unit test for helper --- .../beam/core/runtime/exec/datasource_test.go | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go index bf367ae1cb4..c9933f8d0e1 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go @@ -28,6 +28,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -914,3 +915,71 @@ func validateSource(t *testing.T, out *CaptureNode, source *DataSource, expected t.Errorf("DataSource => %#v, want %#v", extractValues(out.Elements...), extractValues(expected...)) } } + +func constructRootFullValue(rt, size interface{}) *FullValue { + return &FullValue{ + Elm: &FullValue{ + Elm2: &FullValue{ + Elm: rt, + }, + }, + Elm2: size, + } +} + +func TestGetRTrackerFromRoot(t *testing.T) { + var tests = []struct { + name string + inRt interface{} + inSize interface{} + valid bool + expSize float64 + }{ + { + "valid", + offsetrange.NewTracker(offsetrange.Restriction{Start: int64(0), End: int64(1)}), + 1.0, + true, + 1.0, + }, + { + "not a bounded rtracker", + int64(42), + 1.0, + false, + -1.0, + }, + { + "non-float size", + offsetrange.NewTracker(offsetrange.Restriction{Start: int64(0), End: int64(1)}), + int64(1), + false, + -1.0, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + root := constructRootFullValue(test.inRt, test.inSize) + tracker, size, ok := getRTrackerFromRoot(root) + + if test.valid { + if !ok { + t.Fatalf("failed to get tracker and size from root") + } + if tracker == nil { + t.Errorf("got nil tracker, expected %#v", test.inRt) + } + } else { + if ok { + t.Errorf("invalid root returned ok") + } + if tracker != nil { + t.Errorf("got tracker %#v, want nil", tracker) + } + } + if !floatEquals(test.expSize, size, 0.001) { + t.Errorf("got size %f, want %f", size, test.inSize) + } + }) + } +} From c9d9366f93792b586fb356448e92773cae8bacf2 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 19 May 2022 11:18:29 -0400 Subject: [PATCH 05/16] Rename helper to getBoundedRTrackerFromRoot() --- sdks/go/pkg/beam/core/runtime/exec/datasource.go | 4 ++-- sdks/go/pkg/beam/core/runtime/exec/datasource_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index 30b435fbb7a..8e50a2627ca 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -348,7 +348,7 @@ func (n *DataSource) makeEncodeElms() func([]*FullValue) ([][]byte, error) { return encodeElms } -func getRTrackerFromRoot(root *FullValue) (sdf.BoundableRTracker, float64, bool) { +func getBoundedRTrackerFromRoot(root *FullValue) (sdf.BoundableRTracker, float64, bool) { tracker, ok := root.Elm.(*FullValue).Elm2.(*FullValue).Elm.(sdf.BoundableRTracker) if !ok { return nil, -1.0, false @@ -387,7 +387,7 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) { } if len(ps) != 0 { for _, root := range ps { - tracker, size, ok := getRTrackerFromRoot(root) + tracker, size, ok := getBoundedRTrackerFromRoot(root) // If type assertion didn't return a BoundableRTracker, we move on. if !ok { continue diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go index c9933f8d0e1..365cf52062f 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go @@ -960,7 +960,7 @@ func TestGetRTrackerFromRoot(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { root := constructRootFullValue(test.inRt, test.inSize) - tracker, size, ok := getRTrackerFromRoot(root) + tracker, size, ok := getBoundedRTrackerFromRoot(root) if test.valid { if !ok { From a76e5426d13aa6d6ef6a3b3ef15d46a26f1ab9db Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 19 May 2022 11:30:46 -0400 Subject: [PATCH 06/16] Warn on a failed type assertion --- sdks/go/pkg/beam/core/runtime/exec/datasource.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index 8e50a2627ca..fa03196c3b6 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -29,6 +29,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" ) // DataSource is a Root execution unit. @@ -390,6 +391,7 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) { tracker, size, ok := getBoundedRTrackerFromRoot(root) // If type assertion didn't return a BoundableRTracker, we move on. if !ok { + log.Warn(context.Background(), "got unexpected primary root contents %v, please check the output of the restriction tracker's TrySplit() function", root) continue } if !tracker.IsBounded() || size > 0.00001 { From 8b733e3194c661fc609f74f4c5fa85e214ff40e0 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 19 May 2022 11:46:21 -0400 Subject: [PATCH 07/16] Comment expected structure of root FV --- sdks/go/pkg/beam/core/runtime/exec/datasource.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index fa03196c3b6..5c54753e02a 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -387,6 +387,7 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) { return SplitResult{}, -1 * time.Minute, false, err } if len(ps) != 0 { + // Expected structure of the root FullValue is KV>, Size, (Timestamp?, Windows?)> for _, root := range ps { tracker, size, ok := getBoundedRTrackerFromRoot(root) // If type assertion didn't return a BoundableRTracker, we move on. From 5d7c71835063a52bcdd90a58b9efc1f87e578237 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 19 May 2022 11:50:07 -0400 Subject: [PATCH 08/16] Move to Warnf() --- sdks/go/pkg/beam/core/runtime/exec/datasource.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index 5c54753e02a..1506550771f 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -392,7 +392,7 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) { tracker, size, ok := getBoundedRTrackerFromRoot(root) // If type assertion didn't return a BoundableRTracker, we move on. if !ok { - log.Warn(context.Background(), "got unexpected primary root contents %v, please check the output of the restriction tracker's TrySplit() function", root) + log.Warnf(context.Background(), "got unexpected primary root contents %v, please check the output of the restriction tracker's TrySplit() function", root) continue } if !tracker.IsBounded() || size > 0.00001 { From 79919688a392cd472fe229952bf0fcc0c94608aa Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 19 May 2022 15:08:59 -0400 Subject: [PATCH 09/16] Add truncate URN, improve test, add short path for no split on checkpoint --- sdks/go/pkg/beam/core/runtime/exec/datasource.go | 3 +++ sdks/go/pkg/beam/core/runtime/graphx/translate.go | 1 + sdks/go/test/integration/primitives/checkpointing.go | 11 ++++++++--- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index 1506550771f..5d0705e57fe 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -386,6 +386,9 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) { if err != nil { return SplitResult{}, -1 * time.Minute, false, err } + if len(rs) == 0 { + return SplitResult{}, -1 * time.Minute, false, nil + } if len(ps) != 0 { // Expected structure of the root FullValue is KV>, Size, (Timestamp?, Windows?)> for _, root := range ps { diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go index d5c7a31b3e4..1e91637d611 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go @@ -43,6 +43,7 @@ const ( URNReshuffle = "beam:transform:reshuffle:v1" URNCombinePerKey = "beam:transform:combine_per_key:v1" URNWindow = "beam:transform:window_into:v1" + URNTruncate = "beam:transform:sdf_truncate_sized_restrictions_v1" URNIterableSideInput = "beam:side_input:iterable:v1" URNMultimapSideInput = "beam:side_input:multimap:v1" diff --git a/sdks/go/test/integration/primitives/checkpointing.go b/sdks/go/test/integration/primitives/checkpointing.go index f26b9f392ac..330ca8aeb0b 100644 --- a/sdks/go/test/integration/primitives/checkpointing.go +++ b/sdks/go/test/integration/primitives/checkpointing.go @@ -53,7 +53,7 @@ func (fn *selfCheckpointingDoFn) RestrictionSize(_ []byte, rest offsetrange.Rest // SplitRestriction modifies the offsetrange.Restriction's sized restriction function to produce a size-zero restriction // at the end of execution. func (fn *selfCheckpointingDoFn) SplitRestriction(_ []byte, rest offsetrange.Restriction) []offsetrange.Restriction { - size := int64(1) + size := int64(10) s := rest.Start var splits []offsetrange.Restriction for e := s + size; e <= rest.End; s, e = e, e+size { @@ -68,19 +68,24 @@ func (fn *selfCheckpointingDoFn) SplitRestriction(_ []byte, rest offsetrange.Res func (fn *selfCheckpointingDoFn) ProcessElement(rt *sdf.LockRTracker, _ []byte, emit func(int64)) sdf.ProcessContinuation { position := rt.GetRestriction().(offsetrange.Restriction).Start + counter := 0 for { if rt.TryClaim(position) { // Successful claim, emit the value and move on. emit(position) position++ - return sdf.ResumeProcessingIn(1 * time.Second) + counter++ } else if rt.GetError() != nil || rt.IsDone() { // Stop processing on error or completion return sdf.StopProcessing() } else { - // Failed to claim but no error, resume later. + // Resume later. return sdf.ResumeProcessingIn(5 * time.Second) } + + if counter >= 10 { + return sdf.ResumeProcessingIn(1 * time.Second) + } } } From 0f91109f52f4b7adf4620f491f435197a57cf45c Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 19 May 2022 15:25:37 -0400 Subject: [PATCH 10/16] Add new warnings for type assertion failures --- sdks/go/pkg/beam/core/runtime/exec/datasource.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index 5d0705e57fe..73dd317d4e4 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -350,15 +350,23 @@ func (n *DataSource) makeEncodeElms() func([]*FullValue) ([][]byte, error) { } func getBoundedRTrackerFromRoot(root *FullValue) (sdf.BoundableRTracker, float64, bool) { - tracker, ok := root.Elm.(*FullValue).Elm2.(*FullValue).Elm.(sdf.BoundableRTracker) + tElm := root.Elm.(*FullValue).Elm2.(*FullValue).Elm + tracker, ok := tElm.(sdf.RTracker) if !ok { + log.Warnf(context.Background(), "expected type sdf.RTracker, got type %T", tElm) + return nil, -1.0, false + } + boundTracker, ok := tracker.(sdf.BoundableRTracker) + if !ok { + log.Warn(context.Background(), "expected type sdf.BoundableRTracker; ensure that the RTracker implements IsBounded()") return nil, -1.0, false } size, ok := root.Elm2.(float64) if !ok { + log.Warnf(context.Background(), "expected size to be type float64, got type %T", root.Elm2) return nil, -1.0, false } - return tracker, size, true + return boundTracker, size, true } // Checkpoint attempts to split an SDF that has self-checkpointed (e.g. returned a From b8af766e92e6031d3dc32dde26cbecd27313d7c2 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 19 May 2022 15:43:47 -0400 Subject: [PATCH 11/16] Create wrapper, wrap RTracker in new BoundedRTracker impl --- .../pkg/beam/core/runtime/exec/datasource.go | 4 +- .../wrappedbounded/wrappedbounded.go | 76 +++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 sdks/go/pkg/beam/io/rtrackers/wrappedbounded/wrappedbounded.go diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index 73dd317d4e4..9e1e1bc56bd 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -29,6 +29,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/wrappedbounded" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" ) @@ -359,7 +360,8 @@ func getBoundedRTrackerFromRoot(root *FullValue) (sdf.BoundableRTracker, float64 boundTracker, ok := tracker.(sdf.BoundableRTracker) if !ok { log.Warn(context.Background(), "expected type sdf.BoundableRTracker; ensure that the RTracker implements IsBounded()") - return nil, -1.0, false + // Assume an RTracker that does not implement IsBounded() will always be bounded, wrap so it can be used. + boundTracker = wrappedbounded.NewTracker(tracker) } size, ok := root.Elm2.(float64) if !ok { diff --git a/sdks/go/pkg/beam/io/rtrackers/wrappedbounded/wrappedbounded.go b/sdks/go/pkg/beam/io/rtrackers/wrappedbounded/wrappedbounded.go new file mode 100644 index 00000000000..545a7f11825 --- /dev/null +++ b/sdks/go/pkg/beam/io/rtrackers/wrappedbounded/wrappedbounded.go @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package offsetrange defines a restriction and restriction tracker for offset +// ranges. An offset range is just a range, with a start and end, that can +// begin at an offset, and is commonly used to represent byte ranges for files +// or indices for iterable containers. + +package wrappedbounded + +import "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" + +// Tracker wraps an implementation of an RTracker and adds an IsBounded() function +// that returns true in order to allow RTrackers to be handled as bounded BoundableRTrackers +// if necessary (like in self-checkpointing evaluation.) +type Tracker struct { + baseTracker sdf.RTracker +} + +// TryClaim attempts to claim a block of work from the underlying RTracker's restriction. +func (t *Tracker) TryClaim(pos interface{}) (ok bool) { + return t.baseTracker.TryClaim(pos) +} + +// GetError returns an error from the underlying RTracker if it has stopped executing. Returns nil +// if none has occurred. +func (t *Tracker) GetError() error { + return t.baseTracker.GetError() +} + +// TrySplit splits the underlying RTracker's restriction into a primary (work that is currently executing) +// and a residual (work that will be split off and resumed later.) +func (t *Tracker) TrySplit(fraction float64) (primary, residual interface{}, err error) { + return t.baseTracker.TrySplit(fraction) +} + +// GetProgress returns two abstract scalars representing the amount of work done and the remaining work +// left in the underlying RTracker. These are unitless values, only used to estimate work in relation to +// each other. +func (t *Tracker) GetProgress() (done float64, remaining float64) { + return t.baseTracker.GetProgress() +} + +// IsDone() returns a boolean indicating if the work represented by the underlying RTracker has +// been completed. +func (t *Tracker) IsDone() bool { + return t.baseTracker.IsDone() +} + +// GetRestriction returns the restriction maintained by the underlying RTracker. +func (t *Tracker) GetRestriction() interface{} { + return t.baseTracker.GetRestriction() +} + +// IsBounded returns true, indicating that the underlying RTracker represents a bounded +// amount of work. +func (t *Tracker) IsBounded() bool { + return true +} + +// NewTracker is a constructor for an RTracker that wraps another RTracker into a BoundedRTracker. +func NewTracker(underlying sdf.RTracker) *Tracker { + return &Tracker{baseTracker: underlying} +} From 93157ba11087a69dc699c112c0d355e9a7e670f0 Mon Sep 17 00:00:00 2001 From: Jack McCluskey <34928439+jrmccluskey@users.noreply.github.com> Date: Thu, 19 May 2022 16:56:37 -0400 Subject: [PATCH 12/16] Update sdks/go/pkg/beam/io/rtrackers/wrappedbounded/wrappedbounded.go Co-authored-by: Danny McCormick --- sdks/go/pkg/beam/io/rtrackers/wrappedbounded/wrappedbounded.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/io/rtrackers/wrappedbounded/wrappedbounded.go b/sdks/go/pkg/beam/io/rtrackers/wrappedbounded/wrappedbounded.go index 545a7f11825..0baff38f2c7 100644 --- a/sdks/go/pkg/beam/io/rtrackers/wrappedbounded/wrappedbounded.go +++ b/sdks/go/pkg/beam/io/rtrackers/wrappedbounded/wrappedbounded.go @@ -53,7 +53,7 @@ func (t *Tracker) GetProgress() (done float64, remaining float64) { return t.baseTracker.GetProgress() } -// IsDone() returns a boolean indicating if the work represented by the underlying RTracker has +// IsDone returns a boolean indicating if the work represented by the underlying RTracker has // been completed. func (t *Tracker) IsDone() bool { return t.baseTracker.IsDone() From 45c09c7f9e2f82a718a3ba18ecd30e6a15b35f34 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 19 May 2022 17:08:30 -0400 Subject: [PATCH 13/16] Move RTracker wraper to core/sdf --- .../pkg/beam/core/runtime/exec/datasource.go | 5 ++- sdks/go/pkg/beam/core/sdf/wrappedbounded.go | 34 +++++++++++++++++++ 2 files changed, 36 insertions(+), 3 deletions(-) create mode 100644 sdks/go/pkg/beam/core/sdf/wrappedbounded.go diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index 9e1e1bc56bd..8c39aeaed41 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -29,7 +29,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" - "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/wrappedbounded" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" ) @@ -361,7 +360,7 @@ func getBoundedRTrackerFromRoot(root *FullValue) (sdf.BoundableRTracker, float64 if !ok { log.Warn(context.Background(), "expected type sdf.BoundableRTracker; ensure that the RTracker implements IsBounded()") // Assume an RTracker that does not implement IsBounded() will always be bounded, wrap so it can be used. - boundTracker = wrappedbounded.NewTracker(tracker) + boundTracker = sdf.NewWrappedTracker(tracker) } size, ok := root.Elm2.(float64) if !ok { @@ -400,7 +399,7 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) { return SplitResult{}, -1 * time.Minute, false, nil } if len(ps) != 0 { - // Expected structure of the root FullValue is KV>, Size, (Timestamp?, Windows?)> + // Expected structure of the root FullValue is KV>, Size> for _, root := range ps { tracker, size, ok := getBoundedRTrackerFromRoot(root) // If type assertion didn't return a BoundableRTracker, we move on. diff --git a/sdks/go/pkg/beam/core/sdf/wrappedbounded.go b/sdks/go/pkg/beam/core/sdf/wrappedbounded.go new file mode 100644 index 00000000000..36f44817a83 --- /dev/null +++ b/sdks/go/pkg/beam/core/sdf/wrappedbounded.go @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sdf + +// WrappedTracker wraps an implementation of an RTracker and adds an IsBounded() function +// that returns true in order to allow RTrackers to be handled as bounded BoundableRTrackers +// if necessary (like in self-checkpointing evaluation.) +type WrappedTracker struct { + RTracker +} + +// IsBounded returns true, indicating that the underlying RTracker represents a bounded +// amount of work. +func (t *WrappedTracker) IsBounded() bool { + return true +} + +// NewWrappedTracker is a constructor for an RTracker that wraps another RTracker into a BoundedRTracker. +func NewWrappedTracker(underlying RTracker) *WrappedTracker { + return &WrappedTracker{RTracker: underlying} +} From 41544d7a39a0ca1f1d2735aadf7d00ef9637f8cb Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 19 May 2022 17:08:57 -0400 Subject: [PATCH 14/16] Delete old wrapper --- .../wrappedbounded/wrappedbounded.go | 76 ------------------- 1 file changed, 76 deletions(-) delete mode 100644 sdks/go/pkg/beam/io/rtrackers/wrappedbounded/wrappedbounded.go diff --git a/sdks/go/pkg/beam/io/rtrackers/wrappedbounded/wrappedbounded.go b/sdks/go/pkg/beam/io/rtrackers/wrappedbounded/wrappedbounded.go deleted file mode 100644 index 0baff38f2c7..00000000000 --- a/sdks/go/pkg/beam/io/rtrackers/wrappedbounded/wrappedbounded.go +++ /dev/null @@ -1,76 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package offsetrange defines a restriction and restriction tracker for offset -// ranges. An offset range is just a range, with a start and end, that can -// begin at an offset, and is commonly used to represent byte ranges for files -// or indices for iterable containers. - -package wrappedbounded - -import "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" - -// Tracker wraps an implementation of an RTracker and adds an IsBounded() function -// that returns true in order to allow RTrackers to be handled as bounded BoundableRTrackers -// if necessary (like in self-checkpointing evaluation.) -type Tracker struct { - baseTracker sdf.RTracker -} - -// TryClaim attempts to claim a block of work from the underlying RTracker's restriction. -func (t *Tracker) TryClaim(pos interface{}) (ok bool) { - return t.baseTracker.TryClaim(pos) -} - -// GetError returns an error from the underlying RTracker if it has stopped executing. Returns nil -// if none has occurred. -func (t *Tracker) GetError() error { - return t.baseTracker.GetError() -} - -// TrySplit splits the underlying RTracker's restriction into a primary (work that is currently executing) -// and a residual (work that will be split off and resumed later.) -func (t *Tracker) TrySplit(fraction float64) (primary, residual interface{}, err error) { - return t.baseTracker.TrySplit(fraction) -} - -// GetProgress returns two abstract scalars representing the amount of work done and the remaining work -// left in the underlying RTracker. These are unitless values, only used to estimate work in relation to -// each other. -func (t *Tracker) GetProgress() (done float64, remaining float64) { - return t.baseTracker.GetProgress() -} - -// IsDone returns a boolean indicating if the work represented by the underlying RTracker has -// been completed. -func (t *Tracker) IsDone() bool { - return t.baseTracker.IsDone() -} - -// GetRestriction returns the restriction maintained by the underlying RTracker. -func (t *Tracker) GetRestriction() interface{} { - return t.baseTracker.GetRestriction() -} - -// IsBounded returns true, indicating that the underlying RTracker represents a bounded -// amount of work. -func (t *Tracker) IsBounded() bool { - return true -} - -// NewTracker is a constructor for an RTracker that wraps another RTracker into a BoundedRTracker. -func NewTracker(underlying sdf.RTracker) *Tracker { - return &Tracker{baseTracker: underlying} -} From 52b9f54cd7da7e06b837eb286c39dd79a97533a8 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 19 May 2022 17:09:49 -0400 Subject: [PATCH 15/16] Move URNTruncate --- sdks/go/pkg/beam/core/runtime/graphx/translate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go index 1e91637d611..7b777771a8c 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go @@ -43,7 +43,6 @@ const ( URNReshuffle = "beam:transform:reshuffle:v1" URNCombinePerKey = "beam:transform:combine_per_key:v1" URNWindow = "beam:transform:window_into:v1" - URNTruncate = "beam:transform:sdf_truncate_sized_restrictions_v1" URNIterableSideInput = "beam:side_input:iterable:v1" URNMultimapSideInput = "beam:side_input:multimap:v1" @@ -69,6 +68,7 @@ const ( URNRequiresSplittableDoFn = "beam:requirement:pardo:splittable_dofn:v1" URNRequiresBundleFinalization = "beam:requirement:pardo:finalization:v1" + URNTruncate = "beam:transform:sdf_truncate_sized_restrictions_v1" // Deprecated: Determine worker binary based on GoWorkerBinary Role instead. URNArtifactGoWorker = "beam:artifact:type:go_worker_binary:v1" From 9ecb375a19639d105619988a7947e863e821f261 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 19 May 2022 17:13:33 -0400 Subject: [PATCH 16/16] Log RTracker error on TryClaim in test --- sdks/go/test/integration/primitives/checkpointing.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdks/go/test/integration/primitives/checkpointing.go b/sdks/go/test/integration/primitives/checkpointing.go index 330ca8aeb0b..5b1079ad4ef 100644 --- a/sdks/go/test/integration/primitives/checkpointing.go +++ b/sdks/go/test/integration/primitives/checkpointing.go @@ -16,12 +16,14 @@ package primitives import ( + "context" "reflect" "time" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" ) @@ -77,6 +79,9 @@ func (fn *selfCheckpointingDoFn) ProcessElement(rt *sdf.LockRTracker, _ []byte, counter++ } else if rt.GetError() != nil || rt.IsDone() { // Stop processing on error or completion + if err := rt.GetError(); err != nil { + log.Errorf(context.Background(), "error in restriction tracker, got %v", err) + } return sdf.StopProcessing() } else { // Resume later.