Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds/bootstrap: add testing support to generate config #7326

Merged
merged 5 commits into from
Jun 21, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
review comments pass #2
easwars committed Jun 20, 2024
commit 3f9f060b1b3dbcfe5f43f612e217535fe5e22dba
17 changes: 9 additions & 8 deletions internal/testutils/xds/e2e/bootstrap.go
Original file line number Diff line number Diff line change
@@ -52,13 +52,13 @@ func DefaultBootstrapContents(t *testing.T, nodeID, serverURI string) []byte {
t.Helper()

// Create a directory to hold certs and key files used on the server side.
serverDir, err := createTmpDirWithFiles("testServerSideXDS*", "x509/server1_cert.pem", "x509/server1_key.pem", "x509/client_ca_cert.pem")
serverDir, err := createTmpDirWithCerts("testServerSideXDS*", "x509/server1_cert.pem", "x509/server1_key.pem", "x509/client_ca_cert.pem")
if err != nil {
t.Fatalf("Failed to create bootstrap configuration: %v", err)
}

// Create a directory to hold certs and key files used on the client side.
clientDir, err := createTmpDirWithFiles("testClientSideXDS*", "x509/client1_cert.pem", "x509/client1_key.pem", "x509/server_ca_cert.pem")
clientDir, err := createTmpDirWithCerts("testClientSideXDS*", "x509/client1_cert.pem", "x509/client1_key.pem", "x509/server_ca_cert.pem")
if err != nil {
t.Fatalf("Failed to create bootstrap configuration: %v", err)
}
@@ -104,14 +104,15 @@ func createTmpFile(src, dst string) error {
return nil
}

// createTmpDirWithFiles creates a temporary directory under the system default
// tempDir with the given dirSuffix. It also reads from certSrc, keySrc and
// rootSrc files are creates appropriate files under the newly create tempDir.
// Returns the name of the created tempDir.
func createTmpDirWithFiles(dirSuffix, certSrc, keySrc, rootSrc string) (string, error) {
// createTmpDirWithCerts creates a temporary directory under the system default
// tempDir with the given dirPattern. It also reads from certSrc, keySrc and
// rootSrc files and creates appropriate files under the newly create tempDir.
// Returns the path of the created tempDir if successful, and an error
// otherwise.
func createTmpDirWithCerts(dirPattern, certSrc, keySrc, rootSrc string) (string, error) {
// Create a temp directory. Passing an empty string for the first argument
// uses the system temp directory.
dir, err := os.MkdirTemp("", dirSuffix)
dir, err := os.MkdirTemp("", dirPattern)
if err != nil {
return "", fmt.Errorf("os.MkdirTemp() failed: %v", err)
}
26 changes: 9 additions & 17 deletions internal/testutils/xds/e2e/logging.go
Original file line number Diff line number Diff line change
@@ -18,31 +18,23 @@

package e2e

import (
"fmt"

"google.golang.org/grpc/grpclog"
)

var logger = grpclog.Component("xds-e2e")

// serverLogger implements the Logger interface defined at
// envoyproxy/go-control-plane/pkg/log. This is passed to the Snapshot cache.
type serverLogger struct{}
type serverLogger struct {
logger interface {
Logf(format string, args ...any)
}
}

func (l serverLogger) Debugf(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
logger.InfoDepth(1, msg)
l.logger.Logf(format, args)
}
func (l serverLogger) Infof(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
logger.InfoDepth(1, msg)
l.logger.Logf(format, args)
}
func (l serverLogger) Warnf(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
logger.WarningDepth(1, msg)
l.logger.Logf(format, args)
}
func (l serverLogger) Errorf(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
logger.ErrorDepth(1, msg)
l.logger.Logf(format, args)
}
22 changes: 15 additions & 7 deletions internal/testutils/xds/e2e/server.go
Original file line number Diff line number Diff line change
@@ -61,6 +61,11 @@ type ManagementServer struct {
gs *grpc.Server // gRPC server which exports the ADS service.
cache v3cache.SnapshotCache // Resource snapshot.
version int // Version of resource snapshot.

// A logging interface, usually supplied from *testing.T.
logger interface {
Logf(format string, args ...any)
}
}

// ManagementServerOptions contains options to be passed to the management
@@ -125,12 +130,14 @@ type ManagementServerOptions struct {
//
// Registers a cleanup function on t to stop the management server.
func StartManagementServer(t *testing.T, opts ManagementServerOptions) *ManagementServer {
t.Helper()

// Create a snapshot cache. The first parameter to NewSnapshotCache()
// controls whether the server should wait for all resources to be
// explicitly named in the request before responding to any of them.
wait := !opts.AllowResourceSubset
cache := v3cache.NewSnapshotCache(wait, v3cache.IDHash{}, serverLogger{})
logger.Infof("Created new snapshot cache...")
cache := v3cache.NewSnapshotCache(wait, v3cache.IDHash{}, serverLogger{t})
t.Logf("Created new snapshot cache...")

lis := opts.Listener
if lis == nil {
@@ -156,7 +163,7 @@ func StartManagementServer(t *testing.T, opts ManagementServerOptions) *Manageme
xs := v3server.NewServer(ctx, cache, callbacks)
gs := grpc.NewServer()
v3discoverygrpc.RegisterAggregatedDiscoveryServiceServer(gs, xs)
logger.Infof("Registered Aggregated Discovery Service (ADS)...")
t.Logf("Registered Aggregated Discovery Service (ADS)...")

mgmtServer := &ManagementServer{
Address: lis.Addr().String(),
@@ -165,17 +172,18 @@ func StartManagementServer(t *testing.T, opts ManagementServerOptions) *Manageme
gs: gs,
xs: xs,
cache: cache,
logger: t,
}
if opts.SupportLoadReportingService {
lrs := fakeserver.NewServer(lis.Addr().String())
v3lrsgrpc.RegisterLoadReportingServiceServer(gs, lrs)
mgmtServer.LRSServer = lrs
logger.Infof("Registered Load Reporting Service (LRS)...")
t.Logf("Registered Load Reporting Service (LRS)...")
}

// Start serving.
go gs.Serve(lis)
logger.Infof("xDS management server serving at: %v...", lis.Addr().String())
t.Logf("xDS management server serving at: %v...", lis.Addr().String())
t.Cleanup(mgmtServer.Stop)
return mgmtServer
}
@@ -218,13 +226,13 @@ func (s *ManagementServer) Update(ctx context.Context, opts UpdateOptions) error
return fmt.Errorf("failed to create new resource snapshot: %v", err)
}
}
logger.Infof("Created new resource snapshot...")
s.logger.Logf("Created new resource snapshot...")

// Update the cache with the new resource snapshot.
if err := s.cache.SetSnapshot(ctx, opts.NodeID, snapshot); err != nil {
return fmt.Errorf("failed to update resource snapshot in management server: %v", err)
}
logger.Infof("Updated snapshot cache with resource snapshot...")
s.logger.Logf("Updated snapshot cache with resource snapshot...")
return nil
}

17 changes: 1 addition & 16 deletions test/xds/xds_security_config_nack_test.go
Original file line number Diff line number Diff line change
@@ -147,22 +147,7 @@ func (s) TestUnmarshalListener_WithUpdateValidatorFunc(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})

// Create bootstrap configuration pointing to the above management
// server with certificate provider configuration.
nodeID := uuid.New().String()
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address)

