From 3b6d586026a72816a0d0d9d3544ef3a4e6500e1b Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Thu, 24 Feb 2022 12:13:41 -0800 Subject: [PATCH] Check is global ns enabled in handler (#2541) --- service/history/handler.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/service/history/handler.go b/service/history/handler.go index 1c30376a7de..03031cde558 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -1096,6 +1096,10 @@ func (h *Handler) ReplicateEventsV2(ctx context.Context, request *historyservice return nil, errShuttingDown } + if err := h.validateReplicationConfig(); err != nil { + return nil, err + } + namespaceID := namespace.ID(request.GetNamespaceId()) if namespaceID == "" { return nil, h.convertError(errNamespaceNotSet) @@ -1160,6 +1164,10 @@ func (h *Handler) SyncActivity(ctx context.Context, request *historyservice.Sync return nil, errShuttingDown } + if err := h.validateReplicationConfig(); err != nil { + return nil, err + } + namespaceID := namespace.ID(request.GetNamespaceId()) if request.GetNamespaceId() == "" || uuid.Parse(request.GetNamespaceId()) == nil { return nil, h.convertError(errNamespaceNotSet) @@ -1195,6 +1203,9 @@ func (h *Handler) GetReplicationMessages(ctx context.Context, request *historyse if h.isStopped() { return nil, errShuttingDown } + if err := h.validateReplicationConfig(); err != nil { + return nil, err + } var wg sync.WaitGroup wg.Add(len(request.Tokens)) @@ -1248,6 +1259,9 @@ func (h *Handler) GetDLQReplicationMessages(ctx context.Context, request *histor if h.isStopped() { return nil, errShuttingDown } + if err := h.validateReplicationConfig(); err != nil { + return nil, err + } taskInfoPerShard := map[int32][]*replicationspb.ReplicationTaskInfo{} // do batch based on workflow ID and run ID @@ -1487,6 +1501,9 @@ func (h *Handler) GetReplicationStatus( if h.isStopped() { return nil, errShuttingDown } + if err := h.validateReplicationConfig(); err != nil { + return nil, err + } resp := &historyservice.GetReplicationStatusResponse{} for _, shardID := range h.controller.ShardIDs() { @@ -1525,6 +1542,13 @@ func (h *Handler) convertError(err error) error { return err } +func (h *Handler) validateReplicationConfig() error { + if !h.clusterMetadata.IsGlobalNamespaceEnabled() { + return serviceerror.NewUnavailable("The cluster has global namespace disabled. The operation is not supported.") + } + return nil +} + func validateTaskToken(taskToken *tokenspb.Task) error { if taskToken.GetWorkflowId() == "" { return errWorkflowIDNotSet