diff --git a/common/metrics/defs.go b/common/metrics/defs.go index d1c33b7b5bb7..273844e020fe 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -796,10 +796,22 @@ const ( NumAdminScopes ) +// -- Operation scopes for Admin service -- +const ( + // OperatorAddSearchAttributesScope is the metric scope for operator.AddSearchAttributes + OperatorAddSearchAttributesScope = iota + NumAdminScopes + // OperatorRemoveSearchAttributesScope is the metric scope for operator.RemoveSearchAttributes + OperatorRemoveSearchAttributesScope + // OperatorListSearchAttributesScope is the metric scope for operator.GetSearchAttributes + OperatorListSearchAttributesScope + + NumOperatorScopes +) + // -- Operation scopes for Frontend service -- const ( // FrontendStartWorkflowExecutionScope is the metric scope for frontend.StartWorkflowExecution - FrontendStartWorkflowExecutionScope = iota + NumAdminScopes + FrontendStartWorkflowExecutionScope = iota + NumOperatorScopes // FrontendPollWorkflowTaskQueueScope is the metric scope for frontend.PollWorkflowTaskQueue FrontendPollWorkflowTaskQueueScope // FrontendPollActivityTaskQueueScope is the metric scope for frontend.PollActivityTaskQueue @@ -1530,7 +1542,9 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ AdminListClustersScope: {operation: "AdminListClusters"}, AdminAddOrUpdateRemoteClusterScope: {operation: "AdminAddOrUpdateRemoteCluster"}, AdminRemoveRemoteClusterScope: {operation: "AdminRemoveRemoteCluster"}, - + OperatorAddSearchAttributesScope: {operation: "OperatorAddSearchAttributes"}, + OperatorRemoveSearchAttributesScope: {operation: "OperatorRemoveSearchAttributes"}, + OperatorListSearchAttributesScope: {operation: "OperatorListSearchAttributes"}, FrontendStartWorkflowExecutionScope: {operation: "StartWorkflowExecution"}, FrontendPollWorkflowTaskQueueScope: {operation: "PollWorkflowTaskQueue"}, FrontendPollActivityTaskQueueScope: {operation: "PollActivityTaskQueue"}, diff --git a/go.mod b/go.mod index 0f3c5bfa68f3..d784a6ae4482 100644 --- a/go.mod +++ b/go.mod @@ -44,7 +44,7 @@ require ( go.opentelemetry.io/otel/sdk v1.4.0 go.opentelemetry.io/otel/sdk/export/metric v0.27.0 go.opentelemetry.io/otel/sdk/metric v0.27.0 - go.temporal.io/api v1.7.1-0.20220211205804-a4f685c2448b + go.temporal.io/api v1.7.1-0.20220223032354-6e6fe738916a go.temporal.io/sdk v1.13.0 go.temporal.io/version v0.3.0 go.uber.org/atomic v1.9.0 @@ -113,11 +113,11 @@ require ( go.uber.org/dig v1.13.0 // indirect golang.org/x/crypto v0.0.0-20220210151621-f4118a5b28e2 // indirect golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect - golang.org/x/sys v0.0.0-20220209214540-3681064d5158 // indirect + golang.org/x/sys v0.0.0-20220222200937-f2425489ef4c // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/genproto v0.0.0-20220211171837-173942840c17 // indirect + google.golang.org/genproto v0.0.0-20220222213610-43724f9ea8cf // indirect google.golang.org/protobuf v1.27.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect ) diff --git a/go.sum b/go.sum index 9832c8998f1f..116c279b1838 100644 --- a/go.sum +++ b/go.sum @@ -477,8 +477,8 @@ go.opentelemetry.io/otel/trace v1.4.0 h1:4OOUrPZdVFQkbzl/JSdvGCWIdw5ONXXxzHlaLlW go.opentelemetry.io/otel/trace v1.4.0/go.mod h1:uc3eRsqDfWs9R7b92xbQbU42/eTNz4N+gLP8qJCi4aE= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.temporal.io/api v1.6.1-0.20211110205628-60c98e9cbfe2/go.mod h1:IlUgOTGfmJuOkGrCZdptNxyXKE9CQz6oOx7/aH9bFY4= -go.temporal.io/api v1.7.1-0.20220211205804-a4f685c2448b h1:VVkp66hR7QpeJ2lwgx+Wr6zXYUvhfnCybwQyRDfdebg= -go.temporal.io/api v1.7.1-0.20220211205804-a4f685c2448b/go.mod h1:HAD4ieSewx7651I9hHKNalm5GtmOyZ7MSfK7anw2pAA= +go.temporal.io/api v1.7.1-0.20220223032354-6e6fe738916a h1:SgkeoCikBXMd/3fNNtymIfhpxk8o/E3zIZFBFkHzTtU= +go.temporal.io/api v1.7.1-0.20220223032354-6e6fe738916a/go.mod h1:OnUq5eS+Nyx+irKb3Ws5YB7yjGFf5XmI3WcVRU9COEo= go.temporal.io/sdk v1.13.0 h1:8PW27o/uYAf1C1u8WUd6LNa6He2nYkBhdUX3c5gif5o= go.temporal.io/sdk v1.13.0/go.mod h1:TCof7U/xas2FyDnx/UUEv4c/O/S41Lnhva+6JVer+Jo= go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig= @@ -694,8 +694,8 @@ golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220209214540-3681064d5158 h1:rm+CHSpPEEW2IsXUib1ThaHIjuBVZjxNgSKmBLFfD4c= -golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220222200937-f2425489ef4c h1:sSIdNI2Dd6vGv47bKc/xArpfxVmEz2+3j0E6I484xC4= +golang.org/x/sys v0.0.0-20220222200937-f2425489ef4c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -891,8 +891,8 @@ google.golang.org/genproto v0.0.0-20220111164026-67b88f271998/go.mod h1:5CzLGKJ6 google.golang.org/genproto v0.0.0-20220114231437-d2e6a121cae0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20220201184016-50beb8ab5c44/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20220204002441-d6cc3cc0770e/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= -google.golang.org/genproto v0.0.0-20220211171837-173942840c17 h1:2X+CNIheCutWRyKRte8szGxrE5ggtV4U+NKAbh/oLhg= -google.golang.org/genproto v0.0.0-20220211171837-173942840c17/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI= +google.golang.org/genproto v0.0.0-20220222213610-43724f9ea8cf h1:SVYXkUz2yZS9FWb2Gm8ivSlbNQzL2Z/NpPKE3RG2jWk= +google.golang.org/genproto v0.0.0-20220222213610-43724f9ea8cf/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI= google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= diff --git a/proto/api b/proto/api index e82978c745a0..ef2e696635ec 160000 --- a/proto/api +++ b/proto/api @@ -1 +1 @@ -Subproject commit e82978c745a07fb8820348ad77b1d02e226d182e +Subproject commit ef2e696635ec368813d0e9060a0da825aec93419 diff --git a/service/frontend/fx.go b/service/frontend/fx.go index e2577b7e7969..e8f8a4f3c258 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -86,6 +86,7 @@ var Module = fx.Options( fx.Provide(HandlerProvider), fx.Provide(func(so []grpc.ServerOption) *grpc.Server { return grpc.NewServer(so...) }), fx.Provide(AdminHandlerProvider), + fx.Provide(OperatorHandlerProvider), fx.Provide(NewVersionChecker), fx.Provide(ServiceResolverProvider), fx.Provide(NewServiceProvider), @@ -97,6 +98,7 @@ func NewServiceProvider( server *grpc.Server, handler Handler, adminHandler *AdminHandler, + operatorHandler *OperatorHandlerImpl, versionChecker *VersionChecker, visibilityMgr manager.VisibilityManager, logger resource.SnTaggedLogger, @@ -109,6 +111,7 @@ func NewServiceProvider( server, handler, adminHandler, + operatorHandler, versionChecker, visibilityMgr, logger, @@ -398,6 +401,27 @@ func AdminHandlerProvider( return NewAdminHandler(args) } +func OperatorHandlerProvider( + esConfig *esclient.Config, + esClient esclient.Client, + logger resource.SnTaggedLogger, + sdkSystemClient sdkclient.Client, + metricsClient metrics.Client, + saProvider searchattribute.Provider, + saManager searchattribute.Manager, +) *OperatorHandlerImpl { + args := NewOperatorHandlerImplArgs{ + esConfig, + esClient, + logger, + sdkSystemClient, + metricsClient, + saProvider, + saManager, + } + return NewOperatorHandlerImpl(args) +} + func HandlerProvider( params *resource.BootstrapParams, serviceConfig *Config, diff --git a/service/frontend/interface.go b/service/frontend/interface.go index 80de64301785..13bbfe23d99d 100644 --- a/service/frontend/interface.go +++ b/service/frontend/interface.go @@ -27,6 +27,7 @@ package frontend import ( + "go.temporal.io/api/operatorservice/v1" "go.temporal.io/api/workflowservice/v1" healthpb "google.golang.org/grpc/health/grpc_health_v1" @@ -35,12 +36,12 @@ import ( ) type ( - // Handler is interface wrapping frontend handler + // Handler is interface wrapping frontend workflow handler Handler interface { workflowservice.WorkflowServiceServer common.Daemon - // Health is the health check method for this rpc handler + // HealthServer is the health check method for the whole frontend server healthpb.HealthServer // UpdateHealthStatus sets the health status for this rpc handler. // This health status will be used within the rpc health check handler @@ -48,4 +49,10 @@ type ( GetConfig() *Config } + + // OperatorHandler is interface wrapping frontend workflow handler + OperatorHandler interface { + operatorservice.OperatorServiceServer + common.Daemon + } ) diff --git a/service/frontend/interface_mock.go b/service/frontend/interface_mock.go index 12d6737eb7e4..580a234402cd 100644 --- a/service/frontend/interface_mock.go +++ b/service/frontend/interface_mock.go @@ -33,7 +33,8 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" - v1 "go.temporal.io/api/workflowservice/v1" + v1 "go.temporal.io/api/operatorservice/v1" + v10 "go.temporal.io/api/workflowservice/v1" grpc_health_v1 "google.golang.org/grpc/health/grpc_health_v1" ) @@ -76,10 +77,10 @@ func (mr *MockHandlerMockRecorder) Check(arg0, arg1 interface{}) *gomock.Call { } // CountWorkflowExecutions mocks base method. -func (m *MockHandler) CountWorkflowExecutions(arg0 context.Context, arg1 *v1.CountWorkflowExecutionsRequest) (*v1.CountWorkflowExecutionsResponse, error) { +func (m *MockHandler) CountWorkflowExecutions(arg0 context.Context, arg1 *v10.CountWorkflowExecutionsRequest) (*v10.CountWorkflowExecutionsResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CountWorkflowExecutions", arg0, arg1) - ret0, _ := ret[0].(*v1.CountWorkflowExecutionsResponse) + ret0, _ := ret[0].(*v10.CountWorkflowExecutionsResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -91,10 +92,10 @@ func (mr *MockHandlerMockRecorder) CountWorkflowExecutions(arg0, arg1 interface{ } // DeprecateNamespace mocks base method. -func (m *MockHandler) DeprecateNamespace(arg0 context.Context, arg1 *v1.DeprecateNamespaceRequest) (*v1.DeprecateNamespaceResponse, error) { +func (m *MockHandler) DeprecateNamespace(arg0 context.Context, arg1 *v10.DeprecateNamespaceRequest) (*v10.DeprecateNamespaceResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeprecateNamespace", arg0, arg1) - ret0, _ := ret[0].(*v1.DeprecateNamespaceResponse) + ret0, _ := ret[0].(*v10.DeprecateNamespaceResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -106,10 +107,10 @@ func (mr *MockHandlerMockRecorder) DeprecateNamespace(arg0, arg1 interface{}) *g } // DescribeNamespace mocks base method. -func (m *MockHandler) DescribeNamespace(arg0 context.Context, arg1 *v1.DescribeNamespaceRequest) (*v1.DescribeNamespaceResponse, error) { +func (m *MockHandler) DescribeNamespace(arg0 context.Context, arg1 *v10.DescribeNamespaceRequest) (*v10.DescribeNamespaceResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DescribeNamespace", arg0, arg1) - ret0, _ := ret[0].(*v1.DescribeNamespaceResponse) + ret0, _ := ret[0].(*v10.DescribeNamespaceResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -121,10 +122,10 @@ func (mr *MockHandlerMockRecorder) DescribeNamespace(arg0, arg1 interface{}) *go } // DescribeTaskQueue mocks base method. -func (m *MockHandler) DescribeTaskQueue(arg0 context.Context, arg1 *v1.DescribeTaskQueueRequest) (*v1.DescribeTaskQueueResponse, error) { +func (m *MockHandler) DescribeTaskQueue(arg0 context.Context, arg1 *v10.DescribeTaskQueueRequest) (*v10.DescribeTaskQueueResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DescribeTaskQueue", arg0, arg1) - ret0, _ := ret[0].(*v1.DescribeTaskQueueResponse) + ret0, _ := ret[0].(*v10.DescribeTaskQueueResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -136,10 +137,10 @@ func (mr *MockHandlerMockRecorder) DescribeTaskQueue(arg0, arg1 interface{}) *go } // DescribeWorkflowExecution mocks base method. -func (m *MockHandler) DescribeWorkflowExecution(arg0 context.Context, arg1 *v1.DescribeWorkflowExecutionRequest) (*v1.DescribeWorkflowExecutionResponse, error) { +func (m *MockHandler) DescribeWorkflowExecution(arg0 context.Context, arg1 *v10.DescribeWorkflowExecutionRequest) (*v10.DescribeWorkflowExecutionResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DescribeWorkflowExecution", arg0, arg1) - ret0, _ := ret[0].(*v1.DescribeWorkflowExecutionResponse) + ret0, _ := ret[0].(*v10.DescribeWorkflowExecutionResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -151,10 +152,10 @@ func (mr *MockHandlerMockRecorder) DescribeWorkflowExecution(arg0, arg1 interfac } // GetClusterInfo mocks base method. -func (m *MockHandler) GetClusterInfo(arg0 context.Context, arg1 *v1.GetClusterInfoRequest) (*v1.GetClusterInfoResponse, error) { +func (m *MockHandler) GetClusterInfo(arg0 context.Context, arg1 *v10.GetClusterInfoRequest) (*v10.GetClusterInfoResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetClusterInfo", arg0, arg1) - ret0, _ := ret[0].(*v1.GetClusterInfoResponse) + ret0, _ := ret[0].(*v10.GetClusterInfoResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -180,10 +181,10 @@ func (mr *MockHandlerMockRecorder) GetConfig() *gomock.Call { } // GetSearchAttributes mocks base method. -func (m *MockHandler) GetSearchAttributes(arg0 context.Context, arg1 *v1.GetSearchAttributesRequest) (*v1.GetSearchAttributesResponse, error) { +func (m *MockHandler) GetSearchAttributes(arg0 context.Context, arg1 *v10.GetSearchAttributesRequest) (*v10.GetSearchAttributesResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetSearchAttributes", arg0, arg1) - ret0, _ := ret[0].(*v1.GetSearchAttributesResponse) + ret0, _ := ret[0].(*v10.GetSearchAttributesResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -195,10 +196,10 @@ func (mr *MockHandlerMockRecorder) GetSearchAttributes(arg0, arg1 interface{}) * } // GetSystemInfo mocks base method. -func (m *MockHandler) GetSystemInfo(arg0 context.Context, arg1 *v1.GetSystemInfoRequest) (*v1.GetSystemInfoResponse, error) { +func (m *MockHandler) GetSystemInfo(arg0 context.Context, arg1 *v10.GetSystemInfoRequest) (*v10.GetSystemInfoResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetSystemInfo", arg0, arg1) - ret0, _ := ret[0].(*v1.GetSystemInfoResponse) + ret0, _ := ret[0].(*v10.GetSystemInfoResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -210,10 +211,10 @@ func (mr *MockHandlerMockRecorder) GetSystemInfo(arg0, arg1 interface{}) *gomock } // GetWorkflowExecutionHistory mocks base method. -func (m *MockHandler) GetWorkflowExecutionHistory(arg0 context.Context, arg1 *v1.GetWorkflowExecutionHistoryRequest) (*v1.GetWorkflowExecutionHistoryResponse, error) { +func (m *MockHandler) GetWorkflowExecutionHistory(arg0 context.Context, arg1 *v10.GetWorkflowExecutionHistoryRequest) (*v10.GetWorkflowExecutionHistoryResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetWorkflowExecutionHistory", arg0, arg1) - ret0, _ := ret[0].(*v1.GetWorkflowExecutionHistoryResponse) + ret0, _ := ret[0].(*v10.GetWorkflowExecutionHistoryResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -225,10 +226,10 @@ func (mr *MockHandlerMockRecorder) GetWorkflowExecutionHistory(arg0, arg1 interf } // GetWorkflowExecutionHistoryReverse mocks base method. -func (m *MockHandler) GetWorkflowExecutionHistoryReverse(arg0 context.Context, arg1 *v1.GetWorkflowExecutionHistoryReverseRequest) (*v1.GetWorkflowExecutionHistoryReverseResponse, error) { +func (m *MockHandler) GetWorkflowExecutionHistoryReverse(arg0 context.Context, arg1 *v10.GetWorkflowExecutionHistoryReverseRequest) (*v10.GetWorkflowExecutionHistoryReverseResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetWorkflowExecutionHistoryReverse", arg0, arg1) - ret0, _ := ret[0].(*v1.GetWorkflowExecutionHistoryReverseResponse) + ret0, _ := ret[0].(*v10.GetWorkflowExecutionHistoryReverseResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -240,10 +241,10 @@ func (mr *MockHandlerMockRecorder) GetWorkflowExecutionHistoryReverse(arg0, arg1 } // ListArchivedWorkflowExecutions mocks base method. -func (m *MockHandler) ListArchivedWorkflowExecutions(arg0 context.Context, arg1 *v1.ListArchivedWorkflowExecutionsRequest) (*v1.ListArchivedWorkflowExecutionsResponse, error) { +func (m *MockHandler) ListArchivedWorkflowExecutions(arg0 context.Context, arg1 *v10.ListArchivedWorkflowExecutionsRequest) (*v10.ListArchivedWorkflowExecutionsResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListArchivedWorkflowExecutions", arg0, arg1) - ret0, _ := ret[0].(*v1.ListArchivedWorkflowExecutionsResponse) + ret0, _ := ret[0].(*v10.ListArchivedWorkflowExecutionsResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -255,10 +256,10 @@ func (mr *MockHandlerMockRecorder) ListArchivedWorkflowExecutions(arg0, arg1 int } // ListClosedWorkflowExecutions mocks base method. -func (m *MockHandler) ListClosedWorkflowExecutions(arg0 context.Context, arg1 *v1.ListClosedWorkflowExecutionsRequest) (*v1.ListClosedWorkflowExecutionsResponse, error) { +func (m *MockHandler) ListClosedWorkflowExecutions(arg0 context.Context, arg1 *v10.ListClosedWorkflowExecutionsRequest) (*v10.ListClosedWorkflowExecutionsResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListClosedWorkflowExecutions", arg0, arg1) - ret0, _ := ret[0].(*v1.ListClosedWorkflowExecutionsResponse) + ret0, _ := ret[0].(*v10.ListClosedWorkflowExecutionsResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -270,10 +271,10 @@ func (mr *MockHandlerMockRecorder) ListClosedWorkflowExecutions(arg0, arg1 inter } // ListNamespaces mocks base method. -func (m *MockHandler) ListNamespaces(arg0 context.Context, arg1 *v1.ListNamespacesRequest) (*v1.ListNamespacesResponse, error) { +func (m *MockHandler) ListNamespaces(arg0 context.Context, arg1 *v10.ListNamespacesRequest) (*v10.ListNamespacesResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListNamespaces", arg0, arg1) - ret0, _ := ret[0].(*v1.ListNamespacesResponse) + ret0, _ := ret[0].(*v10.ListNamespacesResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -285,10 +286,10 @@ func (mr *MockHandlerMockRecorder) ListNamespaces(arg0, arg1 interface{}) *gomoc } // ListOpenWorkflowExecutions mocks base method. -func (m *MockHandler) ListOpenWorkflowExecutions(arg0 context.Context, arg1 *v1.ListOpenWorkflowExecutionsRequest) (*v1.ListOpenWorkflowExecutionsResponse, error) { +func (m *MockHandler) ListOpenWorkflowExecutions(arg0 context.Context, arg1 *v10.ListOpenWorkflowExecutionsRequest) (*v10.ListOpenWorkflowExecutionsResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListOpenWorkflowExecutions", arg0, arg1) - ret0, _ := ret[0].(*v1.ListOpenWorkflowExecutionsResponse) + ret0, _ := ret[0].(*v10.ListOpenWorkflowExecutionsResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -300,10 +301,10 @@ func (mr *MockHandlerMockRecorder) ListOpenWorkflowExecutions(arg0, arg1 interfa } // ListTaskQueuePartitions mocks base method. -func (m *MockHandler) ListTaskQueuePartitions(arg0 context.Context, arg1 *v1.ListTaskQueuePartitionsRequest) (*v1.ListTaskQueuePartitionsResponse, error) { +func (m *MockHandler) ListTaskQueuePartitions(arg0 context.Context, arg1 *v10.ListTaskQueuePartitionsRequest) (*v10.ListTaskQueuePartitionsResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListTaskQueuePartitions", arg0, arg1) - ret0, _ := ret[0].(*v1.ListTaskQueuePartitionsResponse) + ret0, _ := ret[0].(*v10.ListTaskQueuePartitionsResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -315,10 +316,10 @@ func (mr *MockHandlerMockRecorder) ListTaskQueuePartitions(arg0, arg1 interface{ } // ListWorkflowExecutions mocks base method. -func (m *MockHandler) ListWorkflowExecutions(arg0 context.Context, arg1 *v1.ListWorkflowExecutionsRequest) (*v1.ListWorkflowExecutionsResponse, error) { +func (m *MockHandler) ListWorkflowExecutions(arg0 context.Context, arg1 *v10.ListWorkflowExecutionsRequest) (*v10.ListWorkflowExecutionsResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListWorkflowExecutions", arg0, arg1) - ret0, _ := ret[0].(*v1.ListWorkflowExecutionsResponse) + ret0, _ := ret[0].(*v10.ListWorkflowExecutionsResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -330,10 +331,10 @@ func (mr *MockHandlerMockRecorder) ListWorkflowExecutions(arg0, arg1 interface{} } // PollActivityTaskQueue mocks base method. -func (m *MockHandler) PollActivityTaskQueue(arg0 context.Context, arg1 *v1.PollActivityTaskQueueRequest) (*v1.PollActivityTaskQueueResponse, error) { +func (m *MockHandler) PollActivityTaskQueue(arg0 context.Context, arg1 *v10.PollActivityTaskQueueRequest) (*v10.PollActivityTaskQueueResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PollActivityTaskQueue", arg0, arg1) - ret0, _ := ret[0].(*v1.PollActivityTaskQueueResponse) + ret0, _ := ret[0].(*v10.PollActivityTaskQueueResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -345,10 +346,10 @@ func (mr *MockHandlerMockRecorder) PollActivityTaskQueue(arg0, arg1 interface{}) } // PollWorkflowTaskQueue mocks base method. -func (m *MockHandler) PollWorkflowTaskQueue(arg0 context.Context, arg1 *v1.PollWorkflowTaskQueueRequest) (*v1.PollWorkflowTaskQueueResponse, error) { +func (m *MockHandler) PollWorkflowTaskQueue(arg0 context.Context, arg1 *v10.PollWorkflowTaskQueueRequest) (*v10.PollWorkflowTaskQueueResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PollWorkflowTaskQueue", arg0, arg1) - ret0, _ := ret[0].(*v1.PollWorkflowTaskQueueResponse) + ret0, _ := ret[0].(*v10.PollWorkflowTaskQueueResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -360,10 +361,10 @@ func (mr *MockHandlerMockRecorder) PollWorkflowTaskQueue(arg0, arg1 interface{}) } // QueryWorkflow mocks base method. -func (m *MockHandler) QueryWorkflow(arg0 context.Context, arg1 *v1.QueryWorkflowRequest) (*v1.QueryWorkflowResponse, error) { +func (m *MockHandler) QueryWorkflow(arg0 context.Context, arg1 *v10.QueryWorkflowRequest) (*v10.QueryWorkflowResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "QueryWorkflow", arg0, arg1) - ret0, _ := ret[0].(*v1.QueryWorkflowResponse) + ret0, _ := ret[0].(*v10.QueryWorkflowResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -375,10 +376,10 @@ func (mr *MockHandlerMockRecorder) QueryWorkflow(arg0, arg1 interface{}) *gomock } // RecordActivityTaskHeartbeat mocks base method. -func (m *MockHandler) RecordActivityTaskHeartbeat(arg0 context.Context, arg1 *v1.RecordActivityTaskHeartbeatRequest) (*v1.RecordActivityTaskHeartbeatResponse, error) { +func (m *MockHandler) RecordActivityTaskHeartbeat(arg0 context.Context, arg1 *v10.RecordActivityTaskHeartbeatRequest) (*v10.RecordActivityTaskHeartbeatResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RecordActivityTaskHeartbeat", arg0, arg1) - ret0, _ := ret[0].(*v1.RecordActivityTaskHeartbeatResponse) + ret0, _ := ret[0].(*v10.RecordActivityTaskHeartbeatResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -390,10 +391,10 @@ func (mr *MockHandlerMockRecorder) RecordActivityTaskHeartbeat(arg0, arg1 interf } // RecordActivityTaskHeartbeatById mocks base method. -func (m *MockHandler) RecordActivityTaskHeartbeatById(arg0 context.Context, arg1 *v1.RecordActivityTaskHeartbeatByIdRequest) (*v1.RecordActivityTaskHeartbeatByIdResponse, error) { +func (m *MockHandler) RecordActivityTaskHeartbeatById(arg0 context.Context, arg1 *v10.RecordActivityTaskHeartbeatByIdRequest) (*v10.RecordActivityTaskHeartbeatByIdResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RecordActivityTaskHeartbeatById", arg0, arg1) - ret0, _ := ret[0].(*v1.RecordActivityTaskHeartbeatByIdResponse) + ret0, _ := ret[0].(*v10.RecordActivityTaskHeartbeatByIdResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -405,10 +406,10 @@ func (mr *MockHandlerMockRecorder) RecordActivityTaskHeartbeatById(arg0, arg1 in } // RegisterNamespace mocks base method. -func (m *MockHandler) RegisterNamespace(arg0 context.Context, arg1 *v1.RegisterNamespaceRequest) (*v1.RegisterNamespaceResponse, error) { +func (m *MockHandler) RegisterNamespace(arg0 context.Context, arg1 *v10.RegisterNamespaceRequest) (*v10.RegisterNamespaceResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RegisterNamespace", arg0, arg1) - ret0, _ := ret[0].(*v1.RegisterNamespaceResponse) + ret0, _ := ret[0].(*v10.RegisterNamespaceResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -420,10 +421,10 @@ func (mr *MockHandlerMockRecorder) RegisterNamespace(arg0, arg1 interface{}) *go } // RequestCancelWorkflowExecution mocks base method. -func (m *MockHandler) RequestCancelWorkflowExecution(arg0 context.Context, arg1 *v1.RequestCancelWorkflowExecutionRequest) (*v1.RequestCancelWorkflowExecutionResponse, error) { +func (m *MockHandler) RequestCancelWorkflowExecution(arg0 context.Context, arg1 *v10.RequestCancelWorkflowExecutionRequest) (*v10.RequestCancelWorkflowExecutionResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RequestCancelWorkflowExecution", arg0, arg1) - ret0, _ := ret[0].(*v1.RequestCancelWorkflowExecutionResponse) + ret0, _ := ret[0].(*v10.RequestCancelWorkflowExecutionResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -435,10 +436,10 @@ func (mr *MockHandlerMockRecorder) RequestCancelWorkflowExecution(arg0, arg1 int } // ResetStickyTaskQueue mocks base method. -func (m *MockHandler) ResetStickyTaskQueue(arg0 context.Context, arg1 *v1.ResetStickyTaskQueueRequest) (*v1.ResetStickyTaskQueueResponse, error) { +func (m *MockHandler) ResetStickyTaskQueue(arg0 context.Context, arg1 *v10.ResetStickyTaskQueueRequest) (*v10.ResetStickyTaskQueueResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ResetStickyTaskQueue", arg0, arg1) - ret0, _ := ret[0].(*v1.ResetStickyTaskQueueResponse) + ret0, _ := ret[0].(*v10.ResetStickyTaskQueueResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -450,10 +451,10 @@ func (mr *MockHandlerMockRecorder) ResetStickyTaskQueue(arg0, arg1 interface{}) } // ResetWorkflowExecution mocks base method. -func (m *MockHandler) ResetWorkflowExecution(arg0 context.Context, arg1 *v1.ResetWorkflowExecutionRequest) (*v1.ResetWorkflowExecutionResponse, error) { +func (m *MockHandler) ResetWorkflowExecution(arg0 context.Context, arg1 *v10.ResetWorkflowExecutionRequest) (*v10.ResetWorkflowExecutionResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ResetWorkflowExecution", arg0, arg1) - ret0, _ := ret[0].(*v1.ResetWorkflowExecutionResponse) + ret0, _ := ret[0].(*v10.ResetWorkflowExecutionResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -465,10 +466,10 @@ func (mr *MockHandlerMockRecorder) ResetWorkflowExecution(arg0, arg1 interface{} } // RespondActivityTaskCanceled mocks base method. -func (m *MockHandler) RespondActivityTaskCanceled(arg0 context.Context, arg1 *v1.RespondActivityTaskCanceledRequest) (*v1.RespondActivityTaskCanceledResponse, error) { +func (m *MockHandler) RespondActivityTaskCanceled(arg0 context.Context, arg1 *v10.RespondActivityTaskCanceledRequest) (*v10.RespondActivityTaskCanceledResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RespondActivityTaskCanceled", arg0, arg1) - ret0, _ := ret[0].(*v1.RespondActivityTaskCanceledResponse) + ret0, _ := ret[0].(*v10.RespondActivityTaskCanceledResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -480,10 +481,10 @@ func (mr *MockHandlerMockRecorder) RespondActivityTaskCanceled(arg0, arg1 interf } // RespondActivityTaskCanceledById mocks base method. -func (m *MockHandler) RespondActivityTaskCanceledById(arg0 context.Context, arg1 *v1.RespondActivityTaskCanceledByIdRequest) (*v1.RespondActivityTaskCanceledByIdResponse, error) { +func (m *MockHandler) RespondActivityTaskCanceledById(arg0 context.Context, arg1 *v10.RespondActivityTaskCanceledByIdRequest) (*v10.RespondActivityTaskCanceledByIdResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RespondActivityTaskCanceledById", arg0, arg1) - ret0, _ := ret[0].(*v1.RespondActivityTaskCanceledByIdResponse) + ret0, _ := ret[0].(*v10.RespondActivityTaskCanceledByIdResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -495,10 +496,10 @@ func (mr *MockHandlerMockRecorder) RespondActivityTaskCanceledById(arg0, arg1 in } // RespondActivityTaskCompleted mocks base method. -func (m *MockHandler) RespondActivityTaskCompleted(arg0 context.Context, arg1 *v1.RespondActivityTaskCompletedRequest) (*v1.RespondActivityTaskCompletedResponse, error) { +func (m *MockHandler) RespondActivityTaskCompleted(arg0 context.Context, arg1 *v10.RespondActivityTaskCompletedRequest) (*v10.RespondActivityTaskCompletedResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RespondActivityTaskCompleted", arg0, arg1) - ret0, _ := ret[0].(*v1.RespondActivityTaskCompletedResponse) + ret0, _ := ret[0].(*v10.RespondActivityTaskCompletedResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -510,10 +511,10 @@ func (mr *MockHandlerMockRecorder) RespondActivityTaskCompleted(arg0, arg1 inter } // RespondActivityTaskCompletedById mocks base method. -func (m *MockHandler) RespondActivityTaskCompletedById(arg0 context.Context, arg1 *v1.RespondActivityTaskCompletedByIdRequest) (*v1.RespondActivityTaskCompletedByIdResponse, error) { +func (m *MockHandler) RespondActivityTaskCompletedById(arg0 context.Context, arg1 *v10.RespondActivityTaskCompletedByIdRequest) (*v10.RespondActivityTaskCompletedByIdResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RespondActivityTaskCompletedById", arg0, arg1) - ret0, _ := ret[0].(*v1.RespondActivityTaskCompletedByIdResponse) + ret0, _ := ret[0].(*v10.RespondActivityTaskCompletedByIdResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -525,10 +526,10 @@ func (mr *MockHandlerMockRecorder) RespondActivityTaskCompletedById(arg0, arg1 i } // RespondActivityTaskFailed mocks base method. -func (m *MockHandler) RespondActivityTaskFailed(arg0 context.Context, arg1 *v1.RespondActivityTaskFailedRequest) (*v1.RespondActivityTaskFailedResponse, error) { +func (m *MockHandler) RespondActivityTaskFailed(arg0 context.Context, arg1 *v10.RespondActivityTaskFailedRequest) (*v10.RespondActivityTaskFailedResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RespondActivityTaskFailed", arg0, arg1) - ret0, _ := ret[0].(*v1.RespondActivityTaskFailedResponse) + ret0, _ := ret[0].(*v10.RespondActivityTaskFailedResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -540,10 +541,10 @@ func (mr *MockHandlerMockRecorder) RespondActivityTaskFailed(arg0, arg1 interfac } // RespondActivityTaskFailedById mocks base method. -func (m *MockHandler) RespondActivityTaskFailedById(arg0 context.Context, arg1 *v1.RespondActivityTaskFailedByIdRequest) (*v1.RespondActivityTaskFailedByIdResponse, error) { +func (m *MockHandler) RespondActivityTaskFailedById(arg0 context.Context, arg1 *v10.RespondActivityTaskFailedByIdRequest) (*v10.RespondActivityTaskFailedByIdResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RespondActivityTaskFailedById", arg0, arg1) - ret0, _ := ret[0].(*v1.RespondActivityTaskFailedByIdResponse) + ret0, _ := ret[0].(*v10.RespondActivityTaskFailedByIdResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -555,10 +556,10 @@ func (mr *MockHandlerMockRecorder) RespondActivityTaskFailedById(arg0, arg1 inte } // RespondQueryTaskCompleted mocks base method. -func (m *MockHandler) RespondQueryTaskCompleted(arg0 context.Context, arg1 *v1.RespondQueryTaskCompletedRequest) (*v1.RespondQueryTaskCompletedResponse, error) { +func (m *MockHandler) RespondQueryTaskCompleted(arg0 context.Context, arg1 *v10.RespondQueryTaskCompletedRequest) (*v10.RespondQueryTaskCompletedResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RespondQueryTaskCompleted", arg0, arg1) - ret0, _ := ret[0].(*v1.RespondQueryTaskCompletedResponse) + ret0, _ := ret[0].(*v10.RespondQueryTaskCompletedResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -570,10 +571,10 @@ func (mr *MockHandlerMockRecorder) RespondQueryTaskCompleted(arg0, arg1 interfac } // RespondWorkflowTaskCompleted mocks base method. -func (m *MockHandler) RespondWorkflowTaskCompleted(arg0 context.Context, arg1 *v1.RespondWorkflowTaskCompletedRequest) (*v1.RespondWorkflowTaskCompletedResponse, error) { +func (m *MockHandler) RespondWorkflowTaskCompleted(arg0 context.Context, arg1 *v10.RespondWorkflowTaskCompletedRequest) (*v10.RespondWorkflowTaskCompletedResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RespondWorkflowTaskCompleted", arg0, arg1) - ret0, _ := ret[0].(*v1.RespondWorkflowTaskCompletedResponse) + ret0, _ := ret[0].(*v10.RespondWorkflowTaskCompletedResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -585,10 +586,10 @@ func (mr *MockHandlerMockRecorder) RespondWorkflowTaskCompleted(arg0, arg1 inter } // RespondWorkflowTaskFailed mocks base method. -func (m *MockHandler) RespondWorkflowTaskFailed(arg0 context.Context, arg1 *v1.RespondWorkflowTaskFailedRequest) (*v1.RespondWorkflowTaskFailedResponse, error) { +func (m *MockHandler) RespondWorkflowTaskFailed(arg0 context.Context, arg1 *v10.RespondWorkflowTaskFailedRequest) (*v10.RespondWorkflowTaskFailedResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RespondWorkflowTaskFailed", arg0, arg1) - ret0, _ := ret[0].(*v1.RespondWorkflowTaskFailedResponse) + ret0, _ := ret[0].(*v10.RespondWorkflowTaskFailedResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -600,10 +601,10 @@ func (mr *MockHandlerMockRecorder) RespondWorkflowTaskFailed(arg0, arg1 interfac } // ScanWorkflowExecutions mocks base method. -func (m *MockHandler) ScanWorkflowExecutions(arg0 context.Context, arg1 *v1.ScanWorkflowExecutionsRequest) (*v1.ScanWorkflowExecutionsResponse, error) { +func (m *MockHandler) ScanWorkflowExecutions(arg0 context.Context, arg1 *v10.ScanWorkflowExecutionsRequest) (*v10.ScanWorkflowExecutionsResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ScanWorkflowExecutions", arg0, arg1) - ret0, _ := ret[0].(*v1.ScanWorkflowExecutionsResponse) + ret0, _ := ret[0].(*v10.ScanWorkflowExecutionsResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -615,10 +616,10 @@ func (mr *MockHandlerMockRecorder) ScanWorkflowExecutions(arg0, arg1 interface{} } // SignalWithStartWorkflowExecution mocks base method. -func (m *MockHandler) SignalWithStartWorkflowExecution(arg0 context.Context, arg1 *v1.SignalWithStartWorkflowExecutionRequest) (*v1.SignalWithStartWorkflowExecutionResponse, error) { +func (m *MockHandler) SignalWithStartWorkflowExecution(arg0 context.Context, arg1 *v10.SignalWithStartWorkflowExecutionRequest) (*v10.SignalWithStartWorkflowExecutionResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SignalWithStartWorkflowExecution", arg0, arg1) - ret0, _ := ret[0].(*v1.SignalWithStartWorkflowExecutionResponse) + ret0, _ := ret[0].(*v10.SignalWithStartWorkflowExecutionResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -630,10 +631,10 @@ func (mr *MockHandlerMockRecorder) SignalWithStartWorkflowExecution(arg0, arg1 i } // SignalWorkflowExecution mocks base method. -func (m *MockHandler) SignalWorkflowExecution(arg0 context.Context, arg1 *v1.SignalWorkflowExecutionRequest) (*v1.SignalWorkflowExecutionResponse, error) { +func (m *MockHandler) SignalWorkflowExecution(arg0 context.Context, arg1 *v10.SignalWorkflowExecutionRequest) (*v10.SignalWorkflowExecutionResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SignalWorkflowExecution", arg0, arg1) - ret0, _ := ret[0].(*v1.SignalWorkflowExecutionResponse) + ret0, _ := ret[0].(*v10.SignalWorkflowExecutionResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -657,10 +658,10 @@ func (mr *MockHandlerMockRecorder) Start() *gomock.Call { } // StartWorkflowExecution mocks base method. -func (m *MockHandler) StartWorkflowExecution(arg0 context.Context, arg1 *v1.StartWorkflowExecutionRequest) (*v1.StartWorkflowExecutionResponse, error) { +func (m *MockHandler) StartWorkflowExecution(arg0 context.Context, arg1 *v10.StartWorkflowExecutionRequest) (*v10.StartWorkflowExecutionResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "StartWorkflowExecution", arg0, arg1) - ret0, _ := ret[0].(*v1.StartWorkflowExecutionResponse) + ret0, _ := ret[0].(*v10.StartWorkflowExecutionResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -684,10 +685,10 @@ func (mr *MockHandlerMockRecorder) Stop() *gomock.Call { } // TerminateWorkflowExecution mocks base method. -func (m *MockHandler) TerminateWorkflowExecution(arg0 context.Context, arg1 *v1.TerminateWorkflowExecutionRequest) (*v1.TerminateWorkflowExecutionResponse, error) { +func (m *MockHandler) TerminateWorkflowExecution(arg0 context.Context, arg1 *v10.TerminateWorkflowExecutionRequest) (*v10.TerminateWorkflowExecutionResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "TerminateWorkflowExecution", arg0, arg1) - ret0, _ := ret[0].(*v1.TerminateWorkflowExecutionResponse) + ret0, _ := ret[0].(*v10.TerminateWorkflowExecutionResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -711,10 +712,10 @@ func (mr *MockHandlerMockRecorder) UpdateHealthStatus(status interface{}) *gomoc } // UpdateNamespace mocks base method. -func (m *MockHandler) UpdateNamespace(arg0 context.Context, arg1 *v1.UpdateNamespaceRequest) (*v1.UpdateNamespaceResponse, error) { +func (m *MockHandler) UpdateNamespace(arg0 context.Context, arg1 *v10.UpdateNamespaceRequest) (*v10.UpdateNamespaceResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UpdateNamespace", arg0, arg1) - ret0, _ := ret[0].(*v1.UpdateNamespaceResponse) + ret0, _ := ret[0].(*v10.UpdateNamespaceResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -738,3 +739,95 @@ func (mr *MockHandlerMockRecorder) Watch(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Watch", reflect.TypeOf((*MockHandler)(nil).Watch), arg0, arg1) } + +// MockOperatorHandler is a mock of OperatorHandler interface. +type MockOperatorHandler struct { + ctrl *gomock.Controller + recorder *MockOperatorHandlerMockRecorder +} + +// MockOperatorHandlerMockRecorder is the mock recorder for MockOperatorHandler. +type MockOperatorHandlerMockRecorder struct { + mock *MockOperatorHandler +} + +// NewMockOperatorHandler creates a new mock instance. +func NewMockOperatorHandler(ctrl *gomock.Controller) *MockOperatorHandler { + mock := &MockOperatorHandler{ctrl: ctrl} + mock.recorder = &MockOperatorHandlerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockOperatorHandler) EXPECT() *MockOperatorHandlerMockRecorder { + return m.recorder +} + +// AddSearchAttributes mocks base method. +func (m *MockOperatorHandler) AddSearchAttributes(arg0 context.Context, arg1 *v1.AddSearchAttributesRequest) (*v1.AddSearchAttributesResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddSearchAttributes", arg0, arg1) + ret0, _ := ret[0].(*v1.AddSearchAttributesResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AddSearchAttributes indicates an expected call of AddSearchAttributes. +func (mr *MockOperatorHandlerMockRecorder) AddSearchAttributes(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddSearchAttributes", reflect.TypeOf((*MockOperatorHandler)(nil).AddSearchAttributes), arg0, arg1) +} + +// ListSearchAttributes mocks base method. +func (m *MockOperatorHandler) ListSearchAttributes(arg0 context.Context, arg1 *v1.ListSearchAttributesRequest) (*v1.ListSearchAttributesResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListSearchAttributes", arg0, arg1) + ret0, _ := ret[0].(*v1.ListSearchAttributesResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListSearchAttributes indicates an expected call of ListSearchAttributes. +func (mr *MockOperatorHandlerMockRecorder) ListSearchAttributes(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListSearchAttributes", reflect.TypeOf((*MockOperatorHandler)(nil).ListSearchAttributes), arg0, arg1) +} + +// RemoveSearchAttributes mocks base method. +func (m *MockOperatorHandler) RemoveSearchAttributes(arg0 context.Context, arg1 *v1.RemoveSearchAttributesRequest) (*v1.RemoveSearchAttributesResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RemoveSearchAttributes", arg0, arg1) + ret0, _ := ret[0].(*v1.RemoveSearchAttributesResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RemoveSearchAttributes indicates an expected call of RemoveSearchAttributes. +func (mr *MockOperatorHandlerMockRecorder) RemoveSearchAttributes(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveSearchAttributes", reflect.TypeOf((*MockOperatorHandler)(nil).RemoveSearchAttributes), arg0, arg1) +} + +// Start mocks base method. +func (m *MockOperatorHandler) Start() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Start") +} + +// Start indicates an expected call of Start. +func (mr *MockOperatorHandlerMockRecorder) Start() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockOperatorHandler)(nil).Start)) +} + +// Stop mocks base method. +func (m *MockOperatorHandler) Stop() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Stop") +} + +// Stop indicates an expected call of Stop. +func (mr *MockOperatorHandlerMockRecorder) Stop() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockOperatorHandler)(nil).Stop)) +} diff --git a/service/frontend/operator_handler.go b/service/frontend/operator_handler.go new file mode 100644 index 000000000000..995ae500b912 --- /dev/null +++ b/service/frontend/operator_handler.go @@ -0,0 +1,300 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package frontend + +import ( + "context" + "fmt" + "sync/atomic" + + commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/api/operatorservice/v1" + "go.temporal.io/api/serviceerror" + sdkclient "go.temporal.io/sdk/client" + + "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/common/metrics" + esclient "go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client" + "go.temporal.io/server/service/worker" + "go.temporal.io/server/service/worker/addsearchattributes" + + "go.temporal.io/server/common" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/searchattribute" +) + +var _ OperatorHandler = (*OperatorHandlerImpl)(nil) + +type ( + // OperatorHandlerImpl - gRPC handler interface for operatorservice + OperatorHandlerImpl struct { + status int32 + + healthStatus int32 + logger log.Logger + ESConfig *esclient.Config + ESClient esclient.Client + sdkClient sdkclient.Client + metricsClient metrics.Client + saProvider searchattribute.Provider + saManager searchattribute.Manager + } + + NewOperatorHandlerImplArgs struct { + EsConfig *esclient.Config + EsClient esclient.Client + Logger log.Logger + SdkSystemClient sdkclient.Client + MetricsClient metrics.Client + SaProvider searchattribute.Provider + SaManager searchattribute.Manager + } +) + +// NewOperatorHandlerImpl creates a gRPC handler for operatorservice +func NewOperatorHandlerImpl( + args NewOperatorHandlerImplArgs, +) *OperatorHandlerImpl { + + handler := &OperatorHandlerImpl{ + logger: args.Logger, + status: common.DaemonStatusInitialized, + ESConfig: args.EsConfig, + ESClient: args.EsClient, + sdkClient: args.SdkSystemClient, + metricsClient: args.MetricsClient, + saProvider: args.SaProvider, + saManager: args.SaManager, + } + + return handler +} + +// Start starts the handler +func (h *OperatorHandlerImpl) Start() { + if !atomic.CompareAndSwapInt32( + &h.status, + common.DaemonStatusInitialized, + common.DaemonStatusStarted, + ) { + return + } +} + +// Stop stops the handler +func (h *OperatorHandlerImpl) Stop() { + if !atomic.CompareAndSwapInt32( + &h.status, + common.DaemonStatusStarted, + common.DaemonStatusStopped, + ) { + return + } +} + +func (h *OperatorHandlerImpl) AddSearchAttributes(ctx context.Context, request *operatorservice.AddSearchAttributesRequest) (_ *operatorservice.AddSearchAttributesResponse, retError error) { + const endpointName = "AddSearchAttributes" + + defer log.CapturePanic(h.logger, &retError) + + scope, sw := h.startRequestProfile(metrics.OperatorAddSearchAttributesScope) + defer sw.Stop() + + // validate request + if request == nil { + return nil, h.error(errRequestNotSet, scope, endpointName) + } + + if len(request.GetSearchAttributes()) == 0 { + return nil, h.error(errSearchAttributesNotSet, scope, endpointName) + } + + indexName := h.ESConfig.GetVisibilityIndex() + + currentSearchAttributes, err := h.saProvider.GetSearchAttributes(indexName, true) + if err != nil { + return nil, h.error(serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err)), scope, endpointName) + } + + for saName, saType := range request.GetSearchAttributes() { + if searchattribute.IsReserved(saName) { + return nil, h.error(serviceerror.NewInvalidArgument(fmt.Sprintf(errSearchAttributeIsReservedMessage, saName)), scope, endpointName) + } + if currentSearchAttributes.IsDefined(saName) { + return nil, h.error(serviceerror.NewInvalidArgument(fmt.Sprintf(errSearchAttributeAlreadyExistsMessage, saName)), scope, endpointName) + } + if _, ok := enumspb.IndexedValueType_name[int32(saType)]; !ok { + return nil, h.error(serviceerror.NewInvalidArgument(fmt.Sprintf(errUnknownSearchAttributeTypeMessage, saType)), scope, endpointName) + } + } + + // Execute workflow. + wfParams := addsearchattributes.WorkflowParams{ + CustomAttributesToAdd: request.GetSearchAttributes(), + IndexName: indexName, + SkipSchemaUpdate: false, + } + + run, err := h.sdkClient.ExecuteWorkflow( + ctx, + sdkclient.StartWorkflowOptions{ + TaskQueue: worker.DefaultWorkerTaskQueue, + ID: addsearchattributes.WorkflowName, + }, + addsearchattributes.WorkflowName, + wfParams, + ) + if err != nil { + return nil, h.error(serviceerror.NewUnavailable(fmt.Sprintf(errUnableToStartWorkflowMessage, addsearchattributes.WorkflowName, err)), scope, endpointName) + } + + // Wait for workflow to complete. + err = run.Get(ctx, nil) + if err != nil { + scope.IncCounter(metrics.AddSearchAttributesWorkflowFailuresCount) + execution := &commonpb.WorkflowExecution{WorkflowId: addsearchattributes.WorkflowName, RunId: run.GetRunID()} + return nil, h.error(serviceerror.NewSystemWorkflow(execution, err), scope, endpointName) + } + scope.IncCounter(metrics.AddSearchAttributesWorkflowSuccessCount) + + return &operatorservice.AddSearchAttributesResponse{}, nil +} + +func (h *OperatorHandlerImpl) RemoveSearchAttributes(ctx context.Context, request *operatorservice.RemoveSearchAttributesRequest) (_ *operatorservice.RemoveSearchAttributesResponse, retError error) { + const endpointName = "RemoveSearchAttributes" + + defer log.CapturePanic(h.logger, &retError) + + scope, sw := h.startRequestProfile(metrics.OperatorRemoveSearchAttributesScope) + defer sw.Stop() + + // validate request + if request == nil { + return nil, h.error(errRequestNotSet, scope, endpointName) + } + + if len(request.GetSearchAttributes()) == 0 { + return nil, h.error(errSearchAttributesNotSet, scope, endpointName) + } + + indexName := h.ESConfig.GetVisibilityIndex() + + currentSearchAttributes, err := h.saProvider.GetSearchAttributes(indexName, true) + if err != nil { + return nil, h.error(serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err)), scope, endpointName) + } + + newCustomSearchAttributes := map[string]enumspb.IndexedValueType{} + for saName, saType := range currentSearchAttributes.Custom() { + newCustomSearchAttributes[saName] = saType + } + + for _, saName := range request.GetSearchAttributes() { + if !currentSearchAttributes.IsDefined(saName) { + return nil, h.error(serviceerror.NewInvalidArgument(fmt.Sprintf(errSearchAttributeDoesntExistMessage, saName)), scope, endpointName) + } + if _, ok := newCustomSearchAttributes[saName]; !ok { + return nil, h.error(serviceerror.NewInvalidArgument(fmt.Sprintf(errUnableToRemoveNonCustomSearchAttributesMessage, saName)), scope, endpointName) + } + delete(newCustomSearchAttributes, saName) + } + + err = h.saManager.SaveSearchAttributes(indexName, newCustomSearchAttributes) + if err != nil { + return nil, h.error(serviceerror.NewUnavailable(fmt.Sprintf(errUnableToSaveSearchAttributesMessage, err)), scope, endpointName) + } + + return &operatorservice.RemoveSearchAttributesResponse{}, nil +} + +func (h *OperatorHandlerImpl) ListSearchAttributes(ctx context.Context, request *operatorservice.ListSearchAttributesRequest) (_ *operatorservice.ListSearchAttributesResponse, retError error) { + const endpointName = "ListSearchAttributes" + + defer log.CapturePanic(h.logger, &retError) + + scope, sw := h.startRequestProfile(metrics.OperatorListSearchAttributesScope) + defer sw.Stop() + + if request == nil { + return nil, h.error(errRequestNotSet, scope, endpointName) + } + + indexName := h.ESConfig.GetVisibilityIndex() + + var lastErr error + var esMapping map[string]string = nil + if h.ESClient != nil { + esMapping, lastErr = h.ESClient.GetMapping(ctx, indexName) + if lastErr != nil { + lastErr = h.error(serviceerror.NewUnavailable(fmt.Sprintf("unable to get mapping from Elasticsearch: %v", lastErr)), scope, endpointName) + } + } + + searchAttributes, err := h.saProvider.GetSearchAttributes(indexName, true) + if err != nil { + lastErr = h.error(serviceerror.NewUnavailable(fmt.Sprintf("unable to read custom search attributes: %v", err)), scope, endpointName) + } + + if lastErr != nil { + return nil, lastErr + } + + return &operatorservice.ListSearchAttributesResponse{ + CustomAttributes: searchAttributes.Custom(), + SystemAttributes: searchAttributes.System(), + StorageSchema: esMapping, + }, nil +} + +// startRequestProfile initiates recording of request metrics +func (h *OperatorHandlerImpl) startRequestProfile(scope int) (metrics.Scope, metrics.Stopwatch) { + metricsScope := h.metricsClient.Scope(scope) + sw := metricsScope.StartTimer(metrics.ServiceLatency) + metricsScope.IncCounter(metrics.ServiceRequests) + return metricsScope, sw +} + +func (h *OperatorHandlerImpl) error(err error, scope metrics.Scope, endpointName string) error { + switch err := err.(type) { + case *serviceerror.Unavailable: + h.logger.Error("["+endpointName+"] Unavailable error", tag.Error(err)) + scope.IncCounter(metrics.ServiceFailures) + return err + case *serviceerror.InvalidArgument: + scope.IncCounter(metrics.ServiceErrInvalidArgumentCounter) + return err + case *serviceerror.ResourceExhausted: + scope.Tagged(metrics.ResourceExhaustedCauseTag(err.Cause)).IncCounter(metrics.ServiceErrResourceExhaustedCounter) + return err + case *serviceerror.NotFound: + return err + } + + h.logger.Error("["+endpointName+"] Unknown error", tag.Error(err)) + scope.IncCounter(metrics.ServiceFailures) + + return err +} diff --git a/service/frontend/operator_handler_test.go b/service/frontend/operator_handler_test.go new file mode 100644 index 000000000000..2ac4829e3377 --- /dev/null +++ b/service/frontend/operator_handler_test.go @@ -0,0 +1,376 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package frontend + +import ( + "context" + "errors" + "fmt" + "testing" + + "go.temporal.io/api/operatorservice/v1" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/api/serviceerror" + "go.temporal.io/api/workflowservice/v1" + sdkmocks "go.temporal.io/sdk/mocks" + + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client" + "go.temporal.io/server/common/resource" + "go.temporal.io/server/common/searchattribute" +) + +type ( + operatorHandlerSuite struct { + suite.Suite + *require.Assertions + + controller *gomock.Controller + mockResource *resource.Test + mockSdkSystemClient *sdkmocks.Client + + handler *OperatorHandlerImpl + } +) + +func TestOperatorHandlerSuite(t *testing.T) { + s := new(operatorHandlerSuite) + suite.Run(t, s) +} + +func (s *operatorHandlerSuite) SetupTest() { + s.Assertions = require.New(s.T()) + + s.controller = gomock.NewController(s.T()) + s.mockResource = resource.NewTest(s.controller, metrics.Frontend) + + s.mockSdkSystemClient = &sdkmocks.Client{} + + args := NewOperatorHandlerImplArgs{ + nil, + s.mockResource.ESClient, + s.mockResource.Logger, + s.mockSdkSystemClient, + s.mockResource.GetMetricsClient(), + s.mockResource.GetSearchAttributesProvider(), + s.mockResource.GetSearchAttributesManager(), + } + s.handler = NewOperatorHandlerImpl(args) + s.handler.Start() +} + +func (s *operatorHandlerSuite) TearDownTest() { + s.controller.Finish() + s.handler.Stop() +} + +func (s *operatorHandlerSuite) Test_AddSearchAttributes() { + handler := s.handler + ctx := context.Background() + + type test struct { + Name string + Request *operatorservice.AddSearchAttributesRequest + Expected error + } + // request validation tests + testCases1 := []test{ + { + Name: "nil request", + Request: nil, + Expected: &serviceerror.InvalidArgument{Message: "Request is nil."}, + }, + { + Name: "empty request", + Request: &operatorservice.AddSearchAttributesRequest{}, + Expected: &serviceerror.InvalidArgument{Message: "SearchAttributes are not set on request."}, + }, + } + for _, testCase := range testCases1 { + s.T().Run(testCase.Name, func(t *testing.T) { + resp, err := handler.AddSearchAttributes(ctx, testCase.Request) + s.Equal(testCase.Expected, err) + s.Nil(resp) + }) + } + + // Elasticsearch is not configured + s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() + testCases3 := []test{ + { + Name: "reserved key (empty index)", + Request: &operatorservice.AddSearchAttributesRequest{ + SearchAttributes: map[string]enumspb.IndexedValueType{ + "WorkflowId": enumspb.INDEXED_VALUE_TYPE_TEXT, + }, + }, + Expected: &serviceerror.InvalidArgument{Message: "Search attribute WorkflowId is reserved by system."}, + }, + { + Name: "key already whitelisted (empty index)", + Request: &operatorservice.AddSearchAttributesRequest{ + SearchAttributes: map[string]enumspb.IndexedValueType{ + "CustomTextField": enumspb.INDEXED_VALUE_TYPE_TEXT, + }, + }, + Expected: &serviceerror.InvalidArgument{Message: "Search attribute CustomTextField already exists."}, + }, + } + for _, testCase := range testCases3 { + s.T().Run(testCase.Name, func(t *testing.T) { + resp, err := handler.AddSearchAttributes(ctx, testCase.Request) + s.Equal(testCase.Expected, err) + s.Nil(resp) + }) + } + + // Configure Elasticsearch: add advanced visibility store config with index name. + handler.ESConfig = &client.Config{ + Indices: map[string]string{ + "visibility": "random-index-name", + }, + } + + s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("random-index-name", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() + testCases2 := []test{ + { + Name: "reserved key (ES configured)", + Request: &operatorservice.AddSearchAttributesRequest{ + SearchAttributes: map[string]enumspb.IndexedValueType{ + "WorkflowId": enumspb.INDEXED_VALUE_TYPE_TEXT, + }, + }, + Expected: &serviceerror.InvalidArgument{Message: "Search attribute WorkflowId is reserved by system."}, + }, + { + Name: "key already whitelisted (ES configured)", + Request: &operatorservice.AddSearchAttributesRequest{ + SearchAttributes: map[string]enumspb.IndexedValueType{ + "CustomTextField": enumspb.INDEXED_VALUE_TYPE_TEXT, + }, + }, + Expected: &serviceerror.InvalidArgument{Message: "Search attribute CustomTextField already exists."}, + }, + } + for _, testCase := range testCases2 { + s.T().Run(testCase.Name, func(t *testing.T) { + resp, err := handler.AddSearchAttributes(ctx, testCase.Request) + s.Equal(testCase.Expected, err) + s.Nil(resp) + }) + } + + // Start workflow failed. + s.mockSdkSystemClient.On("ExecuteWorkflow", mock.Anything, mock.Anything, "temporal-sys-add-search-attributes-workflow", mock.Anything).Return(nil, errors.New("start failed")).Once() + resp, err := handler.AddSearchAttributes(ctx, &operatorservice.AddSearchAttributesRequest{ + SearchAttributes: map[string]enumspb.IndexedValueType{ + "CustomAttr": enumspb.INDEXED_VALUE_TYPE_KEYWORD, + }, + }) + s.Error(err) + s.Equal("Unable to start temporal-sys-add-search-attributes-workflow workflow: start failed.", err.Error()) + s.Nil(resp) + + // Workflow failed. + mockRun := &sdkmocks.WorkflowRun{} + mockRun.On("Get", mock.Anything, nil).Return(errors.New("workflow failed")).Once() + const RunId = "31d8ebd6-93a7-11ec-b909-0242ac120002" + mockRun.On("GetRunID").Return(RunId).Once() + s.mockSdkSystemClient.On("ExecuteWorkflow", mock.Anything, mock.Anything, "temporal-sys-add-search-attributes-workflow", mock.Anything).Return(mockRun, nil) + resp, err = handler.AddSearchAttributes(ctx, &operatorservice.AddSearchAttributesRequest{ + SearchAttributes: map[string]enumspb.IndexedValueType{ + "CustomAttr": enumspb.INDEXED_VALUE_TYPE_KEYWORD, + }, + }) + s.Error(err) + s.Equal(RunId, err.(*serviceerror.SystemWorkflow).WorkflowExecution.RunId) + s.Equal(fmt.Sprintf("System Workflow with WorkflowId temporal-sys-add-search-attributes-workflow and RunId %s returned an error: workflow failed", RunId), err.Error()) + s.Nil(resp) + + // Success case. + mockRun.On("Get", mock.Anything, nil).Return(nil) + + resp, err = handler.AddSearchAttributes(ctx, &operatorservice.AddSearchAttributesRequest{ + SearchAttributes: map[string]enumspb.IndexedValueType{ + "CustomAttr": enumspb.INDEXED_VALUE_TYPE_KEYWORD, + }, + }) + s.NoError(err) + s.NotNil(resp) + mockRun.AssertExpectations(s.T()) + s.mockSdkSystemClient.AssertExpectations(s.T()) +} + +func (s *operatorHandlerSuite) Test_ListSearchAttributes() { + handler := s.handler + ctx := context.Background() + + resp, err := handler.ListSearchAttributes(ctx, nil) + s.Error(err) + s.Equal(&serviceerror.InvalidArgument{Message: "Request is nil."}, err) + s.Nil(resp) + + // Elasticsearch is not configured + s.mockSdkSystemClient.On("DescribeWorkflowExecution", mock.Anything, "temporal-sys-add-search-attributes-workflow", "").Return( + &workflowservice.DescribeWorkflowExecutionResponse{}, nil).Once() + s.mockResource.ESClient.EXPECT().GetMapping(gomock.Any(), "").Return(map[string]string{"col": "type"}, nil) + s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() + + resp, err = handler.ListSearchAttributes(ctx, &operatorservice.ListSearchAttributesRequest{}) + s.NoError(err) + s.NotNil(resp) + + // Configure Elasticsearch: add advanced visibility store config with index name. + handler.ESConfig = &client.Config{ + Indices: map[string]string{ + "visibility": "random-index-name", + }, + } + + s.mockResource.ESClient.EXPECT().GetMapping(gomock.Any(), "random-index-name").Return(map[string]string{"col": "type"}, nil) + s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("random-index-name", true).Return(searchattribute.TestNameTypeMap, nil) + resp, err = handler.ListSearchAttributes(ctx, &operatorservice.ListSearchAttributesRequest{}) + s.NoError(err) + s.NotNil(resp) + + s.mockResource.ESClient.EXPECT().GetMapping(gomock.Any(), "random-index-name").Return(map[string]string{"col": "type"}, nil) + s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("random-index-name", true).Return(searchattribute.NameTypeMap{}, errors.New("random error")) + resp, err = handler.ListSearchAttributes(ctx, &operatorservice.ListSearchAttributesRequest{}) + s.Error(err) + s.Nil(resp) +} + +func (s *operatorHandlerSuite) Test_RemoveSearchAttributes() { + handler := s.handler + ctx := context.Background() + + type test struct { + Name string + Request *operatorservice.RemoveSearchAttributesRequest + Expected error + } + // request validation tests + testCases1 := []test{ + { + Name: "nil request", + Request: nil, + Expected: &serviceerror.InvalidArgument{Message: "Request is nil."}, + }, + { + Name: "empty request", + Request: &operatorservice.RemoveSearchAttributesRequest{}, + Expected: &serviceerror.InvalidArgument{Message: "SearchAttributes are not set on request."}, + }, + } + for _, testCase := range testCases1 { + s.T().Run(testCase.Name, func(t *testing.T) { + resp, err := handler.RemoveSearchAttributes(ctx, testCase.Request) + s.Equal(testCase.Expected, err) + s.Nil(resp) + }) + } + + // Elasticsearch is not configured + s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() + testCases3 := []test{ + { + Name: "reserved search attribute (empty index)", + Request: &operatorservice.RemoveSearchAttributesRequest{ + SearchAttributes: []string{ + "WorkflowId", + }, + }, + Expected: &serviceerror.InvalidArgument{Message: "Unable to remove non-custom search attributes: WorkflowId."}, + }, + { + Name: "search attribute doesn't exist (empty index)", + Request: &operatorservice.RemoveSearchAttributesRequest{ + SearchAttributes: []string{ + "ProductId", + }, + }, + Expected: &serviceerror.InvalidArgument{Message: "Search attribute ProductId doesn't exist."}, + }, + } + for _, testCase := range testCases3 { + s.T().Run(testCase.Name, func(t *testing.T) { + resp, err := handler.RemoveSearchAttributes(ctx, testCase.Request) + s.Equal(testCase.Expected, err) + s.Nil(resp) + }) + } + + // Configure Elasticsearch: add advanced visibility store config with index name. + handler.ESConfig = &client.Config{ + Indices: map[string]string{ + "visibility": "random-index-name", + }, + } + + s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("random-index-name", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() + testCases2 := []test{ + { + Name: "reserved search attribute (ES configured)", + Request: &operatorservice.RemoveSearchAttributesRequest{ + SearchAttributes: []string{ + "WorkflowId", + }, + }, + Expected: &serviceerror.InvalidArgument{Message: "Unable to remove non-custom search attributes: WorkflowId."}, + }, + { + Name: "search attribute doesn't exist (ES configured)", + Request: &operatorservice.RemoveSearchAttributesRequest{ + SearchAttributes: []string{ + "ProductId", + }, + }, + Expected: &serviceerror.InvalidArgument{Message: "Search attribute ProductId doesn't exist."}, + }, + } + for _, testCase := range testCases2 { + s.T().Run(testCase.Name, func(t *testing.T) { + resp, err := handler.RemoveSearchAttributes(ctx, testCase.Request) + s.Equal(testCase.Expected, err) + s.Nil(resp) + }) + } + + // Success case. + s.mockResource.SearchAttributesManager.EXPECT().SaveSearchAttributes("random-index-name", gomock.Any()).Return(nil) + + resp, err := handler.RemoveSearchAttributes(ctx, &operatorservice.RemoveSearchAttributesRequest{ + SearchAttributes: []string{ + "CustomKeywordField", + }, + }) + s.NoError(err) + s.NotNil(resp) +} diff --git a/service/frontend/service.go b/service/frontend/service.go index f24ba424ffea..b902dc5ca10a 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -32,6 +32,8 @@ import ( "sync/atomic" "time" + "go.temporal.io/api/operatorservice/v1" + "go.temporal.io/api/workflowservice/v1" "google.golang.org/grpc" healthpb "google.golang.org/grpc/health/grpc_health_v1" @@ -188,6 +190,7 @@ type Service struct { handler Handler adminHandler *AdminHandler + operatorHandler *OperatorHandlerImpl versionChecker *VersionChecker visibilityManager manager.VisibilityManager server *grpc.Server @@ -204,6 +207,7 @@ func NewService( server *grpc.Server, handler Handler, adminHandler *AdminHandler, + operatorHandler *OperatorHandlerImpl, versionChecker *VersionChecker, visibilityMgr manager.VisibilityManager, logger log.Logger, @@ -217,6 +221,7 @@ func NewService( server: server, handler: handler, adminHandler: adminHandler, + operatorHandler: operatorHandler, versionChecker: versionChecker, visibilityManager: visibilityMgr, logger: logger, @@ -237,8 +242,8 @@ func (s *Service) Start() { workflowservice.RegisterWorkflowServiceServer(s.server, s.handler) healthpb.RegisterHealthServer(s.server, s.handler) - adminservice.RegisterAdminServiceServer(s.server, s.adminHandler) + operatorservice.RegisterOperatorServiceServer(s.server, s.operatorHandler) reflection.Register(s.server) @@ -247,6 +252,7 @@ func (s *Service) Start() { rand.Seed(time.Now().UnixNano()) s.adminHandler.Start() + s.operatorHandler.Start() s.versionChecker.Start() listener := s.grpcListener @@ -281,6 +287,7 @@ func (s *Service) Stop() { time.Sleep(failureDetectionTime) s.adminHandler.Stop() + s.operatorHandler.Stop() s.versionChecker.Stop() s.visibilityManager.Close() diff --git a/service/worker/addsearchattributes/workflow.go b/service/worker/addsearchattributes/workflow.go index f5d49d4264fe..02321bef9586 100644 --- a/service/worker/addsearchattributes/workflow.go +++ b/service/worker/addsearchattributes/workflow.go @@ -42,7 +42,7 @@ import ( ) const ( - // WorkflowName is the workflow name. + // WorkflowName is workflowId of the system workflow performing addition of search attributes WorkflowName = "temporal-sys-add-search-attributes-workflow" ) diff --git a/tools/cli/app_test.go b/tools/cli/app_test.go index 63425558530f..7230da592983 100644 --- a/tools/cli/app_test.go +++ b/tools/cli/app_test.go @@ -38,6 +38,7 @@ import ( enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" namespacepb "go.temporal.io/api/namespace/v1" + "go.temporal.io/api/operatorservice/v1" replicationpb "go.temporal.io/api/replication/v1" "go.temporal.io/api/serviceerror" taskqueuepb "go.temporal.io/api/taskqueue/v1" @@ -61,12 +62,14 @@ type cliAppSuite struct { app *cli.App mockCtrl *gomock.Controller frontendClient *workflowservicemock.MockWorkflowServiceClient + operatorClient *operatorservice.OperatorServiceClient serverAdminClient *adminservicemock.MockAdminServiceClient sdkClient *sdkmocks.Client } type clientFactoryMock struct { frontendClient workflowservice.WorkflowServiceClient + operatorClient operatorservice.OperatorServiceClient serverAdminClient adminservice.AdminServiceClient sdkClient *sdkmocks.Client } @@ -75,6 +78,10 @@ func (m *clientFactoryMock) FrontendClient(c *cli.Context) workflowservice.Workf return m.frontendClient } +func (m *clientFactoryMock) OperatorClient(c *cli.Context) operatorservice.OperatorServiceClient { + return m.operatorClient +} + func (m *clientFactoryMock) AdminClient(c *cli.Context) adminservice.AdminServiceClient { return m.serverAdminClient } diff --git a/tools/cli/factory.go b/tools/cli/factory.go index a413dabf1bf1..71d6c9be09e4 100644 --- a/tools/cli/factory.go +++ b/tools/cli/factory.go @@ -37,6 +37,7 @@ import ( "time" "github.com/urfave/cli" + "go.temporal.io/api/operatorservice/v1" "go.temporal.io/api/workflowservice/v1" sdkclient "go.temporal.io/sdk/client" "google.golang.org/grpc" @@ -64,6 +65,7 @@ type HttpGetter interface { // ClientFactory is used to construct rpc clients type ClientFactory interface { FrontendClient(c *cli.Context) workflowservice.WorkflowServiceClient + OperatorClient(c *cli.Context) operatorservice.OperatorServiceClient AdminClient(c *cli.Context) adminservice.AdminServiceClient SDKClient(c *cli.Context, namespace string) sdkclient.Client HealthClient(c *cli.Context) healthpb.HealthClient @@ -89,6 +91,13 @@ func (b *clientFactory) FrontendClient(c *cli.Context) workflowservice.WorkflowS return workflowservice.NewWorkflowServiceClient(connection) } +// OperatorClient builds an operator client. +func (b *clientFactory) OperatorClient(c *cli.Context) operatorservice.OperatorServiceClient { + connection, _ := b.createGRPCConnection(c) + + return operatorservice.NewOperatorServiceClient(connection) +} + // AdminClient builds an admin client. func (b *clientFactory) AdminClient(c *cli.Context) adminservice.AdminServiceClient { connection, _ := b.createGRPCConnection(c)