// Create an xDS resolver with the above bootstrap configuration.
var xdsResolver resolver.Builder
if newResolver := internal.NewXDSResolverWithConfigForTesting; newResolver != nil {
var err error
xdsResolver, err = newResolver.(func([]byte) (resolver.Builder, error))(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
}
}
managementServer, nodeID, bootstrapContents, xdsResolver := setupManagementServerAndResolver(t)

lis, cleanup2 := setupGRPCServer(t, bootstrapContents)
defer cleanup2()
16 changes: 1 addition & 15 deletions test/xds/xds_server_integration_test.go
Original file line number Diff line number Diff line change
@@ -224,21 +224,7 @@ func (s) TestServerSideXDS_FileWatcherCerts(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
managementServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})

// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, managementServer.Address)

// Create an xDS resolver with the above bootstrap configuration.
var xdsResolver resolver.Builder
if newResolver := internal.NewXDSResolverWithConfigForTesting; newResolver != nil {
var err error
xdsResolver, err = newResolver.(func([]byte) (resolver.Builder, error))(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
}
}
managementServer, nodeID, bootstrapContents, xdsResolver := setupManagementServerAndResolver(t)
lis, cleanup2 := setupGRPCServer(t, bootstrapContents)
defer cleanup2()

1 change: 0 additions & 1 deletion xds/internal/httpfilter/fault/fault_test.go
Original file line number Diff line number Diff line change
@@ -123,7 +123,6 @@ func clientSetup(t *testing.T) (*e2e.ManagementServer, string, uint32, func()) {
}()

