From 19f938aecacab4bdbd10f20b76dda58e33d540f5 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Wed, 6 Mar 2024 11:31:19 -0800 Subject: [PATCH] tests --- xds/internal/xdsclient/transport/transport.go | 2 +- .../transport/transport_resource_test.go | 191 ++++++++++++++++++ 2 files changed, 192 insertions(+), 1 deletion(-) diff --git a/xds/internal/xdsclient/transport/transport.go b/xds/internal/xdsclient/transport/transport.go index 47bc5f280f8a..64e23ecd9648 100644 --- a/xds/internal/xdsclient/transport/transport.go +++ b/xds/internal/xdsclient/transport/transport.go @@ -363,7 +363,7 @@ func (t *Transport) send(ctx context.Context) { // The xDS protocol only requires that we send the node proto in the first // discovery request on every stream. Sending the node proto in every // request message wastes CPU resources on the client and the server. - sentNodeProto := true + sentNodeProto := false for { select { case <-ctx.Done(): diff --git a/xds/internal/xdsclient/transport/transport_resource_test.go b/xds/internal/xdsclient/transport/transport_resource_test.go index 43ec82ae74ed..35b5219ae6ca 100644 --- a/xds/internal/xdsclient/transport/transport_resource_test.go +++ b/xds/internal/xdsclient/transport/transport_resource_test.go @@ -21,6 +21,7 @@ package transport_test import ( "context" + "errors" "testing" "time" @@ -217,3 +218,193 @@ func (s) TestHandleResponseFromManagementServer(t *testing.T) { }) } } + +func (s) TestEmptyListenerResourceOnStreamRestart(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + mgmtServer, cleanup := startFakeManagementServer(t) + defer cleanup() + t.Logf("Started xDS management server on %s", mgmtServer.Address) + nodeProto := &v3corepb.Node{Id: uuid.New().String()} + tr, err := transport.New(transport.Options{ + ServerCfg: *xdstestutils.ServerConfigForAddress(t, mgmtServer.Address), + OnRecvHandler: func(update transport.ResourceUpdate) error { + return nil + }, + OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling. + OnErrorHandler: func(error) {}, // No stream error handling. + Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff. + NodeProto: nodeProto, + }) + if err != nil { + t.Fatalf("Failed to create xDS transport: %v", err) + } + defer tr.Close() + + // Send a request for a listener resource. + const resource = "some-resource" + tr.SendRequest(version.V3ListenerURL, []string{resource}) + + // Ensure the proper request was sent. + val, err := mgmtServer.XDSRequestChan.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for mgmt server response: %v", err) + } + wantReq := &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ + Node: nodeProto, + ResourceNames: []string{resource}, + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + }} + gotReq := val.(*fakeserver.Request) + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) + } + + // Remove the subscription by requesting an empty list. + tr.SendRequest(version.V3ListenerURL, []string{}) + + // Ensure the proper request was sent. + val, err = mgmtServer.XDSRequestChan.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for mgmt server response: %v", err) + } + wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ + ResourceNames: []string{}, + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + }} + gotReq = val.(*fakeserver.Request) + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) + } + + // Cause the stream to restart. + mgmtServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("go away")} + + // Ensure no request is sent since there are no resources. + ctxShort, cancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer cancel() + if got, err := mgmtServer.XDSRequestChan.Receive(ctxShort); !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("mgmt server received request: %v; wanted DeadlineExceeded error", got) + } + + tr.SendRequest(version.V3ListenerURL, []string{resource}) + + // Ensure the proper request was sent with the node proto. + val, err = mgmtServer.XDSRequestChan.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for mgmt server response: %v", err) + } + wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ + Node: nodeProto, + ResourceNames: []string{resource}, + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + }} + gotReq = val.(*fakeserver.Request) + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) + } + +} + +func (s) TestEmptyClusterResourceOnStreamRestartWithListener(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + mgmtServer, cleanup := startFakeManagementServer(t) + defer cleanup() + t.Logf("Started xDS management server on %s", mgmtServer.Address) + nodeProto := &v3corepb.Node{Id: uuid.New().String()} + tr, err := transport.New(transport.Options{ + ServerCfg: *xdstestutils.ServerConfigForAddress(t, mgmtServer.Address), + OnRecvHandler: func(update transport.ResourceUpdate) error { + return nil + }, + OnSendHandler: func(*transport.ResourceSendInfo) {}, // No onSend handling. + OnErrorHandler: func(error) {}, // No stream error handling. + Backoff: func(int) time.Duration { return time.Duration(0) }, // No backoff. + NodeProto: nodeProto, + }) + if err != nil { + t.Fatalf("Failed to create xDS transport: %v", err) + } + defer tr.Close() + + // Send a request for a listener resource. + const resource = "some-resource" + tr.SendRequest(version.V3ListenerURL, []string{resource}) + + // Ensure the proper request was sent. + val, err := mgmtServer.XDSRequestChan.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for mgmt server response: %v", err) + } + wantReq := &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ + Node: nodeProto, + ResourceNames: []string{resource}, + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + }} + gotReq := val.(*fakeserver.Request) + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) + } + + // Send a request for a cluster resource. + tr.SendRequest(version.V3ClusterURL, []string{resource}) + + // Ensure the proper request was sent. + val, err = mgmtServer.XDSRequestChan.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for mgmt server response: %v", err) + } + wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ + ResourceNames: []string{resource}, + TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", + }} + gotReq = val.(*fakeserver.Request) + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) + } + + // Remove the cluster subscription by requesting an empty list. + tr.SendRequest(version.V3ClusterURL, []string{}) + + // Ensure the proper request was sent. + val, err = mgmtServer.XDSRequestChan.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for mgmt server response: %v", err) + } + wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ + ResourceNames: []string{}, + TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", + }} + gotReq = val.(*fakeserver.Request) + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) + } + + // Cause the stream to restart. + mgmtServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("go away")} + + // Ensure the proper LDS request was sent. + val, err = mgmtServer.XDSRequestChan.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for mgmt server response: %v", err) + } + wantReq = &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ + Node: nodeProto, + ResourceNames: []string{resource}, + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + }} + gotReq = val.(*fakeserver.Request) + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) + } + + // Ensure no cluster request is sent since there are no cluster resources. + ctxShort, cancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer cancel() + if got, err := mgmtServer.XDSRequestChan.Receive(ctxShort); !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("mgmt server received request: %v; wanted DeadlineExceeded error", got) + } +}