diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 0248ab7d80a..c70aab3eee1 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -630,10 +630,10 @@ const ( WorkerParentCloseMaxConcurrentActivityTaskPollers = "worker.ParentCloseMaxConcurrentActivityTaskPollers" // WorkerParentCloseMaxConcurrentWorkflowTaskPollers indicates worker parent close worker max concurrent workflow pollers WorkerParentCloseMaxConcurrentWorkflowTaskPollers = "worker.ParentCloseMaxConcurrentWorkflowTaskPollers" + // WorkerPerNamespaceWorkerCount controls number of per-ns (scheduler, batcher, etc.) workers to run per namespace + WorkerPerNamespaceWorkerCount = "worker.perNamespaceWorkerCount" // WorkerEnableScheduler controls whether to start the worker for scheduled workflows WorkerEnableScheduler = "worker.enableScheduler" - // WorkerSchedulerNumWorkers controls number of scheduler workers to run per namespace - WorkerSchedulerNumWorkers = "worker.schedulerNumWorkers" ) // Filter represents a filter on the dynamic config key diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index d9ca271eac5..f9b408852ee 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -72,6 +72,7 @@ import ( "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/rpc/interceptor" "go.temporal.io/server/common/searchattribute" + workercommon "go.temporal.io/server/service/worker/common" "go.temporal.io/server/service/worker/scheduler" ) @@ -3128,7 +3129,7 @@ func (wh *WorkflowHandler) CreateSchedule(ctx context.Context, request *workflow Namespace: request.Namespace, WorkflowId: request.ScheduleId, WorkflowType: &commonpb.WorkflowType{Name: scheduler.WorkflowType}, - TaskQueue: &taskqueuepb.TaskQueue{Name: scheduler.TaskQueueName}, + TaskQueue: &taskqueuepb.TaskQueue{Name: workercommon.PerNSWorkerTaskQueue}, Input: inputPayload, Identity: request.Identity, RequestId: request.RequestId, diff --git a/service/worker/common/interface.go b/service/worker/common/interface.go index 7bbb910885b..a0e6d440b91 100644 --- a/service/worker/common/interface.go +++ b/service/worker/common/interface.go @@ -31,6 +31,11 @@ import ( "go.temporal.io/server/common/namespace" ) +const ( + // All per-ns workers share one task queue + PerNSWorkerTaskQueue = "temporal-sys-per-ns-tq" +) + type ( // WorkerComponent represents a type of work needed for worker role WorkerComponent interface { @@ -60,13 +65,7 @@ type ( } PerNSDedicatedWorkerOptions struct { - // Set this to false to disable a worker for this namespace + // Set this to false to disable this worker for this namespace Enabled bool - // TaskQueue is required - TaskQueue string - // How many worker nodes should run a worker per namespace - NumWorkers int - // Other worker options - Options sdkworker.Options } ) diff --git a/service/worker/pernamespaceworker.go b/service/worker/pernamespaceworker.go index e1a314d0a88..c66923a6a86 100644 --- a/service/worker/pernamespaceworker.go +++ b/service/worker/pernamespaceworker.go @@ -25,6 +25,7 @@ package worker import ( + "context" "fmt" "os" "sync" @@ -35,9 +36,11 @@ import ( sdkclient "go.temporal.io/sdk/client" sdkworker "go.temporal.io/sdk/worker" "go.uber.org/fx" + "golang.org/x/exp/maps" "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" + "go.temporal.io/server/common/headers" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/membership" @@ -59,6 +62,7 @@ type ( SdkWorkerFactory sdk.WorkerFactory NamespaceRegistry namespace.Registry HostName resource.HostName + Config *Config Components []workercommon.PerNSWorkerComponent `group:"perNamespaceWorkerComponent"` } @@ -72,42 +76,40 @@ type ( namespaceRegistry namespace.Registry self *membership.HostInfo hostName resource.HostName + config *Config serviceResolver membership.ServiceResolver components []workercommon.PerNSWorkerComponent initialRetry time.Duration membershipChangedCh chan *membership.ChangedEvent - lock sync.Mutex - workerSets map[namespace.ID]*workerSet + lock sync.Mutex + workers map[namespace.ID]*perNamespaceWorker } - workerSet struct { + perNamespaceWorker struct { wm *perNamespaceWorkerManager - lock sync.Mutex // protects below fields - ns *namespace.Namespace - deleted bool - workers map[workercommon.PerNSWorkerComponent]*worker - } - - worker struct { - client sdkclient.Client - worker sdkworker.Worker + lock sync.Mutex // protects below fields + ns *namespace.Namespace + componentSet string + client sdkclient.Client + worker sdkworker.Worker } ) func NewPerNamespaceWorkerManager(params perNamespaceWorkerManagerInitParams) *perNamespaceWorkerManager { return &perNamespaceWorkerManager{ - logger: params.Logger, + logger: log.With(params.Logger, tag.ComponentPerNSWorkerManager), sdkClientFactory: params.SdkClientFactory, sdkWorkerFactory: params.SdkWorkerFactory, namespaceRegistry: params.NamespaceRegistry, hostName: params.HostName, + config: params.Config, components: params.Components, initialRetry: 1 * time.Second, membershipChangedCh: make(chan *membership.ChangedEvent), - workerSets: make(map[namespace.ID]*workerSet), + workers: make(map[namespace.ID]*perNamespaceWorker), } } @@ -130,7 +132,7 @@ func (wm *perNamespaceWorkerManager) Start( wm.self = self wm.serviceResolver = serviceResolver - wm.logger.Info("", tag.ComponentPerNSWorkerManager, tag.LifeCycleStarting) + wm.logger.Info("", tag.LifeCycleStarting) // this will call namespaceCallback with current namespaces wm.namespaceRegistry.RegisterStateChangeCallback(wm, wm.namespaceCallback) @@ -138,7 +140,7 @@ func (wm *perNamespaceWorkerManager) Start( wm.serviceResolver.AddListener(perNamespaceWorkerManagerListenerKey, wm.membershipChangedCh) go wm.membershipChangedListener() - wm.logger.Info("", tag.ComponentPerNSWorkerManager, tag.LifeCycleStarted) + wm.logger.Info("", tag.LifeCycleStarted) } func (wm *perNamespaceWorkerManager) Stop() { @@ -150,70 +152,70 @@ func (wm *perNamespaceWorkerManager) Stop() { return } - wm.logger.Info("", tag.ComponentPerNSWorkerManager, tag.LifeCycleStopping) + wm.logger.Info("", tag.LifeCycleStopping) wm.namespaceRegistry.UnregisterStateChangeCallback(wm) wm.serviceResolver.RemoveListener(perNamespaceWorkerManagerListenerKey) close(wm.membershipChangedCh) wm.lock.Lock() - defer wm.lock.Unlock() + workers := maps.Values(wm.workers) + maps.Clear(wm.workers) + wm.lock.Unlock() - for _, ws := range wm.workerSets { - // this will see that the perNamespaceWorkerManager is not running - // anymore and stop all workers - ws.refresh(nil, false) + for _, worker := range workers { + worker.stopWorker() } - wm.logger.Info("", tag.ComponentPerNSWorkerManager, tag.LifeCycleStopped) + wm.logger.Info("", tag.LifeCycleStopped) } func (wm *perNamespaceWorkerManager) namespaceCallback(ns *namespace.Namespace, deleted bool) { - go wm.getWorkerSet(ns).refresh(ns, deleted) + go wm.getWorkerByNamespace(ns).refreshWithNewNamespace(ns, deleted) } func (wm *perNamespaceWorkerManager) membershipChangedListener() { for range wm.membershipChangedCh { wm.lock.Lock() - for _, ws := range wm.workerSets { - go ws.refresh(nil, false) + for _, worker := range wm.workers { + go worker.refreshWithExistingNamespace() } wm.lock.Unlock() } } -func (wm *perNamespaceWorkerManager) getWorkerSet(ns *namespace.Namespace) *workerSet { +func (wm *perNamespaceWorkerManager) getWorkerByNamespace(ns *namespace.Namespace) *perNamespaceWorker { wm.lock.Lock() defer wm.lock.Unlock() - if ws, ok := wm.workerSets[ns.ID()]; ok { - return ws + if worker, ok := wm.workers[ns.ID()]; ok { + return worker } - ws := &workerSet{ - wm: wm, - ns: ns, - workers: make(map[workercommon.PerNSWorkerComponent]*worker), + worker := &perNamespaceWorker{ + wm: wm, + ns: ns, } - wm.workerSets[ns.ID()] = ws - return ws + wm.workers[ns.ID()] = worker + return worker } -func (wm *perNamespaceWorkerManager) removeWorkerSet(ns *namespace.Namespace) { +func (wm *perNamespaceWorkerManager) removeWorker(ns *namespace.Namespace) { wm.lock.Lock() defer wm.lock.Unlock() - delete(wm.workerSets, ns.ID()) + delete(wm.workers, ns.ID()) } -func (wm *perNamespaceWorkerManager) responsibleForNamespace(ns *namespace.Namespace, queueName string, num int) (int, error) { - // This can result in fewer than the intended number of workers if num > 1, because +func (wm *perNamespaceWorkerManager) getWorkerMultiplicity(ns *namespace.Namespace) (int, error) { + workerCount := wm.config.PerNamespaceWorkerCount(ns.Name().String()) + // This can result in fewer than the intended number of workers if numWorkers > 1, because // multiple lookups might land on the same node. To compensate, we increase the number of // pollers in that case, but it would be better to try to spread them across our nodes. // TODO: implement this properly using LookupN in serviceResolver multiplicity := 0 - for i := 0; i < num; i++ { - key := fmt.Sprintf("%s/%s/%d", ns.ID().String(), queueName, i) + for i := 0; i < workerCount; i++ { + key := fmt.Sprintf("%s/%d", ns.ID().String(), i) target, err := wm.serviceResolver.Lookup(key) if err != nil { return 0, err @@ -225,127 +227,155 @@ func (wm *perNamespaceWorkerManager) responsibleForNamespace(ns *namespace.Names return multiplicity, nil } -// called after change to this namespace state _or_ any membership change in the -// server worker ring -func (ws *workerSet) refresh(newNs *namespace.Namespace, newDeleted bool) { - ws.lock.Lock() - if newNs != nil { - ws.ns = newNs - ws.deleted = newDeleted - } - ns, deleted := ws.ns, ws.deleted - ws.lock.Unlock() - - for _, wc := range ws.wm.components { - ws.refreshComponent(wc, ns, deleted) - } - +func (w *perNamespaceWorker) refreshWithNewNamespace(ns *namespace.Namespace, deleted bool) { if deleted { - // if fully deleted from db, we can remove from our map also - ws.wm.removeWorkerSet(ns) + w.stopWorker() + // if namespace is fully deleted from db, we can remove from our map also + w.wm.removeWorker(ns) + return } + w.lock.Lock() + w.ns = ns + w.lock.Unlock() + w.refresh(ns) } -func (ws *workerSet) refreshComponent( - cmp workercommon.PerNSWorkerComponent, - ns *namespace.Namespace, - deleted bool, -) { +func (w *perNamespaceWorker) refreshWithExistingNamespace() { + w.lock.Lock() + ns := w.ns + w.lock.Unlock() + w.refresh(ns) +} + +// This is called after change to this namespace state _or_ any membership change in the server +// worker ring. It runs in its own goroutine (except for server shutdown), and multiple +// goroutines for the same namespace may be running at once. That's okay because they should +// eventually converge on the same state (running or not running, set of components) and exit. +func (w *perNamespaceWorker) refresh(ns *namespace.Namespace) { op := func() error { - // we should run only if all four are true: - // 1. perNamespaceWorkerManager is running - // 2. this namespace is not deleted - // 3. the component says we should be (can filter by namespace) - // 4. we are responsible for this namespace - multiplicity := 0 - var options *workercommon.PerNSDedicatedWorkerOptions - if ws.wm.Running() && ns.State() != enumspb.NAMESPACE_STATE_DELETED && !deleted { - options = cmp.DedicatedWorkerOptions(ns) - if options.Enabled { - var err error - multiplicity, err = ws.wm.responsibleForNamespace(ns, options.TaskQueue, options.NumWorkers) - if err != nil { - return err - } - } + if !w.wm.Running() || ns.State() == enumspb.NAMESPACE_STATE_DELETED { + w.stopWorker() + return nil } - if multiplicity > 0 { - ws.lock.Lock() - if _, ok := ws.workers[cmp]; ok { - // worker is already running. it's possible that it started with a different - // multiplicity. we don't bother changing it in that case. - ws.lock.Unlock() - return nil - } - ws.lock.Unlock() - - worker, err := ws.startWorker(cmp, ns, options, multiplicity) - if err != nil { - return err - } - - ws.lock.Lock() - defer ws.lock.Unlock() - - // check again in case we had a race - if _, ok := ws.workers[cmp]; ok || !ws.wm.Running() { - worker.stop() - return nil + // figure out which components are enabled at all for this namespace + var enabledComponents []workercommon.PerNSWorkerComponent + var componentSet string + for _, cmp := range w.wm.components { + options := cmp.DedicatedWorkerOptions(ns) + if options.Enabled { + enabledComponents = append(enabledComponents, cmp) + componentSet += fmt.Sprintf("%p,", cmp) } + } - ws.workers[cmp] = worker + if len(enabledComponents) == 0 { + // no components enabled, we don't need a worker + w.stopWorker() + return nil + } + // check if we are responsible for this namespace at all + multiplicity, err := w.wm.getWorkerMultiplicity(ns) + if err != nil { + w.wm.logger.Error("Failed to look up hosts", tag.WorkflowNamespace(ns.Name().String()), tag.Error(err)) + // TODO: add metric also + return err + } + if multiplicity == 0 { + // not ours, don't need a worker + w.stopWorker() + return nil + } + // ensure this changes if multiplicity changes + componentSet += fmt.Sprintf("%d", multiplicity) + + // we do need a worker, but maybe we have one already + w.lock.Lock() + if componentSet == w.componentSet { + // no change in set of components enabled + w.lock.Unlock() return nil - } else { - ws.lock.Lock() - defer ws.lock.Unlock() + } + // set of components changed, need to recreate worker. first stop old one + w.stopWorkerLocked() + w.lock.Unlock() - if worker, ok := ws.workers[cmp]; ok { - worker.stop() - delete(ws.workers, cmp) - } + // create worker outside of lock + client, worker, err := w.startWorker(ns, enabledComponents, multiplicity) + if err != nil { + w.wm.logger.Error("Failed to start sdk worker", tag.WorkflowNamespace(ns.Name().String()), tag.Error(err)) + // TODO: add metric also + return err + } + w.lock.Lock() + defer w.lock.Unlock() + // maybe there was a race and someone else created a client already. stop ours + if !w.wm.Running() || w.client != nil || w.worker != nil { + worker.Stop() + client.Close() return nil } + w.client = client + w.worker = worker + w.componentSet = componentSet + return nil } - policy := backoff.NewExponentialRetryPolicy(ws.wm.initialRetry) + policy := backoff.NewExponentialRetryPolicy(w.wm.initialRetry) + policy.SetMaximumInterval(1 * time.Minute) + policy.SetExpirationInterval(backoff.NoInterval) backoff.ThrottleRetry(op, policy, nil) } -func (ws *workerSet) startWorker( - wc workercommon.PerNSWorkerComponent, +func (w *perNamespaceWorker) startWorker( ns *namespace.Namespace, - options *workercommon.PerNSDedicatedWorkerOptions, + components []workercommon.PerNSWorkerComponent, multiplicity int, -) (*worker, error) { +) (sdkclient.Client, sdkworker.Worker, error) { nsName := ns.Name().String() - client, err := ws.wm.sdkClientFactory.NewClient(nsName, ws.wm.logger) + // TODO: after sdk supports cloning clients to share connections, use that here + client, err := w.wm.sdkClientFactory.NewClient(nsName, w.wm.logger) if err != nil { - return nil, err + return nil, nil, err } - sdkoptions := options.Options - sdkoptions.Identity = fmt.Sprintf("%d@%s@%s@%s@%T", os.Getpid(), ws.wm.hostName, nsName, options.TaskQueue, wc) + var sdkoptions sdkworker.Options + sdkoptions.BackgroundActivityContext = headers.SetCallerInfo(context.Background(), headers.NewCallerInfo(headers.CallerTypeBackground)) + sdkoptions.Identity = fmt.Sprintf("server-worker@%d@%s@%s", os.Getpid(), w.wm.hostName, nsName) // sdk default is 2, we increase it if we're supposed to run with more multiplicity. // other defaults are already large enough. sdkoptions.MaxConcurrentWorkflowTaskPollers = 2 * multiplicity sdkoptions.MaxConcurrentActivityTaskPollers = 2 * multiplicity - sdkworker := ws.wm.sdkWorkerFactory.New(client, options.TaskQueue, sdkoptions) - wc.Register(sdkworker, ns) + sdkworker := w.wm.sdkWorkerFactory.New(client, workercommon.PerNSWorkerTaskQueue, sdkoptions) + for _, cmp := range components { + cmp.Register(sdkworker, ns) + } // TODO: use Run() and handle post-startup errors by recreating worker - // (after sdk supports returning post-startup errors from Run) err = sdkworker.Start() if err != nil { client.Close() - return nil, err + return nil, nil, err } - return &worker{client: client, worker: sdkworker}, nil + return client, sdkworker, nil } -func (w *worker) stop() { - w.worker.Stop() - w.client.Close() +func (w *perNamespaceWorker) stopWorker() { + w.lock.Lock() + defer w.lock.Unlock() + w.stopWorkerLocked() +} + +func (w *perNamespaceWorker) stopWorkerLocked() { + if w.worker != nil { + w.worker.Stop() + w.worker = nil + } + if w.client != nil { + w.client.Close() + w.client = nil + } + w.componentSet = "" } diff --git a/service/worker/pernamespaceworker_test.go b/service/worker/pernamespaceworker_test.go index 814501c6ccb..bfbae9e73ff 100644 --- a/service/worker/pernamespaceworker_test.go +++ b/service/worker/pernamespaceworker_test.go @@ -35,12 +35,13 @@ import ( sdkworker "go.temporal.io/sdk/worker" persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common" "go.temporal.io/server/common/log" "go.temporal.io/server/common/membership" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/sdk" "go.temporal.io/server/common/testing/mocksdk" - "go.temporal.io/server/service/worker/common" + workercommon "go.temporal.io/server/service/worker/common" ) type perNsWorkerManagerSuite struct { @@ -54,8 +55,8 @@ type perNsWorkerManagerSuite struct { hostInfo *membership.HostInfo serviceResolver *membership.MockServiceResolver - cmp1 *common.MockPerNSWorkerComponent - cmp2 *common.MockPerNSWorkerComponent + cmp1 *workercommon.MockPerNSWorkerComponent + cmp2 *workercommon.MockPerNSWorkerComponent manager *perNamespaceWorkerManager } @@ -73,8 +74,8 @@ func (s *perNsWorkerManagerSuite) SetupTest() { s.registry = namespace.NewMockRegistry(s.controller) s.hostInfo = membership.NewHostInfo("self", nil) s.serviceResolver = membership.NewMockServiceResolver(s.controller) - s.cmp1 = common.NewMockPerNSWorkerComponent(s.controller) - s.cmp2 = common.NewMockPerNSWorkerComponent(s.controller) + s.cmp1 = workercommon.NewMockPerNSWorkerComponent(s.controller) + s.cmp2 = workercommon.NewMockPerNSWorkerComponent(s.controller) s.manager = NewPerNamespaceWorkerManager(perNamespaceWorkerManagerInitParams{ Logger: s.logger, @@ -82,7 +83,12 @@ func (s *perNsWorkerManagerSuite) SetupTest() { SdkWorkerFactory: s.wfactory, NamespaceRegistry: s.registry, HostName: "self", - Components: []common.PerNSWorkerComponent{s.cmp1, s.cmp2}, + Config: &Config{ + PerNamespaceWorkerCount: func(ns string) int { + return common.MaxInt(1, map[string]int{"ns1": 1, "ns2": 2, "ns3": 3}[ns]) + }, + }, + Components: []workercommon.PerNSWorkerComponent{s.cmp1, s.cmp2}, }) s.manager.initialRetry = 1 * time.Millisecond @@ -102,10 +108,10 @@ func (s *perNsWorkerManagerSuite) TearDownTest() { func (s *perNsWorkerManagerSuite) TestDisabled() { ns := testns("ns1", enumspb.NAMESPACE_STATE_REGISTERED) - s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&common.PerNSDedicatedWorkerOptions{ + s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ Enabled: false, }).AnyTimes() - s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&common.PerNSDedicatedWorkerOptions{ + s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ Enabled: false, }).AnyTimes() @@ -116,17 +122,14 @@ func (s *perNsWorkerManagerSuite) TestDisabled() { func (s *perNsWorkerManagerSuite) TestEnabledButResolvedToOther() { ns := testns("ns1", enumspb.NAMESPACE_STATE_REGISTERED) - s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&common.PerNSDedicatedWorkerOptions{ - Enabled: true, - TaskQueue: "tq1", - NumWorkers: 2, + s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ + Enabled: true, }).AnyTimes() - s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&common.PerNSDedicatedWorkerOptions{ + s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().Lookup("ns1/tq1/0").Return(membership.NewHostInfo("other1", nil), nil) - s.serviceResolver.EXPECT().Lookup("ns1/tq1/1").Return(membership.NewHostInfo("other2", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfo("other1", nil), nil) s.manager.namespaceCallback(ns, false) // main work happens in a goroutine @@ -136,20 +139,18 @@ func (s *perNsWorkerManagerSuite) TestEnabledButResolvedToOther() { func (s *perNsWorkerManagerSuite) TestEnabled() { ns := testns("ns1", enumspb.NAMESPACE_STATE_REGISTERED) - s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&common.PerNSDedicatedWorkerOptions{ - Enabled: true, - TaskQueue: "tq1", - NumWorkers: 1, + s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ + Enabled: true, }).AnyTimes() - s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&common.PerNSDedicatedWorkerOptions{ + s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().Lookup("ns1/tq1/0").Return(membership.NewHostInfo("self", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfo("self", nil), nil) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient("ns1", gomock.Any()).Return(cli1, nil) wkr1 := mocksdk.NewMockWorker(s.controller) - s.wfactory.EXPECT().New(cli1, "tq1", gomock.Any()).Return(wkr1) + s.wfactory.EXPECT().New(cli1, workercommon.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1) s.cmp1.EXPECT().Register(wkr1, ns) wkr1.EXPECT().Start() @@ -162,24 +163,22 @@ func (s *perNsWorkerManagerSuite) TestEnabled() { } func (s *perNsWorkerManagerSuite) TestMultiplicity() { - ns := testns("ns1", enumspb.NAMESPACE_STATE_REGISTERED) + ns := testns("ns3", enumspb.NAMESPACE_STATE_REGISTERED) // three workers - s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&common.PerNSDedicatedWorkerOptions{ - Enabled: true, - TaskQueue: "tq1", - NumWorkers: 3, + s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ + Enabled: true, }).AnyTimes() - s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&common.PerNSDedicatedWorkerOptions{ + s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().Lookup("ns1/tq1/0").Return(membership.NewHostInfo("self", nil), nil) - s.serviceResolver.EXPECT().Lookup("ns1/tq1/1").Return(membership.NewHostInfo("other", nil), nil) - s.serviceResolver.EXPECT().Lookup("ns1/tq1/2").Return(membership.NewHostInfo("self", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns3/0").Return(membership.NewHostInfo("self", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns3/1").Return(membership.NewHostInfo("other", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns3/2").Return(membership.NewHostInfo("self", nil), nil) cli1 := mocksdk.NewMockClient(s.controller) - s.cfactory.EXPECT().NewClient("ns1", gomock.Any()).Return(cli1, nil) + s.cfactory.EXPECT().NewClient("ns3", gomock.Any()).Return(cli1, nil) wkr1 := mocksdk.NewMockWorker(s.controller) - s.wfactory.EXPECT().New(cli1, "tq1", gomock.Any()).Do(func(_, _ any, options sdkworker.Options) { + s.wfactory.EXPECT().New(cli1, workercommon.PerNSWorkerTaskQueue, gomock.Any()).Do(func(_, _ any, options sdkworker.Options) { s.Equal(4, options.MaxConcurrentWorkflowTaskPollers) s.Equal(4, options.MaxConcurrentActivityTaskPollers) }).Return(wkr1) @@ -197,83 +196,64 @@ func (s *perNsWorkerManagerSuite) TestTwoNamespacesTwoComponents() { ns1 := testns("ns1", enumspb.NAMESPACE_STATE_REGISTERED) ns2 := testns("ns2", enumspb.NAMESPACE_STATE_REGISTERED) - s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&common.PerNSDedicatedWorkerOptions{ - Enabled: true, - TaskQueue: "tq1", - NumWorkers: 1, - }).AnyTimes() - s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&common.PerNSDedicatedWorkerOptions{ - Enabled: true, - TaskQueue: "tq2", - NumWorkers: 1, - }).AnyTimes() + s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).DoAndReturn( + func(ns *namespace.Namespace) *workercommon.PerNSDedicatedWorkerOptions { + return &workercommon.PerNSDedicatedWorkerOptions{Enabled: true} + }).AnyTimes() + s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).DoAndReturn( + func(ns *namespace.Namespace) *workercommon.PerNSDedicatedWorkerOptions { + // only enabled on ns1 + return &workercommon.PerNSDedicatedWorkerOptions{Enabled: ns.Name().String() == "ns1"} + }).AnyTimes() + + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfo("self", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns2/0").Return(membership.NewHostInfo("self", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns2/1").Return(membership.NewHostInfo("self", nil), nil) + + cli1 := mocksdk.NewMockClient(s.controller) + cli2 := mocksdk.NewMockClient(s.controller) + + wkr1 := mocksdk.NewMockWorker(s.controller) + wkr2 := mocksdk.NewMockWorker(s.controller) + + s.cfactory.EXPECT().NewClient("ns1", gomock.Any()).Return(cli1, nil) + s.cfactory.EXPECT().NewClient("ns2", gomock.Any()).Return(cli2, nil) - s.serviceResolver.EXPECT().Lookup("ns1/tq1/0").Return(membership.NewHostInfo("self", nil), nil) - s.serviceResolver.EXPECT().Lookup("ns1/tq2/0").Return(membership.NewHostInfo("self", nil), nil) - s.serviceResolver.EXPECT().Lookup("ns2/tq1/0").Return(membership.NewHostInfo("self", nil), nil) - s.serviceResolver.EXPECT().Lookup("ns2/tq2/0").Return(membership.NewHostInfo("self", nil), nil) - - cli11 := mocksdk.NewMockClient(s.controller) - cli12 := mocksdk.NewMockClient(s.controller) - cli21 := mocksdk.NewMockClient(s.controller) - cli22 := mocksdk.NewMockClient(s.controller) - - wkr11 := mocksdk.NewMockWorker(s.controller) - wkr12 := mocksdk.NewMockWorker(s.controller) - wkr21 := mocksdk.NewMockWorker(s.controller) - wkr22 := mocksdk.NewMockWorker(s.controller) - - s.cfactory.EXPECT().NewClient("ns1", gomock.Any()).Return(cli11, nil) - s.cfactory.EXPECT().NewClient("ns1", gomock.Any()).Return(cli12, nil) - s.cfactory.EXPECT().NewClient("ns2", gomock.Any()).Return(cli21, nil) - s.cfactory.EXPECT().NewClient("ns2", gomock.Any()).Return(cli22, nil) - - s.wfactory.EXPECT().New(cli11, "tq1", gomock.Any()).Return(wkr11) - s.wfactory.EXPECT().New(cli12, "tq2", gomock.Any()).Return(wkr12) - s.wfactory.EXPECT().New(cli21, "tq1", gomock.Any()).Return(wkr21) - s.wfactory.EXPECT().New(cli22, "tq2", gomock.Any()).Return(wkr22) - - s.cmp1.EXPECT().Register(wkr11, ns1) - s.cmp2.EXPECT().Register(wkr12, ns1) - s.cmp1.EXPECT().Register(wkr21, ns2) - s.cmp2.EXPECT().Register(wkr22, ns2) - - wkr11.EXPECT().Start() - wkr12.EXPECT().Start() - wkr21.EXPECT().Start() - wkr22.EXPECT().Start() + s.wfactory.EXPECT().New(cli1, workercommon.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1) + s.wfactory.EXPECT().New(cli2, workercommon.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr2) + + s.cmp1.EXPECT().Register(wkr1, ns1) + s.cmp1.EXPECT().Register(wkr2, ns2) + s.cmp2.EXPECT().Register(wkr1, ns1) + + wkr1.EXPECT().Start() + wkr2.EXPECT().Start() s.manager.namespaceCallback(ns1, false) s.manager.namespaceCallback(ns2, false) time.Sleep(50 * time.Millisecond) - wkr11.EXPECT().Stop() - wkr12.EXPECT().Stop() - wkr21.EXPECT().Stop() - wkr22.EXPECT().Stop() - cli11.EXPECT().Close() - cli12.EXPECT().Close() - cli21.EXPECT().Close() - cli22.EXPECT().Close() + wkr1.EXPECT().Stop() + wkr2.EXPECT().Stop() + cli1.EXPECT().Close() + cli2.EXPECT().Close() } func (s *perNsWorkerManagerSuite) TestDeleteNs() { ns := testns("ns1", enumspb.NAMESPACE_STATE_REGISTERED) - s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&common.PerNSDedicatedWorkerOptions{ - Enabled: true, - TaskQueue: "tq1", - NumWorkers: 1, + s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ + Enabled: true, }).AnyTimes() - s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&common.PerNSDedicatedWorkerOptions{ + s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().Lookup("ns1/tq1/0").Return(membership.NewHostInfo("self", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfo("self", nil), nil) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient("ns1", gomock.Any()).Return(cli1, nil) wkr1 := mocksdk.NewMockWorker(s.controller) - s.wfactory.EXPECT().New(cli1, "tq1", gomock.Any()).Return(wkr1) + s.wfactory.EXPECT().New(cli1, workercommon.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1) s.cmp1.EXPECT().Register(wkr1, ns) wkr1.EXPECT().Start() @@ -289,11 +269,11 @@ func (s *perNsWorkerManagerSuite) TestDeleteNs() { // restore it nsRestored := testns("ns1", enumspb.NAMESPACE_STATE_REGISTERED) - s.serviceResolver.EXPECT().Lookup("ns1/tq1/0").Return(membership.NewHostInfo("self", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfo("self", nil), nil) cli2 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient("ns1", gomock.Any()).Return(cli2, nil) wkr2 := mocksdk.NewMockWorker(s.controller) - s.wfactory.EXPECT().New(cli1, "tq1", gomock.Any()).Return(wkr2) + s.wfactory.EXPECT().New(cli1, workercommon.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr2) s.cmp1.EXPECT().Register(wkr2, ns) wkr2.EXPECT().Start() @@ -310,27 +290,25 @@ func (s *perNsWorkerManagerSuite) TestDeleteNs() { func (s *perNsWorkerManagerSuite) TestMembershipChanged() { ns := testns("ns1", enumspb.NAMESPACE_STATE_REGISTERED) - s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&common.PerNSDedicatedWorkerOptions{ - Enabled: true, - TaskQueue: "tq1", - NumWorkers: 1, + s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ + Enabled: true, }).AnyTimes() - s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&common.PerNSDedicatedWorkerOptions{ + s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ Enabled: false, }).AnyTimes() // we don't own it at first - s.serviceResolver.EXPECT().Lookup("ns1/tq1/0").Return(membership.NewHostInfo("other", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfo("other", nil), nil) s.manager.namespaceCallback(ns, false) time.Sleep(50 * time.Millisecond) // now we own it - s.serviceResolver.EXPECT().Lookup("ns1/tq1/0").Return(membership.NewHostInfo("self", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfo("self", nil), nil) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient("ns1", gomock.Any()).Return(cli1, nil) wkr1 := mocksdk.NewMockWorker(s.controller) - s.wfactory.EXPECT().New(cli1, "tq1", gomock.Any()).Return(wkr1) + s.wfactory.EXPECT().New(cli1, workercommon.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1) s.cmp1.EXPECT().Register(wkr1, ns) wkr1.EXPECT().Start() @@ -338,7 +316,7 @@ func (s *perNsWorkerManagerSuite) TestMembershipChanged() { time.Sleep(50 * time.Millisecond) // now we don't own it anymore - s.serviceResolver.EXPECT().Lookup("ns1/tq1/0").Return(membership.NewHostInfo("other", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfo("other", nil), nil) wkr1.EXPECT().Stop() cli1.EXPECT().Close() @@ -349,23 +327,21 @@ func (s *perNsWorkerManagerSuite) TestMembershipChanged() { func (s *perNsWorkerManagerSuite) TestServiceResolverError() { ns := testns("ns1", enumspb.NAMESPACE_STATE_REGISTERED) - s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&common.PerNSDedicatedWorkerOptions{ - Enabled: true, - TaskQueue: "tq1", - NumWorkers: 1, + s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ + Enabled: true, }).AnyTimes() - s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&common.PerNSDedicatedWorkerOptions{ + s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().Lookup("ns1/tq1/0").Return(nil, errors.New("resolver error")) - s.serviceResolver.EXPECT().Lookup("ns1/tq1/0").Return(nil, errors.New("resolver error again")) - s.serviceResolver.EXPECT().Lookup("ns1/tq1/0").Return(membership.NewHostInfo("self", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(nil, errors.New("resolver error")) + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(nil, errors.New("resolver error again")) + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfo("self", nil), nil) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient("ns1", gomock.Any()).Return(cli1, nil) wkr1 := mocksdk.NewMockWorker(s.controller) - s.wfactory.EXPECT().New(cli1, "tq1", gomock.Any()).Return(wkr1) + s.wfactory.EXPECT().New(cli1, workercommon.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1) s.cmp1.EXPECT().Register(wkr1, ns) wkr1.EXPECT().Start() @@ -380,16 +356,14 @@ func (s *perNsWorkerManagerSuite) TestServiceResolverError() { func (s *perNsWorkerManagerSuite) TestNewClientError() { ns := testns("ns1", enumspb.NAMESPACE_STATE_REGISTERED) - s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&common.PerNSDedicatedWorkerOptions{ - Enabled: true, - TaskQueue: "tq1", - NumWorkers: 1, + s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ + Enabled: true, }).AnyTimes() - s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&common.PerNSDedicatedWorkerOptions{ + s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().Lookup("ns1/tq1/0").Return(membership.NewHostInfo("self", nil), nil).AnyTimes() + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfo("self", nil), nil).AnyTimes() cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient("ns1", gomock.Any()).Return(nil, errors.New("new client error")) @@ -397,7 +371,7 @@ func (s *perNsWorkerManagerSuite) TestNewClientError() { s.cfactory.EXPECT().NewClient("ns1", gomock.Any()).Return(cli1, nil) wkr1 := mocksdk.NewMockWorker(s.controller) - s.wfactory.EXPECT().New(cli1, "tq1", gomock.Any()).Return(wkr1) + s.wfactory.EXPECT().New(cli1, workercommon.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1) s.cmp1.EXPECT().Register(wkr1, ns) wkr1.EXPECT().Start() @@ -412,21 +386,19 @@ func (s *perNsWorkerManagerSuite) TestNewClientError() { func (s *perNsWorkerManagerSuite) TestStartWorkerError() { ns := testns("ns1", enumspb.NAMESPACE_STATE_REGISTERED) - s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&common.PerNSDedicatedWorkerOptions{ - Enabled: true, - TaskQueue: "tq1", - NumWorkers: 1, + s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ + Enabled: true, }).AnyTimes() - s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&common.PerNSDedicatedWorkerOptions{ + s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().Lookup("ns1/tq1/0").Return(membership.NewHostInfo("self", nil), nil).AnyTimes() + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfo("self", nil), nil).AnyTimes() cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient("ns1", gomock.Any()).Return(cli1, nil) wkr1 := mocksdk.NewMockWorker(s.controller) - s.wfactory.EXPECT().New(cli1, "tq1", gomock.Any()).Return(wkr1) + s.wfactory.EXPECT().New(cli1, workercommon.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1) s.cmp1.EXPECT().Register(wkr1, ns) // first try fails to start @@ -437,7 +409,7 @@ func (s *perNsWorkerManagerSuite) TestStartWorkerError() { cli2 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient("ns1", gomock.Any()).Return(cli2, nil) wkr2 := mocksdk.NewMockWorker(s.controller) - s.wfactory.EXPECT().New(cli1, "tq1", gomock.Any()).Return(wkr2) + s.wfactory.EXPECT().New(cli1, workercommon.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr2) s.cmp1.EXPECT().Register(wkr2, ns) wkr2.EXPECT().Start() diff --git a/service/worker/scheduler/fx.go b/service/worker/scheduler/fx.go index 608d9355622..57407fb3340 100644 --- a/service/worker/scheduler/fx.go +++ b/service/worker/scheduler/fx.go @@ -25,8 +25,6 @@ package scheduler import ( - "context" - "go.uber.org/fx" "go.temporal.io/api/workflowservice/v1" @@ -34,7 +32,6 @@ import ( "go.temporal.io/sdk/workflow" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/common/dynamicconfig" - "go.temporal.io/server/common/headers" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" @@ -42,15 +39,13 @@ import ( ) const ( - WorkflowType = "temporal-sys-scheduler-workflow" - TaskQueueName = "temporal-sys-scheduler-tq" + WorkflowType = "temporal-sys-scheduler-workflow" ) type ( workerComponent struct { activityDeps activityDeps enabledForNs dynamicconfig.BoolPropertyFnWithNamespaceFilter - numWorkers dynamicconfig.IntPropertyFnWithNamespaceFilter } activityDeps struct { @@ -80,20 +75,13 @@ func NewResult( activityDeps: params, enabledForNs: dcCollection.GetBoolPropertyFnWithNamespaceFilter( dynamicconfig.WorkerEnableScheduler, false), - numWorkers: dcCollection.GetIntPropertyFilteredByNamespace( - dynamicconfig.WorkerSchedulerNumWorkers, 1), }, } } func (s *workerComponent) DedicatedWorkerOptions(ns *namespace.Namespace) *workercommon.PerNSDedicatedWorkerOptions { return &workercommon.PerNSDedicatedWorkerOptions{ - Enabled: s.enabledForNs(ns.Name().String()), - TaskQueue: TaskQueueName, - NumWorkers: s.numWorkers(ns.Name().String()), - Options: sdkworker.Options{ - BackgroundActivityContext: headers.SetCallerInfo(context.Background(), headers.NewCallerInfo(headers.CallerTypeBackground)), - }, + Enabled: s.enabledForNs(ns.Name().String()), } } diff --git a/service/worker/service.go b/service/worker/service.go index b9bf6199fe8..0c75e5a620f 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -110,6 +110,7 @@ type ( EnablePersistencePriorityRateLimiting dynamicconfig.BoolPropertyFn EnableBatcher dynamicconfig.BoolPropertyFn EnableParentClosePolicyWorker dynamicconfig.BoolPropertyFn + PerNamespaceWorkerCount dynamicconfig.IntPropertyFnWithNamespaceFilter StandardVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn StandardVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn @@ -298,6 +299,10 @@ func NewConfig(dc *dynamicconfig.Collection, persistenceConfig *config.Persisten dynamicconfig.EnableParentClosePolicyWorker, true, ), + PerNamespaceWorkerCount: dc.GetIntPropertyFilteredByNamespace( + dynamicconfig.WorkerPerNamespaceWorkerCount, + 1, + ), ThrottledLogRPS: dc.GetIntProperty( dynamicconfig.WorkerThrottledLogRPS, 20,