From a95c2fcaa97c0dbfe087b64dc9084ffad834d5b1 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Fri, 1 Jul 2022 16:23:30 -0700 Subject: [PATCH] Fix error shadowing in standby executors (#3053) --- .../history/timerQueueStandbyTaskExecutor.go | 86 +++++++++--------- .../transferQueueStandbyTaskExecutor.go | 88 ++++++++++--------- 2 files changed, 93 insertions(+), 81 deletions(-) diff --git a/service/history/timerQueueStandbyTaskExecutor.go b/service/history/timerQueueStandbyTaskExecutor.go index 776d132f194..b0c1f9061da 100644 --- a/service/history/timerQueueStandbyTaskExecutor.go +++ b/service/history/timerQueueStandbyTaskExecutor.go @@ -517,52 +517,58 @@ func (t *timerQueueStandbyTaskExecutor) fetchHistoryFromRemote( return err } if resendInfo.lastEventID == common.EmptyEventID || resendInfo.lastEventVersion == common.EmptyVersion { - err = serviceerror.NewInternal("timerQueueStandbyProcessor encountered empty historyResendInfo") - } else { - ns, err := t.registry.GetNamespaceByID(namespace.ID(taskInfo.GetNamespaceID())) - if err != nil { - return err - } + t.logger.Error("Error re-replicating history from remote: timerQueueStandbyProcessor encountered empty historyResendInfo.", + tag.ShardID(t.shard.GetShardID()), + tag.WorkflowNamespaceID(taskInfo.GetNamespaceID()), + tag.WorkflowID(taskInfo.GetWorkflowID()), + tag.WorkflowRunID(taskInfo.GetRunID()), + tag.ClusterName(remoteClusterName)) - if err := refreshTasks( - ctx, - adminClient, - ns.Name(), - namespace.ID(taskInfo.GetNamespaceID()), - taskInfo.GetWorkflowID(), - taskInfo.GetRunID(), - ); err != nil { - if _, isNotFound := err.(*serviceerror.NamespaceNotFound); isNotFound { - // Don't log NamespaceNotFound error because it is valid case, and return error to stop retry. - return err - } + return consts.ErrTaskRetry + } - t.logger.Error("Error refresh tasks from remote.", - tag.ShardID(t.shard.GetShardID()), - tag.WorkflowNamespaceID(taskInfo.GetNamespaceID()), - tag.WorkflowID(taskInfo.GetWorkflowID()), - tag.WorkflowRunID(taskInfo.GetRunID()), - tag.ClusterName(remoteClusterName), - tag.Error(err)) + ns, err := t.registry.GetNamespaceByID(namespace.ID(taskInfo.GetNamespaceID())) + if err != nil { + // This is most likely a NamespaceNotFound error. Don't log it and return error to stop retrying. + return err + } + + if err = refreshTasks( + ctx, + adminClient, + ns.Name(), + namespace.ID(taskInfo.GetNamespaceID()), + taskInfo.GetWorkflowID(), + taskInfo.GetRunID(), + ); err != nil { + if _, isNotFound := err.(*serviceerror.NamespaceNotFound); isNotFound { + // Don't log NamespaceNotFound error because it is valid case, and return error to stop retrying. + return err } - // NOTE: history resend may take long time and its timeout is currently - // controlled by a separate dynamicconfig config: StandbyTaskReReplicationContextTimeout - err = t.nDCHistoryResender.SendSingleWorkflowHistory( - remoteClusterName, - namespace.ID(taskInfo.GetNamespaceID()), - taskInfo.GetWorkflowID(), - taskInfo.GetRunID(), - resendInfo.lastEventID, - resendInfo.lastEventVersion, - common.EmptyEventID, - common.EmptyVersion, - ) + t.logger.Error("Error refresh tasks from remote.", + tag.ShardID(t.shard.GetShardID()), + tag.WorkflowNamespaceID(taskInfo.GetNamespaceID()), + tag.WorkflowID(taskInfo.GetWorkflowID()), + tag.WorkflowRunID(taskInfo.GetRunID()), + tag.ClusterName(remoteClusterName), + tag.Error(err)) } - if err != nil { + // NOTE: history resend may take long time and its timeout is currently + // controlled by a separate dynamicconfig config: StandbyTaskReReplicationContextTimeout + if err = t.nDCHistoryResender.SendSingleWorkflowHistory( + remoteClusterName, + namespace.ID(taskInfo.GetNamespaceID()), + taskInfo.GetWorkflowID(), + taskInfo.GetRunID(), + resendInfo.lastEventID, + resendInfo.lastEventVersion, + common.EmptyEventID, + common.EmptyVersion, + ); err != nil { if _, isNotFound := err.(*serviceerror.NamespaceNotFound); isNotFound { - // Don't log NamespaceNotFound error because it is valid case, and return error to stop retry. + // Don't log NamespaceNotFound error because it is valid case, and return error to stop retrying. return err } t.logger.Error("Error re-replicating history from remote.", @@ -574,7 +580,7 @@ func (t *timerQueueStandbyTaskExecutor) fetchHistoryFromRemote( tag.Error(err)) } - // return error so task processing logic will retry + // Return retryable error, so task processing will retry. return consts.ErrTaskRetry } diff --git a/service/history/transferQueueStandbyTaskExecutor.go b/service/history/transferQueueStandbyTaskExecutor.go index a5bbdbe042a..952bdd5c933 100644 --- a/service/history/transferQueueStandbyTaskExecutor.go +++ b/service/history/transferQueueStandbyTaskExecutor.go @@ -619,51 +619,57 @@ func (t *transferQueueStandbyTaskExecutor) fetchHistoryFromRemote( return err } if resendInfo.lastEventID == common.EmptyEventID || resendInfo.lastEventVersion == common.EmptyVersion { - err = serviceerror.NewInternal("transferQueueStandbyProcessor encountered empty historyResendInfo") - } else { - ns, err := t.registry.GetNamespaceByID(namespace.ID(taskInfo.GetNamespaceID())) - if err != nil { - return err - } - - if err := refreshTasks( - ctx, - adminClient, - ns.Name(), - namespace.ID(taskInfo.GetNamespaceID()), - taskInfo.GetWorkflowID(), - taskInfo.GetRunID(), - ); err != nil { - if _, isNotFound := err.(*serviceerror.NamespaceNotFound); isNotFound { - // Don't log NamespaceNotFound error because it is valid case, and return error to stop retry. - return err - } - t.logger.Error("Error refresh tasks from remote.", - tag.ShardID(t.shard.GetShardID()), - tag.WorkflowNamespaceID(taskInfo.GetNamespaceID()), - tag.WorkflowID(taskInfo.GetWorkflowID()), - tag.WorkflowRunID(taskInfo.GetRunID()), - tag.ClusterName(remoteClusterName), - tag.Error(err)) - } + t.logger.Error("Error re-replicating history from remote: transferQueueStandbyProcessor encountered empty historyResendInfo.", + tag.ShardID(t.shard.GetShardID()), + tag.WorkflowNamespaceID(taskInfo.GetNamespaceID()), + tag.WorkflowID(taskInfo.GetWorkflowID()), + tag.WorkflowRunID(taskInfo.GetRunID()), + tag.SourceCluster(remoteClusterName)) - // NOTE: history resend may take long time and its timeout is currently - // controlled by a separate dynamicconfig config: StandbyTaskReReplicationContextTimeout - err = t.nDCHistoryResender.SendSingleWorkflowHistory( - remoteClusterName, - namespace.ID(taskInfo.GetNamespaceID()), - taskInfo.GetWorkflowID(), - taskInfo.GetRunID(), - resendInfo.lastEventID, - resendInfo.lastEventVersion, - 0, - 0, - ) + return consts.ErrTaskRetry } + ns, err := t.registry.GetNamespaceByID(namespace.ID(taskInfo.GetNamespaceID())) if err != nil { + // This is most likely a NamespaceNotFound error. Don't log it and return error to stop retrying. + return err + } + + if err = refreshTasks( + ctx, + adminClient, + ns.Name(), + namespace.ID(taskInfo.GetNamespaceID()), + taskInfo.GetWorkflowID(), + taskInfo.GetRunID(), + ); err != nil { + if _, isNotFound := err.(*serviceerror.NamespaceNotFound); isNotFound { + // Don't log NamespaceNotFound error because it is valid case, and return error to stop retrying. + return err + } + t.logger.Error("Error refresh tasks from remote.", + tag.ShardID(t.shard.GetShardID()), + tag.WorkflowNamespaceID(taskInfo.GetNamespaceID()), + tag.WorkflowID(taskInfo.GetWorkflowID()), + tag.WorkflowRunID(taskInfo.GetRunID()), + tag.ClusterName(remoteClusterName), + tag.Error(err)) + } + + // NOTE: history resend may take long time and its timeout is currently + // controlled by a separate dynamicconfig config: StandbyTaskReReplicationContextTimeout + if err = t.nDCHistoryResender.SendSingleWorkflowHistory( + remoteClusterName, + namespace.ID(taskInfo.GetNamespaceID()), + taskInfo.GetWorkflowID(), + taskInfo.GetRunID(), + resendInfo.lastEventID, + resendInfo.lastEventVersion, + 0, + 0, + ); err != nil { if _, isNotFound := err.(*serviceerror.NamespaceNotFound); isNotFound { - // Don't log NamespaceNotFound error because it is valid case, and return error to stop retry. + // Don't log NamespaceNotFound error because it is valid case, and return error to stop retrying. return err } t.logger.Error("Error re-replicating history from remote.", @@ -675,7 +681,7 @@ func (t *transferQueueStandbyTaskExecutor) fetchHistoryFromRemote( tag.Error(err)) } - // return error so task processing logic will retry + // Return retryable error, so task processing will retry. return consts.ErrTaskRetry }