Skip to content

Commit

Permalink
Fix error shadowing in standby executors (#3053)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Jul 8, 2022
1 parent 0b01a72 commit a95c2fc
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 81 deletions.
86 changes: 46 additions & 40 deletions service/history/timerQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,52 +517,58 @@ func (t *timerQueueStandbyTaskExecutor) fetchHistoryFromRemote(
return err
}
if resendInfo.lastEventID == common.EmptyEventID || resendInfo.lastEventVersion == common.EmptyVersion {
err = serviceerror.NewInternal("timerQueueStandbyProcessor encountered empty historyResendInfo")
} else {
ns, err := t.registry.GetNamespaceByID(namespace.ID(taskInfo.GetNamespaceID()))
if err != nil {
return err
}
t.logger.Error("Error re-replicating history from remote: timerQueueStandbyProcessor encountered empty historyResendInfo.",
tag.ShardID(t.shard.GetShardID()),
tag.WorkflowNamespaceID(taskInfo.GetNamespaceID()),
tag.WorkflowID(taskInfo.GetWorkflowID()),
tag.WorkflowRunID(taskInfo.GetRunID()),
tag.ClusterName(remoteClusterName))

if err := refreshTasks(
ctx,
adminClient,
ns.Name(),
namespace.ID(taskInfo.GetNamespaceID()),
taskInfo.GetWorkflowID(),
taskInfo.GetRunID(),
); err != nil {
if _, isNotFound := err.(*serviceerror.NamespaceNotFound); isNotFound {
// Don't log NamespaceNotFound error because it is valid case, and return error to stop retry.
return err
}
return consts.ErrTaskRetry
}

t.logger.Error("Error refresh tasks from remote.",
tag.ShardID(t.shard.GetShardID()),
tag.WorkflowNamespaceID(taskInfo.GetNamespaceID()),
tag.WorkflowID(taskInfo.GetWorkflowID()),
tag.WorkflowRunID(taskInfo.GetRunID()),
tag.ClusterName(remoteClusterName),
tag.Error(err))
ns, err := t.registry.GetNamespaceByID(namespace.ID(taskInfo.GetNamespaceID()))
if err != nil {
// This is most likely a NamespaceNotFound error. Don't log it and return error to stop retrying.
return err
}

if err = refreshTasks(
ctx,
adminClient,
ns.Name(),
namespace.ID(taskInfo.GetNamespaceID()),
taskInfo.GetWorkflowID(),
taskInfo.GetRunID(),
); err != nil {
if _, isNotFound := err.(*serviceerror.NamespaceNotFound); isNotFound {
// Don't log NamespaceNotFound error because it is valid case, and return error to stop retrying.
return err
}

// NOTE: history resend may take long time and its timeout is currently
// controlled by a separate dynamicconfig config: StandbyTaskReReplicationContextTimeout
err = t.nDCHistoryResender.SendSingleWorkflowHistory(
remoteClusterName,
namespace.ID(taskInfo.GetNamespaceID()),
taskInfo.GetWorkflowID(),
taskInfo.GetRunID(),
resendInfo.lastEventID,
resendInfo.lastEventVersion,
common.EmptyEventID,
common.EmptyVersion,
)
t.logger.Error("Error refresh tasks from remote.",
tag.ShardID(t.shard.GetShardID()),
tag.WorkflowNamespaceID(taskInfo.GetNamespaceID()),
tag.WorkflowID(taskInfo.GetWorkflowID()),
tag.WorkflowRunID(taskInfo.GetRunID()),
tag.ClusterName(remoteClusterName),
tag.Error(err))
}

if err != nil {
// NOTE: history resend may take long time and its timeout is currently
// controlled by a separate dynamicconfig config: StandbyTaskReReplicationContextTimeout
if err = t.nDCHistoryResender.SendSingleWorkflowHistory(
remoteClusterName,
namespace.ID(taskInfo.GetNamespaceID()),
taskInfo.GetWorkflowID(),
taskInfo.GetRunID(),
resendInfo.lastEventID,
resendInfo.lastEventVersion,
common.EmptyEventID,
common.EmptyVersion,
); err != nil {
if _, isNotFound := err.(*serviceerror.NamespaceNotFound); isNotFound {
// Don't log NamespaceNotFound error because it is valid case, and return error to stop retry.
// Don't log NamespaceNotFound error because it is valid case, and return error to stop retrying.
return err
}
t.logger.Error("Error re-replicating history from remote.",
Expand All @@ -574,7 +580,7 @@ func (t *timerQueueStandbyTaskExecutor) fetchHistoryFromRemote(
tag.Error(err))
}

// return error so task processing logic will retry
// Return retryable error, so task processing will retry.
return consts.ErrTaskRetry
}

Expand Down
88 changes: 47 additions & 41 deletions service/history/transferQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,51 +619,57 @@ func (t *transferQueueStandbyTaskExecutor) fetchHistoryFromRemote(
return err
}
if resendInfo.lastEventID == common.EmptyEventID || resendInfo.lastEventVersion == common.EmptyVersion {
err = serviceerror.NewInternal("transferQueueStandbyProcessor encountered empty historyResendInfo")
} else {
ns, err := t.registry.GetNamespaceByID(namespace.ID(taskInfo.GetNamespaceID()))
if err != nil {
return err
}

if err := refreshTasks(
ctx,
adminClient,
ns.Name(),
namespace.ID(taskInfo.GetNamespaceID()),
taskInfo.GetWorkflowID(),
taskInfo.GetRunID(),
); err != nil {
if _, isNotFound := err.(*serviceerror.NamespaceNotFound); isNotFound {
// Don't log NamespaceNotFound error because it is valid case, and return error to stop retry.
return err
}
t.logger.Error("Error refresh tasks from remote.",
tag.ShardID(t.shard.GetShardID()),
tag.WorkflowNamespaceID(taskInfo.GetNamespaceID()),
tag.WorkflowID(taskInfo.GetWorkflowID()),
tag.WorkflowRunID(taskInfo.GetRunID()),
tag.ClusterName(remoteClusterName),
tag.Error(err))
}
t.logger.Error("Error re-replicating history from remote: transferQueueStandbyProcessor encountered empty historyResendInfo.",
tag.ShardID(t.shard.GetShardID()),
tag.WorkflowNamespaceID(taskInfo.GetNamespaceID()),
tag.WorkflowID(taskInfo.GetWorkflowID()),
tag.WorkflowRunID(taskInfo.GetRunID()),
tag.SourceCluster(remoteClusterName))

// NOTE: history resend may take long time and its timeout is currently
// controlled by a separate dynamicconfig config: StandbyTaskReReplicationContextTimeout
err = t.nDCHistoryResender.SendSingleWorkflowHistory(
remoteClusterName,
namespace.ID(taskInfo.GetNamespaceID()),
taskInfo.GetWorkflowID(),
taskInfo.GetRunID(),
resendInfo.lastEventID,
resendInfo.lastEventVersion,
0,
0,
)
return consts.ErrTaskRetry
}

ns, err := t.registry.GetNamespaceByID(namespace.ID(taskInfo.GetNamespaceID()))
if err != nil {
// This is most likely a NamespaceNotFound error. Don't log it and return error to stop retrying.
return err
}

if err = refreshTasks(
ctx,
adminClient,
ns.Name(),
namespace.ID(taskInfo.GetNamespaceID()),
taskInfo.GetWorkflowID(),
taskInfo.GetRunID(),
); err != nil {
if _, isNotFound := err.(*serviceerror.NamespaceNotFound); isNotFound {
// Don't log NamespaceNotFound error because it is valid case, and return error to stop retrying.
return err
}
t.logger.Error("Error refresh tasks from remote.",
tag.ShardID(t.shard.GetShardID()),
tag.WorkflowNamespaceID(taskInfo.GetNamespaceID()),
tag.WorkflowID(taskInfo.GetWorkflowID()),
tag.WorkflowRunID(taskInfo.GetRunID()),
tag.ClusterName(remoteClusterName),
tag.Error(err))
}

// NOTE: history resend may take long time and its timeout is currently
// controlled by a separate dynamicconfig config: StandbyTaskReReplicationContextTimeout
if err = t.nDCHistoryResender.SendSingleWorkflowHistory(
remoteClusterName,
namespace.ID(taskInfo.GetNamespaceID()),
taskInfo.GetWorkflowID(),
taskInfo.GetRunID(),
resendInfo.lastEventID,
resendInfo.lastEventVersion,
0,
0,
); err != nil {
if _, isNotFound := err.(*serviceerror.NamespaceNotFound); isNotFound {
// Don't log NamespaceNotFound error because it is valid case, and return error to stop retry.
// Don't log NamespaceNotFound error because it is valid case, and return error to stop retrying.
return err
}
t.logger.Error("Error re-replicating history from remote.",
Expand All @@ -675,7 +681,7 @@ func (t *transferQueueStandbyTaskExecutor) fetchHistoryFromRemote(
tag.Error(err))
}

// return error so task processing logic will retry
// Return retryable error, so task processing will retry.
return consts.ErrTaskRetry
}

Expand Down

0 comments on commit a95c2fc

Please sign in to comment.