Skip to content

Commit

Permalink
sourcemanager(ticdc): close all pullers asyncrhonsly to prevent block…
Browse files Browse the repository at this point in the history
… too long (#11667)

close #11638
  • Loading branch information
3AceShowHand authored Jan 22, 2025
1 parent 54ad97f commit 79bd0c0
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion cdc/processor/sourcemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,17 @@ func (m *SourceManager) Close() error {
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID))
start := time.Now()

var wg sync.WaitGroup
m.pullers.Range(func(key, value interface{}) bool {
value.(*pullerwrapper.Wrapper).Close()
wg.Add(1)
go func() {
defer wg.Done()
value.(*pullerwrapper.Wrapper).Close()
}()
return true
})
wg.Wait()
log.Info("All pullers have been closed",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
Expand Down

0 comments on commit 79bd0c0

Please sign in to comment.