Skip to content

Commit

Permalink
xdsclient: switch xdsclient watch deadlock test to e2e style
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Oct 12, 2022
1 parent e81d0a2 commit c4e48b1
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 46 deletions.
18 changes: 16 additions & 2 deletions internal/testutils/xds/e2e/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ type ManagementServerOptions struct {
// will be created and used.
Listener net.Listener

// AllowResourceSubSet allows the management server to respond to requests
// before all configured resources are explicitly named in the request. The
// default behavior that we want is for the management server to wait for
// all configured resources to be requested before responding to any of
// them, since this is how we have run our tests historically, and should be
// set to true only for tests which explicitly require the other behavior.
AllowResourceSubset bool

// The callbacks defined below correspond to the state of the world (sotw)
// version of the xDS API on the management server.

Expand Down Expand Up @@ -97,8 +105,14 @@ type ManagementServerOptions struct {
// logic. When the test is done, it should call the Stop() method to cleanup
// resources allocated by the management server.
func StartManagementServer(opts *ManagementServerOptions) (*ManagementServer, error) {
// Create a snapshot cache.
cache := v3cache.NewSnapshotCache(true, v3cache.IDHash{}, serverLogger{})
// Create a snapshot cache. The first parameter controls whether the server
// will wait for all resources to explicitly named in the request before
// responding to any of them.
wait := true
if opts != nil {
wait = !opts.AllowResourceSubset
}
cache := v3cache.NewSnapshotCache(wait, v3cache.IDHash{}, serverLogger{})
logger.Infof("Created new snapshot cache...")

var lis net.Listener
Expand Down
44 changes: 0 additions & 44 deletions xds/internal/xdsclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient/load"
"google.golang.org/grpc/xds/internal/xdsclient/pubsub"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/protobuf/types/known/anypb"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -148,49 +147,6 @@ func (c *testController) Close() {
c.done.Fire()
}

// TestWatchCallAnotherWatch covers the case where watch() is called inline by a
// callback. It makes sure it doesn't cause a deadlock.
func (s) TestWatchCallAnotherWatch(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Start a watch for some resource, so that the controller and update
// handlers are built for this authority. The test needs these to make an
// inline watch in a callback.
client, ctrlCh := testClientSetup(t, false)
newWatch(t, client, xdsresource.ClusterResource, "doesnot-matter")
controller, updateHandler := getControllerAndPubsub(ctx, t, client, ctrlCh, xdsresource.ClusterResource, "doesnot-matter")

clusterUpdateCh := testutils.NewChannel()
firstTime := true
client.WatchCluster(testCDSName, func(update xdsresource.ClusterUpdate, err error) {
clusterUpdateCh.Send(xdsresource.ClusterUpdateErrTuple{Update: update, Err: err})
// Calls another watch inline, to ensure there's deadlock.
client.WatchCluster("another-random-name", func(xdsresource.ClusterUpdate, error) {})

if _, err := controller.addWatches[xdsresource.ClusterResource].Receive(ctx); firstTime && err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
firstTime = false
})
if _, err := controller.addWatches[xdsresource.ClusterResource].Receive(ctx); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}

wantUpdate := xdsresource.ClusterUpdate{ClusterName: testEDSName}
updateHandler.NewClusters(map[string]xdsresource.ClusterUpdateErrTuple{testCDSName: {Update: wantUpdate}}, xdsresource.UpdateMetadata{})
if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate, nil); err != nil {
t.Fatal(err)
}

// The second update needs to be different in the underlying resource proto
// for the watch callback to be invoked.
wantUpdate2 := xdsresource.ClusterUpdate{ClusterName: testEDSName + "2", Raw: &anypb.Any{}}
updateHandler.NewClusters(map[string]xdsresource.ClusterUpdateErrTuple{testCDSName: {Update: wantUpdate2}}, xdsresource.UpdateMetadata{})
if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate2, nil); err != nil {
t.Fatal(err)
}
}

