Skip to content

Commit

Permalink
Expose DeleteWorkflowExecution API on operatorservice (#2761)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin authored Apr 27, 2022
1 parent 8f98374 commit 724ce2a
Show file tree
Hide file tree
Showing 10 changed files with 355 additions and 36 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ require (
go.opentelemetry.io/otel/metric v0.29.0
go.opentelemetry.io/otel/sdk v1.6.3
go.opentelemetry.io/otel/sdk/metric v0.29.0
go.temporal.io/api v1.7.1-0.20220422210026-a8393024cc6b
go.temporal.io/api v1.7.1-0.20220427183350-2fdcaf051044
go.temporal.io/sdk v1.14.1-0.20220422211740-4e64c51b6b07
go.temporal.io/version v0.3.0
go.uber.org/atomic v1.9.0
Expand Down Expand Up @@ -103,13 +103,13 @@ require (
go.uber.org/dig v1.14.1 // indirect
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/net v0.0.0-20220421235706-1d1ef9303861 // indirect
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.10 // indirect
golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220422154200-b37d22cd5731 // indirect
google.golang.org/genproto v0.0.0-20220426171045-31bebdecfb46 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
lukechampine.com/uint128 v1.2.0 // indirect
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,8 @@ go.opentelemetry.io/otel/trace v1.6.3 h1:IqN4L+5b0mPNjdXIiZ90Ni4Bl5BRkDQywePLWem
go.opentelemetry.io/otel/trace v1.6.3/go.mod h1:GNJQusJlUgZl9/TQBPKU/Y/ty+0iVB5fjhKeJGZPGFs=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.temporal.io/api v1.7.1-0.20220422204533-60b4e0146b1c/go.mod h1:afCYZfuhZV+o/LAEf/XkVu4q0WPg/xbR8u0GqouGGyo=
go.temporal.io/api v1.7.1-0.20220422210026-a8393024cc6b h1:SiMPn6Mfv52DhgVqUuafyrHfWfwelzDo0Gbw2Pih308=
go.temporal.io/api v1.7.1-0.20220422210026-a8393024cc6b/go.mod h1:afCYZfuhZV+o/LAEf/XkVu4q0WPg/xbR8u0GqouGGyo=
go.temporal.io/api v1.7.1-0.20220427183350-2fdcaf051044 h1:fsxssXO5gQeJDrBMln3+idupwj9axJfnmIpbzOD3h9E=
go.temporal.io/api v1.7.1-0.20220427183350-2fdcaf051044/go.mod h1:a4GVALRWI60wEQ14xbCm/qbeZgk/zEmvgdnExf1PCxI=
go.temporal.io/sdk v1.14.1-0.20220422211740-4e64c51b6b07 h1:Rwj1peek7MuUBDS7dg/WloHgPnIuks5hgkUxaoSAkXQ=
go.temporal.io/sdk v1.14.1-0.20220422211740-4e64c51b6b07/go.mod h1:1LB9EZ73++tB3zrcQY7tZVxc+L7IShsxED+gnPKb2X0=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
Expand Down Expand Up @@ -567,8 +567,9 @@ golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220325170049-de3da57026de/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220421235706-1d1ef9303861 h1:yssD99+7tqHWO5Gwh81phT+67hg+KttniBr6UnEXOY8=
golang.org/x/net v0.0.0-20220421235706-1d1ef9303861/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 h1:HVyaeDAYux4pnY+D/SiwmLOR36ewZ4iGQIIrtnuCjFA=
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -872,8 +873,9 @@ google.golang.org/genproto v0.0.0-20220310185008-1973136f34c6/go.mod h1:kGP+zUP2
google.golang.org/genproto v0.0.0-20220324131243-acbaeb5b85eb/go.mod h1:hAL49I2IFola2sVEjAn7MEwsja0xp51I0tlGAf9hz4E=
google.golang.org/genproto v0.0.0-20220405205423-9d709892a2bf/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo=
google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo=
google.golang.org/genproto v0.0.0-20220422154200-b37d22cd5731 h1:nquqdM9+ps0JZcIiI70+tqoaIFS5Ql4ZuK8UXnz3HfE=
google.golang.org/genproto v0.0.0-20220422154200-b37d22cd5731/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo=
google.golang.org/genproto v0.0.0-20220426171045-31bebdecfb46 h1:G1IeWbjrqEq9ChWxEuRPJu6laA67+XgTFHVSAvepr38=
google.golang.org/genproto v0.0.0-20220426171045-31bebdecfb46/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo=
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
Expand Down
4 changes: 4 additions & 0 deletions host/integrationbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
namespacepb "go.temporal.io/api/namespace/v1"
"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/api/workflowservice/v1"
"gopkg.in/yaml.v3"

Expand All @@ -62,6 +63,7 @@ type (
testClusterConfig *TestClusterConfig
engine FrontendClient
adminClient AdminClient
operatorClient operatorservice.OperatorServiceClient
Logger log.Logger
namespace string
testRawHistoryNamespaceName string
Expand All @@ -87,13 +89,15 @@ func (s *IntegrationBase) setupSuite(defaultClusterConfigFile string) {

s.engine = NewFrontendClient(connection)
s.adminClient = NewAdminClient(connection)
s.operatorClient = operatorservice.NewOperatorServiceClient(connection)
} else {
s.Logger.Info("Running integration test against test cluster")
cluster, err := NewCluster(clusterConfig, s.Logger)
s.Require().NoError(err)
s.testCluster = cluster
s.engine = s.testCluster.GetFrontendClient()
s.adminClient = s.testCluster.GetAdminClient()
s.operatorClient = s.testCluster.GetOperatorClient()
}

s.namespace = s.randomizeStr("integration-test-namespace")
Expand Down
7 changes: 7 additions & 0 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"github.com/uber-go/tally/v4"
"github.com/uber/tchannel-go"
"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/api/workflowservice/v1"
"go.uber.org/fx"
"google.golang.org/grpc"
Expand Down Expand Up @@ -101,6 +102,7 @@ type (

adminClient adminservice.AdminServiceClient
frontendClient workflowservice.WorkflowServiceClient
operatorClient operatorservice.OperatorServiceClient
historyClient historyservice.HistoryServiceClient
logger log.Logger
clusterMetadataConfig *cluster.Config
Expand Down Expand Up @@ -358,6 +360,10 @@ func (c *temporalImpl) GetAdminClient() adminservice.AdminServiceClient {
return c.adminClient
}

func (c *temporalImpl) GetOperatorClient() operatorservice.OperatorServiceClient {
return c.operatorClient
}

func (c *temporalImpl) GetFrontendClient() workflowservice.WorkflowServiceClient {
return c.frontendClient
}
Expand Down Expand Up @@ -457,6 +463,7 @@ func (c *temporalImpl) startFrontend(hosts map[string][]string, startWG *sync.Wa
connection := rpcFactory.CreateFrontendGRPCConnection(c.FrontendGRPCAddress())
c.frontendClient = NewFrontendClient(connection)
c.adminClient = NewAdminClient(connection)
c.operatorClient = operatorservice.NewOperatorServiceClient(connection)

feApp.Start(context.Background())

Expand Down
5 changes: 5 additions & 0 deletions host/test_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"os"

"github.com/pborman/uuid"
"go.temporal.io/api/operatorservice/v1"

"go.temporal.io/server/api/adminservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
Expand Down Expand Up @@ -323,6 +324,10 @@ func (tc *TestCluster) GetAdminClient() AdminClient {
return tc.host.GetAdminClient()
}

func (tc *TestCluster) GetOperatorClient() operatorservice.OperatorServiceClient {
return tc.host.GetOperatorClient()
}

// GetHistoryClient returns a history client from the test cluster
func (tc *TestCluster) GetHistoryClient() HistoryClient {
return tc.host.GetHistoryClient()
Expand Down
173 changes: 173 additions & 0 deletions host/workflow_delete_execution_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package host

import (
"fmt"
"time"

"github.com/pborman/uuid"
commandpb "go.temporal.io/api/command/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/server/common/primitives/timestamp"
)

func (s *integrationSuite) TestDeleteWorkflowExecution() {
id := "integration-delete-workflow-test"
wt := "integration-delete-workflow-test-type"
tl := "integration-delete-workflow-test-taskqueue"
identity := "worker1"

// Start workflow execution.
we, err := s.engine.StartWorkflowExecution(NewContext(), &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: id,
WorkflowType: &commonpb.WorkflowType{Name: wt},
TaskQueue: &taskqueuepb.TaskQueue{Name: tl},
Input: nil,
WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second),
WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second),
Identity: identity,
})
s.NoError(err)

// Complete workflow.
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType, previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{}},
}}, nil
}

atHandler := func(execution *commonpb.WorkflowExecution, activityType *commonpb.ActivityType, activityID string, input *commonpb.Payloads, taskToken []byte) (*commonpb.Payloads, bool, error) {
return nil, false, nil
}

poller := &TaskPoller{
Engine: s.engine,
Namespace: s.namespace,
TaskQueue: &taskqueuepb.TaskQueue{Name: tl},
Identity: identity,
WorkflowTaskHandler: wtHandler,
ActivityTaskHandler: atHandler,
Logger: s.Logger,
T: s.T(),
}

_, err = poller.PollAndProcessWorkflowTask(false, false)
s.NoError(err)

// Verify that workflow is completed and visibility is updated.
executionsCount := 0
for i := 0; i < 10; i++ {
visibilityResponse, err := s.engine.ListWorkflowExecutions(NewContext(), &workflowservice.ListWorkflowExecutionsRequest{
Namespace: s.namespace,
PageSize: 1,
NextPageToken: nil,
Query: fmt.Sprintf("WorkflowId='%s'", id),
})
s.NoError(err)
if len(visibilityResponse.Executions) == 0 {
time.Sleep(100 * time.Millisecond)
} else {
executionsCount = len(visibilityResponse.Executions)
break
}
}
s.Equal(1, executionsCount)

// Delete workflow execution.
_, err = s.operatorClient.DeleteWorkflowExecution(NewContext(), &operatorservice.DeleteWorkflowExecutionRequest{
Namespace: s.namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: we.RunId,
},
})
s.NoError(err)

