Skip to content

Commit

Permalink
Fix timer failure (#27006)
Browse files Browse the repository at this point in the history
Fixes #26205

Fixes deadlocks in heartbeat scheduler.

No changelog needed because this bug only exists in go 1.16+ and v7.14.0 is the first affected beats version

The underlying cause is a likely bug in golang, reported here by myself here: golang/go#47329

(cherry picked from commit 2f62e8f)
  • Loading branch information
andrewvc authored and mergify-bot committed Jul 22, 2021
1 parent 96b672c commit d16185b
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 11 deletions.
19 changes: 13 additions & 6 deletions heartbeat/scheduler/timerqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (tq *TimerQueue) Start() {
if tq.th.Len() > 0 {
nr := tq.th[0].runAt
tq.nextRunAt = &nr
tq.timer.Reset(nr.Sub(time.Now()))
tq.timer = time.NewTimer(time.Until(nr))
} else {
tq.timer.Stop()
tq.nextRunAt = nil
Expand All @@ -104,14 +104,21 @@ func (tq *TimerQueue) pushInternal(tt *timerTask) {
heap.Push(&tq.th, tt)

if tq.nextRunAt == nil || tq.nextRunAt.After(tt.runAt) {
// Stop and drain the timer prior to reset per https://golang.org/pkg/time/#Timer.Reset
// Only drain if nextRunAt is set, otherwise the timer channel has already been stopped the
// channel is empty (and thus would block)
if tq.nextRunAt != nil && !tq.timer.Stop() {
<-tq.timer.C
}
tq.timer.Reset(tt.runAt.Sub(time.Now()))

// Originally the line below this comment was
//
// tq.timer.Reset(time.Until(tt.runAt))
//
// however this broke in go1.16rc1, specifically on the commit b4b014465216790e01aa66f9120d03230e4aff46
//, specifically on this line:
// https://github.com/golang/go/commit/b4b014465216790e01aa66f9120d03230e4aff46#diff-73699b6edfe5dbb3f6824e66bb3566bce9405e9a8c810cac55c8199459f0ac19R652
// where some nice new optimizations don't actually work reliably
// This can be worked around by instantiating a new timer rather than resetting the timer.
// since that internally calls deltimer in runtime/timer.go rather than modtimer,
// I suspect that the problem is in modtimer's setting of &pp.timerModifiedEarliest
tq.timer = time.NewTimer(time.Until(tt.runAt))
tq.nextRunAt = &tt.runAt
}
}
Expand Down
33 changes: 28 additions & 5 deletions heartbeat/scheduler/timerqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,41 @@ package timerqueue
import (
"context"
"math/rand"
"os"
"runtime/pprof"
"sort"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestQueueRunsInOrder(t *testing.T) {
t.Skip("flaky test on windows: https://github.com/elastic/beats/issues/26205")
// Bugs can show up only occasionally
for i := 0; i < 100; i++ {
testQueueRunsInOrderOnce(t)
func TestRunsInOrder(t *testing.T) {
testQueueRunsInOrderOnce(t)
}

// TestStress tries to figure out if we have any deadlocks that show up under concurrency
func TestStress(t *testing.T) {
for i := 0; i < 120000; i++ {
failed := make(chan bool)
succeeded := make(chan bool)

watchdog := time.AfterFunc(time.Second*5, func() {
failed <- true
})

go func() {
testQueueRunsInOrderOnce(t)
succeeded <- true
}()

select {
case <-failed:
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
require.FailNow(t, "Scheduler test iteration timed out, deadlock issue?")
case <-succeeded:
watchdog.Stop()
}
}
}

Expand Down

0 comments on commit d16185b

Please sign in to comment.