Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Mar 6, 2024
1 parent 5d89baa commit 19f938a
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 1 deletion.
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (t *Transport) send(ctx context.Context) {
// The xDS protocol only requires that we send the node proto in the first
// discovery request on every stream. Sending the node proto in every
// request message wastes CPU resources on the client and the server.
sentNodeProto := true
sentNodeProto := false
for {
select {
case <-ctx.Done():
Expand Down
191 changes: 191 additions & 0 deletions xds/internal/xdsclient/transport/transport_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package transport_test

import (
"context"
"errors"
"testing"
"time"

Expand Down Expand Up @@ -217,3 +218,193 @@ func (s) TestHandleResponseFromManagementServer(t *testing.T) {
})
}
}

func (s) TestEmptyListenerResourceOnStreamRestart(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

mgmtServer, cleanup := startFakeManagementServer(t)
defer cleanup()
t.Logf("Started xDS management server on %s", mgmtServer.Address)
nodeProto := &v3corepb.Node{Id: uuid.New().String()}
tr, err := transport.New(transport.Options{
ServerCfg: *xdstestutils.ServerConfigForAddress(t, mgmtServer.Address),
OnRecvHandler: func(update transport.ResourceUpdate) error {
return nil
},
OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling.
OnErrorHandler: func(error) {}, // No stream error handling.
Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff.
NodeProto: nodeProto,
})
if err != nil {
t.Fatalf("Failed to create xDS transport: %v", err)
}
defer tr.Close()

// Send a request for a listener resource.
const resource = "some-resource"
tr.SendRequest(version.V3ListenerURL, []string{resource})

// Ensure the proper request was sent.
val, err := mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq := &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
Node: nodeProto,
ResourceNames: []string{resource},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
}}
gotReq := val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}

// Remove the subscription by requesting an empty list.
tr.SendRequest(version.V3ListenerURL, []string{})

// Ensure the proper request was sent.
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
ResourceNames: []string{},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
}}
gotReq = val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}

// Cause the stream to restart.
mgmtServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("go away")}

// Ensure no request is sent since there are no resources.
ctxShort, cancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer cancel()
if got, err := mgmtServer.XDSRequestChan.Receive(ctxShort); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("mgmt server received request: %v; wanted DeadlineExceeded error", got)
}

tr.SendRequest(version.V3ListenerURL, []string{resource})

// Ensure the proper request was sent with the node proto.
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
Node: nodeProto,
ResourceNames: []string{resource},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
}}
gotReq = val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}

}

func (s) TestEmptyClusterResourceOnStreamRestartWithListener(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

mgmtServer, cleanup := startFakeManagementServer(t)
defer cleanup()
t.Logf("Started xDS management server on %s", mgmtServer.Address)
nodeProto := &v3corepb.Node{Id: uuid.New().String()}
tr, err := transport.New(transport.Options{
ServerCfg: *xdstestutils.ServerConfigForAddress(t, mgmtServer.Address),
OnRecvHandler: func(update transport.ResourceUpdate) error {
return nil
},
OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling.
OnErrorHandler: func(error) {}, // No stream error handling.
Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff.
NodeProto: nodeProto,
})
if err != nil {
t.Fatalf("Failed to create xDS transport: %v", err)
}
defer tr.Close()

// Send a request for a listener resource.
const resource = "some-resource"
tr.SendRequest(version.V3ListenerURL, []string{resource})

// Ensure the proper request was sent.
val, err := mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq := &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
Node: nodeProto,
ResourceNames: []string{resource},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
}}
gotReq := val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}

// Send a request for a cluster resource.
tr.SendRequest(version.V3ClusterURL, []string{resource})

// Ensure the proper request was sent.
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
ResourceNames: []string{resource},
TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster",
}}
gotReq = val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}

// Remove the cluster subscription by requesting an empty list.
tr.SendRequest(version.V3ClusterURL, []string{})

// Ensure the proper request was sent.
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
ResourceNames: []string{},
TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster",
}}
gotReq = val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}

// Cause the stream to restart.
mgmtServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("go away")}

// Ensure the proper LDS request was sent.
val, err = mgmtServer.XDSRequestChan.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for mgmt server response: %v", err)
}
wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{
Node: nodeProto,
ResourceNames: []string{resource},
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
}}
gotReq = val.(*fakeserver.Request)
if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" {
t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq)
}

// Ensure no cluster request is sent since there are no cluster resources.
ctxShort, cancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer cancel()
if got, err := mgmtServer.XDSRequestChan.Receive(ctxShort); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("mgmt server received request: %v; wanted DeadlineExceeded error", got)
}
}

0 comments on commit 19f938a

Please sign in to comment.