func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate xdsresource.ListenerUpdate, wantErr error) error {
u, err := updateCh.Receive(ctx)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions xds/internal/xdsclient/e2e_test/lds_watchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ const (

ldsName = "xdsclient-test-lds-resource"
rdsName = "xdsclient-test-rds-resource"
cdsName = "xdsclient-test-cds-resource"
ldsNameNewStyle = "xdstp:///envoy.config.listener.v3.Listener/xdsclient-test-lds-resource"
rdsNameNewStyle = "xdstp:///envoy.config.route.v3.RouteConfiguration/xdsclient-test-rds-resource"
)
Expand Down
168 changes: 168 additions & 0 deletions xds/internal/xdsclient/e2e_test/misc_watchers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package e2e_test

import (
"context"
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"

v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
)

// verifyRouteConfigUpdate waits for an update to be received on the provided
// update channel and verifies that it matches the expected update.
//
// Returns an error if no update is received before the context deadline expires
// or the received update does not match the expected one.
func verifyRouteConfigUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate xdsresource.RouteConfigUpdateErrTuple) error {
u, err := updateCh.Receive(ctx)
if err != nil {
return fmt.Errorf("timeout when waiting for a route configuration resource from the management server: %v", err)
}
got := u.(xdsresource.RouteConfigUpdateErrTuple)
if wantUpdate.Err != nil {
if gotType, wantType := xdsresource.ErrType(got.Err), xdsresource.ErrType(wantUpdate.Err); gotType != wantType {
return fmt.Errorf("received update with error type %v, want %v", gotType, wantType)
}
}
cmpOpts := []cmp.Option{cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw")}
if diff := cmp.Diff(wantUpdate.Update, got.Update, cmpOpts...); diff != "" {
return fmt.Errorf("received unepected diff in the route configuration resource update: (-want, got):\n%s", diff)
}
return nil
}

// TestWatchCallAnotherWatch covers the case where watch() is called inline by a
// callback. It makes sure it doesn't cause a deadlock.
func (s) TestWatchCallAnotherWatch(t *testing.T) {
overrideFedEnvVar(t)

// Start an xDS management server and set the option to allow it to respond
// to request which only specify a subset of the configured resources.
mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, &e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup()

// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()

// Configure the management server to with route configuration resources.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Routes: []*v3routepb.RouteConfiguration{
e2e.DefaultRouteConfig(rdsName, ldsName, cdsName),
e2e.DefaultRouteConfig(rdsNameNewStyle, ldsNameNewStyle, cdsName),
},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}

// Start a watch for one route configuration resource. From the watch
// callback of the first resource, register two more watches (one for the
// same resource name, which would be satisfied from the cache, and another
// for a different resource name, which would be satisfied from the server).
updateCh1 := testutils.NewChannel()
updateCh2 := testutils.NewChannel()
updateCh3 := testutils.NewChannel()
var rdsCancel2, rdsCancel3 func()
rdsCancel1 := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh1.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
// Watch for the same resource name.
rdsCancel2 = client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh2.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
})
// Watch for a different resource name.
rdsCancel3 = client.WatchRouteConfig(rdsNameNewStyle, func(u xdsresource.RouteConfigUpdate, err error) {
updateCh3.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err})
})
})
defer rdsCancel1()
defer func() {
if rdsCancel2 != nil {
rdsCancel2()
}
if rdsCancel3 != nil {
rdsCancel3()
}
}()

// Verify the contents of the received update for the all watchers.
wantUpdate12 := xdsresource.RouteConfigUpdateErrTuple{
Update: xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{ldsName},
Routes: []*xdsresource.Route{
{
Prefix: newStringP("/"),
ActionType: xdsresource.RouteActionRoute,
WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}},
},
},
},
},
},
}
wantUpdate3 := xdsresource.RouteConfigUpdateErrTuple{
Update: xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{ldsNameNewStyle},
Routes: []*xdsresource.Route{
{
Prefix: newStringP("/"),
ActionType: xdsresource.RouteActionRoute,
WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}},
},
},
},
},
},
}
if err := verifyRouteConfigUpdate(ctx, updateCh1, wantUpdate12); err != nil {
t.Fatal(err)
}
if err := verifyRouteConfigUpdate(ctx, updateCh2, wantUpdate12); err != nil {
t.Fatal(err)
}
if err := verifyRouteConfigUpdate(ctx, updateCh3, wantUpdate3); err != nil {
t.Fatal(err)
}
rdsCancel2()
rdsCancel3()
}

func newStringP(s string) *string {
return &s
}

0 comments on commit c4e48b1

Please sign in to comment.