Skip to content

Commit

Permalink
Merge pull request #15959 from serathius/robustness-watch-client
Browse files Browse the repository at this point in the history
tests/robustness: Use traffic.RecordingClient in watch
  • Loading branch information
serathius authored May 26, 2023
2 parents bf903e5 + 16bf0f6 commit 7cc98e6
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 31 deletions.
15 changes: 13 additions & 2 deletions tests/robustness/traffic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,27 +216,38 @@ func (c *RecordingClient) Defragment(ctx context.Context) error {
return err
}

func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool) clientv3.WatchChan {
func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool) clientv3.WatchChan {
ops := []clientv3.OpOption{clientv3.WithProgressNotify()}
if withPrefix {
ops = append(ops, clientv3.WithPrefix())
}
if rev != 0 {
ops = append(ops, clientv3.WithRev(rev))
}
if withProgressNotify {
ops = append(ops, clientv3.WithProgressNotify())
}
respCh := make(chan clientv3.WatchResponse)
go func() {
defer close(respCh)
for r := range c.client.Watch(ctx, key, ops...) {
c.watchMux.Lock()
c.watchResponses = append(c.watchResponses, ToWatchResponse(r, c.baseTime))
c.watchMux.Unlock()
respCh <- r
select {
case respCh <- r:
case <-ctx.Done():
return
}
}
}()
return respCh
}

func (c *RecordingClient) RequestProgress(ctx context.Context) error {
return c.client.RequestProgress(ctx)
}

func ToWatchResponse(r clientv3.WatchResponse, baseTime time.Time) WatchResponse {
// using time.Since time-measuring operation to get monotonic clock reading
// see https://github.com/golang/go/blob/master/src/time/time.go#L17
Expand Down
2 changes: 1 addition & 1 deletion tests/robustness/traffic/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter
s.Reset(resp)
limiter.Wait(ctx)
watchCtx, cancel := context.WithTimeout(ctx, WatchTimeout)
for e := range c.Watch(watchCtx, keyPrefix, resp.Header.Revision+1, true) {
for e := range c.Watch(watchCtx, keyPrefix, resp.Header.Revision+1, true, true) {
s.Update(e)
}
cancel()
Expand Down
41 changes: 13 additions & 28 deletions tests/robustness/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ import (

"github.com/anishathalye/porcupine"
"github.com/google/go-cmp/cmp"
"go.uber.org/zap"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/model"
Expand All @@ -37,27 +35,19 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd
reports := make([]traffic.ClientReport, len(clus.Procs))
memberMaxRevisionChans := make([]chan int64, len(clus.Procs))
for i, member := range clus.Procs {
c, err := clientv3.New(clientv3.Config{
Endpoints: member.EndpointsGRPC(),
Logger: zap.NewNop(),
DialKeepAliveTime: 10 * time.Second,
DialKeepAliveTimeout: 100 * time.Millisecond,
})
c, err := traffic.NewClient(member.EndpointsGRPC(), ids, baseTime)
if err != nil {
t.Fatal(err)
}
memberChan := make(chan int64, 1)
memberMaxRevisionChans[i] = memberChan
memberMaxRevisionChan := make(chan int64, 1)
memberMaxRevisionChans[i] = memberMaxRevisionChan
wg.Add(1)
go func(i int, c *clientv3.Client) {
go func(i int, c *traffic.RecordingClient) {
defer wg.Done()
defer c.Close()
responses := watchMember(ctx, t, c, memberChan, cfg, baseTime)
watchUntilRevision(ctx, t, c, memberMaxRevisionChan, cfg)
mux.Lock()
reports[i] = traffic.ClientReport{
ClientId: ids.NewClientId(),
Watch: responses,
}
reports[i] = c.Report()
mux.Unlock()
}(i, c)
}
Expand All @@ -78,25 +68,23 @@ type watchConfig struct {
expectUniqueRevision bool
}

// watchMember collects all responses until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed.
// TODO: Use traffic.RecordingClient instead of clientv3.Client
func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64, cfg watchConfig, baseTime time.Time) (resps []traffic.WatchResponse) {
// watchUntilRevision watches all changes until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed.
func watchUntilRevision(ctx context.Context, t *testing.T, c *traffic.RecordingClient, maxRevisionChan <-chan int64, cfg watchConfig) {
var maxRevision int64 = 0
var lastRevision int64 = 0
ctx, cancel := context.WithCancel(ctx)
defer cancel()
watch := c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(1), clientv3.WithProgressNotify())
watch := c.Watch(ctx, "", 1, true, true)
for {
select {
case <-ctx.Done():
revision := watchResponsesMaxRevision(resps)
if maxRevision == 0 {
t.Errorf("Client didn't collect all events, max revision not set")
}
if revision < maxRevision {
t.Errorf("Client didn't collect all events, revision got %d, expected: %d", revision, maxRevision)
if lastRevision < maxRevision {
t.Errorf("Client didn't collect all events, revision got %d, expected: %d", lastRevision, maxRevision)
}
return resps
return
case revision, ok := <-maxRevisionChan:
if ok {
maxRevision = revision
Expand All @@ -113,12 +101,9 @@ func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevis
if cfg.requestProgress {
c.RequestProgress(ctx)
}
if resp.Err() == nil {
resps = append(resps, traffic.ToWatchResponse(resp, baseTime))
} else if !resp.Canceled {
if resp.Err() != nil && !resp.Canceled {
t.Errorf("Watch stream received error, err %v", resp.Err())
}
// Assumes that we track all events as we watch all keys.
if len(resp.Events) > 0 {
lastRevision = resp.Events[len(resp.Events)-1].Kv.ModRevision
}
Expand Down

0 comments on commit 7cc98e6

Please sign in to comment.