From ba35d7e13ae86ad3cdac95d70cbb2c918f615aac Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Tue, 22 Feb 2022 13:23:49 -0800 Subject: [PATCH] Allow customizing retry behavior for timeout failure (#2524) --- common/constants.go | 8 ++++ common/util.go | 12 ++++++ common/util_test.go | 37 +++++++++++++++++++ .../history/timerQueueActiveTaskExecutor.go | 3 +- service/history/workflow/retry.go | 35 ++++++++++++++---- service/history/workflow/retry_test.go | 7 ++++ 6 files changed, 93 insertions(+), 9 deletions(-) diff --git a/common/constants.go b/common/constants.go index a4af323538e..48c446542ab 100644 --- a/common/constants.go +++ b/common/constants.go @@ -111,3 +111,11 @@ const ( // DefaultTransactionSizeLimit is the largest allowed transaction size to persistence DefaultTransactionSizeLimit = 4 * 1024 * 1024 ) + +const ( + // TimeoutFailureTypePrefix is the prefix for timeout failure types + // used in retry policy + // the actual failure type will be prefix + enums.TimeoutType.String() + // e.g. "TemporalTimeout:StartToClose" or "TemporalTimeout:Heartbeat" + TimeoutFailureTypePrefix = "TemporalTimeout:" +) diff --git a/common/util.go b/common/util.go index 196b71e2ace..21bc461ed41 100644 --- a/common/util.go +++ b/common/util.go @@ -30,6 +30,7 @@ import ( "fmt" "math/rand" "sort" + "strings" "sync" "time" @@ -471,6 +472,17 @@ func ValidateRetryPolicy(policy *commonpb.RetryPolicy) error { if policy.GetMaximumAttempts() < 0 { return serviceerror.NewInvalidArgument("MaximumAttempts cannot be negative on retry policy.") } + + for _, nrt := range policy.NonRetryableErrorTypes { + if strings.HasPrefix(nrt, TimeoutFailureTypePrefix) { + timeoutTypeValue := nrt[len(TimeoutFailureTypePrefix):] + timeoutType, ok := enumspb.TimeoutType_value[timeoutTypeValue] + if !ok || enumspb.TimeoutType(timeoutType) == enumspb.TIMEOUT_TYPE_UNSPECIFIED { + return serviceerror.NewInvalidArgument(fmt.Sprintf("Invalid timeout type value: %v.", timeoutTypeValue)) + } + } + } + return nil } diff --git a/common/util_test.go b/common/util_test.go index fe5a524b4a6..01ecfd31e12 100644 --- a/common/util_test.go +++ b/common/util_test.go @@ -33,6 +33,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/server/common/dynamicconfig" @@ -105,6 +106,42 @@ func TestValidateRetryPolicy(t *testing.T) { wantErr: true, wantErrString: "MaximumAttempts cannot be negative on retry policy.", }, + { + name: "timeout nonretryable error - valid type", + input: &commonpb.RetryPolicy{ + BackoffCoefficient: 1, + NonRetryableErrorTypes: []string{ + TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_START_TO_CLOSE.String(), + TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START.String(), + TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE.String(), + TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_HEARTBEAT.String(), + }, + }, + wantErr: false, + wantErrString: "", + }, + { + name: "timeout nonretryable error - unspecified type", + input: &commonpb.RetryPolicy{ + BackoffCoefficient: 1, + NonRetryableErrorTypes: []string{ + TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_UNSPECIFIED.String(), + }, + }, + wantErr: true, + wantErrString: "Invalid timeout type value: Unspecified.", + }, + { + name: "timeout nonretryable error - unknown type", + input: &commonpb.RetryPolicy{ + BackoffCoefficient: 1, + NonRetryableErrorTypes: []string{ + TimeoutFailureTypePrefix + "unknown", + }, + }, + wantErr: true, + wantErrString: "Invalid timeout type value: unknown.", + }, } for _, tt := range testCases { diff --git a/service/history/timerQueueActiveTaskExecutor.go b/service/history/timerQueueActiveTaskExecutor.go index 2bf7d2d55b6..fab78a6f3fe 100644 --- a/service/history/timerQueueActiveTaskExecutor.go +++ b/service/history/timerQueueActiveTaskExecutor.go @@ -241,7 +241,8 @@ Loop: break Loop } - timeoutFailure := failure.NewTimeoutFailure("activity timeout", timerSequenceID.TimerType) + failureMsg := fmt.Sprintf("activity %v timeout", timerSequenceID.TimerType.String()) + timeoutFailure := failure.NewTimeoutFailure(failureMsg, timerSequenceID.TimerType) var retryState enumspb.RetryState if retryState, err = mutableState.RetryActivity( activityInfo, diff --git a/service/history/workflow/retry.go b/service/history/workflow/retry.go index 59c4866bc98..3355f2e160c 100644 --- a/service/history/workflow/retry.go +++ b/service/history/workflow/retry.go @@ -39,6 +39,7 @@ import ( "go.temporal.io/server/api/historyservice/v1" workflowspb "go.temporal.io/server/api/workflow/v1" + "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/primitives/timestamp" @@ -118,8 +119,16 @@ func isRetryable(failure *failurepb.Failure, nonRetryableTypes []string) bool { } if failure.GetTimeoutFailureInfo() != nil { - return failure.GetTimeoutFailureInfo().GetTimeoutType() == enumspb.TIMEOUT_TYPE_START_TO_CLOSE || - failure.GetTimeoutFailureInfo().GetTimeoutType() == enumspb.TIMEOUT_TYPE_HEARTBEAT + timeoutType := failure.GetTimeoutFailureInfo().GetTimeoutType() + if timeoutType == enumspb.TIMEOUT_TYPE_START_TO_CLOSE || + timeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT { + return !matchNonRetryableTypes( + common.TimeoutFailureTypePrefix+timeoutType.String(), + nonRetryableTypes, + ) + } + + return false } if failure.GetServerFailureInfo() != nil { @@ -131,16 +140,26 @@ func isRetryable(failure *failurepb.Failure, nonRetryableTypes []string) bool { return false } - failureType := failure.GetApplicationFailureInfo().GetType() - for _, nrt := range nonRetryableTypes { - if nrt == failureType { - return false - } - } + return !matchNonRetryableTypes( + failure.GetApplicationFailureInfo().GetType(), + nonRetryableTypes, + ) } return true } +func matchNonRetryableTypes( + failureType string, + nonRetryableTypes []string, +) bool { + for _, nrt := range nonRetryableTypes { + if nrt == failureType { + return true + } + } + return false +} + // Helpers for creating new retry/cron workflows: func SetupNewWorkflowForRetryOrCron( diff --git a/service/history/workflow/retry_test.go b/service/history/workflow/retry_test.go index f5adb21bcf6..e06597ca013 100644 --- a/service/history/workflow/retry_test.go +++ b/service/history/workflow/retry_test.go @@ -33,6 +33,7 @@ import ( failurepb "go.temporal.io/api/failure/v1" persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/failure" @@ -65,6 +66,7 @@ func Test_IsRetryable(t *testing.T) { }}, } a.True(isRetryable(f, nil)) + a.False(isRetryable(f, []string{common.TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_START_TO_CLOSE.String()})) f = &failurepb.Failure{ FailureInfo: &failurepb.Failure_TimeoutFailureInfo{TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{ @@ -72,6 +74,7 @@ func Test_IsRetryable(t *testing.T) { }}, } a.False(isRetryable(f, nil)) + a.False(isRetryable(f, []string{common.TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START.String()})) f = &failurepb.Failure{ FailureInfo: &failurepb.Failure_TimeoutFailureInfo{TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{ @@ -79,6 +82,7 @@ func Test_IsRetryable(t *testing.T) { }}, } a.False(isRetryable(f, nil)) + a.False(isRetryable(f, []string{common.TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE.String()})) f = &failurepb.Failure{ FailureInfo: &failurepb.Failure_TimeoutFailureInfo{TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{ @@ -86,6 +90,9 @@ func Test_IsRetryable(t *testing.T) { }}, } a.True(isRetryable(f, nil)) + a.False(isRetryable(f, []string{common.TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_HEARTBEAT.String()})) + a.True(isRetryable(f, []string{common.TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_START_TO_CLOSE.String()})) + a.True(isRetryable(f, []string{common.TimeoutFailureTypePrefix + "unknown timeout type string"})) f = &failurepb.Failure{ FailureInfo: &failurepb.Failure_ServerFailureInfo{ServerFailureInfo: &failurepb.ServerFailureInfo{