From fe4bdc26e59497069cba2cd806b622579b3a8b0a Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 22 Jan 2024 13:51:19 +0800 Subject: [PATCH] puller(ticdc): fix resolvedTs get stuck when region split and merge (#10488) (#10493) ref pingcap/tiflow#10157 --- cdc/puller/frontier/frontier.go | 3 ++- cdc/puller/frontier/frontier_test.go | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/cdc/puller/frontier/frontier.go b/cdc/puller/frontier/frontier.go index 33ef73d9444..97dba79a0d1 100644 --- a/cdc/puller/frontier/frontier.go +++ b/cdc/puller/frontier/frontier.go @@ -83,7 +83,7 @@ func (s *spanFrontier) Frontier() uint64 { func (s *spanFrontier) Forward(regionID uint64, span regionspan.ComparableSpan, ts uint64) { // it's the fast part to detect if the region is split or merged, // if not we can update the minTsHeap with use new ts directly - if n, ok := s.cachedRegions[regionID]; ok && n.regionID != fakeRegionID && n.end != nil { + if n, ok := s.cachedRegions[regionID]; ok && n.regionID == regionID && n.end != nil { if bytes.Equal(n.Key(), span.Start) && bytes.Equal(n.End(), span.End) { s.minTsHeap.UpdateKey(n.Value(), ts) return @@ -105,6 +105,7 @@ func (s *spanFrontier) insert(regionID uint64, span regionspan.ComparableSpan, t if next != nil { if bytes.Equal(seekRes.Node().Key(), span.Start) && bytes.Equal(next.Key(), span.End) { s.minTsHeap.UpdateKey(seekRes.Node().Value(), ts) + delete(s.cachedRegions, seekRes.Node().regionID) if regionID != fakeRegionID { s.cachedRegions[regionID] = seekRes.Node() s.cachedRegions[regionID].regionID = regionID diff --git a/cdc/puller/frontier/frontier_test.go b/cdc/puller/frontier/frontier_test.go index 70f8903915b..6609bc0f68d 100644 --- a/cdc/puller/frontier/frontier_test.go +++ b/cdc/puller/frontier/frontier_test.go @@ -15,6 +15,7 @@ package frontier import ( "bytes" + "fmt" "math/rand" "sort" "testing" @@ -393,3 +394,21 @@ func TestMinMaxWithRegionSplitMerge(t *testing.T) { f.Forward(8, regionspan.ComparableSpan{Start: []byte("d"), End: []byte("e")}, 5) require.Equal(t, uint64(5), f.Frontier()) } + +func TestMergeSpitWithDifferentRegionID(t *testing.T) { + frontier := NewFrontier(100, c, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}) + frontier.Forward(1, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1222) + frontier.Forward(2, regionspan.ComparableSpan{Start: []byte("b"), End: []byte("c")}, 102) + frontier.Forward(4, regionspan.ComparableSpan{Start: []byte("b"), End: []byte("c")}, 103) + frontier.Forward(1, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 104) + frontier.Forward(1, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1223) + frontier.Forward(3, regionspan.ComparableSpan{Start: []byte("b"), End: []byte("c")}, 105) + frontier.Forward(2, regionspan.ComparableSpan{Start: []byte("b"), End: []byte("c")}, 107) + frontier.(*spanFrontier).spanList.Entries(func(node *skipListNode) bool { + fmt.Printf("%d:[%s: %s) %d\n", node.regionID, + string(node.Key()), + string(node.End()), node.value.key) + return true + }) + require.Equal(t, uint64(107), frontier.Frontier()) +}