diff --git a/service/frontend/dcRedirectionHandler.go b/service/frontend/dcRedirectionHandler.go index d479b4cd9cf..6aff7171eae 100644 --- a/service/frontend/dcRedirectionHandler.go +++ b/service/frontend/dcRedirectionHandler.go @@ -1656,14 +1656,15 @@ func (handler *DCRedirectionHandlerImpl) UpdateWorkflow( switch { case targetDC == handler.currentClusterName: resp, err = handler.frontendHandler.UpdateWorkflow(ctx, request) + return err default: remoteClient, err := handler.clientBean.GetRemoteFrontendClient(targetDC) if err != nil { return err } resp, err = remoteClient.UpdateWorkflow(ctx, request) + return err } - return err }) return resp, err diff --git a/service/history/queueProcessor.go b/service/history/queueProcessor.go index 8f094e68d89..7141b547f78 100644 --- a/service/history/queueProcessor.go +++ b/service/history/queueProcessor.go @@ -216,19 +216,19 @@ eventLoop: } func (p *queueProcessorBase) processBatch() { - - if !p.verifyReschedulerSize() { - return - } - ctx, cancel := context.WithTimeout(context.Background(), loadQueueTaskThrottleRetryDelay) if err := p.rateLimiter.Wait(ctx); err != nil { + deadline, _ := ctx.Deadline() + p.throttle(deadline.Sub(p.timeSource.Now())) cancel() - p.notifyNewTask() // re-enqueue the event return } cancel() + if !p.verifyReschedulerSize() { + return + } + p.lastPollTime = p.timeSource.Now() tasks, more, err := p.ackMgr.readQueueTasks() @@ -265,14 +265,20 @@ func (p *queueProcessorBase) verifyReschedulerSize() bool { p.backoffTimer = nil } if !passed && p.backoffTimer == nil { - p.backoffTimer = time.AfterFunc(p.options.PollBackoffInterval(), func() { - p.notifyNewTask() // re-enqueue the event - }) + p.throttle(p.options.PollBackoffInterval()) } return passed } +func (p *queueProcessorBase) throttle(duration time.Duration) { + if p.backoffTimer == nil { + p.backoffTimer = time.AfterFunc(duration, func() { + p.notifyNewTask() // re-enqueue the event + }) + } +} + func (p *queueProcessorBase) submitTask( executable queues.Executable, ) { diff --git a/service/history/timerQueueProcessorBase.go b/service/history/timerQueueProcessorBase.go index 601bf5e0e93..ab4dfcbf8b9 100644 --- a/service/history/timerQueueProcessorBase.go +++ b/service/history/timerQueueProcessorBase.go @@ -299,18 +299,19 @@ eventLoop: } func (t *timerQueueProcessorBase) readAndFanoutTimerTasks() (*time.Time, error) { - if !t.verifyReschedulerSize() { - return nil, nil - } - ctx, cancel := context.WithTimeout(context.Background(), loadTimerTaskThrottleRetryDelay) if err := t.rateLimiter.Wait(ctx); err != nil { + deadline, _ := ctx.Deadline() + t.notifyNewTimer(deadline) // re-enqueue the event cancel() - t.notifyNewTimer(time.Time{}) // re-enqueue the event return nil, nil } cancel() + if !t.verifyReschedulerSize() { + return nil, nil + } + t.lastPollTime = t.timeSource.Now() timerTasks, nextFireTime, moreTasks, err := t.timerQueueAckMgr.readTimerTasks() if err != nil {