Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
envoy/eds: only respond to requested clusters
Browse files Browse the repository at this point in the history
Optimizes EDS to only respond with the endpoints for
requested clusters. Per the XDS protocol, additional
responses for resources that have not been requested
will be ignored.

Signed-off-by: Shashank Ram <[email protected]>
  • Loading branch information
shashankram committed Jun 4, 2021
1 parent 4c390a5 commit 419ff9d
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 197 deletions.
50 changes: 27 additions & 23 deletions pkg/envoy/eds/response.go
Original file line number Diff line number Diff line change
@@ -1,55 +1,59 @@
package eds

import (
"strings"

xds_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/pkg/errors"

"github.com/openservicemesh/osm/pkg/catalog"
"github.com/openservicemesh/osm/pkg/certificate"
"github.com/openservicemesh/osm/pkg/configurator"
"github.com/openservicemesh/osm/pkg/endpoint"
"github.com/openservicemesh/osm/pkg/envoy"
"github.com/openservicemesh/osm/pkg/envoy/registry"
"github.com/openservicemesh/osm/pkg/identity"
"github.com/openservicemesh/osm/pkg/service"
)

const (
namespacedNameDelimiter = "/"
)

// NewResponse creates a new Endpoint Discovery Response.
func NewResponse(meshCatalog catalog.MeshCataloger, proxy *envoy.Proxy, _ *xds_discovery.DiscoveryRequest, _ configurator.Configurator, _ certificate.Manager, _ *registry.ProxyRegistry) ([]types.Resource, error) {
func NewResponse(meshCatalog catalog.MeshCataloger, proxy *envoy.Proxy, request *xds_discovery.DiscoveryRequest, _ configurator.Configurator, _ certificate.Manager, _ *registry.ProxyRegistry) ([]types.Resource, error) {
proxyIdentity, err := envoy.GetServiceAccountFromProxyCertificate(proxy.GetCertificateCommonName())
if err != nil {
log.Error().Err(err).Msgf("Error looking up proxy identity for proxy with SerialNumber=%s on Pod with UID=%s", proxy.GetCertificateSerialNumber(), proxy.GetPodUID())
return nil, err
}

allowedEndpoints, err := getEndpointsForProxy(meshCatalog, proxyIdentity.ToServiceIdentity())
if err != nil {
log.Error().Err(err).Msgf("Error looking up endpoints for proxy with SerialNumber=%s on Pod with UID=%s", proxy.GetCertificateSerialNumber(), proxy.GetPodUID())
return nil, err
if request == nil {
return nil, errors.Errorf("Endpoint discovery request for proxy %s cannot be nil", proxyIdentity)
}

var rdsResources []types.Resource
for svc, endpoints := range allowedEndpoints {
loadAssignment := newClusterLoadAssignment(svc, endpoints)
for _, cluster := range request.ResourceNames {
meshSvc, err := clusterToMeshSvc(cluster)
if err != nil {
log.Error().Err(err).Msgf("Error retrieving MeshService from Cluster %s", cluster)
continue
}
endpoints, err := meshCatalog.ListAllowedEndpointsForService(proxyIdentity.ToServiceIdentity(), meshSvc)
if err != nil {
log.Error().Err(err).Msgf("Failed listing allowed endpoints for service %s, for proxy identity %s", meshSvc, proxyIdentity)
continue
}
loadAssignment := newClusterLoadAssignment(meshSvc, endpoints)
rdsResources = append(rdsResources, loadAssignment)
}

return rdsResources, nil
}

// getEndpointsForProxy returns only those service endpoints that belong to the allowed outbound service accounts for the proxy
// Note: ServiceIdentity must be in the format "name.namespace" [https://github.com/openservicemesh/osm/issues/3188]
func getEndpointsForProxy(meshCatalog catalog.MeshCataloger, proxyIdentity identity.ServiceIdentity) (map[service.MeshService][]endpoint.Endpoint, error) {
allowedServicesEndpoints := make(map[service.MeshService][]endpoint.Endpoint)

for _, dstSvc := range meshCatalog.ListAllowedOutboundServicesForIdentity(proxyIdentity) {
endpoints, err := meshCatalog.ListAllowedEndpointsForService(proxyIdentity, dstSvc)
if err != nil {
log.Error().Err(err).Msgf("Failed listing allowed endpoints for service %s for proxy identity %s", dstSvc, proxyIdentity)
continue
}
allowedServicesEndpoints[dstSvc] = endpoints
func clusterToMeshSvc(cluster string) (service.MeshService, error) {
chunks := strings.Split(cluster, namespacedNameDelimiter)
if len(chunks) != 2 {
return service.MeshService{}, errors.Errorf("Invalid cluster name. Expected: <namespace>/<name>, Got: %s", cluster)
}
log.Trace().Msgf("Allowed outbound service endpoints for proxy with identity %s: %v", proxyIdentity, allowedServicesEndpoints)
return allowedServicesEndpoints, nil
return service.MeshService{Namespace: chunks[0], Name: chunks[1]}, nil
}
202 changes: 28 additions & 174 deletions pkg/envoy/eds/response_test.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,24 @@
package eds

import (
"context"
"fmt"
"net"
"testing"

xds_endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
xds_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/golang/mock/gomock"
"github.com/google/uuid"
access "github.com/servicemeshinterface/smi-sdk-go/pkg/apis/access/v1alpha3"
tassert "github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
testclient "k8s.io/client-go/kubernetes/fake"

configFake "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned/fake"
"github.com/openservicemesh/osm/pkg/service"

"github.com/openservicemesh/osm/pkg/catalog"
"github.com/openservicemesh/osm/pkg/certificate"
"github.com/openservicemesh/osm/pkg/configurator"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/endpoint"
"github.com/openservicemesh/osm/pkg/envoy"
"github.com/openservicemesh/osm/pkg/identity"
k8s "github.com/openservicemesh/osm/pkg/kubernetes"
"github.com/openservicemesh/osm/pkg/service"
"github.com/openservicemesh/osm/pkg/smi"
"github.com/openservicemesh/osm/pkg/tests"
)

Expand Down Expand Up @@ -76,15 +67,18 @@ func TestEndpointConfiguration(t *testing.T) {
assert.NotNil(meshCatalog)
assert.NotNil(proxy)

resources, err := NewResponse(meshCatalog, proxy, nil, mockConfigurator, nil, nil)
request := &xds_discovery.DiscoveryRequest{
ResourceNames: []string{"default/bookstore-v1"},
}
resources, err := NewResponse(meshCatalog, proxy, request, mockConfigurator, nil, nil)
assert.Nil(err)
assert.NotNil(resources)

// There are 3 endpoints configured based on the configuration:
// 1. Bookstore
// 2. Bookstore-v1
// 3. Bookstore-v2
assert.Len(resources, 3)
assert.Len(resources, 1)

loadAssignment, ok := resources[0].(*xds_endpoint.ClusterLoadAssignment)

Expand All @@ -93,180 +87,40 @@ func TestEndpointConfiguration(t *testing.T) {
assert.Len(loadAssignment.Endpoints, 1)
}

func TestGetEndpointsForProxy(t *testing.T) {
func TestClusterToMeshSvc(t *testing.T) {
assert := tassert.New(t)

testCases := []struct {
name string
proxyIdentity identity.ServiceIdentity
trafficTargets []*access.TrafficTarget
allowedServiceIdentities []identity.ServiceIdentity
services []service.MeshService
outboundServices map[identity.ServiceIdentity][]service.MeshService
outboundServiceEndpoints map[service.MeshService][]endpoint.Endpoint
outboundServiceAccountEndpoints map[identity.ServiceIdentity]map[service.MeshService][]endpoint.Endpoint
expectedEndpoints map[service.MeshService][]endpoint.Endpoint
name string
cluster string
expectedMeshSvc service.MeshService
expectError bool
}{
{
name: `Traffic target defined for bookstore ServiceAccount.
This service account has bookstore-v1 service which has one endpoint.
Hence one endpoint for bookstore-v1 should be in the expected list`,
proxyIdentity: tests.BookbuyerServiceIdentity,
trafficTargets: []*access.TrafficTarget{&tests.TrafficTarget},
allowedServiceIdentities: []identity.ServiceIdentity{tests.BookstoreServiceIdentity},
services: []service.MeshService{tests.BookstoreV1Service},
outboundServices: map[identity.ServiceIdentity][]service.MeshService{
tests.BookstoreServiceIdentity: {tests.BookstoreV1Service},
},
outboundServiceEndpoints: map[service.MeshService][]endpoint.Endpoint{
tests.BookstoreV1Service: {tests.Endpoint},
},
outboundServiceAccountEndpoints: map[identity.ServiceIdentity]map[service.MeshService][]endpoint.Endpoint{
tests.BookstoreServiceIdentity: {tests.BookstoreV1Service: {tests.Endpoint}},
},
expectedEndpoints: map[service.MeshService][]endpoint.Endpoint{
tests.BookstoreV1Service: {tests.Endpoint},
},
name: "valid cluster name",
cluster: "foo/bar",
expectedMeshSvc: service.MeshService{Namespace: "foo", Name: "bar"},
expectError: false,
},
{
name: `Traffic target defined for bookstore ServiceAccount.
This service account has bookstore-v1 service which has two endpoints,
but endpoint 9.9.9.9 is associated with a pod having service account bookstore-v2.
Hence this endpoint (9.9.9.9) shouldn't be in bookstore-v1's expected list`,
proxyIdentity: tests.BookbuyerServiceIdentity,
trafficTargets: []*access.TrafficTarget{&tests.TrafficTarget},
allowedServiceIdentities: []identity.ServiceIdentity{tests.BookstoreServiceIdentity},
services: []service.MeshService{tests.BookstoreV1Service},
outboundServices: map[identity.ServiceIdentity][]service.MeshService{
tests.BookstoreServiceIdentity: {tests.BookstoreV1Service},
},
outboundServiceEndpoints: map[service.MeshService][]endpoint.Endpoint{
tests.BookstoreV1Service: {tests.Endpoint, {
IP: net.ParseIP("9.9.9.9"),
Port: endpoint.Port(tests.ServicePort),
}},
},
outboundServiceAccountEndpoints: map[identity.ServiceIdentity]map[service.MeshService][]endpoint.Endpoint{
tests.BookstoreServiceIdentity: {tests.BookstoreV1Service: {tests.Endpoint}},
},
expectedEndpoints: map[service.MeshService][]endpoint.Endpoint{
tests.BookstoreV1Service: {tests.Endpoint},
},
name: "invalid cluster name",
cluster: "foobar",
expectedMeshSvc: service.MeshService{},
expectError: true,
},
{
name: `Traffic target defined for bookstore and bookstore-v2 ServiceAccount.
Hence one endpoint should be in bookstore-v1's and bookstore-v2's expected list`,
proxyIdentity: tests.BookbuyerServiceIdentity,
trafficTargets: []*access.TrafficTarget{&tests.TrafficTarget, &tests.BookstoreV2TrafficTarget},
allowedServiceIdentities: []identity.ServiceIdentity{tests.BookstoreServiceIdentity, tests.BookstoreV2ServiceIdentity},
services: []service.MeshService{tests.BookstoreV1Service, tests.BookstoreV2Service},
outboundServices: map[identity.ServiceIdentity][]service.MeshService{
tests.BookstoreServiceIdentity: {tests.BookstoreV1Service},
tests.BookstoreV2ServiceIdentity: {tests.BookstoreV2Service},
},
outboundServiceEndpoints: map[service.MeshService][]endpoint.Endpoint{
tests.BookstoreV1Service: {tests.Endpoint},
tests.BookstoreV2Service: {endpoint.Endpoint{
IP: net.ParseIP("9.9.9.9"),
Port: endpoint.Port(tests.ServicePort),
}},
},
outboundServiceAccountEndpoints: map[identity.ServiceIdentity]map[service.MeshService][]endpoint.Endpoint{
tests.BookstoreServiceIdentity: {tests.BookstoreV1Service: {tests.Endpoint}},
tests.BookstoreV2ServiceIdentity: {tests.BookstoreV2Service: {endpoint.Endpoint{
IP: net.ParseIP("9.9.9.9"),
Port: endpoint.Port(tests.ServicePort),
}}},
},
expectedEndpoints: map[service.MeshService][]endpoint.Endpoint{
tests.BookstoreV1Service: {tests.Endpoint},
tests.BookstoreV2Service: {endpoint.Endpoint{
IP: net.ParseIP("9.9.9.9"),
Port: endpoint.Port(tests.ServicePort),
}},
},
name: "invalid cluster name",
cluster: "foo/bar/baz",
expectedMeshSvc: service.MeshService{},
expectError: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mockCtrl := gomock.NewController(t)
kubeClient := testclient.NewSimpleClientset()
defer mockCtrl.Finish()

mockCatalog := catalog.NewMockMeshCataloger(mockCtrl)
mockConfigurator := configurator.NewMockConfigurator(mockCtrl)
mockKubeController := k8s.NewMockController(mockCtrl)
meshSpec := smi.NewMockMeshSpec(mockCtrl)
mockEndpointProvider := endpoint.NewMockProvider(mockCtrl)

mockConfigurator.EXPECT().IsPermissiveTrafficPolicyMode().Return(false).AnyTimes()
meshSpec.EXPECT().ListTrafficTargets().Return(tc.trafficTargets).AnyTimes()

proxy, err := getProxy(kubeClient)
assert.Empty(err)
assert.NotNil(mockCatalog)
assert.NotNil(proxy)

mockEndpointProvider.EXPECT().GetID().Return("fake").AnyTimes()

for sa, services := range tc.outboundServices {
for _, svc := range services {
k8sService := tests.NewServiceFixture(svc.Name, svc.Namespace, map[string]string{})
mockKubeController.EXPECT().GetService(svc).Return(k8sService).AnyTimes()
}
mockEndpointProvider.EXPECT().GetServicesForServiceAccount(sa).Return(services, nil).AnyTimes()
}

mockCatalog.EXPECT().ListAllowedOutboundServicesForIdentity(tc.proxyIdentity).Return(tc.services).AnyTimes()

for svc, endpoints := range tc.outboundServiceEndpoints {
mockEndpointProvider.EXPECT().ListEndpointsForService(svc).Return(endpoints).AnyTimes()
}

mockCatalog.EXPECT().ListAllowedOutboundServiceIdentities(tc.proxyIdentity).Return(tc.allowedServiceIdentities, nil).AnyTimes()

var pods []*v1.Pod
for serviceIdentity, services := range tc.outboundServices {
sa := serviceIdentity.ToK8sServiceAccount()
for _, svc := range services {
podlabels := map[string]string{
tests.SelectorKey: tests.SelectorValue,
constants.EnvoyUniqueIDLabelName: uuid.New().String(),
}
pod := tests.NewPodFixture(tests.Namespace, svc.Name, sa.Name, podlabels)
svcPodEndpoints := tc.outboundServiceAccountEndpoints[serviceIdentity]
var podIps []v1.PodIP
for _, podEndpoints := range svcPodEndpoints {
for _, ep := range podEndpoints {
podIps = append(podIps, v1.PodIP{IP: ep.IP.String()})
}
}
pod.Status.PodIPs = podIps
pod.Spec.ServiceAccountName = sa.Name
_, err = kubeClient.CoreV1().Pods(tests.Namespace).Create(context.TODO(), &pod, metav1.CreateOptions{})
assert.Nil(err)
pods = append(pods, &pod)
}
}
mockKubeController.EXPECT().ListPods().Return(pods).AnyTimes()

for sa, svcEndpoints := range tc.outboundServiceAccountEndpoints {
for svc, endpoints := range svcEndpoints {
mockEndpointProvider.EXPECT().ListEndpointsForIdentity(sa).Return(endpoints).AnyTimes()
mockCatalog.EXPECT().ListAllowedEndpointsForService(tc.proxyIdentity, svc).Return(endpoints, nil).AnyTimes()
}
}

actual, err := getEndpointsForProxy(mockCatalog, tc.proxyIdentity)
assert.Nil(err)
assert.NotNil(actual)

assert.Len(actual, len(tc.expectedEndpoints))
for svc, endpoints := range tc.expectedEndpoints {
_, ok := actual[svc]
assert.True(ok)
assert.ElementsMatch(actual[svc], endpoints)
}
meshSvc, err := clusterToMeshSvc(tc.cluster)
assert.Equal(tc.expectError, err != nil)
assert.Equal(tc.expectedMeshSvc, meshSvc)
})
}
}

0 comments on commit 419ff9d

Please sign in to comment.