return managementServer, nodeID, uint32(lis.Addr().(*net.TCPAddr).Port), func() {
managementServer.Stop()
server.Stop()
}
}
5 changes: 0 additions & 5 deletions xds/internal/test/e2e/controlplane.go
Original file line number Diff line number Diff line change
@@ -33,7 +33,6 @@ type controlPlane struct {
func newControlPlane(t *testing.T) (*controlPlane, error) {
// Spin up an xDS management server on a local port.
server := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
t.Cleanup(server.Stop)

nodeID := uuid.New().String()
bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, server.Address)
@@ -44,7 +43,3 @@ func newControlPlane(t *testing.T) (*controlPlane, error) {
bootstrapContent: string(bootstrapContents),
}, nil
}

func (cp *controlPlane) stop() {
cp.server.Stop()
}
1 change: 0 additions & 1 deletion xds/internal/test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
@@ -63,7 +63,6 @@ func setup(t *testing.T, opts testOpts) (*controlPlane, *client, []*server) {
if err != nil {
t.Fatalf("failed to start control-plane: %v", err)
}
t.Cleanup(cp.stop)

var clientLog bytes.Buffer
c, err := newClient(fmt.Sprintf("xds:///%s", opts.testName), *clientPath, cp.bootstrapContent, &clientLog, opts.clientFlags...)
2 changes: 0 additions & 2 deletions xds/internal/xdsclient/authority_test.go
Original file line number Diff line number Diff line change
@@ -91,7 +91,6 @@ func (s) TestTimerAndWatchStateOnSendCallback(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
a, ms, nodeID := setupTest(ctx, t, emptyServerOpts, defaultTestTimeout)
defer ms.Stop()
defer a.close()

rn := "xdsclient-test-lds-resource"
@@ -199,7 +198,6 @@ func (s) TestWatchResourceTimerCanRestartOnIgnoredADSRecvError(t *testing.T) {
}

a, ms, nodeID := setupTest(ctx, t, serverOpt, defaultTestTimeout)
defer ms.Stop()
defer a.close()

nameA := "xdsclient-test-lds-resourceA"
31 changes: 17 additions & 14 deletions xds/internal/xdsclient/tests/cds_watchers_test.go
Original file line number Diff line number Diff line change
@@ -22,7 +22,6 @@ import (
"context"
"encoding/json"
"fmt"
"net/url"
"strings"
"testing"
"time"
@@ -427,18 +426,19 @@ func (s) TestCDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) {
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})

nodeID := uuid.New().String()
authority := makeAuthorityName(t.Name())
bc, err := bootstrap.NewContentsForTesting(bootstrap.ConfigOptionsForTesting{
Servers: []json.RawMessage{[]byte(fmt.Sprintf(`{
"server_uri": %q,
"channel_creds": [{"type": "insecure"}]
}`, mgmtServer.Address))},
NodeID: nodeID,
Authorities: map[string]json.RawMessage{
// Xdstp style resource names used in this test use a url escaped
// Xdstp style resource names used in this test use a slash removed
// version of t.Name as their authority, and the empty config
// results in the top-level xds server configuration being used for
// this authority.
url.PathEscape(t.Name()): []byte(`{}`),
authority: []byte(`{}`),
},
})
if err != nil {
@@ -464,7 +464,7 @@ func (s) TestCDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) {

// Register the third watch for a different cluster resource, and push the
// received updates onto a channel.
cdsNameNewStyle := makeNewStyleCDSName(t.Name())
cdsNameNewStyle := makeNewStyleCDSName(authority)
cw3 := newClusterWatcher()
cdsCancel3 := xdsresource.WatchCluster(client, cdsNameNewStyle, cw3)
defer cdsCancel3()
@@ -705,18 +705,19 @@ func (s) TestCDSWatch_ResourceRemoved(t *testing.T) {
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})

nodeID := uuid.New().String()
authority := makeAuthorityName(t.Name())
bc, err := bootstrap.NewContentsForTesting(bootstrap.ConfigOptionsForTesting{
Servers: []json.RawMessage{[]byte(fmt.Sprintf(`{
"server_uri": %q,
"channel_creds": [{"type": "insecure"}]
}`, mgmtServer.Address))},
NodeID: nodeID,
Authorities: map[string]json.RawMessage{
// Xdstp style resource names used in this test use a url escaped
// Xdstp style resource names used in this test use a slash removed
// version of t.Name as their authority, and the empty config
// results in the top-level xds server configuration being used for
// this authority.
url.PathEscape(t.Name()): []byte(`{}`),
authority: []byte(`{}`),
},
})
if err != nil {
@@ -738,14 +739,14 @@ func (s) TestCDSWatch_ResourceRemoved(t *testing.T) {
cdsCancel1 := xdsresource.WatchCluster(client, resourceName1, cw1)
defer cdsCancel1()

resourceName2 := makeNewStyleCDSName(t.Name())
resourceName2 := makeNewStyleCDSName(authority)
cw2 := newClusterWatcher()
cdsCancel2 := xdsresource.WatchCluster(client, resourceName2, cw2)
defer cdsCancel2()

// Configure the management server to return two cluster resources,
// corresponding to the registered watches.
edsNameNewStyle := makeNewStyleEDSName(t.Name())
edsNameNewStyle := makeNewStyleEDSName(authority)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
@@ -879,18 +880,19 @@ func (s) TestCDSWatch_PartialValid(t *testing.T) {
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})

nodeID := uuid.New().String()
authority := makeAuthorityName(t.Name())
bc, err := bootstrap.NewContentsForTesting(bootstrap.ConfigOptionsForTesting{
Servers: []json.RawMessage{[]byte(fmt.Sprintf(`{
"server_uri": %q,
"channel_creds": [{"type": "insecure"}]
}`, mgmtServer.Address))},
NodeID: nodeID,
Authorities: map[string]json.RawMessage{
// Xdstp style resource names used in this test use a url escaped
// Xdstp style resource names used in this test use a slash removed
// version of t.Name as their authority, and the empty config
// results in the top-level xds server configuration being used for
// this authority.
url.PathEscape(t.Name()): []byte(`{}`),
authority: []byte(`{}`),
},
})
if err != nil {
@@ -912,7 +914,7 @@ func (s) TestCDSWatch_PartialValid(t *testing.T) {
cw1 := newClusterWatcher()
cdsCancel1 := xdsresource.WatchCluster(client, badResourceName, cw1)
defer cdsCancel1()
goodResourceName := makeNewStyleCDSName(t.Name())
goodResourceName := makeNewStyleCDSName(authority)
cw2 := newClusterWatcher()
cdsCancel2 := xdsresource.WatchCluster(client, goodResourceName, cw2)
defer cdsCancel2()
@@ -967,18 +969,19 @@ func (s) TestCDSWatch_PartialResponse(t *testing.T) {
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})

nodeID := uuid.New().String()
authority := makeAuthorityName(t.Name())
bc, err := bootstrap.NewContentsForTesting(bootstrap.ConfigOptionsForTesting{
Servers: []json.RawMessage{[]byte(fmt.Sprintf(`{
"server_uri": %q,
"channel_creds": [{"type": "insecure"}]
}`, mgmtServer.Address))},
NodeID: nodeID,
Authorities: map[string]json.RawMessage{
// Xdstp style resource names used in this test use a url escaped
// Xdstp style resource names used in this test use a slash removed
// version of t.Name as their authority, and the empty config
// results in the top-level xds server configuration being used for
// this authority.
url.PathEscape(t.Name()): []byte(`{}`),
authority: []byte(`{}`),
},
})
if err != nil {
@@ -999,7 +1002,7 @@ func (s) TestCDSWatch_PartialResponse(t *testing.T) {
cw1 := newClusterWatcher()
cdsCancel1 := xdsresource.WatchCluster(client, resourceName1, cw1)
defer cdsCancel1()
resourceName2 := makeNewStyleCDSName(t.Name())
resourceName2 := makeNewStyleCDSName(authority)
cw2 := newClusterWatcher()
cdsCancel2 := xdsresource.WatchCluster(client, resourceName2, cw2)
defer cdsCancel2()
Loading