From 3c50e834ead53d6a51b713676b979ce5342ed349 Mon Sep 17 00:00:00 2001 From: Liang Mei Date: Thu, 12 May 2022 11:35:43 -0700 Subject: [PATCH] Minor tweak to migration workflow wait replication check (#2839) --- service/worker/migration/activities.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/service/worker/migration/activities.go b/service/worker/migration/activities.go index 27a986a99de..6aba12e4442 100644 --- a/service/worker/migration/activities.go +++ b/service/worker/migration/activities.go @@ -111,8 +111,15 @@ func (a *activities) checkReplicationOnce(ctx context.Context, waitRequest waitR for _, shard := range resp.Shards { clusterInfo, hasClusterInfo := shard.RemoteClusters[waitRequest.RemoteCluster] if hasClusterInfo { - if shard.MaxReplicationTaskId-clusterInfo.AckedTaskId <= waitRequest.AllowedLaggingTasks || - (clusterInfo.AckedTaskId >= waitRequest.WaitForTaskIds[shard.ShardId] && + // WE are all caught up + if shard.MaxReplicationTaskId == clusterInfo.AckedTaskId { + readyShardCount++ + continue + } + + // Caught up to the last checked IDs, and within allowed lagging range + if clusterInfo.AckedTaskId >= waitRequest.WaitForTaskIds[shard.ShardId] && + (shard.MaxReplicationTaskId-clusterInfo.AckedTaskId <= waitRequest.AllowedLaggingTasks || shard.ShardLocalTime.Sub(*clusterInfo.AckedTaskVisibilityTime) <= waitRequest.AllowedLagging) { readyShardCount++ continue