From 724ce2a377a137a52f2e3d632b0ea47211af8e9d Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Wed, 27 Apr 2022 16:01:43 -0700 Subject: [PATCH] Expose DeleteWorkflowExecution API on operatorservice (#2761) --- go.mod | 6 +- go.sum | 10 +- host/integrationbase.go | 4 + host/onebox.go | 7 + host/test_cluster.go | 5 + host/workflow_delete_execution_test.go | 173 ++++++++++++++++++++++ service/frontend/fx.go | 4 + service/frontend/interface_mock.go | 15 ++ service/frontend/operator_handler.go | 94 ++++++++---- service/frontend/operator_handler_test.go | 73 +++++++++ 10 files changed, 355 insertions(+), 36 deletions(-) create mode 100644 host/workflow_delete_execution_test.go diff --git a/go.mod b/go.mod index cfb0750851a..63d1cf36ea3 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( go.opentelemetry.io/otel/metric v0.29.0 go.opentelemetry.io/otel/sdk v1.6.3 go.opentelemetry.io/otel/sdk/metric v0.29.0 - go.temporal.io/api v1.7.1-0.20220422210026-a8393024cc6b + go.temporal.io/api v1.7.1-0.20220427183350-2fdcaf051044 go.temporal.io/sdk v1.14.1-0.20220422211740-4e64c51b6b07 go.temporal.io/version v0.3.0 go.uber.org/atomic v1.9.0 @@ -103,13 +103,13 @@ require ( go.uber.org/dig v1.14.1 // indirect golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect - golang.org/x/net v0.0.0-20220421235706-1d1ef9303861 // indirect + golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/tools v0.1.10 // indirect golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/genproto v0.0.0-20220422154200-b37d22cd5731 // indirect + google.golang.org/genproto v0.0.0-20220426171045-31bebdecfb46 // indirect google.golang.org/protobuf v1.28.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect lukechampine.com/uint128 v1.2.0 // indirect diff --git a/go.sum b/go.sum index 39732d7b228..0229d9c7ccd 100644 --- a/go.sum +++ b/go.sum @@ -450,8 +450,8 @@ go.opentelemetry.io/otel/trace v1.6.3 h1:IqN4L+5b0mPNjdXIiZ90Ni4Bl5BRkDQywePLWem go.opentelemetry.io/otel/trace v1.6.3/go.mod h1:GNJQusJlUgZl9/TQBPKU/Y/ty+0iVB5fjhKeJGZPGFs= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.temporal.io/api v1.7.1-0.20220422204533-60b4e0146b1c/go.mod h1:afCYZfuhZV+o/LAEf/XkVu4q0WPg/xbR8u0GqouGGyo= -go.temporal.io/api v1.7.1-0.20220422210026-a8393024cc6b h1:SiMPn6Mfv52DhgVqUuafyrHfWfwelzDo0Gbw2Pih308= -go.temporal.io/api v1.7.1-0.20220422210026-a8393024cc6b/go.mod h1:afCYZfuhZV+o/LAEf/XkVu4q0WPg/xbR8u0GqouGGyo= +go.temporal.io/api v1.7.1-0.20220427183350-2fdcaf051044 h1:fsxssXO5gQeJDrBMln3+idupwj9axJfnmIpbzOD3h9E= +go.temporal.io/api v1.7.1-0.20220427183350-2fdcaf051044/go.mod h1:a4GVALRWI60wEQ14xbCm/qbeZgk/zEmvgdnExf1PCxI= go.temporal.io/sdk v1.14.1-0.20220422211740-4e64c51b6b07 h1:Rwj1peek7MuUBDS7dg/WloHgPnIuks5hgkUxaoSAkXQ= go.temporal.io/sdk v1.14.1-0.20220422211740-4e64c51b6b07/go.mod h1:1LB9EZ73++tB3zrcQY7tZVxc+L7IShsxED+gnPKb2X0= go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig= @@ -567,8 +567,9 @@ golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220325170049-de3da57026de/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.0.0-20220421235706-1d1ef9303861 h1:yssD99+7tqHWO5Gwh81phT+67hg+KttniBr6UnEXOY8= golang.org/x/net v0.0.0-20220421235706-1d1ef9303861/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 h1:HVyaeDAYux4pnY+D/SiwmLOR36ewZ4iGQIIrtnuCjFA= +golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -872,8 +873,9 @@ google.golang.org/genproto v0.0.0-20220310185008-1973136f34c6/go.mod h1:kGP+zUP2 google.golang.org/genproto v0.0.0-20220324131243-acbaeb5b85eb/go.mod h1:hAL49I2IFola2sVEjAn7MEwsja0xp51I0tlGAf9hz4E= google.golang.org/genproto v0.0.0-20220405205423-9d709892a2bf/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo= google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo= -google.golang.org/genproto v0.0.0-20220422154200-b37d22cd5731 h1:nquqdM9+ps0JZcIiI70+tqoaIFS5Ql4ZuK8UXnz3HfE= google.golang.org/genproto v0.0.0-20220422154200-b37d22cd5731/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo= +google.golang.org/genproto v0.0.0-20220426171045-31bebdecfb46 h1:G1IeWbjrqEq9ChWxEuRPJu6laA67+XgTFHVSAvepr38= +google.golang.org/genproto v0.0.0-20220426171045-31bebdecfb46/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo= google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= diff --git a/host/integrationbase.go b/host/integrationbase.go index 8b45a101e8d..11a0775d05c 100644 --- a/host/integrationbase.go +++ b/host/integrationbase.go @@ -38,6 +38,7 @@ import ( enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" namespacepb "go.temporal.io/api/namespace/v1" + "go.temporal.io/api/operatorservice/v1" "go.temporal.io/api/workflowservice/v1" "gopkg.in/yaml.v3" @@ -62,6 +63,7 @@ type ( testClusterConfig *TestClusterConfig engine FrontendClient adminClient AdminClient + operatorClient operatorservice.OperatorServiceClient Logger log.Logger namespace string testRawHistoryNamespaceName string @@ -87,6 +89,7 @@ func (s *IntegrationBase) setupSuite(defaultClusterConfigFile string) { s.engine = NewFrontendClient(connection) s.adminClient = NewAdminClient(connection) + s.operatorClient = operatorservice.NewOperatorServiceClient(connection) } else { s.Logger.Info("Running integration test against test cluster") cluster, err := NewCluster(clusterConfig, s.Logger) @@ -94,6 +97,7 @@ func (s *IntegrationBase) setupSuite(defaultClusterConfigFile string) { s.testCluster = cluster s.engine = s.testCluster.GetFrontendClient() s.adminClient = s.testCluster.GetAdminClient() + s.operatorClient = s.testCluster.GetOperatorClient() } s.namespace = s.randomizeStr("integration-test-namespace") diff --git a/host/onebox.go b/host/onebox.go index 1c2c5f827b4..ff18d071574 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -34,6 +34,7 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/tchannel-go" + "go.temporal.io/api/operatorservice/v1" "go.temporal.io/api/workflowservice/v1" "go.uber.org/fx" "google.golang.org/grpc" @@ -101,6 +102,7 @@ type ( adminClient adminservice.AdminServiceClient frontendClient workflowservice.WorkflowServiceClient + operatorClient operatorservice.OperatorServiceClient historyClient historyservice.HistoryServiceClient logger log.Logger clusterMetadataConfig *cluster.Config @@ -358,6 +360,10 @@ func (c *temporalImpl) GetAdminClient() adminservice.AdminServiceClient { return c.adminClient } +func (c *temporalImpl) GetOperatorClient() operatorservice.OperatorServiceClient { + return c.operatorClient +} + func (c *temporalImpl) GetFrontendClient() workflowservice.WorkflowServiceClient { return c.frontendClient } @@ -457,6 +463,7 @@ func (c *temporalImpl) startFrontend(hosts map[string][]string, startWG *sync.Wa connection := rpcFactory.CreateFrontendGRPCConnection(c.FrontendGRPCAddress()) c.frontendClient = NewFrontendClient(connection) c.adminClient = NewAdminClient(connection) + c.operatorClient = operatorservice.NewOperatorServiceClient(connection) feApp.Start(context.Background()) diff --git a/host/test_cluster.go b/host/test_cluster.go index 093740476d8..6c2a1e77bcd 100644 --- a/host/test_cluster.go +++ b/host/test_cluster.go @@ -30,6 +30,7 @@ import ( "os" "github.com/pborman/uuid" + "go.temporal.io/api/operatorservice/v1" "go.temporal.io/server/api/adminservice/v1" persistencespb "go.temporal.io/server/api/persistence/v1" @@ -323,6 +324,10 @@ func (tc *TestCluster) GetAdminClient() AdminClient { return tc.host.GetAdminClient() } +func (tc *TestCluster) GetOperatorClient() operatorservice.OperatorServiceClient { + return tc.host.GetOperatorClient() +} + // GetHistoryClient returns a history client from the test cluster func (tc *TestCluster) GetHistoryClient() HistoryClient { return tc.host.GetHistoryClient() diff --git a/host/workflow_delete_execution_test.go b/host/workflow_delete_execution_test.go new file mode 100644 index 00000000000..31e5df54d0c --- /dev/null +++ b/host/workflow_delete_execution_test.go @@ -0,0 +1,173 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package host + +import ( + "fmt" + "time" + + "github.com/pborman/uuid" + commandpb "go.temporal.io/api/command/v1" + commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" + historypb "go.temporal.io/api/history/v1" + "go.temporal.io/api/operatorservice/v1" + "go.temporal.io/api/serviceerror" + taskqueuepb "go.temporal.io/api/taskqueue/v1" + "go.temporal.io/api/workflowservice/v1" + + "go.temporal.io/server/common/primitives/timestamp" +) + +func (s *integrationSuite) TestDeleteWorkflowExecution() { + id := "integration-delete-workflow-test" + wt := "integration-delete-workflow-test-type" + tl := "integration-delete-workflow-test-taskqueue" + identity := "worker1" + + // Start workflow execution. + we, err := s.engine.StartWorkflowExecution(NewContext(), &workflowservice.StartWorkflowExecutionRequest{ + RequestId: uuid.New(), + Namespace: s.namespace, + WorkflowId: id, + WorkflowType: &commonpb.WorkflowType{Name: wt}, + TaskQueue: &taskqueuepb.TaskQueue{Name: tl}, + Input: nil, + WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second), + WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second), + Identity: identity, + }) + s.NoError(err) + + // Complete workflow. + wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType, previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) { + return []*commandpb.Command{{ + CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{}}, + }}, nil + } + + atHandler := func(execution *commonpb.WorkflowExecution, activityType *commonpb.ActivityType, activityID string, input *commonpb.Payloads, taskToken []byte) (*commonpb.Payloads, bool, error) { + return nil, false, nil + } + + poller := &TaskPoller{ + Engine: s.engine, + Namespace: s.namespace, + TaskQueue: &taskqueuepb.TaskQueue{Name: tl}, + Identity: identity, + WorkflowTaskHandler: wtHandler, + ActivityTaskHandler: atHandler, + Logger: s.Logger, + T: s.T(), + } + + _, err = poller.PollAndProcessWorkflowTask(false, false) + s.NoError(err) + + // Verify that workflow is completed and visibility is updated. + executionsCount := 0 + for i := 0; i < 10; i++ { + visibilityResponse, err := s.engine.ListWorkflowExecutions(NewContext(), &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: s.namespace, + PageSize: 1, + NextPageToken: nil, + Query: fmt.Sprintf("WorkflowId='%s'", id), + }) + s.NoError(err) + if len(visibilityResponse.Executions) == 0 { + time.Sleep(100 * time.Millisecond) + } else { + executionsCount = len(visibilityResponse.Executions) + break + } + } + s.Equal(1, executionsCount) + + // Delete workflow execution. + _, err = s.operatorClient.DeleteWorkflowExecution(NewContext(), &operatorservice.DeleteWorkflowExecutionRequest{ + Namespace: s.namespace, + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: id, + RunId: we.RunId, + }, + }) + s.NoError(err) + + executionDeleted := false + for i := 0; i < 10; i++ { + // Check execution is deleted. + describeResponse, err := s.engine.DescribeWorkflowExecution(NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: s.namespace, + Execution: &commonpb.WorkflowExecution{ + WorkflowId: id, + RunId: we.RunId, + }, + }) + if err == nil { + s.Logger.Warn("Execution not deleted yet") + time.Sleep(100 * time.Millisecond) + continue + } + var notFoundErr *serviceerror.NotFound + s.ErrorAs(err, ¬FoundErr) + s.Nil(describeResponse) + + // Check history is deleted. + historyResponse, err := s.engine.GetWorkflowExecutionHistory(NewContext(), &workflowservice.GetWorkflowExecutionHistoryRequest{ + Namespace: s.namespace, + Execution: &commonpb.WorkflowExecution{ + WorkflowId: id, + RunId: we.RunId, + }, + }) + var invalidArgumentErr *serviceerror.InvalidArgument + s.ErrorAs(err, &invalidArgumentErr) + s.Nil(historyResponse) + + // Check visibility is updated. + for i := 0; i < 10; i++ { + visibilityResponse, err := s.engine.ListWorkflowExecutions(NewContext(), &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: s.namespace, + PageSize: 1, + NextPageToken: nil, + Query: fmt.Sprintf("WorkflowId='%s'", id), + }) + s.NoError(err) + if len(visibilityResponse.Executions) != 0 { + time.Sleep(100 * time.Millisecond) + } else { + executionsCount = len(visibilityResponse.Executions) + break + } + } + s.Equal(0, executionsCount) + + executionDeleted = true + break + } + + s.True(executionDeleted) +} diff --git a/service/frontend/fx.go b/service/frontend/fx.go index 1432dbb8b16..ed4885d4852 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -427,6 +427,8 @@ func OperatorHandlerProvider( saProvider searchattribute.Provider, saManager searchattribute.Manager, healthServer *health.Server, + historyClient historyservice.HistoryServiceClient, + namespaceRegistry namespace.Registry, ) *OperatorHandlerImpl { args := NewOperatorHandlerImplArgs{ config, @@ -438,6 +440,8 @@ func OperatorHandlerProvider( saProvider, saManager, healthServer, + historyClient, + namespaceRegistry, } return NewOperatorHandlerImpl(args) } diff --git a/service/frontend/interface_mock.go b/service/frontend/interface_mock.go index 518fa14a8a6..12232ad258e 100644 --- a/service/frontend/interface_mock.go +++ b/service/frontend/interface_mock.go @@ -751,6 +751,21 @@ func (mr *MockOperatorHandlerMockRecorder) DeleteNamespace(arg0, arg1 interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteNamespace", reflect.TypeOf((*MockOperatorHandler)(nil).DeleteNamespace), arg0, arg1) } +// DeleteWorkflowExecution mocks base method. +func (m *MockOperatorHandler) DeleteWorkflowExecution(arg0 context.Context, arg1 *v1.DeleteWorkflowExecutionRequest) (*v1.DeleteWorkflowExecutionResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteWorkflowExecution", arg0, arg1) + ret0, _ := ret[0].(*v1.DeleteWorkflowExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DeleteWorkflowExecution indicates an expected call of DeleteWorkflowExecution. +func (mr *MockOperatorHandlerMockRecorder) DeleteWorkflowExecution(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteWorkflowExecution", reflect.TypeOf((*MockOperatorHandler)(nil).DeleteWorkflowExecution), arg0, arg1) +} + // ListSearchAttributes mocks base method. func (m *MockOperatorHandler) ListSearchAttributes(arg0 context.Context, arg1 *v1.ListSearchAttributesRequest) (*v1.ListSearchAttributesResponse, error) { m.ctrl.T.Helper() diff --git a/service/frontend/operator_handler.go b/service/frontend/operator_handler.go index 8c83a646883..765b0188390 100644 --- a/service/frontend/operator_handler.go +++ b/service/frontend/operator_handler.go @@ -37,6 +37,7 @@ import ( "google.golang.org/grpc/health" healthpb "google.golang.org/grpc/health/grpc_health_v1" + "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" @@ -59,28 +60,32 @@ type ( OperatorHandlerImpl struct { status int32 - healthStatus int32 - logger log.Logger - config *Config - esConfig *esclient.Config - esClient esclient.Client - sdkClientFactory sdk.ClientFactory - metricsClient metrics.Client - saProvider searchattribute.Provider - saManager searchattribute.Manager - healthServer *health.Server + healthStatus int32 + logger log.Logger + config *Config + esConfig *esclient.Config + esClient esclient.Client + sdkClientFactory sdk.ClientFactory + metricsClient metrics.Client + saProvider searchattribute.Provider + saManager searchattribute.Manager + healthServer *health.Server + historyClient historyservice.HistoryServiceClient + namespaceRegistry namespace.Registry } NewOperatorHandlerImplArgs struct { - config *Config - EsConfig *esclient.Config - EsClient esclient.Client - Logger log.Logger - sdkClientFactory sdk.ClientFactory - MetricsClient metrics.Client - SaProvider searchattribute.Provider - SaManager searchattribute.Manager - healthServer *health.Server + config *Config + EsConfig *esclient.Config + EsClient esclient.Client + Logger log.Logger + sdkClientFactory sdk.ClientFactory + MetricsClient metrics.Client + SaProvider searchattribute.Provider + SaManager searchattribute.Manager + healthServer *health.Server + historyClient historyservice.HistoryServiceClient + namespaceRegistry namespace.Registry } ) @@ -90,16 +95,18 @@ func NewOperatorHandlerImpl( ) *OperatorHandlerImpl { handler := &OperatorHandlerImpl{ - logger: args.Logger, - status: common.DaemonStatusInitialized, - config: args.config, - esConfig: args.EsConfig, - esClient: args.EsClient, - sdkClientFactory: args.sdkClientFactory, - metricsClient: args.MetricsClient, - saProvider: args.SaProvider, - saManager: args.SaManager, - healthServer: args.healthServer, + logger: args.Logger, + status: common.DaemonStatusInitialized, + config: args.config, + esConfig: args.EsConfig, + esClient: args.EsClient, + sdkClientFactory: args.sdkClientFactory, + metricsClient: args.MetricsClient, + saProvider: args.SaProvider, + saManager: args.SaManager, + healthServer: args.healthServer, + historyClient: args.historyClient, + namespaceRegistry: args.namespaceRegistry, } return handler @@ -337,6 +344,35 @@ func (h *OperatorHandlerImpl) DeleteNamespace(ctx context.Context, request *oper }, nil } +// DeleteWorkflowExecution deletes a closed workflow execution asynchronously (workflow must be completed or terminated before). +// This method is EXPERIMENTAL and may be changed or removed in a later release. +func (h *OperatorHandlerImpl) DeleteWorkflowExecution(ctx context.Context, request *operatorservice.DeleteWorkflowExecutionRequest) (_ *operatorservice.DeleteWorkflowExecutionResponse, retError error) { + defer log.CapturePanic(h.logger, &retError) + + if request == nil { + return nil, errRequestNotSet + } + + if err := validateExecution(request.WorkflowExecution); err != nil { + return nil, err + } + + namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace())) + if err != nil { + return nil, err + } + + _, err = h.historyClient.DeleteWorkflowExecution(ctx, &historyservice.DeleteWorkflowExecutionRequest{ + NamespaceId: namespaceID.String(), + WorkflowExecution: request.GetWorkflowExecution(), + }) + if err != nil { + return nil, err + } + + return &operatorservice.DeleteWorkflowExecutionResponse{}, nil +} + // startRequestProfile initiates recording of request metrics func (h *OperatorHandlerImpl) startRequestProfile(scope int) (metrics.Scope, metrics.Stopwatch) { metricsScope := h.metricsClient.Scope(scope) diff --git a/service/frontend/operator_handler_test.go b/service/frontend/operator_handler_test.go index b1bf0a58178..47cb4e01d54 100644 --- a/service/frontend/operator_handler_test.go +++ b/service/frontend/operator_handler_test.go @@ -30,6 +30,7 @@ import ( "fmt" "testing" + commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/operatorservice/v1" "google.golang.org/grpc/health" @@ -41,8 +42,10 @@ import ( "go.temporal.io/api/serviceerror" sdkmocks "go.temporal.io/sdk/mocks" + "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client" "go.temporal.io/server/common/resource" "go.temporal.io/server/common/searchattribute" @@ -82,6 +85,8 @@ func (s *operatorHandlerSuite) SetupTest() { s.mockResource.GetSearchAttributesProvider(), s.mockResource.GetSearchAttributesManager(), health.NewServer(), + s.mockResource.GetHistoryClient(), + s.mockResource.GetNamespaceRegistry(), } s.handler = NewOperatorHandlerImpl(args) s.handler.Start() @@ -455,3 +460,71 @@ func (s *operatorHandlerSuite) Test_DeleteNamespace() { mockRun.AssertExpectations(s.T()) mockSdkClient.AssertExpectations(s.T()) } + +func (s *operatorHandlerSuite) Test_DeleteWorkflowExecution() { + handler := s.handler + ctx := context.Background() + + type test struct { + Name string + Request *operatorservice.DeleteWorkflowExecutionRequest + Expected error + } + // request validation tests + testCases1 := []test{ + { + Name: "nil request", + Request: nil, + Expected: &serviceerror.InvalidArgument{Message: "Request is nil."}, + }, + { + Name: "empty request", + Request: &operatorservice.DeleteWorkflowExecutionRequest{}, + Expected: &serviceerror.InvalidArgument{Message: "Execution is not set on request."}, + }, + { + Name: "empty namespace", + Request: &operatorservice.DeleteWorkflowExecutionRequest{ + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: "test-workflow-id", + RunId: "wrong-run-id", + }, + }, + Expected: &serviceerror.InvalidArgument{Message: "Invalid RunId."}, + }, + } + for _, testCase := range testCases1 { + s.T().Run(testCase.Name, func(t *testing.T) { + resp, err := handler.DeleteWorkflowExecution(ctx, testCase.Request) + s.Equal(testCase.Expected, err) + s.Nil(resp) + }) + } + + // History call failed. + s.mockResource.HistoryClient.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, errors.New("random error")) + s.mockResource.NamespaceCache.EXPECT().GetNamespaceID(namespace.Name("test-namespace")).Return(namespace.ID("test-namespace-id"), nil) + resp, err := handler.DeleteWorkflowExecution(ctx, &operatorservice.DeleteWorkflowExecutionRequest{ + Namespace: "test-namespace", + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: "test-workflow-id", + RunId: "d2595cb3-3b21-4026-a3e8-17bc32fb2a2b", + }, + }) + s.Error(err) + s.Equal("random error", err.Error()) + s.Nil(resp) + + // Success case. + s.mockResource.HistoryClient.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any()).Return(&historyservice.DeleteWorkflowExecutionResponse{}, nil) + s.mockResource.NamespaceCache.EXPECT().GetNamespaceID(namespace.Name("test-namespace")).Return(namespace.ID("test-namespace-id"), nil) + resp, err = handler.DeleteWorkflowExecution(ctx, &operatorservice.DeleteWorkflowExecutionRequest{ + Namespace: "test-namespace", + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: "test-workflow-id", + // RunId is not required. + }, + }) + s.NoError(err) + s.NotNil(resp) +}