Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve deletenamespace workflow errors #2909

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion service/worker/deletenamespace/deleteexecutions/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,11 @@ func DeleteExecutionsWorkflow(ctx workflow.Context, params DeleteExecutionsParam
}

if nextPageToken == nil {
logger.Info("Finish deleting workflow executions.", tag.WorkflowNamespace(params.Namespace.String()), tag.DeletedExecutionsCount(result.SuccessCount), tag.DeletedExecutionsErrorCount(result.ErrorCount))
if result.ErrorCount == 0 {
logger.Info("Successfully deleted workflow executions.", tag.WorkflowNamespace(params.Namespace.String()), tag.DeletedExecutionsCount(result.SuccessCount))
} else {
logger.Error("Finish deleting workflow executions with some errors.", tag.WorkflowNamespace(params.Namespace.String()), tag.DeletedExecutionsCount(result.SuccessCount), tag.DeletedExecutionsErrorCount(result.ErrorCount))
}
return result, nil
}

Expand Down
24 changes: 19 additions & 5 deletions service/worker/deletenamespace/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package errors

import (
"errors"
"fmt"

"go.temporal.io/sdk/temporal"
)
Expand All @@ -37,9 +38,22 @@ const (
)

var (
ErrUnableToExecuteActivity = errors.New("unable to execute activity")
ErrUnableToExecuteChildWorkflow = errors.New("unable to execute child workflow")
ErrExecutionsStillExist = temporal.NewApplicationError("executions are still exist", ExecutionsStillExistErrType)
ErrNoProgress = temporal.NewNonRetryableApplicationError("no progress were made", NoProgressErrType, nil)
ErrNotDeletedExecutionsStillExist = temporal.NewNonRetryableApplicationError("not deleted executions are still exist", NotDeletedExecutionsStillExistErrType, nil)
ErrUnableToExecuteActivity = errors.New("unable to execute activity")
ErrUnableToExecuteChildWorkflow = errors.New("unable to execute child workflow")
)

func NewExecutionsStillExistError(count int) error {
return temporal.NewApplicationError(fmt.Sprintf("%d executions are still exist", count), ExecutionsStillExistErrType, count)
}

func NewSomeExecutionsStillExistError() error {
return temporal.NewApplicationError("some executions are still exist", ExecutionsStillExistErrType)
}

func NewNoProgressError(count int) error {
return temporal.NewNonRetryableApplicationError(fmt.Sprintf("no progress were made: %d executions are still exist", count), NoProgressErrType, nil, count)
}

func NewNotDeletedExecutionsStillExistError(count int) error {
return temporal.NewNonRetryableApplicationError(fmt.Sprintf("%d not deleted executions are still exist", count), NotDeletedExecutionsStillExistErrType, nil, count)
}
8 changes: 4 additions & 4 deletions service/worker/deletenamespace/reclaimresources/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,18 @@ func (a *Activities) EnsureNoExecutionsAdvVisibilityActivity(ctx context.Context
// No progress were made. Something bad happened on task processor side or new executions were created during deletion.
// Return non-retryable error and workflow will try to delete executions again.
a.logger.Warn("No progress were made.", tag.WorkflowNamespace(nsName.String()), tag.Attempt(activityInfo.Attempt), tag.Counter(int(count)))
return errors.ErrNoProgress
return errors.NewNoProgressError(int(count))
}
}

a.logger.Warn("Some workflow executions still exist.", tag.WorkflowNamespace(nsName.String()), tag.Counter(int(count)))
activity.RecordHeartbeat(ctx, count)
return errors.ErrExecutionsStillExist
return errors.NewExecutionsStillExistError(int(count))
}

if notDeletedCount > 0 {
a.logger.Warn("Some workflow executions were not deleted and still exist.", tag.WorkflowNamespace(nsName.String()), tag.Counter(notDeletedCount))
return errors.ErrNotDeletedExecutionsStillExist
return errors.NewNotDeletedExecutionsStillExistError(notDeletedCount)
}

return nil
Expand All @@ -129,7 +129,7 @@ func (a *Activities) EnsureNoExecutionsStdVisibilityActivity(ctx context.Context

if len(resp.Executions) > 0 {
a.logger.Warn("Some workflow executions still exist.", tag.WorkflowNamespace(nsName.String()))
return errors.ErrExecutionsStillExist
return errors.NewSomeExecutionsStillExistError()
}
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func Test_ReclaimResourcesWorkflow_EnsureNoExecutionsActivity_ExecutionsStillExi

env.OnActivity(a.IsAdvancedVisibilityActivity, mock.Anything).Return(true, nil).Once()
env.OnActivity(a.EnsureNoExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace"), 0).
Return(errors.ErrExecutionsStillExist).
Return(errors.NewExecutionsStillExistError(1)).
Times(10) // GoSDK defaultMaximumAttemptsForUnitTest value.

env.ExecuteWorkflow(ReclaimResourcesWorkflow, ReclaimResourcesParams{
Expand Down