Skip to content

Commit

Permalink
Merge pull request #17557 from serathius/progressrequest-new-watch
Browse files Browse the repository at this point in the history
Fix progress notification for watch that doesn't get any events
  • Loading branch information
serathius authored Mar 11, 2024
2 parents 3f79943 + 6103504 commit ddf5471
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 23 deletions.
20 changes: 1 addition & 19 deletions server/etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,6 @@ type serverWatchStream struct {
// records fragmented watch IDs
fragment map[mvcc.WatchID]bool

// indicates whether we have an outstanding global progress
// notification to send
deferredProgress bool

// closec indicates the stream is closed.
closec chan struct{}

Expand Down Expand Up @@ -178,8 +174,6 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
prevKV: make(map[mvcc.WatchID]bool),
fragment: make(map[mvcc.WatchID]bool),

deferredProgress: false,

closec: make(chan struct{}),
}

Expand Down Expand Up @@ -367,14 +361,7 @@ func (sws *serverWatchStream) recvLoop() error {
case *pb.WatchRequest_ProgressRequest:
if uv.ProgressRequest != nil {
sws.mu.Lock()
// Ignore if deferred progress notification is already in progress
if !sws.deferredProgress {
// Request progress for all watchers,
// force generation of a response
if !sws.watchStream.RequestProgressAll() {
sws.deferredProgress = true
}
}
sws.watchStream.RequestProgressAll()
sws.mu.Unlock()
}
default:
Expand Down Expand Up @@ -483,11 +470,6 @@ func (sws *serverWatchStream) sendLoop() {
// elide next progress update if sent a key update
sws.progress[wresp.WatchID] = false
}
if sws.deferredProgress {
if sws.watchStream.RequestProgressAll() {
sws.deferredProgress = false
}
}
sws.mu.Unlock()

case c, ok := <-sws.ctrlStream:
Expand Down
53 changes: 49 additions & 4 deletions tests/integration/v3_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1433,8 +1433,8 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) {
wch := client.Watch(ctx, "foo", clientv3.WithRev(1))

// Immediately request a progress notification. As the client
// is unsynchronised, the server will have to defer the
// notification internally.
// is unsynchronised, the server will not sent any notification,
//as client can infer progress from events.
err := client.RequestProgress(ctx)
require.NoError(t, err)

Expand All @@ -1454,8 +1454,9 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) {
}
event_count += len(wr.Events)
}

// ... followed by the requested progress notification
// client needs to request progress notification again
err = client.RequestProgress(ctx)
require.NoError(t, err)
wr2 := <-wch
if wr2.Err() != nil {
t.Fatal(fmt.Errorf("watch error: %w", wr2.Err()))
Expand All @@ -1467,3 +1468,47 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) {
t.Fatal("Wrong revision in progress notification!")
}
}

func TestV3WatchProgressWaitsForSyncNoEvents(t *testing.T) {
if integration.ThroughProxy {
t.Skip("grpc proxy currently does not support requesting progress notifications")
}
integration.BeforeTest(t)

clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

client := clus.RandClient()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

resp, err := client.Put(ctx, "bar", "1")
require.NoError(t, err)

wch := client.Watch(ctx, "foo", clientv3.WithRev(resp.Header.Revision))
// Request the progress notification on newly created watch that was not yet synced.
err = client.RequestProgress(ctx)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

require.NoError(t, err)
gotProgressNotification := false
for {
select {
case <-ticker.C:
err := client.RequestProgress(ctx)
require.NoError(t, err)
case resp := <-wch:
if resp.Err() != nil {
t.Fatal(fmt.Errorf("watch error: %w", resp.Err()))
}
if resp.IsProgressNotify() {
gotProgressNotification = true
}
}
if gotProgressNotification {
break
}
}
require.True(t, gotProgressNotification, "Expected to get progress notification")
}

0 comments on commit ddf5471

Please sign in to comment.