Skip to content

Commit

Permalink
workerpool: limit the rate to output deadlock warning (#3775) (#3796)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 10, 2021
1 parent f37133e commit 0fbc2ac
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 3 deletions.
20 changes: 17 additions & 3 deletions pkg/workerpool/pool_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/ticdc/pkg/notify"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
)

const (
Expand Down Expand Up @@ -248,13 +249,19 @@ type worker struct {
isRunning int32
// notifies exits of run()
stopNotifier notify.Notifier

slowSynchronizeThreshold time.Duration
slowSynchronizeLimiter *rate.Limiter
}

func newWorker() *worker {
return &worker{
taskCh: make(chan task, 128),
handles: make(map[*defaultEventHandle]struct{}),
handleCancelCh: make(chan struct{}), // this channel must be unbuffered, i.e. blocking

slowSynchronizeThreshold: 10 * time.Second,
slowSynchronizeLimiter: rate.NewLimiter(rate.Every(time.Second*5), 1),
}
}

Expand Down Expand Up @@ -340,13 +347,20 @@ func (w *worker) synchronize() {
break
}

if time.Since(startTime) > time.Second*10 {
// likely the workerpool has deadlocked, or there is a bug in the event handlers.
log.Warn("synchronize is taking too long, report a bug", zap.Duration("elapsed", time.Since(startTime)))
if time.Since(startTime) > w.slowSynchronizeThreshold &&
w.slowSynchronizeLimiter.Allow() {
// likely the workerpool has deadlocked, or there is a bug
// in the event handlers.
logWarn("synchronize is taking too long, report a bug",
zap.Duration("elapsed", time.Since(startTime)),
zap.Stack("stacktrace"))
}
}
}

// A delegate to log.Warn. It exists only for testing.
var logWarn = log.Warn

func (w *worker) addHandle(handle *defaultEventHandle) {
w.handleRWLock.Lock()
defer w.handleRWLock.Unlock()
Expand Down
34 changes: 34 additions & 0 deletions pkg/workerpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
)

func TestSuite(t *testing.T) { check.TestingT(t) }
Expand Down Expand Up @@ -425,6 +427,38 @@ func (s *workerPoolSuite) TestCancelByAddEventContext(c *check.C) {
c.Assert(err, check.IsNil)
}

func TestSynchronizeLog(t *testing.T) {
w := newWorker()
w.isRunning = 1
// Always report "synchronize is taking too long".
w.slowSynchronizeThreshold = time.Duration(0)
w.slowSynchronizeLimiter = rate.NewLimiter(rate.Every(100*time.Minute), 1)

counter := int32(0)
logWarn = func(msg string, fields ...zap.Field) {
atomic.AddInt32(&counter, 1)
}
defer func() { logWarn = log.Warn }()

doneCh := make(chan struct{})
go func() {
w.synchronize()
close(doneCh)
}()

time.Sleep(300 * time.Millisecond)
w.stopNotifier.Notify()
time.Sleep(300 * time.Millisecond)
w.stopNotifier.Notify()

// Close worker.
atomic.StoreInt32(&w.isRunning, 0)
w.stopNotifier.Close()
<-doneCh

require.EqualValues(t, 1, atomic.LoadInt32(&counter))
}

// Benchmark workerpool with ping-pong workflow.
// go test -benchmem -run='^$' -bench '^(BenchmarkWorkerpool)$' github.com/pingcap/ticdc/pkg/workerpool
func BenchmarkWorkerpool(b *testing.B) {
Expand Down

0 comments on commit 0fbc2ac

Please sign in to comment.