diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index aa4f61042d1b2..12e005128fb22 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -53,8 +53,9 @@ type CheckpointAdvancer struct { // The concurrency accessed task: // both by the task listener and ticking. - task *backuppb.StreamBackupTaskInfo - taskMu sync.Mutex + task *backuppb.StreamBackupTaskInfo + taskRange []kv.KeyRange + taskMu sync.Mutex // the read-only config. // once tick begin, this should not be changed for now. @@ -193,12 +194,15 @@ func (c *CheckpointAdvancer) tryAdvance(ctx context.Context, rst RangesSharesTS) }() defer utils.PanicToErr(&err) - ranges := CollapseRanges(len(rst.Ranges), func(i int) kv.KeyRange { return rst.Ranges[i] }) + ranges := CollapseRanges(len(rst.Ranges), func(i int) kv.KeyRange { + return rst.Ranges[i] + }) workers := utils.NewWorkerPool(4, "sub ranges") eg, cx := errgroup.WithContext(ctx) collector := NewClusterCollector(ctx, c.env) collector.setOnSuccessHook(c.cache.InsertRange) - for _, r := range ranges { + clampedRanges := utils.IntersectAll(ranges, utils.CloneSlice(c.taskRange)) + for _, r := range clampedRanges { r := r workers.ApplyOnErrorGroup(eg, func() (e error) { defer c.recordTimeCost("get regions in range", zap.Uint64("checkpoint", rst.TS))() @@ -260,9 +264,8 @@ func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context) // CalculateGlobalCheckpoint calculates the global checkpoint, which won't use the cache. func (c *CheckpointAdvancer) CalculateGlobalCheckpoint(ctx context.Context) (uint64, error) { var ( - cp = uint64(math.MaxInt64) - // TODO: Use The task range here. - thisRun []kv.KeyRange = []kv.KeyRange{{}} + cp = uint64(math.MaxInt64) + thisRun []kv.KeyRange = c.taskRange nextRun []kv.KeyRange ) defer c.recordTimeCost("record all") @@ -411,8 +414,11 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error switch e.Type { case EventAdd: c.task = e.Info + c.taskRange = CollapseRanges(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] }) + log.Info("added event", zap.Stringer("task", e.Info), zap.Stringer("ranges", logutil.StringifyKeys(c.taskRange))) case EventDel: c.task = nil + c.taskRange = nil c.state = &fullScan{} if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil { log.Warn("failed to clear global checkpoint", logutil.ShortError(err)) @@ -475,7 +481,7 @@ func (c *CheckpointAdvancer) onConsistencyCheckTick(s *updateSmallTree) error { return nil } defer c.recordTimeCost("consistency check")() - err := c.cache.ConsistencyCheck() + err := c.cache.ConsistencyCheck(c.taskRange) if err != nil { log.Error("consistency check failed! log backup may lose data! rolling back to full scan for saving.", logutil.ShortError(err)) c.state = &fullScan{} diff --git a/br/pkg/streamhelper/advancer_cliext.go b/br/pkg/streamhelper/advancer_cliext.go index 6ac9bfc5694c8..611ad3744dfa8 100644 --- a/br/pkg/streamhelper/advancer_cliext.go +++ b/br/pkg/streamhelper/advancer_cliext.go @@ -12,6 +12,7 @@ import ( "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/kv" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -36,10 +37,11 @@ func (t EventType) String() string { } type TaskEvent struct { - Type EventType - Name string - Info *backuppb.StreamBackupTaskInfo - Err error + Type EventType + Name string + Info *backuppb.StreamBackupTaskInfo + Ranges []kv.KeyRange + Err error } func (t *TaskEvent) String() string { @@ -60,7 +62,7 @@ func errorEvent(err error) TaskEvent { } } -func toTaskEvent(event *clientv3.Event) (TaskEvent, error) { +func (t AdvancerExt) toTaskEvent(ctx context.Context, event *clientv3.Event) (TaskEvent, error) { if !bytes.HasPrefix(event.Kv.Key, []byte(PrefixOfTask())) { return TaskEvent{}, errors.Annotatef(berrors.ErrInvalidArgument, "the path isn't a task path (%s)", string(event.Kv.Key)) } @@ -78,13 +80,18 @@ func toTaskEvent(event *clientv3.Event) (TaskEvent, error) { if err := proto.Unmarshal(event.Kv.Value, te.Info); err != nil { return TaskEvent{}, err } + var err error + te.Ranges, err = t.MetaDataClient.TaskByInfo(*te.Info).Ranges(ctx) + if err != nil { + return TaskEvent{}, err + } return te, nil } -func eventFromWatch(resp clientv3.WatchResponse) ([]TaskEvent, error) { +func (t AdvancerExt) eventFromWatch(ctx context.Context, resp clientv3.WatchResponse) ([]TaskEvent, error) { result := make([]TaskEvent, 0, len(resp.Events)) for _, event := range resp.Events { - te, err := toTaskEvent(event) + te, err := t.toTaskEvent(ctx, event) if err != nil { te.Type = EventErr te.Err = err @@ -97,7 +104,7 @@ func eventFromWatch(resp clientv3.WatchResponse) ([]TaskEvent, error) { func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskEvent) { c := t.Client.Watcher.Watch(ctx, PrefixOfTask(), clientv3.WithPrefix(), clientv3.WithRev(rev)) handleResponse := func(resp clientv3.WatchResponse) bool { - events, err := eventFromWatch(resp) + events, err := t.eventFromWatch(ctx, resp) if err != nil { ch <- errorEvent(err) return false @@ -146,10 +153,15 @@ func (t AdvancerExt) getFullTasksAsEvent(ctx context.Context) ([]TaskEvent, int6 } events := make([]TaskEvent, 0, len(tasks)) for _, task := range tasks { + ranges, err := task.Ranges(ctx) + if err != nil { + return nil, 0, err + } te := TaskEvent{ - Type: EventAdd, - Name: task.Info.Name, - Info: &(task.Info), + Type: EventAdd, + Name: task.Info.Name, + Info: &(task.Info), + Ranges: ranges, } events = append(events, te) } diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index eb3d71ac88700..aeaadf820af7a 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -12,6 +12,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/streamhelper" "github.com/pingcap/tidb/br/pkg/streamhelper/config" + "github.com/pingcap/tidb/kv" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" "google.golang.org/grpc/codes" @@ -185,3 +186,43 @@ func TestOneStoreFailure(t *testing.T) { require.NoError(t, adv.OnTick(ctx)) require.Equal(t, cp, env.checkpoint) } + +func TestTaskRanges(t *testing.T) { + log.SetLevel(zapcore.DebugLevel) + c := createFakeCluster(t, 4, true) + defer fmt.Println(c) + ctx := context.Background() + c.splitAndScatter("0001", "0002", "0012", "0034", "0048") + c.advanceCheckpoints() + c.flushAllExcept("0000", "0049") + env := &testEnv{fakeCluster: c, testCtx: t, ranges: []kv.KeyRange{{StartKey: []byte("0002"), EndKey: []byte("0048")}}} + adv := streamhelper.NewCheckpointAdvancer(env) + adv.StartTaskListener(ctx) + + shouldFinishInTime(t, 10*time.Second, "first advancing", func() { require.NoError(t, adv.OnTick(ctx)) }) + // Don't check the return value of advance checkpoints here -- we didn't + require.Greater(t, env.getCheckpoint(), uint64(0)) +} + +func TestTaskRangesWithSplit(t *testing.T) { + log.SetLevel(zapcore.DebugLevel) + c := createFakeCluster(t, 4, true) + defer fmt.Println(c) + ctx := context.Background() + c.splitAndScatter("0012", "0034", "0048") + c.advanceCheckpoints() + c.flushAllExcept("0049") + env := &testEnv{fakeCluster: c, testCtx: t, ranges: []kv.KeyRange{{StartKey: []byte("0002"), EndKey: []byte("0048")}}} + adv := streamhelper.NewCheckpointAdvancer(env) + adv.StartTaskListener(ctx) + + shouldFinishInTime(t, 10*time.Second, "first advancing", func() { require.NoError(t, adv.OnTick(ctx)) }) + fstCheckpoint := env.getCheckpoint() + require.Greater(t, fstCheckpoint, uint64(0)) + + c.splitAndScatter("0002") + c.advanceCheckpoints() + c.flushAllExcept("0000", "0049") + shouldFinishInTime(t, 10*time.Second, "second advancing", func() { require.NoError(t, adv.OnTick(ctx)) }) + require.Greater(t, env.getCheckpoint(), fstCheckpoint) +} diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index e6b6e21470d53..f3ccbb408c031 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -18,8 +18,11 @@ import ( "github.com/pingcap/kvproto/pkg/errorpb" logbackup "github.com/pingcap/kvproto/pkg/logbackuppb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/streamhelper" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/kv" + "go.uber.org/zap" "google.golang.org/grpc" ) @@ -155,6 +158,7 @@ func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.Ge }, }) } + log.Debug("Get last flush ts of region", zap.Stringer("in", in), zap.Stringer("out", resp)) return resp, nil } @@ -318,6 +322,7 @@ func (f *fakeCluster) advanceCheckpoints() uint64 { r.fsim.flushedEpoch = 0 }) } + log.Info("checkpoint updated", zap.Uint64("to", minCheckpoint)) return minCheckpoint } @@ -357,7 +362,14 @@ func createFakeCluster(t *testing.T, n int, simEnabled bool) *fakeCluster { } func (r *region) String() string { - return fmt.Sprintf("%d(%d):[%s,%s);%dL%d", r.id, r.epoch, hex.EncodeToString(r.rng.StartKey), hex.EncodeToString(r.rng.EndKey), r.checkpoint, r.leader) + return fmt.Sprintf("%d(%d):[%s,%s);%dL%dF%d", + r.id, + r.epoch, + hex.EncodeToString(r.rng.StartKey), + hex.EncodeToString(r.rng.EndKey), + r.checkpoint, + r.leader, + r.fsim.flushedEpoch) } func (f *fakeStore) String() string { @@ -375,6 +387,20 @@ func (f *fakeCluster) flushAll() { } } +func (f *fakeCluster) flushAllExcept(keys ...string) { +outer: + for _, r := range f.regions { + // Note: can we make it faster? + for _, key := range keys { + if utils.CompareBytesExt(r.rng.StartKey, false, []byte(key), false) <= 0 && + utils.CompareBytesExt([]byte(key), false, r.rng.EndKey, true) < 0 { + continue outer + } + } + r.flush() + } +} + func (f *fakeStore) flush() { for _, r := range f.regions { if r.leader == f.id { @@ -400,17 +426,23 @@ type testEnv struct { *fakeCluster checkpoint uint64 testCtx *testing.T + ranges []kv.KeyRange mu sync.Mutex } func (t *testEnv) Begin(ctx context.Context, ch chan<- streamhelper.TaskEvent) error { + rngs := t.ranges + if len(rngs) == 0 { + rngs = []kv.KeyRange{{}} + } tsk := streamhelper.TaskEvent{ Type: streamhelper.EventAdd, Name: "whole", Info: &backup.StreamBackupTaskInfo{ Name: "whole", }, + Ranges: rngs, } ch <- tsk return nil diff --git a/br/pkg/streamhelper/integration_test.go b/br/pkg/streamhelper/integration_test.go index d60f53ac71bd0..8485bac19ce0f 100644 --- a/br/pkg/streamhelper/integration_test.go +++ b/br/pkg/streamhelper/integration_test.go @@ -283,12 +283,14 @@ func testStreamListening(t *testing.T, metaCli streamhelper.AdvancerExt) { first := <-ch require.Equal(t, first.Type, streamhelper.EventAdd) require.Equal(t, first.Name, taskName) + require.ElementsMatch(t, first.Ranges, simpleRanges(4)) second := <-ch require.Equal(t, second.Type, streamhelper.EventDel) require.Equal(t, second.Name, taskName) third := <-ch require.Equal(t, third.Type, streamhelper.EventAdd) require.Equal(t, third.Name, taskName2) + require.ElementsMatch(t, first.Ranges, simpleRanges(4)) forth := <-ch require.Equal(t, forth.Type, streamhelper.EventDel) require.Equal(t, forth.Name, taskName2) diff --git a/br/pkg/streamhelper/tsheap.go b/br/pkg/streamhelper/tsheap.go index f4006e5e44c42..6c2fb510776e7 100644 --- a/br/pkg/streamhelper/tsheap.go +++ b/br/pkg/streamhelper/tsheap.go @@ -14,6 +14,7 @@ import ( berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/redact" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/kv" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap/zapcore" @@ -56,7 +57,7 @@ type CheckpointsCache interface { // PopRangesWithGapGT pops the ranges which's checkpoint is PopRangesWithGapGT(d time.Duration) []*RangesSharesTS // Check whether the ranges in the cache is integrate. - ConsistencyCheck() error + ConsistencyCheck(ranges []kv.KeyRange) error // Clear the cache. Clear() } @@ -82,7 +83,7 @@ func (NoOPCheckpointCache) PopRangesWithGapGT(d time.Duration) []*RangesSharesTS panic("invalid state: NoOPCheckpointCache should never be used in advancing!") } -func (NoOPCheckpointCache) ConsistencyCheck() error { +func (NoOPCheckpointCache) ConsistencyCheck([]kv.KeyRange) error { return errors.Annotatef(berrors.ErrUnsupportedOperation, "invalid state: NoOPCheckpointCache should never be used in advancing!") } @@ -221,20 +222,105 @@ func (h *Checkpoints) CheckpointTS() uint64 { } // ConsistencyCheck checks whether the tree contains the full range of key space. -// TODO: add argument to it and check a sub range. -func (h *Checkpoints) ConsistencyCheck() error { +func (h *Checkpoints) ConsistencyCheck(rangesIn []kv.KeyRange) error { h.mu.Lock() - ranges := make([]kv.KeyRange, 0, 1024) + rangesReal := make([]kv.KeyRange, 0, 1024) h.tree.Ascend(func(i btree.Item) bool { - ranges = append(ranges, i.(*RangesSharesTS).Ranges...) + rangesReal = append(rangesReal, i.(*RangesSharesTS).Ranges...) return true }) h.mu.Unlock() - r := CollapseRanges(len(ranges), func(i int) kv.KeyRange { return ranges[i] }) - if len(r) != 1 || len(r[0].StartKey) != 0 || len(r[0].EndKey) != 0 { - return errors.Annotatef(berrors.ErrPiTRMalformedMetadata, - "the region tree cannot cover the key space, collapsed: %s", logutil.StringifyKeys(r)) + r := CollapseRanges(len(rangesReal), func(i int) kv.KeyRange { return rangesReal[i] }) + ri := CollapseRanges(len(rangesIn), func(i int) kv.KeyRange { return rangesIn[i] }) + + return errors.Annotatef(checkIntervalIsSubset(r, ri), "ranges: (current) %s (not in) %s", logutil.StringifyKeys(r), + logutil.StringifyKeys(ri)) +} + +// A simple algorithm to detect non-overlapped ranges. +// It maintains the "current" probe, and let the ranges to check "consume" it. +// For example: +// toCheck: |_____________________| |_____________| +// . ^checking +// subsetOf: |_________| |_______| |__________| +// . ^probing +// probing is the subrange of checking, consume it and move forward the probe. +// toCheck: |_____________________| |_____________| +// . ^checking +// subsetOf: |_________| |_______| |__________| +// . ^probing +// consume it, too. +// toCheck: |_____________________| |_____________| +// . ^checking +// subsetOf: |_________| |_______| |__________| +// . ^probing +// checking is at the left of probing and no overlaps, moving it forward. +// toCheck: |_____________________| |_____________| +// . ^checking +// subsetOf: |_________| |_______| |__________| +// . ^probing +// consume it. all subset ranges are consumed, check passed. +func checkIntervalIsSubset(toCheck []kv.KeyRange, subsetOf []kv.KeyRange) error { + i := 0 + si := 0 + + for { + // We have checked all ranges. + if si >= len(subsetOf) { + return nil + } + // There are some ranges doesn't reach the end. + if i >= len(toCheck) { + return errors.Annotatef(berrors.ErrPiTRMalformedMetadata, + "there remains a range doesn't be fully consumed: %s", + logutil.StringifyRange(subsetOf[si])) + } + + checking := toCheck[i] + probing := subsetOf[si] + // checking: |___________| + // probing: |_________| + // A rare case: the "first" range is out of bound or not fully covers the probing range. + if utils.CompareBytesExt(checking.StartKey, false, probing.StartKey, false) > 0 { + holeEnd := checking.StartKey + if utils.CompareBytesExt(holeEnd, false, probing.EndKey, true) > 0 { + holeEnd = probing.EndKey + } + return errors.Annotatef(berrors.ErrPiTRMalformedMetadata, "probably a hole in key ranges: %s", logutil.StringifyRange{ + StartKey: probing.StartKey, + EndKey: holeEnd, + }) + } + + // checking: |_____| + // probing: |_______| + // Just move forward checking. + if utils.CompareBytesExt(checking.EndKey, true, probing.StartKey, false) < 0 { + i += 1 + continue + } + + // checking: |_________| + // probing: |__________________| + // Given all of the ranges are "collapsed", the next checking range must + // not be adjacent with the current checking range. + // And hence there must be a "hole" in the probing key space. + if utils.CompareBytesExt(checking.EndKey, true, probing.EndKey, true) < 0 { + next := probing.EndKey + if i+1 < len(toCheck) { + next = toCheck[i+1].EndKey + } + return errors.Annotatef(berrors.ErrPiTRMalformedMetadata, "probably a hole in key ranges: %s", logutil.StringifyRange{ + StartKey: checking.EndKey, + EndKey: next, + }) + } + // checking: |________________| + // probing: |_____________| + // The current checking range fills the current probing range, + // or the current checking range is out of the current range. + // let's move the probing forward. + si += 1 } - return nil } diff --git a/br/pkg/streamhelper/tsheap_test.go b/br/pkg/streamhelper/tsheap_test.go index 461e7f76c87e7..173bc2e0a0334 100644 --- a/br/pkg/streamhelper/tsheap_test.go +++ b/br/pkg/streamhelper/tsheap_test.go @@ -2,7 +2,9 @@ package streamhelper_test import ( + "fmt" "math" + "math/rand" "testing" "github.com/pingcap/tidb/br/pkg/streamhelper" @@ -162,3 +164,85 @@ func TestInsertRanges(t *testing.T) { } } } + +func TestConsistencyCheckOverRange(t *testing.T) { + r := func(a, b string) kv.KeyRange { + return kv.KeyRange{StartKey: []byte(a), EndKey: []byte(b)} + } + type Case struct { + checking []kv.KeyRange + probing []kv.KeyRange + isSubset bool + } + + cases := []Case{ + // basic: exactly match. + { + checking: []kv.KeyRange{r("0001", "0002"), r("0002", "0003"), r("0004", "0005")}, + probing: []kv.KeyRange{r("0001", "0003"), r("0004", "0005")}, + isSubset: true, + }, + // not fully match, probing longer. + { + checking: []kv.KeyRange{r("0001", "0002"), r("0002", "0003"), r("0004", "0005")}, + probing: []kv.KeyRange{r("0000", "0003"), r("0004", "00051")}, + isSubset: false, + }, + // with infinity end keys. + { + checking: []kv.KeyRange{r("0001", "0002"), r("0002", "0003"), r("0004", "")}, + probing: []kv.KeyRange{r("0001", "0003"), r("0004", "")}, + isSubset: true, + }, + { + checking: []kv.KeyRange{r("0001", "0002"), r("0002", "0003"), r("0004", "")}, + probing: []kv.KeyRange{r("0001", "0003"), r("0004", "0005")}, + isSubset: true, + }, + { + checking: []kv.KeyRange{r("0001", "0002"), r("0002", "0003"), r("0004", "0005")}, + probing: []kv.KeyRange{r("0001", "0003"), r("0004", "")}, + isSubset: false, + }, + // overlapped probe. + { + checking: []kv.KeyRange{r("0001", "0002"), r("0002", "0003"), r("0004", "0007")}, + probing: []kv.KeyRange{r("0001", "0008")}, + isSubset: false, + }, + { + checking: []kv.KeyRange{r("0001", "0008")}, + probing: []kv.KeyRange{r("0001", "0002"), r("0002", "0003"), r("0004", "0007")}, + isSubset: true, + }, + { + checking: []kv.KeyRange{r("0100", "0120"), r("0130", "0141")}, + probing: []kv.KeyRange{r("0000", "0001")}, + isSubset: false, + }, + { + checking: []kv.KeyRange{r("0100", "0120")}, + probing: []kv.KeyRange{r("0090", "0110"), r("0115", "0120")}, + isSubset: false, + }, + } + + run := func(t *testing.T, c Case) { + tree := streamhelper.NewCheckpoints() + for _, r := range c.checking { + tree.InsertRange(rand.Uint64()%10, r) + } + err := tree.ConsistencyCheck(c.probing) + if c.isSubset { + require.NoError(t, err) + } else { + require.Error(t, err) + } + } + + for i, c := range cases { + t.Run(fmt.Sprintf("#%d", i), func(tc *testing.T) { + run(tc, c) + }) + } +} diff --git a/br/pkg/utils/key.go b/br/pkg/utils/key.go index 41c6b4a54089c..062f4b5aac52d 100644 --- a/br/pkg/utils/key.go +++ b/br/pkg/utils/key.go @@ -10,7 +10,11 @@ import ( "strings" "github.com/pingcap/errors" + "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/logutil" + "github.com/pingcap/tidb/kv" + "go.uber.org/zap" ) // ParseKey parse key by given format. @@ -76,6 +80,7 @@ func unescapedKey(text string) ([]byte, error) { // end, so an empty key is greater than any other keys. // Please note that this function is not applicable if any one argument is not an EXCLUSIVE ending of a range. func CompareEndKey(a, b []byte) int { + // NOTE: maybe CompareBytesExt(a, true, b, true)? if len(a) == 0 { if len(b) == 0 { return 0 @@ -89,3 +94,106 @@ func CompareEndKey(a, b []byte) int { return bytes.Compare(a, b) } + +// CompareBytesExt compare two byte sequences. +// different from `bytes.Compare`, we can provide whether to treat the key as inf when meet empty key to this. +func CompareBytesExt(a []byte, aEmptyAsInf bool, b []byte, bEmptyAsInf bool) int { + // Inf = Inf + if len(a) == 0 && aEmptyAsInf && len(b) == 0 && bEmptyAsInf { + return 0 + } + // Inf > anything + if len(a) == 0 && aEmptyAsInf { + return 1 + } + // anything < Inf + if len(b) == 0 && bEmptyAsInf { + return -1 + } + return bytes.Compare(a, b) +} + +type failedToClampReason int + +const ( + successClamp failedToClampReason = iota + // ToClamp : |_________| + // Range: |______| + leftNotOverlapped + // ToClamp : |_________| + // Range: |______| + rightNotOverlapped + buggyUnknown +) + +func clampInOneRange(rng kv.KeyRange, clampIn kv.KeyRange) (kv.KeyRange, failedToClampReason) { + possibleFailureReason := buggyUnknown + if CompareBytesExt(rng.StartKey, false, clampIn.StartKey, false) < 0 { + rng.StartKey = clampIn.StartKey + possibleFailureReason = leftNotOverlapped + } + if CompareBytesExt(rng.EndKey, true, clampIn.EndKey, true) > 0 { + rng.EndKey = clampIn.EndKey + possibleFailureReason = rightNotOverlapped + } + // We treat empty region as "failed" too. + if CompareBytesExt(rng.StartKey, false, rng.EndKey, true) >= 0 { + return kv.KeyRange{}, possibleFailureReason + } + return rng, successClamp +} + +// CloneSlice sallowly clones a slice. +func CloneSlice[T any](s []T) []T { + r := make([]T, len(s)) + copy(r, s) + return r +} + +// IntersectAll returns the intersect of two set of segments. +// OWNERSHIP INFORMATION: +// For running faster, this function would MUTATE the input slice. (i.e. takes its ownership.) +// (But it is promised that this function won't change the `start key` and `end key` slice) +// If you want to use the input slice after, call `CloneSlice` over arguments before passing them. +// +// You can treat "set of segments" as points maybe not adjacent. +// in this way, IntersectAll(s1, s2) = { point | point in both s1 and s2 } +// Example: +// ranges: |___________| |________________| +// toClampIn: |_____| |____| |________________| +// result: |_____| |_| |______________| +// we are assuming the arguments are sorted by the start key and no overlaps. +// you can call CollapseRanges to get key ranges fits this requirements. +// Note: this algorithm is pretty like the `checkIntervalIsSubset`, can we get them together? +func IntersectAll(s1 []kv.KeyRange, s2 []kv.KeyRange) []kv.KeyRange { + currentClamping := 0 + currentClampTarget := 0 + rs := make([]kv.KeyRange, 0, len(s1)) + for currentClampTarget < len(s2) && currentClamping < len(s1) { + cin := s2[currentClampTarget] + crg := s1[currentClamping] + rng, result := clampInOneRange(crg, cin) + switch result { + case successClamp: + rs = append(rs, rng) + if CompareBytesExt(crg.EndKey, true, cin.EndKey, true) <= 0 { + currentClamping++ + } else { + // Not fully consumed the clamped range. + s1[currentClamping].StartKey = cin.EndKey + } + case leftNotOverlapped: + currentClamping++ + case rightNotOverlapped: + currentClampTarget++ + case buggyUnknown: + log.L().DPanic("Unreachable path reached", + zap.Stringer("over-ranges", logutil.StringifyKeys(s1)), + zap.Stringer("clamp-into", logutil.StringifyKeys(s2)), + zap.Stringer("current-clamping", logutil.StringifyRange(crg)), + zap.Stringer("current-target", logutil.StringifyRange(cin)), + ) + } + } + return rs +} diff --git a/br/pkg/utils/key_test.go b/br/pkg/utils/key_test.go index f9ce1c012b88d..763ad4e740b12 100644 --- a/br/pkg/utils/key_test.go +++ b/br/pkg/utils/key_test.go @@ -4,8 +4,10 @@ package utils import ( "encoding/hex" + "fmt" "testing" + "github.com/pingcap/tidb/kv" "github.com/stretchr/testify/require" ) @@ -111,3 +113,66 @@ func TestCompareEndKey(t *testing.T) { require.Equal(t, tt.ans, res) } } + +func TestClampKeyRanges(t *testing.T) { + r := func(a, b string) kv.KeyRange { + return kv.KeyRange{ + StartKey: []byte(a), + EndKey: []byte(b), + } + } + type Case struct { + ranges []kv.KeyRange + clampIn []kv.KeyRange + result []kv.KeyRange + } + + cases := []Case{ + { + ranges: []kv.KeyRange{r("0001", "0002"), r("0003", "0004"), r("0005", "0008")}, + clampIn: []kv.KeyRange{r("0001", "0004"), r("0006", "0008")}, + result: []kv.KeyRange{r("0001", "0002"), r("0003", "0004"), r("0006", "0008")}, + }, + { + ranges: []kv.KeyRange{r("0001", "0002"), r("00021", "0003"), r("0005", "0009")}, + clampIn: []kv.KeyRange{r("0001", "0004"), r("0005", "0008")}, + result: []kv.KeyRange{r("0001", "0002"), r("00021", "0003"), r("0005", "0008")}, + }, + { + ranges: []kv.KeyRange{r("0001", "0050"), r("0051", "0095"), r("0098", "0152")}, + clampIn: []kv.KeyRange{r("0001", "0100"), r("0150", "0200")}, + result: []kv.KeyRange{r("0001", "0050"), r("0051", "0095"), r("0098", "0100"), r("0150", "0152")}, + }, + { + ranges: []kv.KeyRange{r("0001", "0050"), r("0051", "0095"), r("0098", "0152")}, + clampIn: []kv.KeyRange{r("0001", "0100"), r("0150", "")}, + result: []kv.KeyRange{r("0001", "0050"), r("0051", "0095"), r("0098", "0100"), r("0150", "0152")}, + }, + { + ranges: []kv.KeyRange{r("0001", "0050"), r("0051", "0095"), r("0098", "")}, + clampIn: []kv.KeyRange{r("0001", "0100"), r("0150", "0200")}, + result: []kv.KeyRange{r("0001", "0050"), r("0051", "0095"), r("0098", "0100"), r("0150", "0200")}, + }, + { + ranges: []kv.KeyRange{r("", "0050")}, + clampIn: []kv.KeyRange{r("", "")}, + result: []kv.KeyRange{r("", "0050")}, + }, + } + run := func(t *testing.T, c Case) { + require.ElementsMatch( + t, + IntersectAll(CloneSlice(c.ranges), CloneSlice(c.clampIn)), + c.result) + require.ElementsMatch( + t, + IntersectAll(c.clampIn, c.ranges), + c.result) + } + + for i, c := range cases { + t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) { + run(t, c) + }) + } +}