From 9ecb375a19639d105619988a7947e863e821f261 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Thu, 19 May 2022 17:13:33 -0400 Subject: [PATCH] Log RTracker error on TryClaim in test --- sdks/go/test/integration/primitives/checkpointing.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdks/go/test/integration/primitives/checkpointing.go b/sdks/go/test/integration/primitives/checkpointing.go index 330ca8aeb0be..5b1079ad4ef5 100644 --- a/sdks/go/test/integration/primitives/checkpointing.go +++ b/sdks/go/test/integration/primitives/checkpointing.go @@ -16,12 +16,14 @@ package primitives import ( + "context" "reflect" "time" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" ) @@ -77,6 +79,9 @@ func (fn *selfCheckpointingDoFn) ProcessElement(rt *sdf.LockRTracker, _ []byte, counter++ } else if rt.GetError() != nil || rt.IsDone() { // Stop processing on error or completion + if err := rt.GetError(); err != nil { + log.Errorf(context.Background(), "error in restriction tracker, got %v", err) + } return sdf.StopProcessing() } else { // Resume later.