From 79bd0c0f048c5614cb0c1dae1e2c9a8983fad4c1 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Wed, 22 Jan 2025 15:29:17 +0800 Subject: [PATCH] sourcemanager(ticdc): close all pullers asyncrhonsly to prevent block too long (#11667) close pingcap/tiflow#11638 --- cdc/processor/sourcemanager/manager.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index 6306a612c62..6d8df56970f 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -165,10 +165,17 @@ func (m *SourceManager) Close() error { zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID)) start := time.Now() + + var wg sync.WaitGroup m.pullers.Range(func(key, value interface{}) bool { - value.(*pullerwrapper.Wrapper).Close() + wg.Add(1) + go func() { + defer wg.Done() + value.(*pullerwrapper.Wrapper).Close() + }() return true }) + wg.Wait() log.Info("All pullers have been closed", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID),