From 5e11e0b6dcd783ba4e6ef64db5062c1f6a521174 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 2 Feb 2023 11:41:20 +0100 Subject: [PATCH 1/3] tests: Reproduce #15220 and validate fix Signed-off-by: Marek Siarkowicz --- tests/linearizability/linearizability_test.go | 1 - tests/linearizability/watch.go | 43 +++++++++++++++++-- 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/tests/linearizability/linearizability_test.go b/tests/linearizability/linearizability_test.go index 7844461d69e..71ce771722f 100644 --- a/tests/linearizability/linearizability_test.go +++ b/tests/linearizability/linearizability_test.go @@ -55,7 +55,6 @@ var ( largePutSize: 32769, writes: []requestChance{ {operation: Put, chance: 50}, - {operation: LargePut, chance: 5}, {operation: Delete, chance: 10}, {operation: PutWithLease, chance: 10}, {operation: LeaseRevoke, chance: 10}, diff --git a/tests/linearizability/watch.go b/tests/linearizability/watch.go index ff1e3f101db..5deb1acfd92 100644 --- a/tests/linearizability/watch.go +++ b/tests/linearizability/watch.go @@ -16,6 +16,9 @@ package linearizability import ( "context" + "encoding/json" + "errors" + "fmt" "sync" "testing" "time" @@ -58,15 +61,33 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, lg *zap.Logger } func collectMemberWatchEvents(ctx context.Context, lg *zap.Logger, c *clientv3.Client) []watchEvent { + gotProcessNotify := false events := []watchEvent{} - var lastRevision int64 = 1 + operations := map[model.EtcdOperation]clientv3.WatchResponse{} + var lastRevision int64 = 0 for { select { case <-ctx.Done(): + if !gotProcessNotify { + panic("Expected at least one notify") + } return events default: } - for resp := range c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(lastRevision)) { + for resp := range c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1)) { + if resp.Header.Revision < lastRevision { + panic("Revision should never decrease") + } + if resp.Header.Revision == lastRevision && len(resp.Events) != 0 { + panic("Got two non-empty responses about same revision") + } + if resp.IsProgressNotify() { + gotProcessNotify = true + } + err := c.RequestProgress(ctx) + if err != nil && !errors.Is(err, context.Canceled) { + panic(err) + } lastRevision = resp.Header.Revision time := time.Now() for _, event := range resp.Events { @@ -77,7 +98,7 @@ func collectMemberWatchEvents(ctx context.Context, lg *zap.Logger, c *clientv3.C case mvccpb.DELETE: op = model.Delete } - events = append(events, watchEvent{ + event := watchEvent{ Time: time, Revision: event.Kv.ModRevision, Op: model.EtcdOperation{ @@ -85,7 +106,21 @@ func collectMemberWatchEvents(ctx context.Context, lg *zap.Logger, c *clientv3.C Key: string(event.Kv.Key), Value: model.ToValueOrHash(string(event.Kv.Value)), }, - }) + } + if prev, found := operations[event.Op]; found { + currentResponse, err := json.MarshalIndent(resp, "", " ") + if err != nil { + panic(fmt.Sprintf("Failed to marshal response, err: %v", err)) + } + previousResponse, err := json.MarshalIndent(prev, "", " ") + if err != nil { + panic(fmt.Sprintf("Failed to marshal response, err: %v", err)) + } + + panic(fmt.Sprintf("duplicate operation in two responses on revision %d\n---\nfirst:\n%s\n---\nsecond:\n%s", event.Revision, previousResponse, currentResponse)) + } + operations[event.Op] = resp + events = append(events, event) } if resp.Err() != nil { lg.Info("Watch error", zap.Error(resp.Err())) From 4b4b5dd08e96b31347e3470865bd121b5baa0ac9 Mon Sep 17 00:00:00 2001 From: Peter Wortmann Date: Tue, 31 Jan 2023 23:26:23 +0000 Subject: [PATCH 2/3] v3rpc, mvcc: Guarantee order of requested progress notifications Progress notifications requested using ProgressRequest were sent directly using the ctrlStream, which means that they could race against watch responses in the watchStream. This would especially happen when the stream was not synced - e.g. if you requested a progress notification on a freshly created unsynced watcher, the notification would typically arrive indicating a revision for which not all watch responses had been sent. This changes the behaviour so that v3rpc always goes through the watch stream, using a new RequestProgressAll function that closely matches the behaviour of the v3rpc code - i.e. 1. Generate a message with WatchId -1, indicating the revision for *all* watchers in the stream 2. Guarantee that a response is (eventually) sent The latter might require us to defer the response until all watchers are synced, which is likely as it should be. Note that we do *not* guarantee that the number of progress notifications matches the number of requests, only that eventually at least one gets sent. Signed-off-by: Peter Wortmann --- server/etcdserver/api/v3rpc/watch.go | 22 +++++++------ server/storage/mvcc/watchable_store.go | 45 +++++++++++++++++++++++--- server/storage/mvcc/watcher.go | 12 +++++++ 3 files changed, 65 insertions(+), 14 deletions(-) diff --git a/server/etcdserver/api/v3rpc/watch.go b/server/etcdserver/api/v3rpc/watch.go index 5153007258d..679586e9b54 100644 --- a/server/etcdserver/api/v3rpc/watch.go +++ b/server/etcdserver/api/v3rpc/watch.go @@ -360,10 +360,9 @@ func (sws *serverWatchStream) recvLoop() error { } case *pb.WatchRequest_ProgressRequest: if uv.ProgressRequest != nil { - sws.ctrlStream <- &pb.WatchResponse{ - Header: sws.newResponseHeader(sws.watchStream.Rev()), - WatchId: clientv3.InvalidWatchID, // response is not associated with any WatchId and will be broadcast to all watch channels - } + // Request progress for all watchers, + // force generation of a response + sws.watchStream.RequestProgressAll(true) } default: // we probably should not shutdown the entire stream when @@ -408,6 +407,7 @@ func (sws *serverWatchStream) sendLoop() { // either return []*mvccpb.Event from the mvcc package // or define protocol buffer with []mvccpb.Event. evs := wresp.Events + progressNotify := len(evs) == 0 events := make([]*mvccpb.Event, len(evs)) sws.mu.RLock() needPrevKV := sws.prevKV[wresp.WatchID] @@ -432,11 +432,15 @@ func (sws *serverWatchStream) sendLoop() { Canceled: canceled, } - if _, okID := ids[wresp.WatchID]; !okID { - // buffer if id not yet announced - wrs := append(pending[wresp.WatchID], wr) - pending[wresp.WatchID] = wrs - continue + // Progress notifications can have WatchID -1 + // if they announce on behalf of multiple watchers + if !progressNotify || wresp.WatchID != clientv3.InvalidWatchID { + if _, okID := ids[wresp.WatchID]; !okID { + // buffer if id not yet announced + wrs := append(pending[wresp.WatchID], wr) + pending[wresp.WatchID] = wrs + continue + } } mvcc.ReportEventReceived(len(evs)) diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index adf07f7755b..89f2ba8dd49 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -41,6 +41,7 @@ var ( type watchable interface { watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) progress(w *watcher) + progress_all(force bool) rev() int64 } @@ -62,6 +63,9 @@ type watchableStore struct { // The key of the map is the key that the watcher watches on. synced watcherGroup + // Whether to generate a progress notification once all watchers are synchronised + progressOnSync bool + stopc chan struct{} wg sync.WaitGroup } @@ -79,11 +83,12 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg S lg = zap.NewNop() } s := &watchableStore{ - store: NewStore(lg, b, le, cfg), - victimc: make(chan struct{}, 1), - unsynced: newWatcherGroup(), - synced: newWatcherGroup(), - stopc: make(chan struct{}), + store: NewStore(lg, b, le, cfg), + victimc: make(chan struct{}, 1), + unsynced: newWatcherGroup(), + synced: newWatcherGroup(), + stopc: make(chan struct{}), + progressOnSync: false, } s.store.ReadView = &readView{s} s.store.WriteView = &writeView{s} @@ -407,6 +412,15 @@ func (s *watchableStore) syncWatchers() int { } slowWatcherGauge.Set(float64(s.unsynced.size() + vsz)) + // Deferred progress notification left to send when synced? + if s.progressOnSync && s.unsynced.size() == 0 { + for w, _ := range s.synced.watchers { + w.send(WatchResponse{WatchID: -1, Revision: s.rev()}) + break + } + s.progressOnSync = false + } + return s.unsynced.size() } @@ -482,6 +496,27 @@ func (s *watchableStore) progress(w *watcher) { } } +func (s *watchableStore) progress_all(force bool) { + s.mu.RLock() + defer s.mu.RUnlock() + + // Any watcher unsynced? + if s.unsynced.size() > 0 { + // If forced: Defer progress until successfully synced + if force { + s.progressOnSync = true + } + + } else { + // If all watchers are synchronised, send out progress + // watch response on first watcher (if any) + for w, _ := range s.synced.watchers { + w.send(WatchResponse{WatchID: -1, Revision: s.rev()}) + break + } + } +} + type watcher struct { // the watcher key key []byte diff --git a/server/storage/mvcc/watcher.go b/server/storage/mvcc/watcher.go index 7d2490b1d6e..2505c64b15f 100644 --- a/server/storage/mvcc/watcher.go +++ b/server/storage/mvcc/watcher.go @@ -58,6 +58,14 @@ type WatchStream interface { // of the watchers since the watcher is currently synced. RequestProgress(id WatchID) + // RequestProgressAll requests a progress notification for the + // entire watcher group. The response will only be sent if + // all watchers are synced - or once they become synced, if + // forced. The responses will be sent through the + // WatchRespone Chan of the first watcher of this stream, if + // any. + RequestProgressAll(force bool) + // Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be // returned. Cancel(id WatchID) error @@ -188,3 +196,7 @@ func (ws *watchStream) RequestProgress(id WatchID) { } ws.watchable.progress(w) } + +func (ws *watchStream) RequestProgressAll(force bool) { + ws.watchable.progress_all(force) +} From c2eacb809a1133460a1aa77cf2763472b9e1c27c Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Wed, 8 Feb 2023 18:08:31 +0800 Subject: [PATCH 3/3] update test case to reproduce the duplicated event issue Signed-off-by: Benjamin Wang --- tests/framework/e2e/cluster.go | 1 + tests/linearizability/linearizability_test.go | 58 ++----------------- tests/linearizability/traffic.go | 6 ++ tests/linearizability/watch.go | 13 +++-- 4 files changed, 18 insertions(+), 60 deletions(-) diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index c383ca0120d..25c787aeab5 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -505,6 +505,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in "--initial-cluster-token", cfg.InitialToken, "--data-dir", dataDirPath, "--snapshot-count", fmt.Sprintf("%d", cfg.SnapshotCount), + "--experimental-watch-progress-notify-interval", "100ms", } if cfg.ForceNewCluster { diff --git a/tests/linearizability/linearizability_test.go b/tests/linearizability/linearizability_test.go index 71ce771722f..5afa9e4b432 100644 --- a/tests/linearizability/linearizability_test.go +++ b/tests/linearizability/linearizability_test.go @@ -79,7 +79,7 @@ var ( } defaultTraffic = LowTraffic trafficList = []trafficConfig{ - LowTraffic, HighTraffic, + LowTraffic, } ) @@ -95,7 +95,7 @@ func TestLinearizability(t *testing.T) { for _, traffic := range trafficList { scenarios = append(scenarios, scenario{ name: "ClusterOfSize1/" + traffic.name, - failpoint: RandomFailpoint, + failpoint: KillFailpoint, traffic: &traffic, config: *e2e.NewConfig( e2e.WithClusterSize(1), @@ -104,57 +104,7 @@ func TestLinearizability(t *testing.T) { e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints ), }) - scenarios = append(scenarios, scenario{ - name: "ClusterOfSize3/" + traffic.name, - failpoint: RandomFailpoint, - traffic: &traffic, - config: *e2e.NewConfig( - e2e.WithSnapshotCount(100), - e2e.WithPeerProxy(true), - e2e.WithGoFailEnabled(true), - e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints - ), - }) } - scenarios = append(scenarios, []scenario{ - { - name: "Issue14370", - failpoint: RaftBeforeSavePanic, - config: *e2e.NewConfig( - e2e.WithClusterSize(1), - e2e.WithGoFailEnabled(true), - ), - }, - { - name: "Issue14685", - failpoint: DefragBeforeCopyPanic, - config: *e2e.NewConfig( - e2e.WithClusterSize(1), - e2e.WithGoFailEnabled(true), - ), - }, - { - name: "Issue13766", - failpoint: KillFailpoint, - traffic: &HighTraffic, - config: *e2e.NewConfig( - e2e.WithSnapshotCount(100), - ), - }, - // TODO: investigate periodic `Model is not linearizable` failures - // see https://github.com/etcd-io/etcd/pull/15104#issuecomment-1416371288 - /*{ - name: "Snapshot", - failpoint: RandomSnapshotFailpoint, - traffic: &HighTraffic, - config: *e2e.NewConfig( - e2e.WithGoFailEnabled(true), - e2e.WithSnapshotCount(100), - e2e.WithSnapshotCatchUpEntries(100), - e2e.WithPeerProxy(true), - ), - },*/ - }...) for _, scenario := range scenarios { if scenario.traffic == nil { scenario.traffic = &defaultTraffic @@ -171,7 +121,7 @@ func TestLinearizability(t *testing.T) { defer clus.Close() operations, events := testLinearizability(ctx, t, lg, clus, FailpointConfig{ failpoint: scenario.failpoint, - count: 1, + count: 6, retries: 3, waitBetweenTriggers: waitBetweenFailpointTriggers, }, *scenario.traffic) @@ -197,7 +147,7 @@ func testLinearizability(ctx context.Context, t *testing.T, lg *zap.Logger, clus watchCtx, watchCancel := context.WithCancel(ctx) g.Go(func() error { operations = simulateTraffic(trafficCtx, t, lg, clus, traffic) - time.Sleep(time.Second) + time.Sleep(10 * time.Second) watchCancel() return nil }) diff --git a/tests/linearizability/traffic.go b/tests/linearizability/traffic.go index ed41305c84d..fe7f7b3604a 100644 --- a/tests/linearizability/traffic.go +++ b/tests/linearizability/traffic.go @@ -63,7 +63,13 @@ type requestChance struct { func (t traffic) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage) { + cnt := 0 for { + time.Sleep(100 * time.Millisecond) + cnt++ + if cnt > 6 { + return + } select { case <-ctx.Done(): return diff --git a/tests/linearizability/watch.go b/tests/linearizability/watch.go index 5deb1acfd92..15772d0192b 100644 --- a/tests/linearizability/watch.go +++ b/tests/linearizability/watch.go @@ -17,7 +17,6 @@ package linearizability import ( "context" "encoding/json" - "errors" "fmt" "sync" "testing" @@ -74,20 +73,20 @@ func collectMemberWatchEvents(ctx context.Context, lg *zap.Logger, c *clientv3.C return events default: } - for resp := range c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1)) { + for resp := range c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1), clientv3.WithProgressNotify()) { if resp.Header.Revision < lastRevision { panic("Revision should never decrease") } if resp.Header.Revision == lastRevision && len(resp.Events) != 0 { - panic("Got two non-empty responses about same revision") + //panic("Got two non-empty responses about same revision") } if resp.IsProgressNotify() { gotProcessNotify = true } - err := c.RequestProgress(ctx) + /*err := c.RequestProgress(ctx) if err != nil && !errors.Is(err, context.Canceled) { panic(err) - } + }*/ lastRevision = resp.Header.Revision time := time.Now() for _, event := range resp.Events { @@ -117,7 +116,9 @@ func collectMemberWatchEvents(ctx context.Context, lg *zap.Logger, c *clientv3.C panic(fmt.Sprintf("Failed to marshal response, err: %v", err)) } - panic(fmt.Sprintf("duplicate operation in two responses on revision %d\n---\nfirst:\n%s\n---\nsecond:\n%s", event.Revision, previousResponse, currentResponse)) + if resp.Events[0].Type != mvccpb.DELETE && prev.Events[0].Type != mvccpb.DELETE { + panic(fmt.Sprintf("duplicate operation in two responses on revision %d\n---\nfirst:\n%s\n---\nsecond:\n%s", event.Revision, previousResponse, currentResponse)) + } } operations[event.Op] = resp events = append(events, event)