Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix watch validation assuming that client not requesting older watch revision #16695

Merged
merged 1 commit into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions tests/robustness/validate/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.uber.org/zap/zaptest"

"go.etcd.io/etcd/tests/v3/framework/testutils"
"go.etcd.io/etcd/tests/v3/robustness/model"
"go.etcd.io/etcd/tests/v3/robustness/report"
)

Expand All @@ -47,3 +48,170 @@ func TestValidate(t *testing.T) {
})
}
}

func TestValidateWatch(t *testing.T) {
tcs := []struct {
name string
reports []report.ClientReport
}{
{
name: "earlier event after bookmark in separate request",
reports: []report.ClientReport{
{
Watch: []model.WatchOperation{
{
Request: model.WatchRequest{
Key: "a",
Revision: 100,
},
Responses: []model.WatchResponse{
{
IsProgressNotify: true,
Revision: 100,
},
},
},
{
Request: model.WatchRequest{
Key: "a",
Revision: 99,
},
Responses: []model.WatchResponse{
{
Events: []model.WatchEvent{
{
Event: model.Event{
Type: model.PutOperation,
Key: "a",
Value: model.ValueOrHash{
Value: "99",
},
},
Revision: 99,
},
},
Revision: 100,
},
},
},
},
},
},
},
{
name: "earlier event after in separate request",
reports: []report.ClientReport{
{
Watch: []model.WatchOperation{
{
Request: model.WatchRequest{
Key: "a",
Revision: 100,
},
Responses: []model.WatchResponse{
{
Events: []model.WatchEvent{
{
Event: model.Event{
Type: model.PutOperation,
Key: "a",
Value: model.ValueOrHash{
Value: "100",
},
},
Revision: 100,
},
},
Revision: 100,
},
},
},
{
Request: model.WatchRequest{
Key: "a",
Revision: 99,
},
Responses: []model.WatchResponse{
{
Events: []model.WatchEvent{
{
Event: model.Event{
Type: model.PutOperation,
Key: "a",
Value: model.ValueOrHash{
Value: "99",
},
},
Revision: 99,
},
},
Revision: 100,
},
},
},
},
},
},
},
{
name: "duplicated event between two separate requests",
reports: []report.ClientReport{
{
Watch: []model.WatchOperation{
{
Request: model.WatchRequest{
Key: "a",
Revision: 100,
},
Responses: []model.WatchResponse{
{
Events: []model.WatchEvent{
{
Event: model.Event{
Type: model.PutOperation,
Key: "a",
Value: model.ValueOrHash{
Value: "100",
},
},
Revision: 100,
},
},
Revision: 100,
},
},
},
{
Request: model.WatchRequest{
Key: "a",
Revision: 100,
},
Responses: []model.WatchResponse{
{
Events: []model.WatchEvent{
{
Event: model.Event{
Type: model.PutOperation,
Key: "a",
Value: model.ValueOrHash{
Value: "100",
},
},
Revision: 100,
},
},
Revision: 100,
},
},
},
},
},
},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
validateWatch(t, Config{ExpectRevisionUnique: true}, tc.reports)
})
}
}
9 changes: 4 additions & 5 deletions tests/robustness/validate/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func validateWatch(t *testing.T, cfg Config, reports []report.ClientReport) []mo
}

func validateBookmarkable(t *testing.T, report report.ClientReport) {
var lastProgressNotifyRevision int64
for _, op := range report.Watch {
var lastProgressNotifyRevision int64
for _, resp := range op.Responses {
for _, event := range resp.Events {
if event.Revision <= lastProgressNotifyRevision {
Expand All @@ -59,8 +59,8 @@ func validateBookmarkable(t *testing.T, report report.ClientReport) {
}

func validateOrdered(t *testing.T, report report.ClientReport) {
var lastEventRevision int64 = 1
for _, op := range report.Watch {
var lastEventRevision int64 = 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why change this. Aren't watch responses globally ordered for each client? In your test case TestValidateWatch, there is no clientId; do you intentionally verify that different clients may request an older revision?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, we expect client to get increasing revisions due to either fact that watch doesn't break or that users will usually want to reestablish watches on the following revision. However, from etcd perspective, those are independent watch request each providing its own revision to start watching from. It's not invalid that single client can start watching from rev 200, and after that decide to establish new watch from rev 100.

As in case #16693, for some unknown reason etcd went back from revision 301 to 192 in the KV store. So from clients perspective it behaved correctly, after watch was broken it established the new watch on revision 192, even though it has previously seen revision 301.

Goal of this issue is to remove assumption about sensible client behavior (not going back on watch), and just validate the watch responses. This should increase readability of robustness test reports as client misbehavior caused by etcd linearizability issue will no longer also report invalid watch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

etcd went back from revision 301 to 192 in the KV store. So from clients perspective it behaved correctly, after watch was broken it established the new watch on revision 192, even though it has previously seen revision 301

It's true. But in your test case, there is no watch establishment, so the revision shouldn't go back?

The test change is OK. But I'd suggest you to have a deep dive to figure out why the revision go back. Let me know if you need my assistance or I misunderstood anything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for r := range c.client.Watch(ctx, request.Key, ops...) {

Copy link
Member Author

@serathius serathius Oct 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's true. But in your test case, there is no watch establishment, so the revision shouldn't go back?

There is, but only in Kubernetes traffic. It runs a ListWatch loop with 100 ms timeout.

Look consists of Read and Watch from the Read revision.

g.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-finish:
return nil
default:
}
rev, err := t.Read(ctx, kc, s, limiter, keyPrefix)
if err != nil {
continue
}
t.Watch(ctx, kc, s, limiter, keyPrefix, rev+1)
}
})

And watch breaks every 100ms to simulate client loosing connection

func (t kubernetesTraffic) Watch(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string, revision int64) {
watchCtx, cancel := context.WithTimeout(ctx, WatchTimeout)
defer cancel()
for e := range kc.client.Watch(watchCtx, keyPrefix, revision, true, true) {
s.Update(e)
}
limiter.Wait(ctx)
}

for _, resp := range op.Responses {
for _, event := range resp.Events {
if event.Revision < lastEventRevision {
Expand All @@ -73,9 +73,8 @@ func validateOrdered(t *testing.T, report report.ClientReport) {
}

func validateUnique(t *testing.T, expectUniqueRevision bool, report report.ClientReport) {
uniqueOperations := map[any]struct{}{}

for _, op := range report.Watch {
uniqueOperations := map[any]struct{}{}
for _, resp := range op.Responses {
for _, event := range resp.Events {
var key any
Expand All @@ -97,8 +96,8 @@ func validateUnique(t *testing.T, expectUniqueRevision bool, report report.Clien
}

func validateAtomic(t *testing.T, report report.ClientReport) {
var lastEventRevision int64 = 1
for _, op := range report.Watch {
var lastEventRevision int64 = 1
for _, resp := range op.Responses {
if len(resp.Events) > 0 {
if resp.Events[0].Revision == lastEventRevision {
Expand Down