Skip to content

Commit

Permalink
Use better retry logic in reclaimresources workflow (#2689)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin authored Apr 20, 2022
1 parent a8d47d0 commit ddf8abe
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 87 deletions.
3 changes: 2 additions & 1 deletion common/persistence/visibility/visibility_manager_dual.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package visibility

import (
"context"
"strings"

"go.temporal.io/server/common/persistence/visibility/manager"
)
Expand Down Expand Up @@ -60,7 +61,7 @@ func (v *visibilityManagerDual) Close() {
}

func (v *visibilityManagerDual) GetName() string {
return "VisibilityManagerDual"
return strings.Join([]string{v.visibilityManager.GetName(), v.secondaryVisibilityManager.GetName()}, ",")
}

func (v *visibilityManagerDual) RecordWorkflowExecutionStarted(
Expand Down
19 changes: 10 additions & 9 deletions service/worker/deletenamespace/deleteexecutions/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func Test_DeleteExecutionsWorkflow_Success(t *testing.T) {
NamespaceID: "namespace-id",
PageSize: 1000,
NextPageToken: nil,
}).Return(nil, nil)
}).Return(nil, nil).Once()

env.OnActivity(a.DeleteExecutionsActivity, mock.Anything, DeleteExecutionsActivityParams{
Namespace: "namespace",
Expand All @@ -69,7 +69,7 @@ func Test_DeleteExecutionsWorkflow_Success(t *testing.T) {
}).Return(DeleteExecutionsActivityResult{
ErrorCount: 1,
SuccessCount: 2,
}, nil)
}, nil).Once()

env.ExecuteWorkflow(DeleteExecutionsWorkflow, DeleteExecutionsParams{
NamespaceID: "namespace-id",
Expand Down Expand Up @@ -150,7 +150,7 @@ func Test_DeleteExecutionsWorkflow_ManyExecutions_NoContinueAsNew(t *testing.T)
return nil, nil
}
return []byte{3, 22, 83}, nil
})
}).Times(100)

env.OnActivity(a.DeleteExecutionsActivity, mock.Anything, mock.Anything).Return(func(_ context.Context, params DeleteExecutionsActivityParams) (DeleteExecutionsActivityResult, error) {
require.Equal(t, namespace.Name("namespace"), params.Namespace)
Expand All @@ -167,7 +167,7 @@ func Test_DeleteExecutionsWorkflow_ManyExecutions_NoContinueAsNew(t *testing.T)
ErrorCount: 1,
SuccessCount: 2,
}, nil
})
}).Times(100)

