Skip to content

Commit

Permalink
puller(ticdc): fix resolvedTs get stuck when region split and merge (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jan 18, 2024
1 parent 1372b29 commit 1f24fa9
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 10 deletions.
14 changes: 8 additions & 6 deletions cdc/puller/frontier/frontier.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package frontier

import (
"bytes"
"encoding/hex"
"fmt"
"math"
"strings"
Expand Down Expand Up @@ -79,7 +80,7 @@ func (s *spanFrontier) Frontier() uint64 {
func (s *spanFrontier) Forward(regionID uint64, span tablepb.Span, ts uint64) {
// it's the fast part to detect if the region is split or merged,
// if not we can update the minTsHeap with use new ts directly
if n, ok := s.cachedRegions[regionID]; ok && n.regionID != fakeRegionID && n.end != nil {
if n, ok := s.cachedRegions[regionID]; ok && n.regionID == regionID && n.end != nil {
if bytes.Equal(n.Key(), span.StartKey) && bytes.Equal(n.End(), span.EndKey) {
s.minTsHeap.UpdateKey(n.Value(), ts)
return
Expand All @@ -102,7 +103,7 @@ func (s *spanFrontier) insert(regionID uint64, span tablepb.Span, ts uint64) {
if bytes.Equal(seekRes.Node().Key(), span.StartKey) &&
bytes.Equal(next.Key(), span.EndKey) {
s.minTsHeap.UpdateKey(seekRes.Node().Value(), ts)

delete(s.cachedRegions, seekRes.Node().regionID)
if regionID != fakeRegionID {
s.cachedRegions[regionID] = seekRes.Node()
s.cachedRegions[regionID].regionID = regionID
Expand Down Expand Up @@ -173,9 +174,9 @@ func (s *spanFrontier) stringWtihRegionID() string {
var buf strings.Builder
s.spanList.Entries(func(n *skipListNode) bool {
if n.Value().key == math.MaxUint64 {
buf.WriteString(fmt.Sprintf("[%d:%s @ Max] ", n.regionID, n.Key()))
buf.WriteString(fmt.Sprintf("[%d:%s @ Max] ", n.regionID, hex.EncodeToString(n.Key())))
} else { // the next span
buf.WriteString(fmt.Sprintf("[%d:%s @ %d] ", n.regionID, n.Key(), n.Value().key))
buf.WriteString(fmt.Sprintf("[%d:%s @ %d] ", n.regionID, hex.EncodeToString(n.Key()), n.Value().key))
}
return true
})
Expand All @@ -193,12 +194,13 @@ func (s *spanFrontier) SpanString(span tablepb.Span) string {
nextKey = n.Next().Key()
}
if n.Value().key == math.MaxUint64 {
buf.WriteString(fmt.Sprintf("[%d:%s @ Max] ", n.regionID, n.Key()))
buf.WriteString(fmt.Sprintf("[%d:%s @ Max] ", n.regionID, hex.EncodeToString(n.Key())))
} else if idx == 0 || // head
bytes.Equal(key, span.StartKey) || // start key sapn
bytes.Equal(nextKey, span.StartKey) || // the previous sapn of start key
bytes.Equal(key, span.EndKey) { // the end key span
buf.WriteString(fmt.Sprintf("[%d:%s @ %d] ", n.regionID, n.Key(), n.Value().key))
buf.WriteString(fmt.Sprintf("[%d:%s @ %d] ", n.regionID,
hex.EncodeToString(n.Key()), n.Value().key))
}
idx++
return true
Expand Down
27 changes: 23 additions & 4 deletions cdc/puller/frontier/frontier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package frontier
import (
"bytes"
"context"
"fmt"
"math"
"math/rand"
"sort"
Expand Down Expand Up @@ -215,7 +216,7 @@ func TestSpanString(t *testing.T) {

spAH := tablepb.Span{StartKey: []byte("a"), EndKey: []byte("h")}
f := NewFrontier(1, spAH).(*spanFrontier)
require.Equal(t, `[0:a @ 1] [0:h @ Max] `, f.SpanString(spAH))
require.Equal(t, `[0:61 @ 1] [0:68 @ Max] `, f.SpanString(spAH))

f.Forward(1, spAB, 2)
f.Forward(2, spBC, 5)
Expand All @@ -225,14 +226,14 @@ func TestSpanString(t *testing.T) {
f.Forward(6, spFG, 25)
f.Forward(7, spGH, 35)
require.Equal(t, uint64(2), f.Frontier())
require.Equal(t, `[1:a @ 2] [2:b @ 5] [3:c @ 10] [4:d @ 20] [5:e @ 30] [6:f @ 25] [7:g @ 35] [0:h @ Max] `, f.stringWtihRegionID())
require.Equal(t, `[1:61 @ 2] [2:62 @ 5] [3:63 @ 10] [4:64 @ 20] [5:65 @ 30] [6:66 @ 25] [7:67 @ 35] [0:68 @ Max] `, f.stringWtihRegionID())
// Print 5 span: start, before, target span, next, end
require.Equal(t, `[1:a @ 2] [3:c @ 10] [4:d @ 20] [5:e @ 30] [0:h @ Max] `, f.SpanString(spDE))
require.Equal(t, `[1:61 @ 2] [3:63 @ 10] [4:64 @ 20] [5:65 @ 30] [0:68 @ Max] `, f.SpanString(spDE))

spBH := tablepb.Span{StartKey: []byte("b"), EndKey: []byte("h")}
f.Forward(8, spBH, 18)
require.Equal(t, uint64(2), f.Frontier())
require.Equal(t, `[1:a @ 2] [8:b @ 18] [0:h @ Max] `, f.stringWtihRegionID())
require.Equal(t, `[1:61 @ 2] [8:62 @ 18] [0:68 @ Max] `, f.stringWtihRegionID())
}

func TestMinMax(t *testing.T) {
Expand Down Expand Up @@ -471,6 +472,24 @@ func TestFrontierEntries(t *testing.T) {
require.Equal(t, []byte("b"), []byte(slowestRange.EndKey))
}

func TestMergeSpitWithDifferentRegionID(t *testing.T) {
frontier := NewFrontier(100, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("c")})
frontier.Forward(1, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, 1222)
frontier.Forward(2, tablepb.Span{StartKey: []byte("b"), EndKey: []byte("c")}, 102)
frontier.Forward(4, tablepb.Span{StartKey: []byte("b"), EndKey: []byte("c")}, 103)
frontier.Forward(1, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("c")}, 104)
frontier.Forward(1, tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")}, 1223)
frontier.Forward(3, tablepb.Span{StartKey: []byte("b"), EndKey: []byte("c")}, 105)
frontier.Forward(2, tablepb.Span{StartKey: []byte("b"), EndKey: []byte("c")}, 107)
frontier.(*spanFrontier).spanList.Entries(func(node *skipListNode) bool {
fmt.Printf("%d:[%s: %s) %d\n", node.regionID,
string(node.Key()),
string(node.End()), node.value.key)
return true
})
require.Equal(t, uint64(107), frontier.Frontier())
}

func TestRandomMergeAndSplit(t *testing.T) {
t.Parallel()

Expand Down
1 change: 1 addition & 0 deletions cdc/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func (p *pullerImpl) Run(ctx context.Context) error {
zap.Stringer("resolvedSpan", &resolvedSpan.Span),
zap.Stringer("slowestRange", lastSlowestRange),
zap.Uint64("resolvedTs", lastResolvedTs),
zap.Uint64("regionID", resolvedSpan.Region),
zap.String("tsTracker", p.tsTracker.SpanString(*lastSlowestRange)),
)
lastCheckSlowestRangeTime = time.Now()
Expand Down

0 comments on commit 1f24fa9

Please sign in to comment.