diff --git a/service/history/queueProcessor.go b/service/history/queueProcessor.go index 3bb7f0db8fa..ff0d7819971 100644 --- a/service/history/queueProcessor.go +++ b/service/history/queueProcessor.go @@ -210,7 +210,7 @@ eventLoop: p.options.UpdateAckInterval(), p.options.UpdateAckIntervalJitterCoefficient(), )) - if err := p.ackMgr.updateQueueAckLevel(); err == shard.ErrShardClosed { + if err := p.ackMgr.updateQueueAckLevel(); shard.IsShardOwnershipLostError(err) { // shard is no longer owned by this instance, bail out go p.Stop() break eventLoop diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index 49ce0e5df23..02689bdfd24 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -157,9 +157,6 @@ type ( var _ Context = (*ContextImpl)(nil) var ( - // ErrShardClosed is returned when shard is closed and a req cannot be processed - ErrShardClosed = serviceerror.NewUnavailable("shard closed") - // ErrShardStatusUnknown means we're not sure if we have the shard lock or not. This may be returned // during short windows at initialization and if we've lost the connection to the database. ErrShardStatusUnknown = serviceerror.NewUnavailable("shard status unknown") @@ -1129,7 +1126,7 @@ func (s *ContextImpl) errorByState() error { case contextStateAcquired: return nil case contextStateStopping, contextStateStopped: - return ErrShardClosed + return s.newShardClosedErrorWithShardID() default: panic("invalid state") } @@ -1810,7 +1807,7 @@ func (s *ContextImpl) acquireShard() { op := func() error { if !s.isValid() { - return ErrShardClosed + return s.newShardClosedErrorWithShardID() } // Initial load of shard metadata @@ -2038,6 +2035,14 @@ func (s *ContextImpl) newIOContext() (context.Context, context.CancelFunc) { return ctx, cancel } +// newShardClosedErrorWithShardID when shard is closed and a req cannot be processed +func (s *ContextImpl) newShardClosedErrorWithShardID() *persistence.ShardOwnershipLostError { + return &persistence.ShardOwnershipLostError{ + ShardID: s.shardID, // immutable + Msg: "shard closed", + } +} + func OperationPossiblySucceeded(err error) bool { switch err.(type) { case *persistence.CurrentWorkflowConditionFailedError, diff --git a/service/history/shard/controller_impl.go b/service/history/shard/controller_impl.go index 78af2043d8f..1f7891bf951 100644 --- a/service/history/shard/controller_impl.go +++ b/service/history/shard/controller_impl.go @@ -302,9 +302,10 @@ func (c *ControllerImpl) removeShard(shardID int32, expected *ContextImpl) (*Con // ControllerImpl. It is responsible for acquiring / // releasing shards in response to any event that can // change the shard ownership. These events are -// a. Ring membership change -// b. Periodic ticker -// c. ShardOwnershipLostError and subsequent ShardClosedEvents from engine +// +// a. Ring membership change +// b. Periodic ticker +// c. ShardOwnershipLostError and subsequent ShardClosedEvents from engine func (c *ControllerImpl) shardManagementPump() { defer c.shutdownWG.Done() @@ -423,13 +424,11 @@ func (c *ControllerImpl) ShardIDs() []int32 { } func IsShardOwnershipLostError(err error) bool { - if err == ErrShardClosed { - return true - } - switch err.(type) { case *persistence.ShardOwnershipLostError: return true + case *serviceerrors.ShardOwnershipLost: + return true } return false diff --git a/service/history/timerQueueProcessor.go b/service/history/timerQueueProcessor.go index 5a230cbe364..4c2caf4507e 100644 --- a/service/history/timerQueueProcessor.go +++ b/service/history/timerQueueProcessor.go @@ -310,8 +310,8 @@ func (t *timerQueueProcessorImpl) completeTimersLoop() { return false default: } - return err != shard.ErrShardClosed - }); err == shard.ErrShardClosed { + return !shard.IsShardOwnershipLostError(err) + }); shard.IsShardOwnershipLostError(err) { // shard is unloaded, timer processor should quit as well go t.Stop() return diff --git a/service/history/timerQueueProcessorBase.go b/service/history/timerQueueProcessorBase.go index c608829578f..9b33b10eeff 100644 --- a/service/history/timerQueueProcessorBase.go +++ b/service/history/timerQueueProcessorBase.go @@ -282,7 +282,7 @@ eventLoop: t.config.TimerProcessorUpdateAckInterval(), t.config.TimerProcessorUpdateAckIntervalJitterCoefficient(), )) - if err := t.timerQueueAckMgr.updateAckLevel(); err == shard.ErrShardClosed { + if err := t.timerQueueAckMgr.updateAckLevel(); shard.IsShardOwnershipLostError(err) { // shard is closed, shutdown timerQProcessor and bail out go t.Stop() return err diff --git a/service/history/transferQueueProcessor.go b/service/history/transferQueueProcessor.go index 6900bb06dae..8ae92c9c430 100644 --- a/service/history/transferQueueProcessor.go +++ b/service/history/transferQueueProcessor.go @@ -304,8 +304,8 @@ func (t *transferQueueProcessorImpl) completeTransferLoop() { return false default: } - return err != shard.ErrShardClosed - }); err == shard.ErrShardClosed { + return !shard.IsShardOwnershipLostError(err) + }); shard.IsShardOwnershipLostError(err) { // shard is unloaded, transfer processor should quit as well t.Stop() return diff --git a/service/history/visibilityQueueProcessor.go b/service/history/visibilityQueueProcessor.go index e8b278ddc98..c6ae9641557 100644 --- a/service/history/visibilityQueueProcessor.go +++ b/service/history/visibilityQueueProcessor.go @@ -25,7 +25,6 @@ package history import ( - "errors" "sync/atomic" "time" @@ -277,8 +276,8 @@ func (t *visibilityQueueProcessorImpl) completeTaskLoop() { return false default: } - return err != shard.ErrShardClosed - }); errors.Is(err, shard.ErrShardClosed) { + return !shard.IsShardOwnershipLostError(err) + }); shard.IsShardOwnershipLostError(err) { // shard closed, trigger shutdown and bail out t.Stop() return