Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
feat(pkg/catalog): gateway is configured for SSL passthrough
Browse files Browse the repository at this point in the history
Add cluster SNI passthrough for the multicluster gateway

Signed-off-by: Sean Teeling <[email protected]>
  • Loading branch information
steeling committed Jun 18, 2021
1 parent cd0bf12 commit c31e6fc
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 33 deletions.
51 changes: 39 additions & 12 deletions pkg/envoy/cds/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,31 @@ const (
clusterConnectTimeout = 1 * time.Second
)

type clusterOptions struct {
permissive bool
withTLS bool
}

// clusterOption is type of function that edits the defaults of the options struct.
type clusterOption func(o *clusterOptions)

// withTLS is an option to disable TLS.
func withTLS(o *clusterOptions) {
o.withTLS = true
}

// permissive is an option to not generate permissive endpoints discovery for clusters.
func permissive(o *clusterOptions) {
o.permissive = true
}

// getUpstreamServiceCluster returns an Envoy Cluster corresponding to the given upstream service
// Note: ServiceIdentity must be in the format "name.namespace" [https://github.com/openservicemesh/osm/issues/3188]
func getUpstreamServiceCluster(downstreamIdentity identity.ServiceIdentity, upstreamSvc service.MeshService, cfg configurator.Configurator) (*xds_cluster.Cluster, error) {
clusterName := upstreamSvc.String()
marshalledUpstreamTLSContext, err := ptypes.MarshalAny(
envoy.GetUpstreamTLSContext(downstreamIdentity, upstreamSvc))
if err != nil {
return nil, err
// The defaults are non-permissive, and *no tls*.
func getUpstreamServiceCluster(downstreamIdentity identity.ServiceIdentity, upstreamSvc service.MeshService, cfg configurator.Configurator, opts ...clusterOption) (*xds_cluster.Cluster, error) {
o := &clusterOptions{}
for _, opt := range opts {
opt(o)
}

HTTP2ProtocolOptions, err := envoy.GetHTTP2ProtocolOptions()
Expand All @@ -41,19 +58,29 @@ func getUpstreamServiceCluster(downstreamIdentity identity.ServiceIdentity, upst
}

remoteCluster := &xds_cluster.Cluster{
Name: clusterName,
ConnectTimeout: ptypes.DurationProto(clusterConnectTimeout),
TransportSocket: &xds_core.TransportSocket{
Name: upstreamSvc.String(),
ConnectTimeout: ptypes.DurationProto(clusterConnectTimeout),
TypedExtensionProtocolOptions: HTTP2ProtocolOptions,
}

if o.withTLS {
marshalledUpstreamTLSContext, err := ptypes.MarshalAny(
envoy.GetUpstreamTLSContext(downstreamIdentity, upstreamSvc))
if err != nil {
return nil, err
}
remoteCluster.TransportSocket = &xds_core.TransportSocket{
Name: wellknown.TransportSocketTls,
ConfigType: &xds_core.TransportSocket_TypedConfig{
TypedConfig: marshalledUpstreamTLSContext,
},
},
TypedExtensionProtocolOptions: HTTP2ProtocolOptions,
}
}

if cfg.IsPermissiveTrafficPolicyMode() {
if o.permissive {
// Since no traffic policies exist with permissive mode, rely on cluster provided service discovery.
// The gateway's services are referenced via <name>.<namespace>.cluster.<cluster-domain>, which doesn't have
// a cluster service discovery mechanism, so need to be explicit.
remoteCluster.ClusterDiscoveryType = &xds_cluster.Cluster_Type{Type: xds_cluster.Cluster_ORIGINAL_DST}
remoteCluster.LbPolicy = xds_cluster.Cluster_CLUSTER_PROVIDED
} else {
Expand Down
7 changes: 5 additions & 2 deletions pkg/envoy/cds/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,11 @@ func TestGetUpstreamServiceCluster(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mockConfigurator.EXPECT().IsPermissiveTrafficPolicyMode().Return(tc.permissiveMode).Times(1)
remoteCluster, err := getUpstreamServiceCluster(downstreamSvcAccount, upstreamSvc, mockConfigurator)
opts := []clusterOption{withTLS}
if tc.permissiveMode {
opts = append(opts, permissive)
}
remoteCluster, err := getUpstreamServiceCluster(downstreamSvcAccount, upstreamSvc, mockConfigurator, opts...)
assert.Nil(err)
assert.Equal(tc.expectedClusterType, remoteCluster.GetType())
assert.Equal(tc.expectedLbPolicy, remoteCluster.LbPolicy)
Expand Down
46 changes: 32 additions & 14 deletions pkg/envoy/cds/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,6 @@ import (

// NewResponse creates a new Cluster Discovery Response.
func NewResponse(meshCatalog catalog.MeshCataloger, proxy *envoy.Proxy, _ *xds_discovery.DiscoveryRequest, cfg configurator.Configurator, _ certificate.Manager, proxyRegistry *registry.ProxyRegistry) ([]types.Resource, error) {
if proxy.Kind() == envoy.KindGateway {
// TODO: Configure gateway
return nil, nil
}

svcList, err := proxyRegistry.ListProxyServices(proxy)
if err != nil {
log.Error().Err(err).Msgf("Error looking up MeshService for proxy %s", proxy.String())
return nil, err
}

var clusters []*xds_cluster.Cluster

proxyIdentity, err := envoy.GetServiceAccountFromProxyCertificate(proxy.GetCertificateCommonName())
Expand All @@ -34,9 +23,28 @@ func NewResponse(meshCatalog catalog.MeshCataloger, proxy *envoy.Proxy, _ *xds_d
return nil, err
}

if proxy.Kind() == envoy.KindGateway {
// Build remote clusters based on allowed outbound services
for _, dstService := range meshCatalog.ListAllowedOutboundServicesForIdentity(proxyIdentity.ToServiceIdentity()) {
cluster, err := getUpstreamServiceCluster(proxyIdentity.ToServiceIdentity(), dstService, cfg)
if err != nil {
log.Error().Err(err).Msgf("Failed to construct service cluster for service %s for proxy with XDS Certificate SerialNumber=%s on Pod with UID=%s",
dstService.Name, proxy.GetCertificateSerialNumber(), proxy.String())
return nil, err
}

clusters = append(clusters, cluster)
}
return removeDups(clusters), nil
}

// Build remote clusters based on allowed outbound services
for _, dstService := range meshCatalog.ListAllowedOutboundServicesForIdentity(proxyIdentity.ToServiceIdentity()) {
cluster, err := getUpstreamServiceCluster(proxyIdentity.ToServiceIdentity(), dstService, cfg)
opts := []clusterOption{withTLS}
if cfg.IsPermissiveTrafficPolicyMode() {
opts = append(opts, permissive)
}
cluster, err := getUpstreamServiceCluster(proxyIdentity.ToServiceIdentity(), dstService, cfg, opts...)
if err != nil {
log.Error().Err(err).Msgf("Failed to construct service cluster for service %s for proxy %s", dstService.Name, proxy.String())
return nil, err
Expand All @@ -45,6 +53,12 @@ func NewResponse(meshCatalog catalog.MeshCataloger, proxy *envoy.Proxy, _ *xds_d
clusters = append(clusters, cluster)
}

svcList, err := proxyRegistry.ListProxyServices(proxy)
if err != nil {
log.Error().Err(err).Msgf("Error looking up MeshService for proxy %s", proxy.String())
return nil, err
}

// Create a local cluster for each service behind the proxy.
// The local cluster will be used to handle incoming traffic.
for _, proxyService := range svcList {
Expand Down Expand Up @@ -89,16 +103,20 @@ func NewResponse(meshCatalog catalog.MeshCataloger, proxy *envoy.Proxy, _ *xds_d
clusters = append(clusters, getTracingCluster(cfg))
}

return removeDups(clusters), nil
}

func removeDups(clusters []*xds_cluster.Cluster) []types.Resource {
alreadyAdded := mapset.NewSet()
var cdsResources []types.Resource
for _, cluster := range clusters {
if alreadyAdded.Contains(cluster.Name) {
log.Error().Msgf("Found duplicate clusters with name %s; duplicate will not be sent to proxy %s", cluster.Name, proxy.String())
log.Error().Msgf("Found duplicate clusters with name %s; duplicate will not be sent to proxy.", cluster.Name)
continue
}
alreadyAdded.Add(cluster.Name)
cdsResources = append(cdsResources, cluster)
}

return cdsResources, nil
return cdsResources
}
79 changes: 74 additions & 5 deletions pkg/envoy/cds/response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
xds_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
xds_endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
xds_auth "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"github.com/golang/mock/gomock"
"github.com/golang/protobuf/ptypes"
Expand Down Expand Up @@ -380,15 +381,28 @@ func TestNewResponseListServicesError(t *testing.T) {
proxyRegistry := registry.NewProxyRegistry(registry.ExplicitProxyServiceMapper(func(*envoy.Proxy) ([]service.MeshService, error) {
return nil, errors.New("some error")
}))
proxy, err := envoy.NewProxy(certificate.CommonName(fmt.Sprintf("%s.%s.%s.%s", uuid.New(), envoy.KindSidecar, tests.BookbuyerServiceAccountName, tests.Namespace)), "", nil)
cn := certificate.CommonName(fmt.Sprintf("%s.%s.%s.%s", uuid.New(), envoy.KindSidecar, tests.BookbuyerServiceAccountName, tests.Namespace))
proxy, err := envoy.NewProxy(cn, "", nil)
tassert.Nil(t, err)

resp, err := NewResponse(nil, proxy, nil, nil, nil, proxyRegistry)
proxyIdentity, err := envoy.GetServiceAccountFromProxyCertificate(cn)
tassert.NoError(t, err)

ctrl := gomock.NewController(t)
meshCatalog := catalog.NewMockMeshCataloger(ctrl)
meshCatalog.EXPECT().ListAllowedOutboundServicesForIdentity(proxyIdentity.ToServiceIdentity()).Return(nil).AnyTimes()

resp, err := NewResponse(meshCatalog, proxy, nil, nil, nil, proxyRegistry)
tassert.Error(t, err)
tassert.Nil(t, resp)
}

func TestNewResponseGetLocalServiceClusterError(t *testing.T) {
ctrl := gomock.NewController(t)
mockKubeController := k8s.NewMockController(ctrl)
cfg := configurator.NewMockConfigurator(ctrl)
meshCatalog := catalog.NewMockMeshCataloger(ctrl)

svc := service.MeshService{Namespace: "ns", Name: "svc"}
proxyIdentity := identity.K8sServiceAccount{Name: "svcacc", Namespace: "ns"}.ToServiceIdentity()
proxyRegistry := registry.NewProxyRegistry(registry.ExplicitProxyServiceMapper(func(*envoy.Proxy) ([]service.MeshService, error) {
Expand All @@ -398,12 +412,13 @@ func TestNewResponseGetLocalServiceClusterError(t *testing.T) {
proxy, err := envoy.NewProxy(cn, "", nil)
tassert.Nil(t, err)

ctrl := gomock.NewController(t)
meshCatalog := catalog.NewMockMeshCataloger(ctrl)
meshCatalog.EXPECT().ListAllowedOutboundServicesForIdentity(proxyIdentity).Return(nil).Times(1)
meshCatalog.EXPECT().GetTargetPortToProtocolMappingForService(svc).Return(nil, errors.New("some error")).Times(1)
meshCatalog.EXPECT().GetKubeController().Return(mockKubeController).AnyTimes()
mockKubeController.EXPECT().ListPods().Return([]*v1.Pod{})
cfg.EXPECT().IsTracingEnabled().Return(false).Times(1)

resp, err := NewResponse(meshCatalog, proxy, nil, nil, nil, proxyRegistry)
resp, err := NewResponse(meshCatalog, proxy, nil, cfg, nil, proxyRegistry)
tassert.Error(t, err)
tassert.Nil(t, resp)
}
Expand Down Expand Up @@ -464,3 +479,57 @@ func TestNewResponseGetEgressTrafficPolicyNotEmpty(t *testing.T) {
tassert.Len(t, resp, 1)
tassert.Equal(t, resp[0].(*xds_cluster.Cluster).Name, "my-cluster")
}

func TestNewResponseForGateway(t *testing.T) {
proxyIdentity := identity.K8sServiceAccount{Name: "gateway", Namespace: "osm-system"}.ToServiceIdentity()
proxyRegistry := registry.NewProxyRegistry(registry.ExplicitProxyServiceMapper(func(*envoy.Proxy) ([]service.MeshService, error) {
return nil, nil
}))
cn := envoy.NewCertCommonName(uuid.New(), envoy.KindGateway, "gateway", "osm-system")
proxy, err := envoy.NewProxy(cn, "", nil)
tassert.Nil(t, err)

ctrl := gomock.NewController(t)
meshCatalog := catalog.NewMockMeshCataloger(ctrl)
mockKubeController := k8s.NewMockController(ctrl)
cfg := configurator.NewMockConfigurator(ctrl)
meshCatalog.EXPECT().ListAllowedOutboundServicesForIdentity(proxyIdentity).Return([]service.MeshService{
tests.BookbuyerService,
tests.BookwarehouseService,
}).AnyTimes()
meshCatalog.EXPECT().GetKubeController().Return(mockKubeController).AnyTimes()
mockKubeController.EXPECT().ListPods().Return([]*v1.Pod{})
cfg.EXPECT().IsEgressEnabled().Return(false).Times(1)
cfg.EXPECT().IsTracingEnabled().Return(false).Times(1)
cfg.EXPECT().IsPermissiveTrafficPolicyMode().Return(true).AnyTimes()

resp, err := NewResponse(meshCatalog, proxy, nil, cfg, nil, proxyRegistry)
tassert.NoError(t, err)
tassert.Len(t, resp, 2)
tassert.Equal(t, "default/bookbuyer", resp[0].(*xds_cluster.Cluster).Name)
tassert.Equal(t, "default/bookwarehouse", resp[1].(*xds_cluster.Cluster).Name)
}

func TestRemoveDups(t *testing.T) {
assert := tassert.New(t)

orig := []*xds_cluster.Cluster{
{
Name: "c-1",
},
{
Name: "c-2",
},
{
Name: "c-1",
},
}
assert.ElementsMatch([]types.Resource{
&xds_cluster.Cluster{
Name: "c-1",
},
&xds_cluster.Cluster{
Name: "c-2",
},
}, removeDups(orig))
}

0 comments on commit c31e6fc

Please sign in to comment.