diff --git a/pkg/workerpool/pool_impl.go b/pkg/workerpool/pool_impl.go index 966e6ea35c3..d98d4ffc1a8 100644 --- a/pkg/workerpool/pool_impl.go +++ b/pkg/workerpool/pool_impl.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/ticdc/pkg/notify" "go.uber.org/zap" "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" ) const ( @@ -248,6 +249,9 @@ type worker struct { isRunning int32 // notifies exits of run() stopNotifier notify.Notifier + + slowSynchronizeThreshold time.Duration + slowSynchronizeLimiter *rate.Limiter } func newWorker() *worker { @@ -255,6 +259,9 @@ func newWorker() *worker { taskCh: make(chan task, 128), handles: make(map[*defaultEventHandle]struct{}), handleCancelCh: make(chan struct{}), // this channel must be unbuffered, i.e. blocking + + slowSynchronizeThreshold: 10 * time.Second, + slowSynchronizeLimiter: rate.NewLimiter(rate.Every(time.Second*5), 1), } } @@ -340,13 +347,20 @@ func (w *worker) synchronize() { break } - if time.Since(startTime) > time.Second*10 { - // likely the workerpool has deadlocked, or there is a bug in the event handlers. - log.Warn("synchronize is taking too long, report a bug", zap.Duration("elapsed", time.Since(startTime))) + if time.Since(startTime) > w.slowSynchronizeThreshold && + w.slowSynchronizeLimiter.Allow() { + // likely the workerpool has deadlocked, or there is a bug + // in the event handlers. + logWarn("synchronize is taking too long, report a bug", + zap.Duration("elapsed", time.Since(startTime)), + zap.Stack("stacktrace")) } } } +// A delegate to log.Warn. It exists only for testing. +var logWarn = log.Warn + func (w *worker) addHandle(handle *defaultEventHandle) { w.handleRWLock.Lock() defer w.handleRWLock.Unlock() diff --git a/pkg/workerpool/pool_test.go b/pkg/workerpool/pool_test.go index dc5a2bff61a..c34378fd625 100644 --- a/pkg/workerpool/pool_test.go +++ b/pkg/workerpool/pool_test.go @@ -25,8 +25,10 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/util/testleak" + "github.com/stretchr/testify/require" "go.uber.org/zap" "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" ) func TestSuite(t *testing.T) { check.TestingT(t) } @@ -425,6 +427,38 @@ func (s *workerPoolSuite) TestCancelByAddEventContext(c *check.C) { c.Assert(err, check.IsNil) } +func TestSynchronizeLog(t *testing.T) { + w := newWorker() + w.isRunning = 1 + // Always report "synchronize is taking too long". + w.slowSynchronizeThreshold = time.Duration(0) + w.slowSynchronizeLimiter = rate.NewLimiter(rate.Every(100*time.Minute), 1) + + counter := int32(0) + logWarn = func(msg string, fields ...zap.Field) { + atomic.AddInt32(&counter, 1) + } + defer func() { logWarn = log.Warn }() + + doneCh := make(chan struct{}) + go func() { + w.synchronize() + close(doneCh) + }() + + time.Sleep(300 * time.Millisecond) + w.stopNotifier.Notify() + time.Sleep(300 * time.Millisecond) + w.stopNotifier.Notify() + + // Close worker. + atomic.StoreInt32(&w.isRunning, 0) + w.stopNotifier.Close() + <-doneCh + + require.EqualValues(t, 1, atomic.LoadInt32(&counter)) +} + // Benchmark workerpool with ping-pong workflow. // go test -benchmem -run='^$' -bench '^(BenchmarkWorkerpool)$' github.com/pingcap/ticdc/pkg/workerpool func BenchmarkWorkerpool(b *testing.B) {