From b5fa5634fc7f3c2a3e3bcfe9088bd74345fe3884 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 1 Feb 2024 21:10:55 +0800 Subject: [PATCH] redo(cdc): custom redo event cache and disable it by default (#10139) (#10317) close pingcap/tiflow#10143 --- cdc/api/v2/model.go | 21 +++++++++++++++++++++ cdc/api/v2/model_test.go | 4 ++++ cdc/processor/sinkmanager/manager.go | 21 +++++++++++++++------ cdc/processor/sinkmanager/manager_test.go | 5 ++++- pkg/config/config_test_data.go | 18 +++++++++++++++--- pkg/config/consistent.go | 11 +++++++++++ pkg/config/replica_config.go | 4 ++++ 7 files changed, 74 insertions(+), 10 deletions(-) diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 46bf608514a..efd17488b75 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -283,6 +283,12 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( Compression: c.Consistent.Compression, FlushConcurrency: c.Consistent.FlushConcurrency, } + if c.Consistent.MemoryUsage != nil { + res.Consistent.MemoryUsage = &config.ConsistentMemoryUsage{ + MemoryQuotaPercentage: c.Consistent.MemoryUsage.MemoryQuotaPercentage, + EventCachePercentage: c.Consistent.MemoryUsage.EventCachePercentage, + } + } } if c.Sink != nil { var dispatchRules []*config.DispatchRule @@ -776,7 +782,14 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { Compression: cloned.Consistent.Compression, FlushConcurrency: cloned.Consistent.FlushConcurrency, } + if cloned.Consistent.MemoryUsage != nil { + res.Consistent.MemoryUsage = &ConsistentMemoryUsage{ + MemoryQuotaPercentage: cloned.Consistent.MemoryUsage.MemoryQuotaPercentage, + EventCachePercentage: cloned.Consistent.MemoryUsage.EventCachePercentage, + } + } } + if cloned.Mounter != nil { res.Mounter = &MounterConfig{ WorkerNum: cloned.Mounter.WorkerNum, @@ -978,6 +991,14 @@ type ConsistentConfig struct { UseFileBackend bool `json:"use_file_backend"` Compression string `json:"compression,omitempty"` FlushConcurrency int `json:"flush_concurrency,omitempty"` + + MemoryUsage *ConsistentMemoryUsage `json:"memory_usage"` +} + +// ConsistentMemoryUsage represents memory usage of Consistent module. +type ConsistentMemoryUsage struct { + MemoryQuotaPercentage uint64 `json:"memory_quota_percentage"` + EventCachePercentage uint64 `json:"event_cache_percentage"` } // ChangefeedSchedulerConfig is per changefeed scheduler settings. diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index 4fb3815ed78..2b731148958 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -68,6 +68,10 @@ var defaultAPIConfig = &ReplicaConfig{ FlushWorkerNum: redo.DefaultFlushWorkerNum, Storage: "", UseFileBackend: false, + MemoryUsage: &ConsistentMemoryUsage{ + MemoryQuotaPercentage: 50, + EventCachePercentage: 0, + }, }, Scheduler: &ChangefeedSchedulerConfig{ EnableTableAcrossNodes: config.GetDefaultReplicaConfig(). diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 32fc2325eec..dd277b0fc6f 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -158,6 +158,7 @@ func New( WithLabelValues(changefeedID.Namespace, changefeedID.ID), } + totalQuota := changefeedInfo.Config.MemoryQuota if redoDMLMgr != nil && redoDMLMgr.Enabled() { m.redoDMLMgr = redoDMLMgr m.redoProgressHeap = newTableProgresses() @@ -165,18 +166,26 @@ func New( m.redoTaskChan = make(chan *redoTask) m.redoWorkerAvailable = make(chan struct{}, 1) - // Use 3/4 memory quota as redo quota, and 1/2 again for redo cache. - m.sinkMemQuota = memquota.NewMemQuota(changefeedID, changefeedInfo.Config.MemoryQuota/4*1, "sink") - redoQuota := changefeedInfo.Config.MemoryQuota / 4 * 3 + consistentMemoryUsage := changefeedInfo.Config.Consistent.MemoryUsage + if consistentMemoryUsage == nil { + consistentMemoryUsage = config.GetDefaultReplicaConfig().Consistent.MemoryUsage + } + + redoQuota := totalQuota * consistentMemoryUsage.MemoryQuotaPercentage / 100 + sinkQuota := totalQuota - redoQuota + m.sinkMemQuota = memquota.NewMemQuota(changefeedID, sinkQuota, "sink") m.redoMemQuota = memquota.NewMemQuota(changefeedID, redoQuota, "redo") - m.eventCache = newRedoEventCache(changefeedID, redoQuota/2*1) + + eventCache := redoQuota * consistentMemoryUsage.EventCachePercentage / 100 + if eventCache > 0 { + m.eventCache = newRedoEventCache(changefeedID, eventCache) + } } else { - m.sinkMemQuota = memquota.NewMemQuota(changefeedID, changefeedInfo.Config.MemoryQuota, "sink") + m.sinkMemQuota = memquota.NewMemQuota(changefeedID, totalQuota, "sink") m.redoMemQuota = memquota.NewMemQuota(changefeedID, 0, "redo") } m.ready = make(chan struct{}) - return m } diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 249e0832299..992743db857 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -30,10 +30,13 @@ import ( ) func getChangefeedInfo() *model.ChangeFeedInfo { + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Consistent.MemoryUsage.MemoryQuotaPercentage = 75 + replicaConfig.Consistent.MemoryUsage.EventCachePercentage = 50 return &model.ChangeFeedInfo{ Error: nil, SinkURI: "blackhole://", - Config: config.GetDefaultReplicaConfig(), + Config: replicaConfig, } } diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 74266cd0499..063c0e334dc 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -75,7 +75,11 @@ const ( "encoding-worker-num": 16, "flush-worker-num": 8, "storage": "", - "use-file-backend": false + "use-file-backend": false, + "memory-usage": { + "memory-quota-percentage": 50, + "event-cache-percentage": 0 + } }, "scheduler": { "enable-table-across-nodes": false, @@ -312,7 +316,11 @@ const ( "encoding-worker-num": 16, "flush-worker-num": 8, "storage": "", - "use-file-backend": false + "use-file-backend": false, + "memory-usage": { + "memory-quota-percentage": 50, + "event-cache-percentage": 0 + } }, "scheduler": { "enable-table-across-nodes": true, @@ -468,7 +476,11 @@ const ( "encoding-worker-num": 16, "flush-worker-num": 8, "storage": "", - "use-file-backend": false + "use-file-backend": false, + "memory-usage": { + "memory-quota-percentage": 50, + "event-cache-percentage": 0 + } }, "scheduler": { "enable-table-across-nodes": true, diff --git a/pkg/config/consistent.go b/pkg/config/consistent.go index c8f93c6eac8..3409cf8327b 100644 --- a/pkg/config/consistent.go +++ b/pkg/config/consistent.go @@ -35,6 +35,17 @@ type ConsistentConfig struct { UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"` Compression string `toml:"compression" json:"compression"` FlushConcurrency int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"` + + MemoryUsage *ConsistentMemoryUsage `toml:"memory-usage" json:"memory-usage"` +} + +// ConsistentMemoryUsage represents memory usage of Consistent module. +type ConsistentMemoryUsage struct { + // ReplicaConfig.MemoryQuota * MemoryQuotaPercentage / 100 will be used for redo events. + MemoryQuotaPercentage uint64 `toml:"memory-quota-percentage" json:"memory-quota-percentage"` + // ReplicaConfig.MemoryQuota * MemoryQuotaPercentage / 100 * EventCachePercentage / 100 + // will be used for redo cache. + EventCachePercentage uint64 `toml:"event-cache-percentage" json:"event-cache-percentage"` } // ValidateAndAdjust validates the consistency config and adjusts it if necessary. diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 33390402bd1..577937bdbe0 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -88,6 +88,10 @@ var defaultReplicaConfig = &ReplicaConfig{ Storage: "", UseFileBackend: false, Compression: "", + MemoryUsage: &ConsistentMemoryUsage{ + MemoryQuotaPercentage: 50, + EventCachePercentage: 0, + }, }, Scheduler: &ChangefeedSchedulerConfig{ EnableTableAcrossNodes: false,