Skip to content

Commit

Permalink
[cluster] Fix watch resource leak/hang (#2984)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdarulis authored Dec 5, 2020
1 parent b6dfbaa commit d21e701
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 29 deletions.
10 changes: 6 additions & 4 deletions src/cluster/etcd/watchmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ func (w *manager) Watch(key string) {
case r, ok := <-watchChan:
if !ok {
// the watch chan is closed, set it to nil so it will be recreated
// this is unlikely to happen but just to be defensive
cancelFn()
watchChan = nil
logger.Warn("etcd watch channel closed on key, recreating a watch channel")
Expand Down Expand Up @@ -161,14 +160,17 @@ func (w *manager) Watch(key string) {
if err == rpctypes.ErrCompacted {
logger.Warn("recreating watch at revision", zap.Int64("revision", r.CompactRevision))
revOverride = r.CompactRevision
watchChan = nil
} else {
logger.Warn("recreating watch due to an error")
}
}

if r.IsProgressNotify() {
cancelFn()
watchChan = nil
} else if r.IsProgressNotify() {
// Do not call updateFn on ProgressNotify as it happens periodically with no update events
continue
}

if err = w.updateFn(key, r.Events); err != nil {
logger.Error("received notification for key, but failed to get value", zap.Error(err))
}
Expand Down
95 changes: 70 additions & 25 deletions src/cluster/etcd/watchmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ package watchmanager

import (
"fmt"
"runtime"
"sync/atomic"
"testing"
"time"

"github.com/m3db/m3/src/cluster/mocks"

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/integration"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/integration"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -150,21 +151,32 @@ func TestWatchRecreate(t *testing.T) {
}

func TestWatchNoLeader(t *testing.T) {
const (
watchInitAndRetryDelay = 200 * time.Millisecond
watchCheckInterval = 50 * time.Millisecond
)

ecluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer ecluster.Terminate(t)

ec := ecluster.Client(0)

var (
updateCalled int32
shouldStop int32
ec = ecluster.Client(0)
tickDuration = 10 * time.Millisecond
electionTimeout = time.Duration(3*ecluster.Members[0].ElectionTicks) * tickDuration
doneCh = make(chan struct{}, 1)
eventLog = []*clientv3.Event{}
updateCalled int32
shouldStop int32
)
doneCh := make(chan struct{}, 1)

opts := NewOptions().
SetWatcher(ec.Watcher).
SetUpdateFn(
func(string, []*clientv3.Event) error {
func(_ string, e []*clientv3.Event) error {
atomic.AddInt32(&updateCalled, 1)
if len(e) > 0 {
eventLog = append(eventLog, e...)
}
return nil
},
).
Expand All @@ -176,50 +188,83 @@ func TestWatchNoLeader(t *testing.T) {

close(doneCh)

// stopped = true
return true
},
).
SetWatchChanInitTimeout(200 * time.Millisecond)
SetWatchChanInitTimeout(watchInitAndRetryDelay).
SetWatchChanCheckInterval(watchCheckInterval)

wh, err := NewWatchManager(opts)
require.NoError(t, err)

go wh.Watch("foo")

time.Sleep(2 * time.Second)
runtime.Gosched()
time.Sleep(10 * time.Millisecond)

// there should be a valid watch now, trigger a notification
_, err = ec.Put(context.Background(), "foo", "v")
_, err = ec.Put(context.Background(), "foo", "bar")
require.NoError(t, err)

for {
if atomic.LoadInt32(&updateCalled) == int32(1) {
leaderIdx := ecluster.WaitLeader(t)
require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader")

for i := 0; i < 10; i++ {
if atomic.LoadInt32(&updateCalled) == int32(3) {
break
}
time.Sleep(100 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
}

// simulate quorum loss
ecluster.Members[1].Stop(t)
ecluster.Members[2].Stop(t)
ecluster.Client(1).Close()
ecluster.Client(2).Close()
ecluster.TakeClient(1)
ecluster.TakeClient(2)

// wait for election timeout, then member[0] will not have a leader.
tickDuration := 10 * time.Millisecond
time.Sleep(time.Duration(3*ecluster.Members[0].ElectionTicks) * tickDuration)
time.Sleep(electionTimeout)

for {
if atomic.LoadInt32(&updateCalled) == 2 {
for i := 0; i < 100; i++ { // 10ms * 100 = 1s
// test that leader loss is retried - even on error, we should attempt update.
// 5 is an arbitraty number greater than amount of actual updates
if atomic.LoadInt32(&updateCalled) >= 10 {
break
}
time.Sleep(50 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
}

// clean up the background go routine
updates := atomic.LoadInt32(&updateCalled)
if updates < 10 {
require.Fail(t,
"insufficient update calls",
"expected at least 10 update attempts, got %d during a partition",
updates)
}

require.NoError(t, ecluster.Members[1].Restart(t))
require.NoError(t, ecluster.Members[2].Restart(t))
// wait for leader + election delay just in case
time.Sleep(time.Duration(3*ecluster.Members[0].ElectionTicks) * tickDuration)

leaderIdx = ecluster.WaitLeader(t)
require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader")

_, err = ec.Put(context.Background(), "foo", "baz")
require.NoError(t, err)

// give some time for watch to be updated
runtime.Gosched()
time.Sleep(watchInitAndRetryDelay)

atomic.AddInt32(&shouldStop, 1)
<-doneCh

require.Len(t, eventLog, 2)
require.NotNil(t, eventLog[0])
require.Equal(t, eventLog[0].Kv.Key, []byte("foo"))
require.Equal(t, eventLog[0].Kv.Value, []byte("bar"))
require.NotNil(t, eventLog[1])
require.Equal(t, eventLog[1].Kv.Key, []byte("foo"))
require.Equal(t, eventLog[1].Kv.Value, []byte("baz"))
}

func TestWatchCompactedRevision(t *testing.T) {
Expand Down

0 comments on commit d21e701

Please sign in to comment.