Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds/server: fix RDS handling for non-inline route configs #6915

Merged
merged 6 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ var (
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
CanonicalString any // func (codes.Code) string
// DrainServerTransports initiates a graceful close of existing connections
// on a gRPC server accepted on the provided listener address. An
// xDS-enabled server invokes this method on a grpc.Server when a particular
// listener moves to "not-serving" mode.
DrainServerTransports any // func(*grpc.Server, string)
// IsRegisteredMethod returns whether the passed in method is registered as
// a method on the server.
IsRegisteredMethod any // func(*grpc.Server, string) bool
Expand Down Expand Up @@ -188,6 +183,15 @@ var (
ExitIdleModeForTesting any // func(*grpc.ClientConn) error

ChannelzTurnOffForTesting func()

// TriggerXDSResourceNameNotFoundForTesting triggers the resource-not-found
// error for a given resource type and name. This is usually triggered when
// the associated watch timer fires. For testing purposes, having this
// function makes events more predictable than relying on timer events.
TriggerXDSResourceNameNotFoundForTesting any // func(func(xdsresource.Type, string), string, string) error
// TriggerXDSResourceNotFoundClient invokes the testing xDS Client singleton
// to invoke resource not found for a resource type name and resource name.
TriggerXDSResourceNameNotFoundClient any // func(string, string) error
)

// HealthChecker defines the signature of the client-side LB channel health checking function.
Expand Down
136 changes: 128 additions & 8 deletions internal/testutils/xds/e2e/clientresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,72 @@ func marshalAny(m proto.Message) *anypb.Any {
return a
}

// filterChainWontMatch returns a filter chain that won't match if running the
// test locally.
func filterChainWontMatch(routeName string, addressPrefix string, srcPorts []uint32) *v3listenerpb.FilterChain {
hcm := &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{
Rds: &v3httppb.Rds{
ConfigSource: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
},
RouteConfigName: routeName,
},
},
HttpFilters: []*v3httppb.HttpFilter{RouterHTTPFilter},
}
return &v3listenerpb.FilterChain{
Name: routeName + "-wont-match",
FilterChainMatch: &v3listenerpb.FilterChainMatch{
PrefixRanges: []*v3corepb.CidrRange{
{
AddressPrefix: addressPrefix,
PrefixLen: &wrapperspb.UInt32Value{
Value: uint32(0),
},
},
},
SourceType: v3listenerpb.FilterChainMatch_SAME_IP_OR_LOOPBACK,
SourcePorts: srcPorts,
SourcePrefixRanges: []*v3corepb.CidrRange{
{
AddressPrefix: addressPrefix,
PrefixLen: &wrapperspb.UInt32Value{
Value: uint32(0),
},
},
},
},
Filters: []*v3listenerpb.Filter{
{
Name: "filter-1",
ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: marshalAny(hcm)},
},
},
}
}

// ListenerResourceThreeRouteResources returns a listener resource that points
// to three route configurations. Only the filter chain that points to the first
// route config can be matched to.
func ListenerResourceThreeRouteResources(host string, port uint32, secLevel SecurityLevel, routeName string) *v3listenerpb.Listener {
lis := defaultServerListenerCommon(host, port, secLevel, routeName, false)
lis.FilterChains = append(lis.FilterChains, filterChainWontMatch("routeName2", "1.1.1.1", []uint32{1}))
lis.FilterChains = append(lis.FilterChains, filterChainWontMatch("routeName3", "2.2.2.2", []uint32{2}))
return lis
}

// ListenerResourceFallbackToDefault returns a listener resource that contains a
// filter chain that will never get chosen to process traffic and a default
// filter chain. The default filter chain points to routeName2.
func ListenerResourceFallbackToDefault(host string, port uint32, secLevel SecurityLevel) *v3listenerpb.Listener {
lis := defaultServerListenerCommon(host, port, secLevel, "", false)
lis.FilterChains = nil
lis.FilterChains = append(lis.FilterChains, filterChainWontMatch("routeName", "1.1.1.1", []uint32{1}))
lis.DefaultFilterChain = filterChainWontMatch("routeName2", "2.2.2.2", []uint32{2})
return lis
}

// DefaultServerListener returns a basic xds Listener resource to be used on the
// server side. The returned Listener resource contains an inline route
// configuration with the name of routeName.
Expand Down Expand Up @@ -290,13 +356,6 @@ func defaultServerListenerCommon(host string, port uint32, secLevel SecurityLeve
}
}

// DefaultServerListenerWithRouteConfigName returns a basic xds Listener
// resource to be used on the server side. The returned Listener resource
// contains a RouteCongiguration resource name that needs to be resolved.
func DefaultServerListenerWithRouteConfigName(host string, port uint32, secLevel SecurityLevel, routeName string) *v3listenerpb.Listener {
return defaultServerListenerCommon(host, port, secLevel, routeName, false)
}

