diff --git a/cdc/kv/client.go b/cdc/kv/client.go index e6ac06b0cb0..fe4fde0c377 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -1331,7 +1331,7 @@ func (s *eventFeedSession) getStreamCancel(storeAddr string) (cancel context.Can } func (s *eventFeedSession) logSlowRegions(ctx context.Context) error { - ticker := time.NewTicker(10 * time.Second) + ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { @@ -1340,12 +1340,18 @@ func (s *eventFeedSession) logSlowRegions(ctx context.Context) error { case <-ticker.C: } - currTime := s.client.pdClock.CurrentTime() attr := s.rangeLock.CollectLockedRangeAttrs(nil) + ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.CheckpointTs) + currTime := s.client.pdClock.CurrentTime() + log.Info("event feed starts to check locked regions", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName)) + if attr.SlowestRegion.Initialized { - ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.CheckpointTs) if currTime.Sub(ckptTime) > 2*resolveLockMinInterval { - log.Info("event feed finds a slow region", + log.Info("event feed finds a initialized slow region", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Int64("tableID", s.tableID), @@ -1359,6 +1365,13 @@ func (s *eventFeedSession) logSlowRegions(ctx context.Context) error { zap.Int64("tableID", s.tableID), zap.String("tableName", s.tableName), zap.Any("slowRegion", attr.SlowestRegion)) + } else if currTime.Sub(ckptTime) > 10*time.Minute { + log.Info("event feed finds a uninitialized slow region", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Int64("tableID", s.tableID), + zap.String("tableName", s.tableName), + zap.Any("slowRegion", attr.SlowestRegion)) } if len(attr.Holes) > 0 { holes := make([]string, 0, len(attr.Holes)) diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index f11a036a0b4..161726f3914 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -696,7 +696,7 @@ func (s *SharedClient) resolveLock(ctx context.Context) error { } func (s *SharedClient) logSlowRegions(ctx context.Context) error { - ticker := time.NewTicker(10 * time.Second) + ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { @@ -704,15 +704,18 @@ func (s *SharedClient) logSlowRegions(ctx context.Context) error { return ctx.Err() case <-ticker.C: } + log.Info("event feed starts to check locked regions", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID)) currTime := s.pdClock.CurrentTime() s.totalSpans.RLock() for subscriptionID, rt := range s.totalSpans.v { attr := rt.rangeLock.CollectLockedRangeAttrs(nil) + ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.CheckpointTs) if attr.SlowestRegion.Initialized { - ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.CheckpointTs) if currTime.Sub(ckptTime) > 2*resolveLockMinInterval { - log.Info("event feed finds a slow region", + log.Info("event feed finds a initialized slow region", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Any("subscriptionID", subscriptionID), @@ -724,6 +727,12 @@ func (s *SharedClient) logSlowRegions(ctx context.Context) error { zap.String("changefeed", s.changefeed.ID), zap.Any("subscriptionID", subscriptionID), zap.Any("slowRegion", attr.SlowestRegion)) + } else if currTime.Sub(ckptTime) > 10*time.Minute { + log.Info("event feed finds a uninitialized slow region", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", subscriptionID), + zap.Any("slowRegion", attr.SlowestRegion)) } if len(attr.Holes) > 0 { holes := make([]string, 0, len(attr.Holes)) diff --git a/cdc/puller/frontier/frontier.go b/cdc/puller/frontier/frontier.go index a10dbc87608..f993dba2bca 100644 --- a/cdc/puller/frontier/frontier.go +++ b/cdc/puller/frontier/frontier.go @@ -31,6 +31,7 @@ type Frontier interface { Forward(regionID uint64, span tablepb.Span, ts uint64) Frontier() uint64 String() string + Entries(fn func(key []byte, ts uint64)) } // spanFrontier tracks the minimum timestamp of a set of spans. diff --git a/cdc/puller/frontier/frontier_test.go b/cdc/puller/frontier/frontier_test.go index 9d5a624fbc4..2ca32716016 100644 --- a/cdc/puller/frontier/frontier_test.go +++ b/cdc/puller/frontier/frontier_test.go @@ -15,6 +15,7 @@ package frontier import ( "bytes" + "math" "math/rand" "sort" "testing" @@ -391,3 +392,46 @@ func TestMinMaxWithRegionSplitMerge(t *testing.T) { f.Forward(8, tablepb.Span{StartKey: []byte("d"), EndKey: []byte("e")}, 5) require.Equal(t, uint64(5), f.Frontier()) } + +func TestFrontierEntries(t *testing.T) { + t.Parallel() + + ab := tablepb.Span{StartKey: []byte("a"), EndKey: []byte("b")} + bc := tablepb.Span{StartKey: []byte("b"), EndKey: []byte("c")} + cd := tablepb.Span{StartKey: []byte("c"), EndKey: []byte("d")} + de := tablepb.Span{StartKey: []byte("d"), EndKey: []byte("e")} + ef := tablepb.Span{StartKey: []byte("e"), EndKey: []byte("f")} + af := tablepb.Span{StartKey: []byte("a"), EndKey: []byte("f")} + f := NewFrontier(0, af) + + var slowestTs uint64 = math.MaxUint64 + var slowestRange tablepb.Span + getSlowestRange := func() { + slowestTs = math.MaxUint64 + slowestRange = tablepb.Span{} + f.Entries(func(key []byte, ts uint64) { + if ts < slowestTs { + slowestTs = ts + slowestRange.StartKey = key + slowestRange.EndKey = nil + } else if slowestTs != math.MaxUint64 && len(slowestRange.EndKey) == 0 { + slowestRange.EndKey = key + } + }) + } + + getSlowestRange() + require.Equal(t, uint64(0), slowestTs) + require.Equal(t, []byte("a"), []byte(slowestRange.StartKey)) + require.Equal(t, []byte("f"), []byte(slowestRange.EndKey)) + + f.Forward(1, ab, 100) + f.Forward(2, bc, 200) + f.Forward(3, cd, 300) + f.Forward(4, de, 400) + f.Forward(5, ef, 500) + getSlowestRange() + require.Equal(t, uint64(100), slowestTs) + require.Equal(t, []byte("a"), []byte(slowestRange.StartKey)) + require.Equal(t, []byte("b"), []byte(slowestRange.EndKey)) +} diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index 3d53966fc42..dda87b9e704 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -15,6 +15,7 @@ package puller import ( "context" + "math" "sync/atomic" "time" @@ -146,6 +147,8 @@ func (p *pullerImpl) Run(ctx context.Context) error { WithLabelValues(p.changefeed.Namespace, p.changefeed.ID, "resolved") lastResolvedTs := p.checkpointTs + lastAdvancedTime := time.Now() + lastLogSlowRangeTime := time.Now() g.Go(func() error { stuckDetectorTicker := time.NewTicker(1 * time.Minute) defer stuckDetectorTicker.Stop() @@ -234,10 +237,38 @@ func (p *pullerImpl) Run(ctx context.Context) error { zap.Duration("duration", time.Since(start)), zap.Strings("spans", spans)) } - if !initialized || resolvedTs == lastResolvedTs { + if !initialized { + continue + } + if resolvedTs <= lastResolvedTs { + if time.Since(lastAdvancedTime) > 30*time.Second && time.Since(lastLogSlowRangeTime) > 30*time.Second { + var slowestTs uint64 = math.MaxUint64 + slowestRange := tablepb.Span{} + rangeFilled := true + p.tsTracker.Entries(func(key []byte, ts uint64) { + if ts < slowestTs { + slowestTs = ts + slowestRange.StartKey = key + rangeFilled = false + } else if !rangeFilled { + slowestRange.EndKey = key + rangeFilled = true + } + }) + log.Info("table puller has been stucked", + zap.String("namespace", p.changefeed.Namespace), + zap.String("changefeed", p.changefeed.ID), + zap.Int64("tableID", p.tableID), + zap.String("tableName", p.tableName), + zap.Uint64("resolvedTs", resolvedTs), + zap.Uint64("slowestRangeTs", slowestTs), + zap.Stringer("range", &slowestRange)) + lastLogSlowRangeTime = time.Now() + } continue } lastResolvedTs = resolvedTs + lastAdvancedTime = time.Now() err := output(&model.RawKVEntry{CRTs: resolvedTs, OpType: model.OpTypeResolved, RegionID: e.RegionID}) if err != nil { return errors.Trace(err)