env.ExecuteWorkflow(DeleteExecutionsWorkflow, DeleteExecutionsParams{
NamespaceID: "namespace-id",
Expand All @@ -191,14 +191,15 @@ func Test_DeleteExecutionsWorkflow_ManyExecutions_ContinueAsNew(t *testing.T) {

var a *Activities

env.OnActivity(a.GetNextPageTokenActivity, mock.Anything, mock.Anything).Return([]byte{3, 22, 83}, nil)
env.OnActivity(a.DeleteExecutionsActivity, mock.Anything, mock.Anything).Return(DeleteExecutionsActivityResult{}, nil)
env.OnActivity(a.GetNextPageTokenActivity, mock.Anything, mock.Anything).Return([]byte{3, 22, 83}, nil).Times(78)
env.OnActivity(a.DeleteExecutionsActivity, mock.Anything, mock.Anything).Return(DeleteExecutionsActivityResult{}, nil).Times(78)

env.ExecuteWorkflow(DeleteExecutionsWorkflow, DeleteExecutionsParams{
NamespaceID: "namespace-id",
Namespace: "namespace",
Config: DeleteExecutionsConfig{
PageSize: 3,
PageSize: 3,
PagesPerExecutionCount: 78,
},
})

Expand All @@ -215,8 +216,8 @@ func Test_DeleteExecutionsWorkflow_ManyExecutions_ActivityError(t *testing.T) {

var a *Activities

env.OnActivity(a.GetNextPageTokenActivity, mock.Anything, mock.Anything).Return([]byte{3, 22, 83}, nil)
env.OnActivity(a.DeleteExecutionsActivity, mock.Anything, mock.Anything).Return(DeleteExecutionsActivityResult{}, serviceerror.NewUnavailable("random error"))
env.OnActivity(a.GetNextPageTokenActivity, mock.Anything, mock.Anything).Return([]byte{3, 22, 83}, nil).Once()
env.OnActivity(a.DeleteExecutionsActivity, mock.Anything, mock.Anything).Return(DeleteExecutionsActivityResult{}, serviceerror.NewUnavailable("random error")).Once()

env.ExecuteWorkflow(DeleteExecutionsWorkflow, DeleteExecutionsParams{
NamespaceID: "namespace-id",
Expand Down
2 changes: 2 additions & 0 deletions service/worker/deletenamespace/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ import (

const (
ExecutionsStillExistErrType = "ExecutionsStillExist"
NoProgressErrType = "NoProgress"
)

var (
ErrUnableToExecuteActivity = errors.New("unable to execute activity")
ErrUnableToExecuteChildWorkflow = errors.New("unable to execute child workflow")
ErrExecutionsStillExist = temporal.NewApplicationError("executions are still exist", ExecutionsStillExistErrType)
ErrNoProgress = temporal.NewNonRetryableApplicationError("no progress were made", NoProgressErrType, nil)
)
87 changes: 57 additions & 30 deletions service/worker/deletenamespace/reclaimresources/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package reclaimresources

import (
"context"
"strings"

"go.temporal.io/sdk/activity"

Expand Down Expand Up @@ -60,47 +61,73 @@ func NewActivities(
logger: logger,
}
}
func (a *Activities) IsAdvancedVisibilityActivity(_ context.Context) (bool, error) {
return strings.Contains(a.visibilityManager.GetName(), "elasticsearch"), nil
}

func (a *Activities) EnsureNoExecutionsActivity(ctx context.Context, nsID namespace.ID, nsName namespace.Name) error {
// TODO: remove this check after CountWorkflowExecutions is implemented in standard visibility.
count := int64(0)
if a.visibilityManager.GetName() == "elasticsearch" {
req := &manager.CountWorkflowExecutionsRequest{
NamespaceID: nsID,
Namespace: nsName,
}
resp, err := a.visibilityManager.CountWorkflowExecutions(ctx, req)
if err != nil {
a.metricsClient.IncCounter(metrics.ReclaimResourcesWorkflowScope, metrics.ListExecutionsFailuresCount)
a.logger.Error("Unable to count workflow executions.", tag.WorkflowNamespace(nsName.String()), tag.Error(err))
return err
}

count = resp.Count
} else {
req := &manager.ListWorkflowExecutionsRequestV2{
NamespaceID: nsID,
Namespace: nsName,
PageSize: 1,
}
resp, err := a.visibilityManager.ListWorkflowExecutions(ctx, req)
if err != nil {
a.metricsClient.IncCounter(metrics.ReclaimResourcesWorkflowScope, metrics.ListExecutionsFailuresCount)
a.logger.Error("Unable to count workflow executions using list.", tag.WorkflowNamespace(nsName.String()), tag.Error(err))
return err
}
// If not 0, it will always be 1 due to PageSize set to 1.
count = int64(len(resp.Executions))
func (a *Activities) EnsureNoExecutionsAdvVisibilityActivity(ctx context.Context, nsID namespace.ID, nsName namespace.Name) error {
req := &manager.CountWorkflowExecutionsRequest{
NamespaceID: nsID,
Namespace: nsName,
}
resp, err := a.visibilityManager.CountWorkflowExecutions(ctx, req)
if err != nil {
a.metricsClient.IncCounter(metrics.ReclaimResourcesWorkflowScope, metrics.ListExecutionsFailuresCount)
a.logger.Error("Unable to count workflow executions.", tag.WorkflowNamespace(nsName.String()), tag.Error(err))
return err
}

count := resp.Count
if count > 0 {
activityInfo := activity.GetInfo(ctx)
// Starting from 8th attempt, workflow executions deletion must show some progress.
if activity.HasHeartbeatDetails(ctx) && activityInfo.Attempt > 7 {
var previousAttemptCount int64
if err := activity.GetHeartbeatDetails(ctx, &previousAttemptCount); err != nil {
a.metricsClient.IncCounter(metrics.ReclaimResourcesWorkflowScope, metrics.ListExecutionsFailuresCount)
a.logger.Error("Unable to get previous heartbeat details.", tag.WorkflowNamespace(nsName.String()), tag.Error(err))
return err
}
if count == previousAttemptCount {
// No progress were made. Something bad happened on task processor side or new executions were created during deletion.
// Return non-retryable error and workflow will try to delete executions again.
a.logger.Warn("No progress were made.", tag.WorkflowNamespace(nsName.String()), tag.Attempt(activityInfo.Attempt), tag.Counter(int(count)))
return errors.ErrNoProgress
}
}

a.logger.Warn("Some workflow executions still exist.", tag.WorkflowNamespace(nsName.String()), tag.Counter(int(count)))
activity.RecordHeartbeat(ctx, count)
return errors.ErrExecutionsStillExist
}
return nil
}

func (a *Activities) EnsureNoExecutionsStdVisibilityActivity(ctx context.Context, nsID namespace.ID, nsName namespace.Name) error {
// Standard visibility does not support CountWorkflowExecutions but only supports ListWorkflowExecutions.
// To prevent read of many records from DB, set PageSize to 1 and use this single record as indicator of workflow executions existence.
// Unfortunately, this doesn't allow to report progress and retry is limited only by timeout.
// TODO: remove this activity after CountWorkflowExecutions is implemented in standard visibility.

req := &manager.ListWorkflowExecutionsRequestV2{
NamespaceID: nsID,
Namespace: nsName,
PageSize: 1,
}
resp, err := a.visibilityManager.ListWorkflowExecutions(ctx, req)
if err != nil {
a.metricsClient.IncCounter(metrics.ReclaimResourcesWorkflowScope, metrics.ListExecutionsFailuresCount)
a.logger.Error("Unable to count workflow executions using list.", tag.WorkflowNamespace(nsName.String()), tag.Error(err))
return err
}

if len(resp.Executions) > 0 {
a.logger.Warn("Some workflow executions still exist.", tag.WorkflowNamespace(nsName.String()))
return errors.ErrExecutionsStillExist
}
return nil
}

func (a *Activities) DeleteNamespaceActivity(ctx context.Context, nsID namespace.ID, nsName namespace.Name) error {
deleteNamespaceRequest := &persistence.DeleteNamespaceByNameRequest{
Name: nsName.String(),
Expand Down
66 changes: 59 additions & 7 deletions service/worker/deletenamespace/reclaimresources/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/testsuite"

Expand All @@ -39,10 +40,9 @@ import (
"go.temporal.io/server/common/persistence/visibility/manager"
)

func Test_EnsureNoExecutionsActivity_NoExecutions(t *testing.T) {
func Test_EnsureNoExecutionsAdvVisibilityActivity_NoExecutions(t *testing.T) {
ctrl := gomock.NewController(t)
visibilityManager := manager.NewMockVisibilityManager(ctrl)
visibilityManager.EXPECT().GetName().Return("elasticsearch")

visibilityManager.EXPECT().CountWorkflowExecutions(gomock.Any(), &manager.CountWorkflowExecutionsRequest{
NamespaceID: "namespace-id",
Expand All @@ -58,19 +58,18 @@ func Test_EnsureNoExecutionsActivity_NoExecutions(t *testing.T) {
logger: log.NewNoopLogger(),
}

err := a.EnsureNoExecutionsActivity(context.Background(), "namespace-id", "namespace")
err := a.EnsureNoExecutionsAdvVisibilityActivity(context.Background(), "namespace-id", "namespace")
require.NoError(t, err)

ctrl.Finish()
}

func Test_EnsureNoExecutionsActivity_ExecutionsExist(t *testing.T) {
func Test_EnsureNoExecutionsAdvVisibilityActivity_ExecutionsExist(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestActivityEnvironment()

ctrl := gomock.NewController(t)
visibilityManager := manager.NewMockVisibilityManager(ctrl)
visibilityManager.EXPECT().GetName().Return("elasticsearch")

visibilityManager.EXPECT().CountWorkflowExecutions(gomock.Any(), &manager.CountWorkflowExecutionsRequest{
NamespaceID: "namespace-id",
Expand All @@ -85,9 +84,62 @@ func Test_EnsureNoExecutionsActivity_ExecutionsExist(t *testing.T) {
metricsClient: metrics.NoopClient,
logger: log.NewNoopLogger(),
}
env.RegisterActivity(a.EnsureNoExecutionsActivity)
env.RegisterActivity(a.EnsureNoExecutionsAdvVisibilityActivity)

_, err := env.ExecuteActivity(a.EnsureNoExecutionsAdvVisibilityActivity, namespace.ID("namespace-id"), namespace.Name("namespace"))
require.Error(t, err)
var appErr *temporal.ApplicationError
require.ErrorAs(t, err, &appErr)
require.Equal(t, "ExecutionsStillExist", appErr.Type())
ctrl.Finish()
}

func Test_EnsureNoExecutionsStdVisibilityActivity_NoExecutions(t *testing.T) {
ctrl := gomock.NewController(t)
visibilityManager := manager.NewMockVisibilityManager(ctrl)

visibilityManager.EXPECT().ListWorkflowExecutions(gomock.Any(), &manager.ListWorkflowExecutionsRequestV2{
NamespaceID: "namespace-id",
Namespace: "namespace",
PageSize: 1,
}).Return(&manager.ListWorkflowExecutionsResponse{
Executions: []*workflowpb.WorkflowExecutionInfo{},
}, nil)

a := &Activities{
visibilityManager: visibilityManager,
metadataManager: nil,
metricsClient: metrics.NoopClient,
logger: log.NewNoopLogger(),
}

err := a.EnsureNoExecutionsStdVisibilityActivity(context.Background(), "namespace-id", "namespace")
require.NoError(t, err)

ctrl.Finish()
}

func Test_EnsureNoExecutionsStdVisibilityActivity_ExecutionsExist(t *testing.T) {
ctrl := gomock.NewController(t)
visibilityManager := manager.NewMockVisibilityManager(ctrl)

visibilityManager.EXPECT().ListWorkflowExecutions(gomock.Any(), &manager.ListWorkflowExecutionsRequestV2{
NamespaceID: "namespace-id",
Namespace: "namespace",
PageSize: 1,
}).Return(&manager.ListWorkflowExecutionsResponse{
Executions: []*workflowpb.WorkflowExecutionInfo{{}},
}, nil)

a := &Activities{
visibilityManager: visibilityManager,
metadataManager: nil,
metricsClient: metrics.NoopClient,
logger: log.NewNoopLogger(),
}

err := a.EnsureNoExecutionsStdVisibilityActivity(context.Background(), "namespace-id", "namespace")

_, err := env.ExecuteActivity(a.EnsureNoExecutionsActivity, namespace.ID("namespace-id"), namespace.Name("namespace"))
require.Error(t, err)
var appErr *temporal.ApplicationError
require.ErrorAs(t, err, &appErr)
Expand Down
52 changes: 36 additions & 16 deletions service/worker/deletenamespace/reclaimresources/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,24 @@ var (
RetryPolicy: retryPolicy,
WorkflowRunTimeout: 60 * time.Minute,
}

ensureNoExecutionsActivityRetryPolicy = &temporal.RetryPolicy{
InitialInterval: 1 * time.Second,
MaximumInterval: 2 * time.Minute,
BackoffCoefficient: 2,
}

ensureNoExecutionsStdVisibilityOptionsActivity = workflow.ActivityOptions{
RetryPolicy: ensureNoExecutionsActivityRetryPolicy,
StartToCloseTimeout: 30 * time.Second,
ScheduleToCloseTimeout: 30 * time.Minute, // ~20 attempts
}

ensureNoExecutionsAdvVisibilityActivityOptions = workflow.ActivityOptions{
RetryPolicy: ensureNoExecutionsActivityRetryPolicy,
StartToCloseTimeout: 30 * time.Second,
ScheduleToCloseTimeout: 10 * time.Hour, // Sanity check, advanced visibility can control the progress of activity.
}
)

func validateParams(params *ReclaimResourcesParams) error {
Expand Down Expand Up @@ -127,38 +145,40 @@ func deleteWorkflowExecutions(ctx workflow.Context, params ReclaimResourcesParam
deleteAttempt := int32(1)
var result ReclaimResourcesResult

ctx1 := workflow.WithLocalActivityOptions(ctx, localActivityOptions)
var isAdvancedVisibility bool
err := workflow.ExecuteLocalActivity(ctx1, a.IsAdvancedVisibilityActivity).Get(ctx, &isAdvancedVisibility)
if err != nil {
return result, fmt.Errorf("%w: IsAdvancedVisibilityActivity: %v", errors.ErrUnableToExecuteActivity, err)
}

for {
ctx1 := workflow.WithChildOptions(ctx, deleteExecutionsWorkflowOptions)
ctx1 = workflow.WithWorkflowID(ctx1, fmt.Sprintf("%s/%s", deleteexecutions.WorkflowName, params.Namespace))
ctx2 := workflow.WithChildOptions(ctx, deleteExecutionsWorkflowOptions)
ctx2 = workflow.WithWorkflowID(ctx2, fmt.Sprintf("%s/%s", deleteexecutions.WorkflowName, params.Namespace))
var der deleteexecutions.DeleteExecutionsResult
err := workflow.ExecuteChildWorkflow(ctx1, deleteexecutions.DeleteExecutionsWorkflow, params.DeleteExecutionsParams).Get(ctx, &der)
err := workflow.ExecuteChildWorkflow(ctx2, deleteexecutions.DeleteExecutionsWorkflow, params.DeleteExecutionsParams).Get(ctx, &der)
if err != nil {
logger.Error("Unable to execute child workflow.", tag.WorkflowType(deleteexecutions.WorkflowName), tag.Error(err))
return result, fmt.Errorf("%w: %s: %v", errors.ErrUnableToExecuteChildWorkflow, deleteexecutions.WorkflowName, err)
}
result.SuccessCount += der.SuccessCount
result.ErrorCount += der.ErrorCount

ensureNoExecutionsActivityOptions := workflow.ActivityOptions{
// 445 seconds of total retry intervals.
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 1 * time.Second,
MaximumInterval: 200 * time.Second,
BackoffCoefficient: 1.8,
MaximumAttempts: 10,
},
StartToCloseTimeout: 30 * time.Second,
ScheduleToCloseTimeout: 600 * time.Second,
if isAdvancedVisibility {
ctx3 := workflow.WithActivityOptions(ctx, ensureNoExecutionsAdvVisibilityActivityOptions)
err = workflow.ExecuteActivity(ctx3, a.EnsureNoExecutionsAdvVisibilityActivity, params.NamespaceID, params.Namespace, isAdvancedVisibility).Get(ctx, nil)
} else {
ctx3 := workflow.WithActivityOptions(ctx, ensureNoExecutionsStdVisibilityOptionsActivity)
err = workflow.ExecuteActivity(ctx3, a.EnsureNoExecutionsStdVisibilityActivity, params.NamespaceID, params.Namespace, isAdvancedVisibility).Get(ctx, nil)
}
ctx2 := workflow.WithActivityOptions(ctx, ensureNoExecutionsActivityOptions)
err = workflow.ExecuteActivity(ctx2, a.EnsureNoExecutionsActivity, params.NamespaceID, params.Namespace).Get(ctx, nil)
if err == nil {
break
}

var appErr *temporal.ApplicationError
if stderrors.As(err, &appErr) {
switch appErr.Type() {
case errors.ExecutionsStillExistErrType:
case errors.ExecutionsStillExistErrType, errors.NoProgressErrType:
logger.Info("Unable to delete workflow executions. Will try again.", tag.WorkflowNamespace(params.Namespace.String()), tag.Counter(der.ErrorCount), tag.Attempt(deleteAttempt))
deleteAttempt++
continue
Expand Down
Loading

0 comments on commit ddf8abe

Please sign in to comment.