Skip to content

Commit

Permalink
puller(ticdc): detect resolved ts stuck in puller (#10182)
Browse files Browse the repository at this point in the history
close #10181
  • Loading branch information
sdojjy authored Dec 1, 2023
1 parent a78bd01 commit 05e0328
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 2 deletions.
37 changes: 35 additions & 2 deletions cdc/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ type pullerImpl struct {
changefeed model.ChangeFeedID
tableID model.TableID
tableName string

cfg *config.ServerConfig
lastForwardTime time.Time
lastForwardResolvedTs uint64
}

// New create a new Puller fetch event start from checkpointTs and put into buf.
Expand Down Expand Up @@ -110,6 +114,7 @@ func New(ctx context.Context,
changefeed: changefeed,
tableID: tableID,
tableName: tableName,
cfg: cfg,
}
return p
}
Expand Down Expand Up @@ -138,8 +143,8 @@ func (p *pullerImpl) Run(ctx context.Context) error {

lastResolvedTs := p.checkpointTs
g.Go(func() error {
metricsTicker := time.NewTicker(15 * time.Second)
defer metricsTicker.Stop()
stuckDetectorTicker := time.NewTicker(1 * time.Minute)
defer stuckDetectorTicker.Stop()
output := func(raw *model.RawKVEntry) error {
// even after https://github.com/pingcap/tiflow/pull/2038, kv client
// could still miss region change notification, which leads to resolved
Expand Down Expand Up @@ -176,6 +181,11 @@ func (p *pullerImpl) Run(ctx context.Context) error {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-stuckDetectorTicker.C:
if err := p.detectResolvedTsStuck(initialized); err != nil {
return errors.Trace(err)
}
continue
case e = <-eventCh:
}

Expand Down Expand Up @@ -235,6 +245,29 @@ func (p *pullerImpl) Run(ctx context.Context) error {
return g.Wait()
}

func (p *pullerImpl) detectResolvedTsStuck(initialized bool) error {
if p.cfg.Debug.Puller.EnableResolvedTsStuckDetection && initialized {
resolvedTs := p.tsTracker.Frontier()
if resolvedTs == p.lastForwardResolvedTs {
log.Warn("ResolvedTs stuck detected in puller",
zap.String("namespace", p.changefeed.Namespace),
zap.String("changefeed", p.changefeed.ID),
zap.Int64("tableID", p.tableID),
zap.String("tableName", p.tableName),
zap.Uint64("lastResolvedTs", p.lastForwardResolvedTs),
zap.Uint64("resolvedTs", resolvedTs))
if time.Since(p.lastForwardTime) > time.Duration(p.cfg.Debug.Puller.ResolvedTsStuckInterval) {
// throw an error to cause changefeed restart
return errors.New("resolved ts stuck")
}
} else {
p.lastForwardTime = time.Now()
p.lastForwardResolvedTs = resolvedTs
}
}
return nil
}

func (p *pullerImpl) Output() <-chan *model.RawKVEntry {
return p.outputCh
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ func TestParseCfg(t *testing.T) {
Enable: false,
MetaStoreConfig: config.MetaStoreConfiguration{},
},
Puller: &config.PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute),
},
},
ClusterID: "default",
MaxMemoryPercentage: config.DisableMemoryLimit,
Expand Down Expand Up @@ -346,6 +350,10 @@ check-balance-interval = "10s"
Enable: false,
MetaStoreConfig: config.MetaStoreConfiguration{},
},
Puller: &config.PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute),
},
},
ClusterID: "default",
MaxMemoryPercentage: config.DisableMemoryLimit,
Expand Down Expand Up @@ -481,6 +489,10 @@ cert-allowed-cn = ["dd","ee"]
Enable: false,
MetaStoreConfig: config.MetaStoreConfiguration{},
},
Puller: &config.PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute),
},
},
ClusterID: "default",
MaxMemoryPercentage: config.DisableMemoryLimit,
Expand Down Expand Up @@ -543,5 +555,9 @@ unknown3 = 3
Enable: false,
MetaStoreConfig: config.MetaStoreConfiguration{},
},
Puller: &config.PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: config.TomlDuration(5 * time.Minute),
},
}, o.serverConfig.Debug)
}
4 changes: 4 additions & 0 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ const (
"ssl-cert": "",
"ssl-key": ""
}
},
"puller": {
"enable-resolved-ts-stuck-detection": false,
"resolved-ts-stuck-interval": 300000000000
}
},
"cluster-id": "default",
Expand Down
11 changes: 11 additions & 0 deletions pkg/config/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type DebugConfig struct {

// CDCV2 enables ticdc version 2 implementation with new metastore
CDCV2 *CDCV2 `toml:"cdc-v2" json:"cdc-v2"`

// Puller is the configuration of the puller.
Puller *PullerConfig `toml:"puller" json:"puller"`
}

// ValidateAndAdjust validates and adjusts the debug configuration
Expand All @@ -50,3 +53,11 @@ func (c *DebugConfig) ValidateAndAdjust() error {

return nil
}

// PullerConfig represents config for puller
type PullerConfig struct {
// EnableResolvedTsStuckDetection is used to enable resolved ts stuck detection.
EnableResolvedTsStuckDetection bool `toml:"enable-resolved-ts-stuck-detection" json:"enable-resolved-ts-stuck-detection"`
// ResolvedTsStuckInterval is the interval of checking resolved ts stuck.
ResolvedTsStuckInterval TomlDuration `toml:"resolved-ts-stuck-interval" json:"resolved-ts-stuck-interval"`
}
4 changes: 4 additions & 0 deletions pkg/config/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ var defaultServerConfig = &ServerConfig{
Scheduler: NewDefaultSchedulerConfig(),
EnableKVConnectBackOff: false,
CDCV2: &CDCV2{Enable: false},
Puller: &PullerConfig{
EnableResolvedTsStuckDetection: false,
ResolvedTsStuckInterval: TomlDuration(5 * time.Minute),
},
},
ClusterID: "default",
GcTunerMemoryThreshold: DisableMemoryLimit,
Expand Down

0 comments on commit 05e0328

Please sign in to comment.