diff --git a/service/history/nDCHistoryReplicator.go b/service/history/nDCHistoryReplicator.go index 0429761ed71..5d0e4c2c6e8 100644 --- a/service/history/nDCHistoryReplicator.go +++ b/service/history/nDCHistoryReplicator.go @@ -866,18 +866,17 @@ func (r *nDCHistoryReplicatorImpl) backfillHistory( lastEventID, lastEventVersion), ) - var lastHistoryBatch *commonpb.DataBlob - prevTxnID := common.EmptyVersion historyBranch, err := serialization.HistoryBranchFromBlob(branchToken, enumspb.ENCODING_TYPE_PROTO3.String()) if err != nil { return nil, err } - latestBranchID := historyBranch.GetBranchId() - var prevBranchID string - sortedAncestors := copyAndSortAncestors(historyBranch.GetAncestors()) + prevTxnID := common.EmptyVersion + var lastHistoryBatch *commonpb.DataBlob + var prevBranchID string + sortedAncestors := sortAncestors(historyBranch.GetAncestors()) sortedAncestorsIdx := 0 - historyBranch.Ancestors = nil + var ancestors []*persistencespb.HistoryBranchRange BackfillLoop: for remoteHistoryIterator.HasNext() { @@ -891,17 +890,18 @@ BackfillLoop: continue BackfillLoop } + branchID := historyBranch.GetBranchId() if sortedAncestorsIdx < len(sortedAncestors) { currentAncestor := sortedAncestors[sortedAncestorsIdx] if historyBlob.nodeID >= currentAncestor.GetEndNodeId() { // update ancestor - historyBranch.Ancestors = append(historyBranch.Ancestors, currentAncestor) + ancestors = append(ancestors, currentAncestor) sortedAncestorsIdx++ } if sortedAncestorsIdx < len(sortedAncestors) { // use ancestor branch id currentAncestor = sortedAncestors[sortedAncestorsIdx] - historyBranch.BranchId = currentAncestor.GetBranchId() + branchID = currentAncestor.GetBranchId() if historyBlob.nodeID < currentAncestor.GetBeginNodeId() || historyBlob.nodeID >= currentAncestor.GetEndNodeId() { return nil, serviceerror.NewInternal( fmt.Sprintf("The backfill history blob node id %d is not in acestoer range [%d, %d]", @@ -910,13 +910,14 @@ BackfillLoop: currentAncestor.GetEndNodeId()), ) } - } else { - // no more ancestor, use the latest branch ID - historyBranch.BranchId = latestBranchID } } - filteredHistoryBranch, err := serialization.HistoryBranchToBlob(historyBranch) + filteredHistoryBranch, err := serialization.HistoryBranchToBlob(&persistencespb.HistoryBranch{ + TreeId: historyBranch.GetTreeId(), + BranchId: branchID, + Ancestors: ancestors, + }) if err != nil { return nil, err } @@ -926,7 +927,7 @@ BackfillLoop: } _, err = r.shard.GetExecutionManager().AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{ ShardID: r.shard.GetShardID(), - IsNewBranch: prevBranchID != historyBranch.BranchId, + IsNewBranch: prevBranchID != branchID, BranchToken: filteredHistoryBranch.GetData(), History: historyBlob.rawHistory, PrevTransactionID: prevTxnID, @@ -942,7 +943,7 @@ BackfillLoop: return nil, err } prevTxnID = txnID - prevBranchID = historyBranch.BranchId + prevBranchID = branchID lastHistoryBatch = historyBlob.rawHistory } @@ -954,9 +955,7 @@ BackfillLoop: return lastEventTime, nil } -func copyAndSortAncestors(input []*persistencespb.HistoryBranchRange) []*persistencespb.HistoryBranchRange { - ans := make([]*persistencespb.HistoryBranchRange, len(input)) - copy(ans, input) +func sortAncestors(ans []*persistencespb.HistoryBranchRange) []*persistencespb.HistoryBranchRange { if len(ans) > 0 { // sort ans based onf EndNodeID so that we can set BeginNodeID sort.Slice(ans, func(i, j int) bool { return ans[i].GetEndNodeId() < ans[j].GetEndNodeId() })