executionDeleted := false
for i := 0; i < 10; i++ {
// Check execution is deleted.
describeResponse, err := s.engine.DescribeWorkflowExecution(NewContext(), &workflowservice.DescribeWorkflowExecutionRequest{
Namespace: s.namespace,
Execution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: we.RunId,
},
})
if err == nil {
s.Logger.Warn("Execution not deleted yet")
time.Sleep(100 * time.Millisecond)
continue
}
var notFoundErr *serviceerror.NotFound
s.ErrorAs(err, &notFoundErr)
s.Nil(describeResponse)

// Check history is deleted.
historyResponse, err := s.engine.GetWorkflowExecutionHistory(NewContext(), &workflowservice.GetWorkflowExecutionHistoryRequest{
Namespace: s.namespace,
Execution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: we.RunId,
},
})
var invalidArgumentErr *serviceerror.InvalidArgument
s.ErrorAs(err, &invalidArgumentErr)
s.Nil(historyResponse)

// Check visibility is updated.
for i := 0; i < 10; i++ {
visibilityResponse, err := s.engine.ListWorkflowExecutions(NewContext(), &workflowservice.ListWorkflowExecutionsRequest{
Namespace: s.namespace,
PageSize: 1,
NextPageToken: nil,
Query: fmt.Sprintf("WorkflowId='%s'", id),
})
s.NoError(err)
if len(visibilityResponse.Executions) != 0 {
time.Sleep(100 * time.Millisecond)
} else {
executionsCount = len(visibilityResponse.Executions)
break
}
}
s.Equal(0, executionsCount)

executionDeleted = true
break
}

s.True(executionDeleted)
}
4 changes: 4 additions & 0 deletions service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,8 @@ func OperatorHandlerProvider(
saProvider searchattribute.Provider,
saManager searchattribute.Manager,
healthServer *health.Server,
historyClient historyservice.HistoryServiceClient,
namespaceRegistry namespace.Registry,
) *OperatorHandlerImpl {
args := NewOperatorHandlerImplArgs{
config,
Expand All @@ -438,6 +440,8 @@ func OperatorHandlerProvider(
saProvider,
saManager,
healthServer,
historyClient,
namespaceRegistry,
}
return NewOperatorHandlerImpl(args)
}
Expand Down
15 changes: 15 additions & 0 deletions service/frontend/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 724ce2a

Please sign in to comment.