Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix watch event loss #17555

Merged
merged 1 commit into from
Mar 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ func (sws *serverWatchStream) sendLoop() {
sws.mu.RUnlock()

var serr error
// gofail: var beforeSendWatchResponse struct{}
if !fragmented && !ok {
serr = sws.gRPCStream.Send(wr)
} else {
Expand Down
7 changes: 7 additions & 0 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ var (
maxWatchersPerSync = 512
)

func ChanBufLen() int { return chanBufLen }

type watchable interface {
watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
progress(w *watcher)
Expand Down Expand Up @@ -370,6 +372,11 @@ func (s *watchableStore) syncWatchers() int {
victims := make(watcherBatch)
wb := newWatcherBatch(wg, evs)
for w := range wg.watchers {
if w.minRev < compactionRev {
// Skip the watcher that failed to send compacted watch response due to w.ch is full.
// Next retry of syncWatchers would try to resend the compacted watch response to w.ch
continue
}
w.minRev = curRev + 1

eb, ok := wb[w]
Expand Down
58 changes: 58 additions & 0 deletions server/storage/mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"

"go.etcd.io/etcd/api/v3/mvccpb"
Expand Down Expand Up @@ -250,6 +251,63 @@ func TestWatchCompacted(t *testing.T) {
}
}

func TestWatchNoEventLossOnCompact(t *testing.T) {
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how relevant is maxWatchersPerSync to the issue and this test as len(watchers) < 4.

What about the case where len(watchers) > maxWatchersPerSync as pointed out in #17555 (comment) ? I haven't verified it, but I expect that unremoved watcher from s.unsynced will cause syncWatchers to return != 0, causing function to be called earlier.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how relevant is maxWatchersPerSync to the issue and this test as len(watchers) < 4.

It seems to be true. Confirmed that it always runs into the if branch (see below), and confirmed that it has no any impact on the test case.

But it isn't a big deal, and also from another prospective it should be OK to explicitly set a value to ensure len(wg.watchers) < maxWatchers although it's already true by default.

if len(wg.watchers) < maxWatchers {
return wg, wg.chooseAll(curRev, compactRev)
}

What about the case where len(watchers) > maxWatchersPerSync as pointed out in #17555 (comment) ?

Suggest to discuss & fix it separately. We may want to do minor local code refactor. @chaochn47 are you able to continue to work on this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confirmed that it has no any impact on the test case.

I mean that the even I don't change maxWatchersPerSync in the test case , the test case could also reproduce the issue without the fix and the issue disappeared after applying the patch in this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good


b, _ := betesting.NewDefaultTmpBackend(t)
lg := zaptest.NewLogger(t)
s := newWatchableStore(lg, b, &lease.FakeLessor{}, StoreConfig{})

defer func() {
cleanup(s, b)
chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync
}()

chanBufLen, maxWatchersPerSync = 1, 4
testKey, testValue := []byte("foo"), []byte("bar")

maxRev := 10
compactRev := int64(5)
for i := 0; i < maxRev; i++ {
s.Put(testKey, testValue, lease.NoLease)
}
_, err := s.Compact(traceutil.TODO(), compactRev)
require.NoErrorf(t, err, "failed to compact kv (%v)", err)

w := s.NewWatchStream()
defer w.Close()

watchers := map[WatchID]int64{
0: 1,
1: 1, // create unsyncd watchers with startRev < compactRev
2: 6, // create unsyncd watchers with compactRev < startRev < currentRev
}
for id, startRev := range watchers {
_, err := w.Watch(id, testKey, nil, startRev)
require.NoError(t, err)
}
// fill up w.Chan() with 1 buf via 2 compacted watch response
s.syncWatchers()

for len(watchers) > 0 {
resp := <-w.Chan()
if resp.CompactRevision != 0 {
require.Equal(t, resp.CompactRevision, compactRev)
require.Contains(t, watchers, resp.WatchID)
delete(watchers, resp.WatchID)
continue
}
nextRev := watchers[resp.WatchID]
for _, ev := range resp.Events {
require.Equalf(t, nextRev, ev.Kv.ModRevision, "got event revision %d but want %d for watcher with watch ID %d", ev.Kv.ModRevision, nextRev, resp.WatchID)
nextRev++
}
if nextRev == s.rev()+1 {
delete(watchers, resp.WatchID)
}
}
}

func TestWatchFutureRev(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
Expand Down
58 changes: 58 additions & 0 deletions tests/integration/v3_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,25 @@ package integration
import (
"bytes"
"context"
"errors"
"fmt"
"reflect"
"sort"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.etcd.io/etcd/tests/v3/framework/integration"
gofail "go.etcd.io/gofail/runtime"
)

// TestV3WatchFromCurrentRevision tests Watch APIs from current revision.
Expand Down Expand Up @@ -1512,3 +1517,56 @@ func TestV3WatchProgressWaitsForSyncNoEvents(t *testing.T) {
}
require.True(t, gotProgressNotification, "Expected to get progress notification")
}

// TestV3NoEventsLostOnCompact verifies that slow watchers exit with compacted watch response
// if its next revision of events are compacted and no lost events sent to client.
func TestV3NoEventsLostOnCompact(t *testing.T) {
if integration.ThroughProxy {
t.Skip("grpc proxy currently does not support requesting progress notifications")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think this comment is relevant.

}
integration.BeforeTest(t)
if len(gofail.List()) == 0 {
t.Skip("please run 'make gofail-enable' before running the test")
}
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

client := clus.RandClient()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// sendLoop throughput is rate-limited to 1 event per second
require.NoError(t, gofail.Enable("beforeSendWatchResponse", `sleep("1s")`))
wch := client.Watch(ctx, "foo")

var rev int64
writeCount := mvcc.ChanBufLen() * 11 / 10
for i := 0; i < writeCount; i++ {
resp, err := client.Put(ctx, "foo", "bar")
require.NoError(t, err)
rev = resp.Header.Revision
}
_, err := client.Compact(ctx, rev)
require.NoError(t, err)

time.Sleep(time.Second)
require.NoError(t, gofail.Disable("beforeSendWatchResponse"))

eventCount := 0
compacted := false
for resp := range wch {
err = resp.Err()
if err != nil {
if !errors.Is(err, rpctypes.ErrCompacted) {
t.Fatalf("want watch response err %v but got %v", rpctypes.ErrCompacted, err)
}
compacted = true
break
}
eventCount += len(resp.Events)
if eventCount == writeCount {
break
}
}
assert.Truef(t, compacted, "Expected stream to get compacted, instead we got %d events out of %d events", eventCount, writeCount)
}
4 changes: 2 additions & 2 deletions tests/robustness/makefile.mk
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ GOFAIL_VERSION = $(shell cd tools/mod && go list -m -f {{.Version}} go.etcd.io/g

.PHONY: gofail-enable
gofail-enable: install-gofail
gofail enable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/
gofail enable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
chaochn47 marked this conversation as resolved.
Show resolved Hide resolved
cd ./server && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./etcdutl && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./etcdctl && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./tests && go get go.etcd.io/gofail@${GOFAIL_VERSION}

.PHONY: gofail-disable
gofail-disable: install-gofail
gofail disable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/
gofail disable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
cd ./server && go mod tidy
cd ./etcdutl && go mod tidy
cd ./etcdctl && go mod tidy
Expand Down
Loading