// HTTPFilter constructs an xds HttpFilter with the provided name and config.
func HTTPFilter(name string, config proto.Message) *v3httppb.HttpFilter {
return &v3httppb.HttpFilter{
Expand Down Expand Up @@ -356,7 +415,6 @@ type RouteConfigOptions struct {
ListenerName string
// ClusterSpecifierType determines the cluster specifier type.
ClusterSpecifierType RouteConfigClusterSpecifierType

// ClusterName is name of the cluster resource used when the cluster
// specifier type is set to RouteConfigClusterSpecifierTypeCluster.
//
Expand Down Expand Up @@ -722,3 +780,65 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
}
return cla
}

// DefaultServerListenerWithRouteConfigName returns a basic xds Listener
// resource to be used on the server side. The returned Listener resource
// contains a RouteCongiguration resource name that needs to be resolved.
func DefaultServerListenerWithRouteConfigName(host string, port uint32, secLevel SecurityLevel, routeName string) *v3listenerpb.Listener {
return defaultServerListenerCommon(host, port, secLevel, routeName, false)
}

// RouteConfigNoRouteMatch returns an xDS RouteConfig resource which a route
// with no route match. This will be NACKed by the xDS Client.
func RouteConfigNoRouteMatch(routeName string) *v3routepb.RouteConfiguration {
return &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{{
// This "*" string matches on any incoming authority. This is to ensure any
// incoming RPC matches to Route_NonForwardingAction and will proceed as
// normal.
Domains: []string{"*"},
Routes: []*v3routepb.Route{{
Action: &v3routepb.Route_NonForwardingAction{},
}}}}}
}

// RouteConfigNonForwardingAction returns an xDS RouteConfig resource which
// specifies to route to a route specifying non forwarding action. This is
// intended to be used on the server side for RDS requests, and corresponds to
// the inline route configuration in DefaultServerListener.
func RouteConfigNonForwardingAction(routeName string) *v3routepb.RouteConfiguration {
return &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{{
// This "*" string matches on any incoming authority. This is to ensure any
// incoming RPC matches to Route_NonForwardingAction and will proceed as
// normal.
Domains: []string{"*"},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{
PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"},
},
Action: &v3routepb.Route_NonForwardingAction{},
}}}}}
}

// RouteConfigFilterAction returns an xDS RouteConfig resource which specifies
// to route to a route specifying route filter action. Since this is not type
// non forwarding action, this should fail requests that match to this server
// side.
func RouteConfigFilterAction(routeName string) *v3routepb.RouteConfiguration {
return &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{{
// This "*" string matches on any incoming authority. This is to
// ensure any incoming RPC matches to Route_Route and will fail with
// UNAVAILABLE.
Domains: []string{"*"},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{
PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"},
},
Action: &v3routepb.Route_FilterAction{},
}}}}}
}
18 changes: 6 additions & 12 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ func init() {
return srv.isRegisteredMethod(method)
}
internal.ServerFromContext = serverFromContext
internal.DrainServerTransports = func(srv *Server, addr string) {
srv.drainServerTransports(addr)
}
internal.AddGlobalServerOptions = func(opt ...ServerOption) {
globalServerOptions = append(globalServerOptions, opt...)
}
Expand Down Expand Up @@ -932,6 +929,12 @@ func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
return
}

if cc, ok := rawConn.(interface {
PassServerTransport(transport.ServerTransport)
}); ok {
cc.PassServerTransport(st)
}

if !s.addConn(lisAddr, st) {
return
}
Expand All @@ -941,15 +944,6 @@ func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
}()
}

func (s *Server) drainServerTransports(addr string) {
s.mu.Lock()
conns := s.conns[addr]
for st := range conns {
st.Drain("")
}
s.mu.Unlock()
}

// newHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go).
func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
Expand Down
2 changes: 1 addition & 1 deletion test/xds/xds_client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func Test(t *testing.T) {

const (
defaultTestTimeout = 10 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
defaultTestShortTimeout = 10 * time.Millisecond // For events expdefaultTected to *not* happen.
dfawley marked this conversation as resolved.
Show resolved Hide resolved
)

func (s) TestClientSideXDS(t *testing.T) {
Expand Down
16 changes: 2 additions & 14 deletions test/xds/xds_server_certificate_providers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@ import (

"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
xdscreds "google.golang.org/grpc/credentials/xds"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/bootstrap"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds"
"google.golang.org/protobuf/types/known/wrapperspb"

Expand Down Expand Up @@ -233,12 +231,7 @@ func (s) TestServerSideXDS_WithNoCertificateProvidersInBootstrap_Failure(t *test
}
defer cc.Close()

client := testgrpc.NewTestServiceClient(cc)
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() failed: %v, wantCode: %s", err, codes.DeadlineExceeded)
}
waitForFailedRPCWithStatusCode(ctx, t, cc, errAcceptAndClose...)
}

// Tests the case where the bootstrap configuration contains one certificate
Expand Down Expand Up @@ -484,10 +477,5 @@ func (s) TestServerSideXDS_WithValidAndInvalidSecurityConfiguration(t *testing.T
}
defer cc2.Close()

client2 := testgrpc.NewTestServiceClient(cc2)
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := client2.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() failed: %v, wantCode: %s", err, codes.DeadlineExceeded)
}
waitForFailedRPCWithStatusCode(ctx, t, cc2, errAcceptAndClose...)
}
10 changes: 10 additions & 0 deletions test/xds/xds_server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package xds_test
import (
"context"
"fmt"
"io"
"net"
"strconv"
"testing"
Expand Down Expand Up @@ -50,6 +51,15 @@ func (*testService) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.S
return &testpb.SimpleResponse{}, nil
}

func (*testService) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error {
for {
_, err := stream.Recv() // hangs here forever if stream doesn't shut down...doesn't receive EOF without any errors
if err == io.EOF {
return nil
}
}
}

func testModeChangeServerOption(t *testing.T) grpc.ServerOption {
// Create a server option to get notified about serving mode changes. We don't
// do anything other than throwing a log entry here. But this is required,
Expand Down
2 changes: 1 addition & 1 deletion test/xds/xds_server_serving_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func waitForFailedRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn) {
return
}

ticker := time.NewTimer(1 * time.Second)
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
select {
Expand Down
Loading
Loading