From c998e2e2d13ab4f4f17b3c1d7de3b3001d169dbc Mon Sep 17 00:00:00 2001 From: Iryna Shustava Date: Mon, 14 Aug 2023 09:18:38 -0600 Subject: [PATCH] sidecar-proxy controller: Add support for transparent proxy This currently does not support inferring destinations from intentions. --- agent/consul/server.go | 1 + .../{cache.go => destinations_cache.go} | 22 +- ...che_test.go => destinations_cache_test.go} | 10 +- .../proxy_configuration_cache.go | 41 +++ .../proxy_configuration_cache_test.go | 68 ++++ .../mesh/internal/controllers/register.go | 5 +- .../sidecarproxy/builder/builder.go | 16 +- .../builder/destination_builder.go | 81 ++++- .../builder/destination_builder_test.go | 90 ++++- .../sidecarproxy/builder/local_app_test.go | 3 +- ...it-and-explicit-destinations-tproxy.golden | 137 ++++++++ .../testdata/l4-multi-destination.golden | 148 ++++----- ...ltiple-implicit-destinations-tproxy.golden | 135 ++++++++ ...kload-addresses-with-specific-ports.golden | 80 ++--- ...le-workload-addresses-without-ports.golden | 80 ++--- ...le-destination-ip-port-bind-address.golden | 84 ++--- ...estination-unix-socket-bind-address.golden | 84 ++--- ...-single-implicit-destination-tproxy.golden | 81 +++++ ...ngle-workload-address-without-ports.golden | 80 ++--- .../controllers/sidecarproxy/controller.go | 63 +++- .../sidecarproxy/controller_test.go | 309 +++++++++++++----- .../sidecarproxy/fetcher/data_fetcher.go | 182 ++++++++++- .../sidecarproxy/fetcher/data_fetcher_test.go | 199 ++++++++++- .../sidecarproxymapper/destinations_mapper.go | 41 +-- .../destinations_mapper_test.go | 4 +- .../mappers/sidecarproxymapper/mapper.go | 67 ++++ .../mappers/sidecarproxymapper/mapper_test.go | 71 ++++ .../proxy_configuration_mapper.go | 28 ++ .../proxy_configuration_mapper_test.go | 73 +++++ .../service_endpoints_mapper.go | 34 +- .../service_endpoints_mapper_test.go | 9 +- .../mesh/internal/types/intermediate/types.go | 6 + 32 files changed, 1854 insertions(+), 478 deletions(-) rename internal/mesh/internal/cache/sidecarproxycache/{cache.go => destinations_cache.go} (85%) rename internal/mesh/internal/cache/sidecarproxycache/{cache_test.go => destinations_cache_test.go} (97%) create mode 100644 internal/mesh/internal/cache/sidecarproxycache/proxy_configuration_cache.go create mode 100644 internal/mesh/internal/cache/sidecarproxycache/proxy_configuration_cache_test.go create mode 100644 internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-implicit-and-explicit-destinations-tproxy.golden create mode 100644 internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-multiple-implicit-destinations-tproxy.golden create mode 100644 internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-implicit-destination-tproxy.golden create mode 100644 internal/mesh/internal/mappers/sidecarproxymapper/mapper.go create mode 100644 internal/mesh/internal/mappers/sidecarproxymapper/mapper_test.go create mode 100644 internal/mesh/internal/mappers/sidecarproxymapper/proxy_configuration_mapper.go create mode 100644 internal/mesh/internal/mappers/sidecarproxymapper/proxy_configuration_mapper_test.go diff --git a/agent/consul/server.go b/agent/consul/server.go index fc01e83fecc1..778ccb557669 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -886,6 +886,7 @@ func (s *Server) registerControllers(deps Deps) { return s.getTrustDomain(caConfig) }, + LocalDatacenter: s.config.Datacenter, }) } diff --git a/internal/mesh/internal/cache/sidecarproxycache/cache.go b/internal/mesh/internal/cache/sidecarproxycache/destinations_cache.go similarity index 85% rename from internal/mesh/internal/cache/sidecarproxycache/cache.go rename to internal/mesh/internal/cache/sidecarproxycache/destinations_cache.go index edebcbf029ce..e2b04e0aa499 100644 --- a/internal/mesh/internal/cache/sidecarproxycache/cache.go +++ b/internal/mesh/internal/cache/sidecarproxycache/destinations_cache.go @@ -10,13 +10,13 @@ import ( "github.com/hashicorp/consul/proto-public/pbresource" ) -// Cache stores information needed for the sidecar-proxy controller to reconcile efficiently. +// DestinationsCache stores information needed for the sidecar-proxy controller to reconcile efficiently. // This currently means storing a list of all destinations for easy look up // as well as indices of source proxies where those destinations are referenced. // // It is the responsibility of the controller and its subcomponents (like mapper and data fetcher) // to keep this cache up-to-date as we're observing new data. -type Cache struct { +type DestinationsCache struct { lock sync.RWMutex // store is a map from destination service reference and port as a reference key @@ -30,8 +30,8 @@ type Cache struct { type storeKeys map[ReferenceKeyWithPort]struct{} -func New() *Cache { - return &Cache{ +func NewDestinationsCache() *DestinationsCache { + return &DestinationsCache{ store: make(map[ReferenceKeyWithPort]intermediate.CombinedDestinationRef), sourceProxiesIndex: make(map[resource.ReferenceKey]storeKeys), } @@ -48,7 +48,7 @@ func KeyFromRefAndPort(ref *pbresource.Reference, port string) ReferenceKeyWithP } // WriteDestination adds destination reference to the cache. -func (c *Cache) WriteDestination(d intermediate.CombinedDestinationRef) { +func (c *DestinationsCache) WriteDestination(d intermediate.CombinedDestinationRef) { // Check that reference is a catalog.Service type. if !resource.EqualType(catalog.ServiceType, d.ServiceRef.Type) { panic("ref must of type catalog.Service") @@ -68,7 +68,7 @@ func (c *Cache) WriteDestination(d intermediate.CombinedDestinationRef) { } // DeleteDestination deletes a given destination reference and port from cache. -func (c *Cache) DeleteDestination(ref *pbresource.Reference, port string) { +func (c *DestinationsCache) DeleteDestination(ref *pbresource.Reference, port string) { // Check that reference is a catalog.Service type. if !resource.EqualType(catalog.ServiceType, ref.Type) { panic("ref must of type catalog.Service") @@ -80,7 +80,7 @@ func (c *Cache) DeleteDestination(ref *pbresource.Reference, port string) { c.deleteLocked(ref, port) } -func (c *Cache) addLocked(d intermediate.CombinedDestinationRef) { +func (c *DestinationsCache) addLocked(d intermediate.CombinedDestinationRef) { key := KeyFromRefAndPort(d.ServiceRef, d.Port) c.store[key] = d @@ -96,7 +96,7 @@ func (c *Cache) addLocked(d intermediate.CombinedDestinationRef) { } } -func (c *Cache) deleteLocked(ref *pbresource.Reference, port string) { +func (c *DestinationsCache) deleteLocked(ref *pbresource.Reference, port string) { key := KeyFromRefAndPort(ref, port) // First get it from the store. @@ -117,7 +117,7 @@ func (c *Cache) deleteLocked(ref *pbresource.Reference, port string) { } // DeleteSourceProxy deletes the source proxy given by id from the cache. -func (c *Cache) DeleteSourceProxy(id *pbresource.ID) { +func (c *DestinationsCache) DeleteSourceProxy(id *pbresource.ID) { // Check that id is the ProxyStateTemplate type. if !resource.EqualType(types.ProxyStateTemplateType, id.Type) { panic("id must of type mesh.ProxyStateTemplate") @@ -148,7 +148,7 @@ func (c *Cache) DeleteSourceProxy(id *pbresource.ID) { } // ReadDestination returns a destination reference for the given service reference and port. -func (c *Cache) ReadDestination(ref *pbresource.Reference, port string) (intermediate.CombinedDestinationRef, bool) { +func (c *DestinationsCache) ReadDestination(ref *pbresource.Reference, port string) (intermediate.CombinedDestinationRef, bool) { // Check that reference is a catalog.Service type. if !resource.EqualType(catalog.ServiceType, ref.Type) { panic("ref must of type catalog.Service") @@ -164,7 +164,7 @@ func (c *Cache) ReadDestination(ref *pbresource.Reference, port string) (interme } // DestinationsBySourceProxy returns all destinations that are a referenced by the given source proxy id. -func (c *Cache) DestinationsBySourceProxy(id *pbresource.ID) []intermediate.CombinedDestinationRef { +func (c *DestinationsCache) DestinationsBySourceProxy(id *pbresource.ID) []intermediate.CombinedDestinationRef { // Check that id is the ProxyStateTemplate type. if !resource.EqualType(types.ProxyStateTemplateType, id.Type) { panic("id must of type mesh.ProxyStateTemplate") diff --git a/internal/mesh/internal/cache/sidecarproxycache/cache_test.go b/internal/mesh/internal/cache/sidecarproxycache/destinations_cache_test.go similarity index 97% rename from internal/mesh/internal/cache/sidecarproxycache/cache_test.go rename to internal/mesh/internal/cache/sidecarproxycache/destinations_cache_test.go index e2d634c98e22..8db061ee3ae1 100644 --- a/internal/mesh/internal/cache/sidecarproxycache/cache_test.go +++ b/internal/mesh/internal/cache/sidecarproxycache/destinations_cache_test.go @@ -13,7 +13,7 @@ import ( ) func TestWrite_Create(t *testing.T) { - cache := New() + cache := NewDestinationsCache() proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID() destination := testDestination(proxyID) @@ -34,7 +34,7 @@ func TestWrite_Create(t *testing.T) { } func TestWrite_Update(t *testing.T) { - cache := New() + cache := NewDestinationsCache() proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID() destination1 := testDestination(proxyID) @@ -91,7 +91,7 @@ func TestWrite_Update(t *testing.T) { } func TestWrite_Delete(t *testing.T) { - cache := New() + cache := NewDestinationsCache() proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID() destination1 := testDestination(proxyID) @@ -125,7 +125,7 @@ func TestWrite_Delete(t *testing.T) { } func TestDeleteSourceProxy(t *testing.T) { - cache := New() + cache := NewDestinationsCache() proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID() destination1 := testDestination(proxyID) @@ -160,7 +160,7 @@ func TestDeleteSourceProxy(t *testing.T) { } func TestDestinationsBySourceProxy(t *testing.T) { - cache := New() + cache := NewDestinationsCache() proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID() destination1 := testDestination(proxyID) diff --git a/internal/mesh/internal/cache/sidecarproxycache/proxy_configuration_cache.go b/internal/mesh/internal/cache/sidecarproxycache/proxy_configuration_cache.go new file mode 100644 index 000000000000..4e45486abb02 --- /dev/null +++ b/internal/mesh/internal/cache/sidecarproxycache/proxy_configuration_cache.go @@ -0,0 +1,41 @@ +package sidecarproxycache + +import ( + "github.com/hashicorp/consul/internal/mesh/internal/types" + "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/internal/resource/mappers/bimapper" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +// ProxyConfigurationCache tracks mappings between proxy configurations and proxy IDs +// that a configuration applies to. It is the responsibility of the controller to +// keep this cache up-to-date. +type ProxyConfigurationCache struct { + mapper *bimapper.Mapper +} + +func NewProxyConfigurationCache() *ProxyConfigurationCache { + return &ProxyConfigurationCache{ + mapper: bimapper.New(types.ProxyConfigurationType, types.ProxyStateTemplateType), + } +} + +// ProxyConfigurationsByProxyID returns proxy configuration IDs given the id of the proxy state template. +func (c *ProxyConfigurationCache) ProxyConfigurationsByProxyID(id *pbresource.ID) []*pbresource.ID { + return c.mapper.ItemIDsForLink(id) +} + +// TrackProxyConfiguration tracks given proxy configuration ID and the linked proxy state template IDs. +func (c *ProxyConfigurationCache) TrackProxyConfiguration(proxyCfgID *pbresource.ID, proxyIDs []resource.ReferenceOrID) { + c.mapper.TrackItem(proxyCfgID, proxyIDs) +} + +// UntrackProxyConfiguration removes tracking for the given proxy configuration ID. +func (c *ProxyConfigurationCache) UntrackProxyConfiguration(proxyCfgID *pbresource.ID) { + c.mapper.UntrackItem(proxyCfgID) +} + +// UntrackProxyID removes tracking for the given proxy state template ID. +func (c *ProxyConfigurationCache) UntrackProxyID(proxyID *pbresource.ID) { + c.mapper.UntrackLink(proxyID) +} diff --git a/internal/mesh/internal/cache/sidecarproxycache/proxy_configuration_cache_test.go b/internal/mesh/internal/cache/sidecarproxycache/proxy_configuration_cache_test.go new file mode 100644 index 000000000000..669d379f7801 --- /dev/null +++ b/internal/mesh/internal/cache/sidecarproxycache/proxy_configuration_cache_test.go @@ -0,0 +1,68 @@ +package sidecarproxycache + +import ( + "testing" + + "github.com/hashicorp/consul/internal/mesh/internal/types" + "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/internal/resource/resourcetest" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/proto/private/prototest" + "github.com/stretchr/testify/require" +) + +func TestProxyConfigurationCache(t *testing.T) { + cache := NewProxyConfigurationCache() + + // Create some proxy configurations. + proxyCfg1 := resourcetest.Resource(types.ProxyConfigurationType, "test-cfg-1").ID() + proxyCfg2 := resourcetest.Resource(types.ProxyConfigurationType, "test-cfg-2").ID() + proxyCfg3 := resourcetest.Resource(types.ProxyConfigurationType, "test-cfg-3").ID() + + // Create some proxy state templates. + p1 := resourcetest.Resource(types.ProxyStateTemplateType, "w-111").ID() + p2 := resourcetest.Resource(types.ProxyStateTemplateType, "w-222").ID() + p3 := resourcetest.Resource(types.ProxyStateTemplateType, "w-333").ID() + p4 := resourcetest.Resource(types.ProxyStateTemplateType, "w-444").ID() + p5 := resourcetest.Resource(types.ProxyStateTemplateType, "w-555").ID() + + // Track these and make sure there's some overlap. + cache.TrackProxyConfiguration(proxyCfg1, []resource.ReferenceOrID{p1, p2, p4}) + cache.TrackProxyConfiguration(proxyCfg2, []resource.ReferenceOrID{p3, p4, p5}) + cache.TrackProxyConfiguration(proxyCfg3, []resource.ReferenceOrID{p1, p3}) + + // Read proxy configurations by proxy. + requireProxyConfigurations(t, cache, p1, proxyCfg1, proxyCfg3) + requireProxyConfigurations(t, cache, p2, proxyCfg1) + requireProxyConfigurations(t, cache, p3, proxyCfg2, proxyCfg3) + requireProxyConfigurations(t, cache, p4, proxyCfg1, proxyCfg2) + requireProxyConfigurations(t, cache, p5, proxyCfg2) + + // Untrack some proxy IDs. + cache.UntrackProxyID(p1) + + requireProxyConfigurations(t, cache, p1) + + // Untrack some proxy IDs. + cache.UntrackProxyID(p3) + + requireProxyConfigurations(t, cache, p3) + + // Untrack proxy cfg. + cache.UntrackProxyConfiguration(proxyCfg1) + + requireProxyConfigurations(t, cache, p1) // no-op because we untracked it earlier + requireProxyConfigurations(t, cache, p2) + requireProxyConfigurations(t, cache, p3) // no-op because we untracked it earlier + requireProxyConfigurations(t, cache, p4, proxyCfg2) + requireProxyConfigurations(t, cache, p5, proxyCfg2) +} + +func requireProxyConfigurations(t *testing.T, cache *ProxyConfigurationCache, proxyID *pbresource.ID, proxyCfgs ...*pbresource.ID) { + t.Helper() + + actualProxyCfgs := cache.ProxyConfigurationsByProxyID(proxyID) + + require.Len(t, actualProxyCfgs, len(proxyCfgs)) + prototest.AssertElementsMatch(t, proxyCfgs, actualProxyCfgs) +} diff --git a/internal/mesh/internal/controllers/register.go b/internal/mesh/internal/controllers/register.go index 24a02722e87d..11cf20cbebda 100644 --- a/internal/mesh/internal/controllers/register.go +++ b/internal/mesh/internal/controllers/register.go @@ -12,10 +12,11 @@ import ( type Dependencies struct { TrustDomainFetcher sidecarproxy.TrustDomainFetcher + LocalDatacenter string } func Register(mgr *controller.Manager, deps Dependencies) { - c := sidecarproxycache.New() + c := sidecarproxycache.NewDestinationsCache() m := sidecarproxymapper.New(c) - mgr.Register(sidecarproxy.Controller(c, m, deps.TrustDomainFetcher)) + mgr.Register(sidecarproxy.Controller(c, m, deps.TrustDomainFetcher, deps.LocalDatacenter)) } diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/builder.go b/internal/mesh/internal/controllers/sidecarproxy/builder/builder.go index d675dbec7d6d..4c9016a3b020 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/builder/builder.go +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/builder.go @@ -10,13 +10,23 @@ import ( type Builder struct { id *pbresource.ID proxyStateTemplate *pbmesh.ProxyStateTemplate + proxyCfg *pbmesh.ProxyConfiguration trustDomain string + localDatacenter string } -func New(id *pbresource.ID, identity *pbresource.Reference, trustDomain string) *Builder { +func New(id *pbresource.ID, + identity *pbresource.Reference, + trustDomain string, + dc string, + proxyCfg *pbmesh.ProxyConfiguration, +) *Builder { + return &Builder{ - id: id, - trustDomain: trustDomain, + id: id, + trustDomain: trustDomain, + localDatacenter: dc, + proxyCfg: proxyCfg, proxyStateTemplate: &pbmesh.ProxyStateTemplate{ ProxyState: &pbmesh.ProxyState{ Identity: identity, diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/destination_builder.go b/internal/mesh/internal/controllers/sidecarproxy/builder/destination_builder.go index 0b46a48c4e61..c7344c54525b 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/builder/destination_builder.go +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/destination_builder.go @@ -2,17 +2,28 @@ package builder import ( "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/envoyextensions/xdscommon" "github.com/hashicorp/consul/internal/mesh/internal/types/intermediate" + "github.com/hashicorp/consul/internal/resource" pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate" "github.com/hashicorp/consul/proto-public/pbresource" + "google.golang.org/protobuf/types/known/wrapperspb" ) func (b *Builder) BuildDestinations(destinations []*intermediate.Destination) *Builder { + if b.proxyCfg.GetDynamicConfig() != nil && + b.proxyCfg.DynamicConfig.Mode == pbmesh.ProxyMode_PROXY_MODE_TRANSPARENT { + + b.addOutboundListener(b.proxyCfg.DynamicConfig.TransparentProxy.OutboundListenerPort) + } + for _, destination := range destinations { if destination.Explicit != nil { b.buildExplicitDestination(destination) + } else { + b.buildImplicitDestination(destination) } } @@ -45,6 +56,29 @@ func (b *Builder) buildExplicitDestination(destination *intermediate.Destination return b } +func (b *Builder) buildImplicitDestination(destination *intermediate.Destination) *Builder { + serviceRef := resource.Reference(destination.ServiceEndpoints.Resource.Owner, "") + clusterName := DestinationClusterName(serviceRef, b.localDatacenter, b.trustDomain) + statPrefix := DestinationStatPrefix(serviceRef, b.localDatacenter) + + // We assume that all endpoints have the same port protocol and name, and so it's sufficient + // to check ports just from the first endpoint. + if len(destination.ServiceEndpoints.Endpoints.Endpoints) > 0 { + // Find the destination proxy's port. + // Endpoints refs will need to route to mesh port instead of the destination port as that + // is the port of the destination's proxy. + meshPortName := findMeshPort(destination.ServiceEndpoints.Endpoints.Endpoints[0].Ports) + + for _, port := range destination.ServiceEndpoints.Endpoints.Endpoints[0].Ports { + b.addRouterWithIPMatch(clusterName, statPrefix, port.Protocol, destination.VirtualIPs). + addCluster(clusterName, destination.Identities). + addEndpointsRef(clusterName, destination.ServiceEndpoints.Resource.Id, meshPortName) + } + } + + return b +} + func (b *Builder) addOutboundDestinationListener(explicit *pbmesh.Upstream) *Builder { listener := &pbproxystate.Listener{ Direction: pbproxystate.Direction_DIRECTION_OUTBOUND, @@ -75,22 +109,55 @@ func (b *Builder) addOutboundDestinationListener(explicit *pbmesh.Upstream) *Bui return b.addListener(listener) } +func (b *Builder) addOutboundListener(port uint32) *Builder { + listener := &pbproxystate.Listener{ + Name: xdscommon.OutboundListenerName, + Direction: pbproxystate.Direction_DIRECTION_OUTBOUND, + BindAddress: &pbproxystate.Listener_HostPort{ + HostPort: &pbproxystate.HostPortAddress{ + Host: "127.0.0.1", + Port: port, + }, + }, + Capabilities: []pbproxystate.Capability{pbproxystate.Capability_CAPABILITY_TRANSPARENT}, + } + + return b.addListener(listener) +} + func (b *Builder) addRouter(clusterName, statPrefix string, protocol pbcatalog.Protocol) *Builder { + return b.addRouterWithIPMatch(clusterName, statPrefix, protocol, nil) +} + +func (b *Builder) addRouterWithIPMatch(clusterName, statPrefix string, protocol pbcatalog.Protocol, vips []string) *Builder { listener := b.getLastBuiltListener() // For explicit destinations, we have no filter chain match, and filters are based on port protocol. + router := &pbproxystate.Router{} switch protocol { case pbcatalog.Protocol_PROTOCOL_TCP: - router := &pbproxystate.Router{ - Destination: &pbproxystate.Router_L4{ - L4: &pbproxystate.L4Destination{ - Name: clusterName, - StatPrefix: statPrefix, - }, + router.Destination = &pbproxystate.Router_L4{ + L4: &pbproxystate.L4Destination{ + Name: clusterName, + StatPrefix: statPrefix, }, } + } + + if router.Destination != nil { + for _, vip := range vips { + if router.Match == nil { + router.Match = &pbproxystate.Match{} + } + + router.Match.PrefixRanges = append(router.Match.PrefixRanges, &pbproxystate.CidrRange{ + AddressPrefix: vip, + PrefixLen: &wrapperspb.UInt32Value{Value: 32}, + }) + } listener.Routers = append(listener.Routers, router) } + return b } @@ -100,7 +167,7 @@ func (b *Builder) addCluster(clusterName string, destinationIdentities []*pbreso spiffeIDs = append(spiffeIDs, connect.SpiffeIDFromIdentityRef(b.trustDomain, identity)) } - // Create destination cluster + // Create destination cluster. cluster := &pbproxystate.Cluster{ Group: &pbproxystate.Cluster_EndpointGroup{ EndpointGroup: &pbproxystate.EndpointGroup{ diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/destination_builder_test.go b/internal/mesh/internal/controllers/sidecarproxy/builder/destination_builder_test.go index cbe1a1a43797..33ea69a55a68 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/builder/destination_builder_test.go +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/destination_builder_test.go @@ -94,7 +94,95 @@ func TestBuildExplicitDestinations(t *testing.T) { } for name, c := range cases { - proxyTmpl := New(testProxyStateTemplateID(), testIdentityRef(), "foo.consul"). + proxyTmpl := New(testProxyStateTemplateID(), testIdentityRef(), "foo.consul", "dc1", nil). + BuildDestinations(c.destinations). + Build() + + actual := protoToJSON(t, proxyTmpl) + expected := goldenValue(t, name, actual, *update) + + require.Equal(t, expected, actual) + } +} + +func TestBuildImplicitDestinations(t *testing.T) { + api1Endpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "api-1"). + WithOwner(resourcetest.Resource(catalog.ServiceType, "api-1").ID()). + WithData(t, endpointsData).Build() + + api2Endpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "api-2"). + WithOwner(resourcetest.Resource(catalog.ServiceType, "api-2").ID()). + WithData(t, endpointsData).Build() + + api1Identity := &pbresource.Reference{ + Name: "api1-identity", + Tenancy: api1Endpoints.Id.Tenancy, + } + + api2Identity := &pbresource.Reference{ + Name: "api2-identity", + Tenancy: api2Endpoints.Id.Tenancy, + } + + proxyCfg := &pbmesh.ProxyConfiguration{ + DynamicConfig: &pbmesh.DynamicConfig{ + Mode: pbmesh.ProxyMode_PROXY_MODE_TRANSPARENT, + TransparentProxy: &pbmesh.TransparentProxy{ + OutboundListenerPort: 15001, + }, + }, + } + + destination1 := &intermediate.Destination{ + ServiceEndpoints: &intermediate.ServiceEndpoints{ + Resource: api1Endpoints, + Endpoints: endpointsData, + }, + Identities: []*pbresource.Reference{api1Identity}, + VirtualIPs: []string{"1.1.1.1"}, + } + + destination2 := &intermediate.Destination{ + ServiceEndpoints: &intermediate.ServiceEndpoints{ + Resource: api2Endpoints, + Endpoints: endpointsData, + }, + Identities: []*pbresource.Reference{api2Identity}, + VirtualIPs: []string{"2.2.2.2", "3.3.3.3"}, + } + + destination3 := &intermediate.Destination{ + Explicit: &pbmesh.Upstream{ + DestinationRef: resource.Reference(api1Endpoints.Id, ""), + DestinationPort: "tcp", + Datacenter: "dc1", + ListenAddr: &pbmesh.Upstream_IpPort{ + IpPort: &pbmesh.IPPortAddress{Ip: "1.1.1.1", Port: 1234}, + }, + }, + ServiceEndpoints: &intermediate.ServiceEndpoints{ + Resource: api1Endpoints, + Endpoints: endpointsData, + }, + Identities: []*pbresource.Reference{api1Identity}, + } + + cases := map[string]struct { + destinations []*intermediate.Destination + }{ + "l4-single-implicit-destination-tproxy": { + destinations: []*intermediate.Destination{destination1}, + }, + "l4-multiple-implicit-destinations-tproxy": { + destinations: []*intermediate.Destination{destination1, destination2}, + }, + "l4-implicit-and-explicit-destinations-tproxy": { + destinations: []*intermediate.Destination{destination2, destination3}, + }, + } + + for name, c := range cases { + proxyTmpl := New(testProxyStateTemplateID(), testIdentityRef(), "foo.consul", "dc1", proxyCfg). BuildDestinations(c.destinations). Build() diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/local_app_test.go b/internal/mesh/internal/controllers/sidecarproxy/builder/local_app_test.go index cbd76954ffaa..1a070f067784 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/builder/local_app_test.go +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/local_app_test.go @@ -65,7 +65,8 @@ func TestBuildLocalApp(t *testing.T) { for name, c := range cases { t.Run(name, func(t *testing.T) { - proxyTmpl := New(testProxyStateTemplateID(), testIdentityRef(), "foo.consul").BuildLocalApp(c.workload). + proxyTmpl := New(testProxyStateTemplateID(), testIdentityRef(), "foo.consul", "dc1", nil). + BuildLocalApp(c.workload). Build() actual := protoToJSON(t, proxyTmpl) expected := goldenValue(t, name, actual, *update) diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-implicit-and-explicit-destinations-tproxy.golden b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-implicit-and-explicit-destinations-tproxy.golden new file mode 100644 index 000000000000..e8f9e1680a78 --- /dev/null +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-implicit-and-explicit-destinations-tproxy.golden @@ -0,0 +1,137 @@ +{ + "proxyState": { + "identity": { + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" + }, + "name": "test-identity" + }, + "listeners": [ + { + "name": "outbound_listener", + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "127.0.0.1", + "port": 15001 + }, + "routers": [ + { + "match": { + "prefixRanges": [ + { + "addressPrefix": "2.2.2.2", + "prefixLen": 32 + }, + { + "addressPrefix": "3.3.3.3", + "prefixLen": 32 + } + ] + }, + "l4": { + "name": "api-2.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-2.default.default.dc1" + } + } + ], + "capabilities": [ + "CAPABILITY_TRANSPARENT" + ] + }, + { + "name": "api-1:tcp:1.1.1.1:1234", + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "1.1.1.1", + "port": 1234 + }, + "routers": [ + { + "l4": { + "name": "api-1.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-1.default.default.dc1" + } + } + ] + } + ], + "clusters": { + "api-1.default.dc1.internal.foo.consul": { + "endpointGroup": { + "dynamic": { + "config": { + "disablePanicThreshold": true + }, + "outboundTls": { + "outboundMesh": { + "identityKey": "test-identity", + "validationContext": { + "spiffeIds": [ + "spiffe://foo.consul/ap/default/ns/default/identity/api1-identity" + ] + }, + "sni": "api-1.default.dc1.internal.foo.consul" + } + } + } + } + }, + "api-2.default.dc1.internal.foo.consul": { + "endpointGroup": { + "dynamic": { + "config": { + "disablePanicThreshold": true + }, + "outboundTls": { + "outboundMesh": { + "identityKey": "test-identity", + "validationContext": { + "spiffeIds": [ + "spiffe://foo.consul/ap/default/ns/default/identity/api2-identity" + ] + }, + "sni": "api-2.default.dc1.internal.foo.consul" + } + } + } + } + } + } + }, + "requiredEndpoints": { + "api-1.default.dc1.internal.foo.consul": { + "id": { + "name": "api-1", + "type": { + "group": "catalog", + "groupVersion": "v1alpha1", + "kind": "ServiceEndpoints" + }, + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" + } + }, + "port": "mesh" + }, + "api-2.default.dc1.internal.foo.consul": { + "id": { + "name": "api-2", + "type": { + "group": "catalog", + "groupVersion": "v1alpha1", + "kind": "ServiceEndpoints" + }, + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" + } + }, + "port": "mesh" + } + } +} \ No newline at end of file diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-multi-destination.golden b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-multi-destination.golden index 36310c806f0c..7298777d8700 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-multi-destination.golden +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-multi-destination.golden @@ -1,83 +1,83 @@ { - "proxyState": { - "identity": { - "tenancy": { - "partition": "default", - "namespace": "default", - "peerName": "local" + "proxyState": { + "identity": { + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" }, - "name": "test-identity" + "name": "test-identity" }, - "listeners": [ + "listeners": [ { - "name": "api-1:tcp:1.1.1.1:1234", - "direction": "DIRECTION_OUTBOUND", - "hostPort": { - "host": "1.1.1.1", - "port": 1234 + "name": "api-1:tcp:1.1.1.1:1234", + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "1.1.1.1", + "port": 1234 }, - "routers": [ + "routers": [ { - "l4": { - "name": "api-1.default.dc1.internal.foo.consul", - "statPrefix": "upstream.api-1.default.default.dc1" + "l4": { + "name": "api-1.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-1.default.default.dc1" } } ] }, { - "name": "api-2:tcp:/path/to/socket", - "direction": "DIRECTION_OUTBOUND", - "unixSocket": { - "path": "/path/to/socket", - "mode": "0666" + "name": "api-2:tcp:/path/to/socket", + "direction": "DIRECTION_OUTBOUND", + "unixSocket": { + "path": "/path/to/socket", + "mode": "0666" }, - "routers": [ + "routers": [ { - "l4": { - "name": "api-2.default.dc1.internal.foo.consul", - "statPrefix": "upstream.api-2.default.default.dc1" + "l4": { + "name": "api-2.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-2.default.default.dc1" } } ] } ], - "clusters": { - "api-1.default.dc1.internal.foo.consul": { - "endpointGroup": { - "dynamic": { - "config": { - "disablePanicThreshold": true + "clusters": { + "api-1.default.dc1.internal.foo.consul": { + "endpointGroup": { + "dynamic": { + "config": { + "disablePanicThreshold": true }, - "outboundTls": { - "outboundMesh": { - "identityKey": "test-identity", - "validationContext": { - "spiffeIds": [ + "outboundTls": { + "outboundMesh": { + "identityKey": "test-identity", + "validationContext": { + "spiffeIds": [ "spiffe://foo.consul/ap/default/ns/default/identity/api1-identity" ] }, - "sni": "api-1.default.dc1.internal.foo.consul" + "sni": "api-1.default.dc1.internal.foo.consul" } } } } }, - "api-2.default.dc1.internal.foo.consul": { - "endpointGroup": { - "dynamic": { - "config": { - "disablePanicThreshold": true + "api-2.default.dc1.internal.foo.consul": { + "endpointGroup": { + "dynamic": { + "config": { + "disablePanicThreshold": true }, - "outboundTls": { - "outboundMesh": { - "identityKey": "test-identity", - "validationContext": { - "spiffeIds": [ + "outboundTls": { + "outboundMesh": { + "identityKey": "test-identity", + "validationContext": { + "spiffeIds": [ "spiffe://foo.consul/ap/default/ns/default/identity/api2-identity" ] }, - "sni": "api-2.default.dc1.internal.foo.consul" + "sni": "api-2.default.dc1.internal.foo.consul" } } } @@ -85,38 +85,38 @@ } } }, - "requiredEndpoints": { - "api-1.default.dc1.internal.foo.consul": { - "id": { - "name": "api-1", - "type": { - "group": "catalog", - "groupVersion": "v1alpha1", - "kind": "ServiceEndpoints" + "requiredEndpoints": { + "api-1.default.dc1.internal.foo.consul": { + "id": { + "name": "api-1", + "type": { + "group": "catalog", + "groupVersion": "v1alpha1", + "kind": "ServiceEndpoints" }, - "tenancy": { - "partition": "default", - "namespace": "default", - "peerName": "local" + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" } }, - "port": "mesh" + "port": "mesh" }, - "api-2.default.dc1.internal.foo.consul": { - "id": { - "name": "api-2", - "type": { - "group": "catalog", - "groupVersion": "v1alpha1", - "kind": "ServiceEndpoints" + "api-2.default.dc1.internal.foo.consul": { + "id": { + "name": "api-2", + "type": { + "group": "catalog", + "groupVersion": "v1alpha1", + "kind": "ServiceEndpoints" }, - "tenancy": { - "partition": "default", - "namespace": "default", - "peerName": "local" + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" } }, - "port": "mesh" + "port": "mesh" } } } \ No newline at end of file diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-multiple-implicit-destinations-tproxy.golden b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-multiple-implicit-destinations-tproxy.golden new file mode 100644 index 000000000000..f8ea8b0347c3 --- /dev/null +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-multiple-implicit-destinations-tproxy.golden @@ -0,0 +1,135 @@ +{ + "proxyState": { + "identity": { + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" + }, + "name": "test-identity" + }, + "listeners": [ + { + "name": "outbound_listener", + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "127.0.0.1", + "port": 15001 + }, + "routers": [ + { + "match": { + "prefixRanges": [ + { + "addressPrefix": "1.1.1.1", + "prefixLen": 32 + } + ] + }, + "l4": { + "name": "api-1.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-1.default.default.dc1" + } + }, + { + "match": { + "prefixRanges": [ + { + "addressPrefix": "2.2.2.2", + "prefixLen": 32 + }, + { + "addressPrefix": "3.3.3.3", + "prefixLen": 32 + } + ] + }, + "l4": { + "name": "api-2.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-2.default.default.dc1" + } + } + ], + "capabilities": [ + "CAPABILITY_TRANSPARENT" + ] + } + ], + "clusters": { + "api-1.default.dc1.internal.foo.consul": { + "endpointGroup": { + "dynamic": { + "config": { + "disablePanicThreshold": true + }, + "outboundTls": { + "outboundMesh": { + "identityKey": "test-identity", + "validationContext": { + "spiffeIds": [ + "spiffe://foo.consul/ap/default/ns/default/identity/api1-identity" + ] + }, + "sni": "api-1.default.dc1.internal.foo.consul" + } + } + } + } + }, + "api-2.default.dc1.internal.foo.consul": { + "endpointGroup": { + "dynamic": { + "config": { + "disablePanicThreshold": true + }, + "outboundTls": { + "outboundMesh": { + "identityKey": "test-identity", + "validationContext": { + "spiffeIds": [ + "spiffe://foo.consul/ap/default/ns/default/identity/api2-identity" + ] + }, + "sni": "api-2.default.dc1.internal.foo.consul" + } + } + } + } + } + } + }, + "requiredEndpoints": { + "api-1.default.dc1.internal.foo.consul": { + "id": { + "name": "api-1", + "type": { + "group": "catalog", + "groupVersion": "v1alpha1", + "kind": "ServiceEndpoints" + }, + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" + } + }, + "port": "mesh" + }, + "api-2.default.dc1.internal.foo.consul": { + "id": { + "name": "api-2", + "type": { + "group": "catalog", + "groupVersion": "v1alpha1", + "kind": "ServiceEndpoints" + }, + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" + } + }, + "port": "mesh" + } + } +} \ No newline at end of file diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-multiple-workload-addresses-with-specific-ports.golden b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-multiple-workload-addresses-with-specific-ports.golden index 834672743cc9..ff3d8ef0c072 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-multiple-workload-addresses-with-specific-ports.golden +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-multiple-workload-addresses-with-specific-ports.golden @@ -1,32 +1,32 @@ { - "proxyState": { - "identity": { - "tenancy": { - "partition": "default", - "namespace": "default", - "peerName": "local" + "proxyState": { + "identity": { + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" }, - "name": "test-identity" + "name": "test-identity" }, - "listeners": [ + "listeners": [ { - "name": "public_listener", - "direction": "DIRECTION_INBOUND", - "hostPort": { - "host": "10.0.0.2", - "port": 20000 + "name": "public_listener", + "direction": "DIRECTION_INBOUND", + "hostPort": { + "host": "10.0.0.2", + "port": 20000 }, - "routers": [ + "routers": [ { - "l4": { - "name": "local_app:port1", - "statPrefix": "public_listener" + "l4": { + "name": "local_app:port1", + "statPrefix": "public_listener" }, - "inboundTls": { - "inboundMesh": { - "identityKey": "test-identity", - "validationContext": { - "trustBundlePeerNameKeys": [ + "inboundTls": { + "inboundMesh": { + "identityKey": "test-identity", + "validationContext": { + "trustBundlePeerNameKeys": [ "local" ] } @@ -36,36 +36,36 @@ ] } ], - "clusters": { - "local_app:port1": { - "endpointGroup": { - "static": {} + "clusters": { + "local_app:port1": { + "endpointGroup": { + "static": {} } } }, - "endpoints": { - "local_app:port1": { - "endpoints": [ + "endpoints": { + "local_app:port1": { + "endpoints": [ { - "hostPort": { - "host": "127.0.0.1", - "port": 8080 + "hostPort": { + "host": "127.0.0.1", + "port": 8080 } } ] } } }, - "requiredLeafCertificates": { - "test-identity": { - "name": "test-identity", - "namespace": "default", - "partition": "default" + "requiredLeafCertificates": { + "test-identity": { + "name": "test-identity", + "namespace": "default", + "partition": "default" } }, - "requiredTrustBundles": { - "local": { - "peer": "local" + "requiredTrustBundles": { + "local": { + "peer": "local" } } } \ No newline at end of file diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-multiple-workload-addresses-without-ports.golden b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-multiple-workload-addresses-without-ports.golden index 39c532d7376f..9c22e94d5974 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-multiple-workload-addresses-without-ports.golden +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-multiple-workload-addresses-without-ports.golden @@ -1,32 +1,32 @@ { - "proxyState": { - "identity": { - "tenancy": { - "partition": "default", - "namespace": "default", - "peerName": "local" + "proxyState": { + "identity": { + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" }, - "name": "test-identity" + "name": "test-identity" }, - "listeners": [ + "listeners": [ { - "name": "public_listener", - "direction": "DIRECTION_INBOUND", - "hostPort": { - "host": "10.0.0.1", - "port": 20000 + "name": "public_listener", + "direction": "DIRECTION_INBOUND", + "hostPort": { + "host": "10.0.0.1", + "port": 20000 }, - "routers": [ + "routers": [ { - "l4": { - "name": "local_app:port1", - "statPrefix": "public_listener" + "l4": { + "name": "local_app:port1", + "statPrefix": "public_listener" }, - "inboundTls": { - "inboundMesh": { - "identityKey": "test-identity", - "validationContext": { - "trustBundlePeerNameKeys": [ + "inboundTls": { + "inboundMesh": { + "identityKey": "test-identity", + "validationContext": { + "trustBundlePeerNameKeys": [ "local" ] } @@ -36,36 +36,36 @@ ] } ], - "clusters": { - "local_app:port1": { - "endpointGroup": { - "static": {} + "clusters": { + "local_app:port1": { + "endpointGroup": { + "static": {} } } }, - "endpoints": { - "local_app:port1": { - "endpoints": [ + "endpoints": { + "local_app:port1": { + "endpoints": [ { - "hostPort": { - "host": "127.0.0.1", - "port": 8080 + "hostPort": { + "host": "127.0.0.1", + "port": 8080 } } ] } } }, - "requiredLeafCertificates": { - "test-identity": { - "name": "test-identity", - "namespace": "default", - "partition": "default" + "requiredLeafCertificates": { + "test-identity": { + "name": "test-identity", + "namespace": "default", + "partition": "default" } }, - "requiredTrustBundles": { - "local": { - "peer": "local" + "requiredTrustBundles": { + "local": { + "peer": "local" } } } \ No newline at end of file diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-destination-ip-port-bind-address.golden b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-destination-ip-port-bind-address.golden index e1865f7fe2ce..b0e113ec93ac 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-destination-ip-port-bind-address.golden +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-destination-ip-port-bind-address.golden @@ -1,47 +1,47 @@ { - "proxyState": { - "identity": { - "tenancy": { - "partition": "default", - "namespace": "default", - "peerName": "local" + "proxyState": { + "identity": { + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" }, - "name": "test-identity" + "name": "test-identity" }, - "listeners": [ + "listeners": [ { - "name": "api-1:tcp:1.1.1.1:1234", - "direction": "DIRECTION_OUTBOUND", - "hostPort": { - "host": "1.1.1.1", - "port": 1234 + "name": "api-1:tcp:1.1.1.1:1234", + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "1.1.1.1", + "port": 1234 }, - "routers": [ + "routers": [ { - "l4": { - "name": "api-1.default.dc1.internal.foo.consul", - "statPrefix": "upstream.api-1.default.default.dc1" + "l4": { + "name": "api-1.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-1.default.default.dc1" } } ] } ], - "clusters": { - "api-1.default.dc1.internal.foo.consul": { - "endpointGroup": { - "dynamic": { - "config": { - "disablePanicThreshold": true + "clusters": { + "api-1.default.dc1.internal.foo.consul": { + "endpointGroup": { + "dynamic": { + "config": { + "disablePanicThreshold": true }, - "outboundTls": { - "outboundMesh": { - "identityKey": "test-identity", - "validationContext": { - "spiffeIds": [ + "outboundTls": { + "outboundMesh": { + "identityKey": "test-identity", + "validationContext": { + "spiffeIds": [ "spiffe://foo.consul/ap/default/ns/default/identity/api1-identity" ] }, - "sni": "api-1.default.dc1.internal.foo.consul" + "sni": "api-1.default.dc1.internal.foo.consul" } } } @@ -49,22 +49,22 @@ } } }, - "requiredEndpoints": { - "api-1.default.dc1.internal.foo.consul": { - "id": { - "name": "api-1", - "type": { - "group": "catalog", - "groupVersion": "v1alpha1", - "kind": "ServiceEndpoints" + "requiredEndpoints": { + "api-1.default.dc1.internal.foo.consul": { + "id": { + "name": "api-1", + "type": { + "group": "catalog", + "groupVersion": "v1alpha1", + "kind": "ServiceEndpoints" }, - "tenancy": { - "partition": "default", - "namespace": "default", - "peerName": "local" + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" } }, - "port": "mesh" + "port": "mesh" } } } \ No newline at end of file diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-destination-unix-socket-bind-address.golden b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-destination-unix-socket-bind-address.golden index a67ba1c22cc0..aa21472ad578 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-destination-unix-socket-bind-address.golden +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-destination-unix-socket-bind-address.golden @@ -1,47 +1,47 @@ { - "proxyState": { - "identity": { - "tenancy": { - "partition": "default", - "namespace": "default", - "peerName": "local" + "proxyState": { + "identity": { + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" }, - "name": "test-identity" + "name": "test-identity" }, - "listeners": [ + "listeners": [ { - "name": "api-2:tcp:/path/to/socket", - "direction": "DIRECTION_OUTBOUND", - "unixSocket": { - "path": "/path/to/socket", - "mode": "0666" + "name": "api-2:tcp:/path/to/socket", + "direction": "DIRECTION_OUTBOUND", + "unixSocket": { + "path": "/path/to/socket", + "mode": "0666" }, - "routers": [ + "routers": [ { - "l4": { - "name": "api-2.default.dc1.internal.foo.consul", - "statPrefix": "upstream.api-2.default.default.dc1" + "l4": { + "name": "api-2.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-2.default.default.dc1" } } ] } ], - "clusters": { - "api-2.default.dc1.internal.foo.consul": { - "endpointGroup": { - "dynamic": { - "config": { - "disablePanicThreshold": true + "clusters": { + "api-2.default.dc1.internal.foo.consul": { + "endpointGroup": { + "dynamic": { + "config": { + "disablePanicThreshold": true }, - "outboundTls": { - "outboundMesh": { - "identityKey": "test-identity", - "validationContext": { - "spiffeIds": [ + "outboundTls": { + "outboundMesh": { + "identityKey": "test-identity", + "validationContext": { + "spiffeIds": [ "spiffe://foo.consul/ap/default/ns/default/identity/api2-identity" ] }, - "sni": "api-2.default.dc1.internal.foo.consul" + "sni": "api-2.default.dc1.internal.foo.consul" } } } @@ -49,22 +49,22 @@ } } }, - "requiredEndpoints": { - "api-2.default.dc1.internal.foo.consul": { - "id": { - "name": "api-2", - "type": { - "group": "catalog", - "groupVersion": "v1alpha1", - "kind": "ServiceEndpoints" + "requiredEndpoints": { + "api-2.default.dc1.internal.foo.consul": { + "id": { + "name": "api-2", + "type": { + "group": "catalog", + "groupVersion": "v1alpha1", + "kind": "ServiceEndpoints" }, - "tenancy": { - "partition": "default", - "namespace": "default", - "peerName": "local" + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" } }, - "port": "mesh" + "port": "mesh" } } } \ No newline at end of file diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-implicit-destination-tproxy.golden b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-implicit-destination-tproxy.golden new file mode 100644 index 000000000000..db1dd088ddd6 --- /dev/null +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-implicit-destination-tproxy.golden @@ -0,0 +1,81 @@ +{ + "proxyState": { + "identity": { + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" + }, + "name": "test-identity" + }, + "listeners": [ + { + "name": "outbound_listener", + "direction": "DIRECTION_OUTBOUND", + "hostPort": { + "host": "127.0.0.1", + "port": 15001 + }, + "routers": [ + { + "match": { + "prefixRanges": [ + { + "addressPrefix": "1.1.1.1", + "prefixLen": 32 + } + ] + }, + "l4": { + "name": "api-1.default.dc1.internal.foo.consul", + "statPrefix": "upstream.api-1.default.default.dc1" + } + } + ], + "capabilities": [ + "CAPABILITY_TRANSPARENT" + ] + } + ], + "clusters": { + "api-1.default.dc1.internal.foo.consul": { + "endpointGroup": { + "dynamic": { + "config": { + "disablePanicThreshold": true + }, + "outboundTls": { + "outboundMesh": { + "identityKey": "test-identity", + "validationContext": { + "spiffeIds": [ + "spiffe://foo.consul/ap/default/ns/default/identity/api1-identity" + ] + }, + "sni": "api-1.default.dc1.internal.foo.consul" + } + } + } + } + } + } + }, + "requiredEndpoints": { + "api-1.default.dc1.internal.foo.consul": { + "id": { + "name": "api-1", + "type": { + "group": "catalog", + "groupVersion": "v1alpha1", + "kind": "ServiceEndpoints" + }, + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" + } + }, + "port": "mesh" + } + } +} \ No newline at end of file diff --git a/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-workload-address-without-ports.golden b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-workload-address-without-ports.golden index 39c532d7376f..9c22e94d5974 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-workload-address-without-ports.golden +++ b/internal/mesh/internal/controllers/sidecarproxy/builder/testdata/l4-single-workload-address-without-ports.golden @@ -1,32 +1,32 @@ { - "proxyState": { - "identity": { - "tenancy": { - "partition": "default", - "namespace": "default", - "peerName": "local" + "proxyState": { + "identity": { + "tenancy": { + "partition": "default", + "namespace": "default", + "peerName": "local" }, - "name": "test-identity" + "name": "test-identity" }, - "listeners": [ + "listeners": [ { - "name": "public_listener", - "direction": "DIRECTION_INBOUND", - "hostPort": { - "host": "10.0.0.1", - "port": 20000 + "name": "public_listener", + "direction": "DIRECTION_INBOUND", + "hostPort": { + "host": "10.0.0.1", + "port": 20000 }, - "routers": [ + "routers": [ { - "l4": { - "name": "local_app:port1", - "statPrefix": "public_listener" + "l4": { + "name": "local_app:port1", + "statPrefix": "public_listener" }, - "inboundTls": { - "inboundMesh": { - "identityKey": "test-identity", - "validationContext": { - "trustBundlePeerNameKeys": [ + "inboundTls": { + "inboundMesh": { + "identityKey": "test-identity", + "validationContext": { + "trustBundlePeerNameKeys": [ "local" ] } @@ -36,36 +36,36 @@ ] } ], - "clusters": { - "local_app:port1": { - "endpointGroup": { - "static": {} + "clusters": { + "local_app:port1": { + "endpointGroup": { + "static": {} } } }, - "endpoints": { - "local_app:port1": { - "endpoints": [ + "endpoints": { + "local_app:port1": { + "endpoints": [ { - "hostPort": { - "host": "127.0.0.1", - "port": 8080 + "hostPort": { + "host": "127.0.0.1", + "port": 8080 } } ] } } }, - "requiredLeafCertificates": { - "test-identity": { - "name": "test-identity", - "namespace": "default", - "partition": "default" + "requiredLeafCertificates": { + "test-identity": { + "name": "test-identity", + "namespace": "default", + "partition": "default" } }, - "requiredTrustBundles": { - "local": { - "peer": "local" + "requiredTrustBundles": { + "local": { + "peer": "local" } } } \ No newline at end of file diff --git a/internal/mesh/internal/controllers/sidecarproxy/controller.go b/internal/mesh/internal/controllers/sidecarproxy/controller.go index 51d7f848dacc..61a5a0a5b6df 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/controller.go +++ b/internal/mesh/internal/controllers/sidecarproxy/controller.go @@ -15,6 +15,7 @@ import ( "github.com/hashicorp/consul/internal/mesh/internal/types" "github.com/hashicorp/consul/internal/mesh/internal/types/intermediate" "github.com/hashicorp/consul/internal/resource" + pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" "github.com/hashicorp/consul/proto-public/pbresource" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" @@ -25,20 +26,33 @@ const ControllerName = "consul.io/sidecar-proxy-controller" type TrustDomainFetcher func() (string, error) -func Controller(cache *sidecarproxycache.Cache, mapper *sidecarproxymapper.Mapper, trustDomainFetcher TrustDomainFetcher) controller.Controller { - if cache == nil || mapper == nil || trustDomainFetcher == nil { - panic("cache, mapper and trust domain fetcher are required") +func Controller(destinationsCache *sidecarproxycache.DestinationsCache, + proxyCfgCache *sidecarproxycache.ProxyConfigurationCache, + mapper *sidecarproxymapper.Mapper, + trustDomainFetcher TrustDomainFetcher, + dc string) controller.Controller { + + if destinationsCache == nil || proxyCfgCache == nil || mapper == nil || trustDomainFetcher == nil { + panic("destinations cache, proxy configuration cache, mapper and trust domain fetcher are required") } return controller.ForType(types.ProxyStateTemplateType). WithWatch(catalog.ServiceEndpointsType, mapper.MapServiceEndpointsToProxyStateTemplate). WithWatch(types.UpstreamsType, mapper.MapDestinationsToProxyStateTemplate). - WithReconciler(&reconciler{cache: cache, getTrustDomain: trustDomainFetcher}) + WithWatch(types.ProxyConfigurationType, mapper.MapProxyConfigurationToProxyStateTemplate). + WithReconciler(&reconciler{ + destinationsCache: destinationsCache, + proxyCfgCache: proxyCfgCache, + getTrustDomain: trustDomainFetcher, + dc: dc, + }) } type reconciler struct { - cache *sidecarproxycache.Cache - getTrustDomain TrustDomainFetcher + destinationsCache *sidecarproxycache.DestinationsCache + proxyCfgCache *sidecarproxycache.ProxyConfigurationCache + getTrustDomain TrustDomainFetcher + dc string } func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error { @@ -47,7 +61,7 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c rt.Logger.Trace("reconciling proxy state template", "id", req.ID) // Instantiate a data fetcher to fetch all reconciliation data. - dataFetcher := fetcher.Fetcher{Client: rt.Client, Cache: r.cache} + dataFetcher := fetcher.New(rt.Client, r.destinationsCache, r.proxyCfgCache) // Check if the workload exists. workloadID := resource.ReplaceType(catalog.WorkloadType, req.ID) @@ -86,8 +100,8 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c return err } - // Remove it from cache. - r.cache.DeleteSourceProxy(req.ID) + // Remove it from destinationsCache. + r.destinationsCache.DeleteSourceProxy(req.ID) } rt.Logger.Trace("skipping proxy state template generation because workload is not on the mesh", "workload", workload.Resource.Id) return nil @@ -100,22 +114,41 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c return err } - b := builder.New(req.ID, workloadIdentityRefFromWorkload(workload), trustDomain). + // Fetch proxy configuration. + proxyCfg, err := dataFetcher.FetchAndMergeProxyConfigurations(ctx, req.ID) + if err != nil { + rt.Logger.Error("error fetching proxy and merging proxy configurations", "error", err) + return err + } + b := builder.New(req.ID, identityRefFromWorkload(workload), trustDomain, r.dc, proxyCfg). BuildLocalApp(workload.Workload) // Get all destinationsData. - destinationsRefs := r.cache.DestinationsBySourceProxy(req.ID) - destinationsData, statuses, err := dataFetcher.FetchDestinationsData(ctx, destinationsRefs) + destinationsRefs := r.destinationsCache.DestinationsBySourceProxy(req.ID) + destinationsData, statuses, err := dataFetcher.FetchExplicitDestinationsData(ctx, destinationsRefs) if err != nil { - rt.Logger.Error("error fetching destinations for this proxy", "id", req.ID, "error", err) + rt.Logger.Error("error fetching explicit destinations for this proxy", "error", err) return err } + if proxyCfg.GetDynamicConfig() != nil && + proxyCfg.DynamicConfig.Mode == pbmesh.ProxyMode_PROXY_MODE_TRANSPARENT { + + destinationsData, err = dataFetcher.FetchImplicitDestinationsData(ctx, req.ID, destinationsData) + if err != nil { + rt.Logger.Error("error fetching implicit destinations for this proxy", "error", err) + return err + } + } + b.BuildDestinations(destinationsData) newProxyTemplate := b.Build() if proxyStateTemplate == nil || !proto.Equal(proxyStateTemplate.Tmpl, newProxyTemplate) { + if proxyStateTemplate == nil { + req.ID.Uid = "" + } proxyTemplateData, err := anypb.New(newProxyTemplate) if err != nil { rt.Logger.Error("error creating proxy state template data", "error", err) @@ -134,7 +167,7 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c return err } } else { - rt.Logger.Trace("proxy state template data has not changed, skipping update", "id", req.ID) + rt.Logger.Trace("proxy state template data has not changed, skipping update") } // Update any statuses. @@ -160,7 +193,7 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c return nil } -func workloadIdentityRefFromWorkload(w *intermediate.Workload) *pbresource.Reference { +func identityRefFromWorkload(w *intermediate.Workload) *pbresource.Reference { return &pbresource.Reference{ Name: w.Workload.Identity, Tenancy: w.Resource.Id.Tenancy, diff --git a/internal/mesh/internal/controllers/sidecarproxy/controller_test.go b/internal/mesh/internal/controllers/sidecarproxy/controller_test.go index fbebd244c87f..106b2af06063 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/controller_test.go +++ b/internal/mesh/internal/controllers/sidecarproxy/controller_test.go @@ -5,9 +5,11 @@ package sidecarproxy import ( "context" + "strings" "testing" svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing" + "github.com/hashicorp/consul/envoyextensions/xdscommon" "github.com/hashicorp/consul/internal/catalog" "github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/mesh/internal/cache/sidecarproxycache" @@ -53,7 +55,8 @@ func (suite *meshControllerTestSuite) SetupTest() { suite.ctx = testutil.TestContext(suite.T()) suite.ctl = &reconciler{ - cache: sidecarproxycache.New(), + destinationsCache: sidecarproxycache.NewDestinationsCache(), + proxyCfgCache: sidecarproxycache.NewProxyConfigurationCache(), getTrustDomain: func() (string, error) { return "test.consul", nil }, @@ -78,7 +81,8 @@ func (suite *meshControllerTestSuite) SetupTest() { suite.apiService = resourcetest.Resource(catalog.ServiceType, "api-service"). WithData(suite.T(), &pbcatalog.Service{ - Workloads: &pbcatalog.WorkloadSelector{Names: []string{"api-abc"}}, + Workloads: &pbcatalog.WorkloadSelector{Names: []string{"api-abc"}}, + VirtualIps: []string{"1.1.1.1"}, Ports: []*pbcatalog.ServicePort{ {TargetPort: "tcp", Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, }}). @@ -140,7 +144,7 @@ func (suite *meshControllerTestSuite) SetupTest() { Tenancy: suite.apiWorkloadID.Tenancy, } - suite.proxyStateTemplate = builder.New(suite.apiWorkloadID, identityRef, "test.consul"). + suite.proxyStateTemplate = builder.New(suite.apiWorkloadID, identityRef, "test.consul", "dc1", nil). BuildLocalApp(suite.apiWorkload). Build() } @@ -197,7 +201,7 @@ func (suite *meshControllerTestSuite) TestReconcile_NoExistingProxyStateTemplate func (suite *meshControllerTestSuite) TestReconcile_ExistingProxyStateTemplate_WithUpdates() { // This test ensures that we write a new proxy state template when there are changes. - // Write the original. + // AddDestination the original. resourcetest.Resource(types.ProxyStateTemplateType, "api-abc"). WithData(suite.T(), suite.proxyStateTemplate). WithOwner(suite.apiWorkloadID). @@ -232,7 +236,7 @@ func (suite *meshControllerTestSuite) TestReconcile_ExistingProxyStateTemplate_W func (suite *meshControllerTestSuite) TestReconcile_ExistingProxyStateTemplate_NoUpdates() { // This test ensures that we skip writing of the proxy state template when there are no changes to it. - // Write the original. + // AddDestination the original. originalProxyState := resourcetest.Resource(types.ProxyStateTemplateType, "api-abc"). WithData(suite.T(), suite.proxyStateTemplate). WithOwner(suite.apiWorkloadID). @@ -255,17 +259,19 @@ func (suite *meshControllerTestSuite) TestReconcile_ExistingProxyStateTemplate_N func (suite *meshControllerTestSuite) TestController() { // This is a comprehensive test that checks the overall controller behavior as various resources change state. - // This should test interactions between the reconciler, the mappers, and the cache to ensure they work + // This should test interactions between the reconciler, the mappers, and the destinationsCache to ensure they work // together and produce expected result. // Run the controller manager mgr := controller.NewManager(suite.client, suite.runtime.Logger) - c := sidecarproxycache.New() - m := sidecarproxymapper.New(c) - mgr.Register(Controller(c, m, func() (string, error) { - return "test.consul", nil - })) + // Initialize controller dependencies. + destinationsCache := sidecarproxycache.NewDestinationsCache() + proxyCfgCache := sidecarproxycache.NewProxyConfigurationCache() + m := sidecarproxymapper.New(destinationsCache, proxyCfgCache) + trustDomainFetcher := func() (string, error) { return "test.consul", nil } + + mgr.Register(Controller(destinationsCache, proxyCfgCache, m, trustDomainFetcher, "dc1")) mgr.SetRaftLeader(true) go mgr.Run(suite.ctx) @@ -273,96 +279,239 @@ func (suite *meshControllerTestSuite) TestController() { apiProxyStateTemplateID := resourcetest.Resource(types.ProxyStateTemplateType, "api-abc").ID() webProxyStateTemplateID := resourcetest.Resource(types.ProxyStateTemplateType, "web-def").ID() - // Check that proxy state template resource is generated for both the api and web workloads. - var webProxyStateTemplate *pbresource.Resource - retry.Run(suite.T(), func(r *retry.R) { - suite.client.RequireResourceExists(r, apiProxyStateTemplateID) - webProxyStateTemplate = suite.client.RequireResourceExists(r, webProxyStateTemplateID) + var ( + webProxyStateTemplate *pbresource.Resource + webDestinations *pbresource.Resource + ) + + testutil.RunStep(suite.T(), "proxy state template generation", func(t *testing.T) { + // Check that proxy state template resource is generated for both the api and web workloads. + retry.Run(t, func(r *retry.R) { + suite.client.RequireResourceExists(r, apiProxyStateTemplateID) + webProxyStateTemplate = suite.client.RequireResourceExists(r, webProxyStateTemplateID) + }) }) - // Add a source service and check that a new proxy state is generated. - webDestinations := resourcetest.Resource(types.UpstreamsType, "web-destinations"). - WithData(suite.T(), &pbmesh.Upstreams{ - Workloads: &pbcatalog.WorkloadSelector{Names: []string{"web-def"}}, - Upstreams: []*pbmesh.Upstream{ - { - DestinationRef: resource.Reference(suite.apiService.Id, ""), - DestinationPort: "tcp", + testutil.RunStep(suite.T(), "add explicit destinations and check that new proxy state is generated", func(t *testing.T) { + // Add a source service and check that a new proxy state is generated. + webDestinations = resourcetest.Resource(types.UpstreamsType, "web-destinations"). + WithData(suite.T(), &pbmesh.Upstreams{ + Workloads: &pbcatalog.WorkloadSelector{Names: []string{"web-def"}}, + Upstreams: []*pbmesh.Upstream{ + { + DestinationRef: resource.Reference(suite.apiService.Id, ""), + DestinationPort: "tcp", + ListenAddr: &pbmesh.Upstream_IpPort{ + IpPort: &pbmesh.IPPortAddress{ + Ip: "127.0.0.1", + Port: 1234, + }, + }, + }, }, - }, - }).Write(suite.T(), suite.client) - webProxyStateTemplate = suite.client.WaitForNewVersion(suite.T(), webProxyStateTemplateID, webProxyStateTemplate.Version) - - // Update destination's service endpoints and workload to be non-mesh - // and check that: - // * api's proxy state template is deleted - // * we get a new web proxy resource re-generated - // * the status on Upstreams resource is updated with a validation error - nonMeshPorts := map[string]*pbcatalog.WorkloadPort{ - "tcp": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, - } + }).Write(suite.T(), suite.client) + webProxyStateTemplate = suite.client.WaitForNewVersion(t, webProxyStateTemplateID, webProxyStateTemplate.Version) - // Note: the order matters here because in reality service endpoints will only - // be reconciled after the workload has been updated, and so we need to write the - // workload before we write service endpoints. - suite.runtime.Logger.Trace("test: updating api-abc workload to be non-mesh") - resourcetest.Resource(catalog.WorkloadType, "api-abc"). - WithData(suite.T(), &pbcatalog.Workload{ - Identity: "api-identity", - Addresses: suite.apiWorkload.Addresses, - Ports: nonMeshPorts}). - Write(suite.T(), suite.client) + requireExplicitDestinationsFound(t, "api", webProxyStateTemplate) + }) - suite.runtime.Logger.Trace("test: updating api-service to be non-mesh") - resourcetest.Resource(catalog.ServiceEndpointsType, "api-service"). - WithData(suite.T(), &pbcatalog.ServiceEndpoints{ - Endpoints: []*pbcatalog.Endpoint{ - { - TargetRef: suite.apiWorkloadID, - Addresses: suite.apiWorkload.Addresses, - Ports: nonMeshPorts, - Identity: "api-identity", + testutil.RunStep(suite.T(), "update api's ports to be non-mesh", func(t *testing.T) { + // Update destination's service endpoints and workload to be non-mesh + // and check that: + // * api's proxy state template is deleted + // * we get a new web proxy resource re-generated + // * the status on Upstreams resource is updated with a validation error + nonMeshPorts := map[string]*pbcatalog.WorkloadPort{ + "tcp": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, + } + + // Note: the order matters here because in reality service endpoints will only + // be reconciled after the workload has been updated, and so we need to write the + // workload before we write service endpoints. + resourcetest.Resource(catalog.WorkloadType, "api-abc"). + WithData(suite.T(), &pbcatalog.Workload{ + Identity: "api-identity", + Addresses: suite.apiWorkload.Addresses, + Ports: nonMeshPorts}). + Write(suite.T(), suite.client) + + resourcetest.Resource(catalog.ServiceEndpointsType, "api-service"). + WithData(suite.T(), &pbcatalog.ServiceEndpoints{ + Endpoints: []*pbcatalog.Endpoint{ + { + TargetRef: suite.apiWorkloadID, + Addresses: suite.apiWorkload.Addresses, + Ports: nonMeshPorts, + Identity: "api-identity", + }, }, - }, - }). - Write(suite.T(), suite.client.ResourceServiceClient) + }). + Write(suite.T(), suite.client.ResourceServiceClient) + + // Check that api proxy template is gone. + retry.Run(t, func(r *retry.R) { + suite.client.RequireResourceNotFound(r, apiProxyStateTemplateID) + }) - // Check that api proxy template is gone. - retry.Run(suite.T(), func(r *retry.R) { - suite.client.RequireResourceNotFound(r, apiProxyStateTemplateID) + // Check status on the pbmesh.Upstreams resource. + serviceRef := resource.ReferenceToString(resource.Reference(suite.apiService.Id, "")) + suite.client.WaitForStatusCondition(t, webDestinations.Id, ControllerName, + status.ConditionMeshProtocolNotFound(serviceRef)) + + // We should get a new web proxy template resource because this destination should be removed. + webProxyStateTemplate = suite.client.WaitForNewVersion(t, webProxyStateTemplateID, webProxyStateTemplate.Version) + + requireExplicitDestinationsNotFound(t, "api", webProxyStateTemplate) }) - // Check status on the pbmesh.Upstreams resource. - serviceRef := resource.ReferenceToString(resource.Reference(suite.apiService.Id, "")) - suite.client.WaitForStatusCondition(suite.T(), webDestinations.Id, ControllerName, - status.ConditionMeshProtocolNotFound(serviceRef)) + testutil.RunStep(suite.T(), "update ports to be mesh again", func(t *testing.T) { + // Update destination's service endpoints back to mesh and check that we get a new web proxy resource re-generated + // and that the status on Upstreams resource is updated to be empty. + suite.runtime.Logger.Trace("updating ports to mesh") + resourcetest.Resource(catalog.WorkloadType, "api-abc"). + WithData(suite.T(), suite.apiWorkload). + Write(suite.T(), suite.client) - // We should get a new web proxy template resource because this destination should be removed. - webProxyStateTemplate = suite.client.WaitForNewVersion(suite.T(), webProxyStateTemplateID, webProxyStateTemplate.Version) + resourcetest.Resource(catalog.ServiceEndpointsType, "api-service"). + WithData(suite.T(), suite.apiEndpointsData). + Write(suite.T(), suite.client.ResourceServiceClient) - // Update destination's service apiEndpoints back to mesh and check that we get a new web proxy resource re-generated - // and that the status on Upstreams resource is updated to be empty. - resourcetest.Resource(catalog.ServiceEndpointsType, "api-service"). - WithData(suite.T(), suite.apiEndpointsData). - Write(suite.T(), suite.client.ResourceServiceClient) + serviceRef := resource.ReferenceToString(resource.Reference(suite.apiService.Id, "")) + suite.client.WaitForStatusCondition(t, webDestinations.Id, ControllerName, + status.ConditionMeshProtocolFound(serviceRef)) + + // We should also get a new web proxy template resource as this destination should be added again. + webProxyStateTemplate = suite.client.WaitForNewVersion(t, webProxyStateTemplateID, webProxyStateTemplate.Version) - suite.client.WaitForStatusCondition(suite.T(), webDestinations.Id, ControllerName, - status.ConditionMeshProtocolFound(serviceRef)) + requireExplicitDestinationsFound(t, "api", webProxyStateTemplate) + }) - // We should also get a new web proxy template resource as this destination should be added again. - webProxyStateTemplate = suite.client.WaitForNewVersion(suite.T(), webProxyStateTemplateID, webProxyStateTemplate.Version) + testutil.RunStep(suite.T(), "delete the proxy state template and check re-generation", func(t *testing.T) { + // Delete the proxy state template resource and check that it gets regenerated. + suite.runtime.Logger.Trace("deleting web proxy") + _, err := suite.client.Delete(suite.ctx, &pbresource.DeleteRequest{Id: webProxyStateTemplateID}) + require.NoError(suite.T(), err) - // Delete the proxy state template resource and check that it gets regenerated. - _, err := suite.client.Delete(suite.ctx, &pbresource.DeleteRequest{Id: webProxyStateTemplateID}) - require.NoError(suite.T(), err) + webProxyStateTemplate = suite.client.WaitForNewVersion(suite.T(), webProxyStateTemplateID, webProxyStateTemplate.Version) + requireExplicitDestinationsFound(t, "api", webProxyStateTemplate) + }) + + testutil.RunStep(suite.T(), "add implicit upstream and enable tproxy", func(t *testing.T) { + // Delete explicit destinations resource. + suite.runtime.Logger.Trace("deleting web destinations") + _, err := suite.client.Delete(suite.ctx, &pbresource.DeleteRequest{Id: webDestinations.Id}) + require.NoError(t, err) + + webProxyStateTemplate = suite.client.WaitForNewVersion(suite.T(), webProxyStateTemplateID, webProxyStateTemplate.Version) - suite.client.WaitForNewVersion(suite.T(), webProxyStateTemplateID, webProxyStateTemplate.Version) + // Enable transparent proxy for the web proxy. + resourcetest.Resource(types.ProxyConfigurationType, "proxy-config"). + WithData(t, &pbmesh.ProxyConfiguration{ + Workloads: &pbcatalog.WorkloadSelector{ + Prefixes: []string{"web"}, + }, + DynamicConfig: &pbmesh.DynamicConfig{ + Mode: pbmesh.ProxyMode_PROXY_MODE_TRANSPARENT, + TransparentProxy: &pbmesh.TransparentProxy{ + OutboundListenerPort: 15001, + }, + }, + }).Write(t, suite.client) + + webProxyStateTemplate = suite.client.WaitForNewVersion(suite.T(), webProxyStateTemplateID, webProxyStateTemplate.Version) + + requireImplicitDestinationsFound(t, "api", webProxyStateTemplate) + }) } func TestMeshController(t *testing.T) { suite.Run(t, new(meshControllerTestSuite)) } +func requireExplicitDestinationsFound(t *testing.T, name string, tmplResource *pbresource.Resource) { + requireExplicitDestinations(t, name, tmplResource, true) +} + +func requireExplicitDestinationsNotFound(t *testing.T, name string, tmplResource *pbresource.Resource) { + requireExplicitDestinations(t, name, tmplResource, false) +} + +func requireExplicitDestinations(t *testing.T, name string, tmplResource *pbresource.Resource, found bool) { + t.Helper() + + var tmpl pbmesh.ProxyStateTemplate + err := tmplResource.Data.UnmarshalTo(&tmpl) + require.NoError(t, err) + + // Check outbound listener. + var foundListener bool + for _, l := range tmpl.ProxyState.Listeners { + if strings.Contains(l.Name, name) && l.Direction == pbproxystate.Direction_DIRECTION_OUTBOUND { + foundListener = true + break + } + } + + require.Equal(t, found, foundListener) + + requireClustersAndEndpoints(t, name, &tmpl, found) +} + +func requireImplicitDestinationsFound(t *testing.T, name string, tmplResource *pbresource.Resource) { + t.Helper() + + var tmpl pbmesh.ProxyStateTemplate + err := tmplResource.Data.UnmarshalTo(&tmpl) + require.NoError(t, err) + + // Check outbound listener. + var foundListener bool + for _, l := range tmpl.ProxyState.Listeners { + if strings.Contains(l.Name, xdscommon.OutboundListenerName) && l.Direction == pbproxystate.Direction_DIRECTION_OUTBOUND { + foundListener = true + + // Check the listener filter chain + for _, r := range l.Routers { + destName := r.Destination.(*pbproxystate.Router_L4).L4.Name + if strings.Contains(destName, name) { + // We expect that there is a filter chain match for transparent proxy destinations. + require.NotNil(t, r.Match) + require.NotEmpty(t, r.Match.PrefixRanges) + break + } + } + break + } + } + require.True(t, foundListener) + + requireClustersAndEndpoints(t, name, &tmpl, true) +} + +func requireClustersAndEndpoints(t *testing.T, name string, tmpl *pbmesh.ProxyStateTemplate, found bool) { + t.Helper() + + var foundCluster bool + for c := range tmpl.ProxyState.Clusters { + if strings.Contains(c, name) { + foundCluster = true + break + } + } + + require.Equal(t, found, foundCluster) + + var foundEndpoints bool + for c := range tmpl.RequiredEndpoints { + if strings.Contains(c, name) { + foundEndpoints = true + break + } + } + + require.Equal(t, found, foundEndpoints) +} + func resourceID(rtype *pbresource.Type, name string) *pbresource.ID { return &pbresource.ID{ Type: rtype, diff --git a/internal/mesh/internal/controllers/sidecarproxy/fetcher/data_fetcher.go b/internal/mesh/internal/controllers/sidecarproxy/fetcher/data_fetcher.go index 7755c600a15b..dad7f8886375 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/fetcher/data_fetcher.go +++ b/internal/mesh/internal/controllers/sidecarproxy/fetcher/data_fetcher.go @@ -9,16 +9,30 @@ import ( "github.com/hashicorp/consul/internal/mesh/internal/types" intermediateTypes "github.com/hashicorp/consul/internal/mesh/internal/types/intermediate" "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/internal/storage" pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" "github.com/hashicorp/consul/proto-public/pbresource" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" ) type Fetcher struct { - Client pbresource.ResourceServiceClient - Cache *sidecarproxycache.Cache + Client pbresource.ResourceServiceClient + DestinationsCache *sidecarproxycache.DestinationsCache + ProxyCfgCache *sidecarproxycache.ProxyConfigurationCache +} + +func New(client pbresource.ResourceServiceClient, + dCache *sidecarproxycache.DestinationsCache, + pcfgCache *sidecarproxycache.ProxyConfigurationCache) *Fetcher { + + return &Fetcher{ + Client: client, + DestinationsCache: dCache, + ProxyCfgCache: pcfgCache, + } } func (f *Fetcher) FetchWorkload(ctx context.Context, id *pbresource.ID) (*intermediateTypes.Workload, error) { @@ -28,7 +42,9 @@ func (f *Fetcher) FetchWorkload(ctx context.Context, id *pbresource.ID) (*interm case status.Code(err) == codes.NotFound: // We also need to make sure to delete the associated proxy from cache. // We are ignoring errors from cache here as this deletion is best effort. - f.Cache.DeleteSourceProxy(resource.ReplaceType(types.ProxyStateTemplateType, id)) + proxyID := resource.ReplaceType(types.ProxyStateTemplateType, id) + f.DestinationsCache.DeleteSourceProxy(proxyID) + f.ProxyCfgCache.UntrackProxyID(proxyID) return nil, nil case err != nil: return nil, err @@ -96,6 +112,30 @@ func (f *Fetcher) FetchServiceEndpoints(ctx context.Context, id *pbresource.ID) return se, nil } +func (f *Fetcher) FetchService(ctx context.Context, id *pbresource.ID) (*intermediateTypes.Service, error) { + rsp, err := f.Client.Read(ctx, &pbresource.ReadRequest{Id: id}) + + switch { + case status.Code(err) == codes.NotFound: + return nil, nil + case err != nil: + return nil, err + } + + se := &intermediateTypes.Service{ + Resource: rsp.Resource, + } + + var service pbcatalog.Service + err = rsp.Resource.Data.UnmarshalTo(&service) + if err != nil { + return nil, resource.NewErrDataParse(&service, err) + } + + se.Service = &service + return se, nil +} + func (f *Fetcher) FetchDestinations(ctx context.Context, id *pbresource.ID) (*intermediateTypes.Destinations, error) { rsp, err := f.Client.Read(ctx, &pbresource.ReadRequest{Id: id}) @@ -120,14 +160,14 @@ func (f *Fetcher) FetchDestinations(ctx context.Context, id *pbresource.ID) (*in return u, nil } -func (f *Fetcher) FetchDestinationsData( +func (f *Fetcher) FetchExplicitDestinationsData( ctx context.Context, - destinationRefs []intermediateTypes.CombinedDestinationRef, + explDestRefs []intermediateTypes.CombinedDestinationRef, ) ([]*intermediateTypes.Destination, map[string]*intermediateTypes.Status, error) { var destinations []*intermediateTypes.Destination statuses := make(map[string]*intermediateTypes.Status) - for _, dest := range destinationRefs { + for _, dest := range explDestRefs { // Fetch Destinations resource if there is one. us, err := f.FetchDestinations(ctx, dest.ExplicitDestinationsID) if err != nil { @@ -138,11 +178,12 @@ func (f *Fetcher) FetchDestinationsData( if us == nil { // If the Destinations resource is not found, then we should delete it from cache and continue. - f.Cache.DeleteDestination(dest.ServiceRef, dest.Port) + f.DestinationsCache.DeleteDestination(dest.ServiceRef, dest.Port) continue } d := &intermediateTypes.Destination{} + // As Destinations resource contains a list of destinations, // we need to find the one that references our service and port. d.Explicit = findDestination(dest.ServiceRef, dest.Port, us.Destinations) @@ -214,6 +255,119 @@ func (f *Fetcher) FetchDestinationsData( return destinations, statuses, nil } +// FetchImplicitDestinationsData fetches all implicit destinations and adds them to existing destinations. +// If the implicit destination is already in addToDestinations, it will be skipped. +// todo (ishustava): this function will eventually need to fetch implicit destinations from the ImplicitDestinations resource instead. +func (f *Fetcher) FetchImplicitDestinationsData(ctx context.Context, proxyID *pbresource.ID, addToDestinations []*intermediateTypes.Destination) ([]*intermediateTypes.Destination, error) { + // First, convert existing destinations to a map so we can de-dup. + destinations := make(map[resource.ReferenceKey]*intermediateTypes.Destination) + for _, d := range addToDestinations { + destinations[resource.NewReferenceKey(d.ServiceEndpoints.Resource.Id)] = d + } + + // For now, we need to look up all service endpoints within a partition. + rsp, err := f.Client.List(ctx, &pbresource.ListRequest{ + Type: catalog.ServiceEndpointsType, + Tenancy: &pbresource.Tenancy{ + Namespace: storage.Wildcard, + Partition: proxyID.Tenancy.Partition, + PeerName: proxyID.Tenancy.PeerName, + }, + }) + if err != nil { + return nil, err + } + + for _, r := range rsp.Resources { + // If it's already in destinations, ignore it. + if _, ok := destinations[resource.NewReferenceKey(r.Id)]; ok { + continue + } + + var endpoints pbcatalog.ServiceEndpoints + err = r.Data.UnmarshalTo(&endpoints) + if err != nil { + return nil, err + } + + // If this proxy is a part of this service, ignore it. + if isPartOfService(resource.ReplaceType(catalog.WorkloadType, proxyID), &endpoints) { + continue + } + + // Collect all identities. + var identities []*pbresource.Reference + for _, ep := range endpoints.Endpoints { + identities = append(identities, &pbresource.Reference{ + Name: ep.Identity, + Tenancy: r.Id.Tenancy, + }) + } + + // Fetch the service. + // todo (ishustava): this should eventually grab virtual IPs resource. + s, err := f.FetchService(ctx, resource.ReplaceType(catalog.ServiceType, r.Id)) + if err != nil { + return nil, err + } + if s == nil { + // If service no longer exists, skip. + continue + } + + d := &intermediateTypes.Destination{ + ServiceEndpoints: &intermediateTypes.ServiceEndpoints{ + Resource: r, + Endpoints: &endpoints, + }, + VirtualIPs: s.Service.VirtualIps, + Identities: identities, + } + addToDestinations = append(addToDestinations, d) + } + return addToDestinations, err +} + +// FetchAndMergeProxyConfigurations fetches proxy configurations for the proxy state template provided by id +// and merges them into one object. +func (f *Fetcher) FetchAndMergeProxyConfigurations(ctx context.Context, id *pbresource.ID) (*pbmesh.ProxyConfiguration, error) { + proxyCfgRefs := f.ProxyCfgCache.ProxyConfigurationsByProxyID(id) + + result := &pbmesh.ProxyConfiguration{ + DynamicConfig: &pbmesh.DynamicConfig{}, + } + for _, ref := range proxyCfgRefs { + proxyCfgID := &pbresource.ID{ + Name: ref.GetName(), + Type: ref.GetType(), + Tenancy: ref.GetTenancy(), + } + rsp, err := f.Client.Read(ctx, &pbresource.ReadRequest{ + Id: proxyCfgID, + }) + switch { + case status.Code(err) == codes.NotFound: + f.ProxyCfgCache.UntrackProxyConfiguration(proxyCfgID) + return nil, nil + case err != nil: + return nil, err + } + + var proxyCfg pbmesh.ProxyConfiguration + err = rsp.Resource.Data.UnmarshalTo(&proxyCfg) + if err != nil { + return nil, err + } + + // Note that we only care about dynamic config as bootstrap config + // will not be updated dynamically by this controller. + // todo (ishustava): do sorting etc. + proto.Merge(result.DynamicConfig, proxyCfg.DynamicConfig) + } + + return result, nil +} + // IsMeshEnabled returns true if the workload or service endpoints port // contain a port with the "mesh" protocol. func IsMeshEnabled(ports map[string]*pbcatalog.WorkloadPort) bool { @@ -253,3 +407,17 @@ func updateStatusCondition( } } } + +func isPartOfService(workloadID *pbresource.ID, endpoints *pbcatalog.ServiceEndpoints) bool { + // convert IDs to refs so that we can compare without UIDs. + workloadRef := resource.Reference(workloadID, "") + for _, ep := range endpoints.Endpoints { + if ep.TargetRef != nil { + targetRef := resource.Reference(ep.TargetRef, "") + if resource.EqualReference(workloadRef, targetRef) { + return true + } + } + } + return false +} diff --git a/internal/mesh/internal/controllers/sidecarproxy/fetcher/data_fetcher_test.go b/internal/mesh/internal/controllers/sidecarproxy/fetcher/data_fetcher_test.go index d2f7728891bb..90244f27626e 100644 --- a/internal/mesh/internal/controllers/sidecarproxy/fetcher/data_fetcher_test.go +++ b/internal/mesh/internal/controllers/sidecarproxy/fetcher/data_fetcher_test.go @@ -175,7 +175,8 @@ func (suite *dataFetcherSuite) TestFetcher_FetchWorkload_WorkloadNotFound() { proxyID := resourcetest.Resource(types.ProxyStateTemplateType, "service-workload-abc").ID() // Create cache and pre-populate it. - c := sidecarproxycache.New() + destCache := sidecarproxycache.NewDestinationsCache() + proxyCfgCache := sidecarproxycache.NewProxyConfigurationCache() dest1 := intermediate.CombinedDestinationRef{ ServiceRef: resourcetest.Resource(catalog.ServiceType, "test-service-1").ReferenceNoSection(), Port: "tcp", @@ -192,15 +193,19 @@ func (suite *dataFetcherSuite) TestFetcher_FetchWorkload_WorkloadNotFound() { resource.NewReferenceKey(proxyID): {}, }, } - c.WriteDestination(dest1) - c.WriteDestination(dest2) + destCache.WriteDestination(dest1) + destCache.WriteDestination(dest2) - f := Fetcher{Cache: c, Client: suite.client} + proxyCfgID := resourcetest.Resource(types.ProxyConfigurationType, "proxy-config").ID() + proxyCfgCache.TrackProxyConfiguration(proxyCfgID, []resource.ReferenceOrID{proxyID}) + + f := Fetcher{DestinationsCache: destCache, ProxyCfgCache: proxyCfgCache, Client: suite.client} _, err := f.FetchWorkload(context.Background(), proxyID) require.NoError(suite.T(), err) // Check that cache is updated to remove proxy id. - require.Nil(suite.T(), c.DestinationsBySourceProxy(proxyID)) + require.Nil(suite.T(), destCache.DestinationsBySourceProxy(proxyID)) + require.Nil(suite.T(), proxyCfgCache.ProxyConfigurationsByProxyID(proxyID)) } func (suite *dataFetcherSuite) TestFetcher_NotFound() { @@ -235,6 +240,13 @@ func (suite *dataFetcherSuite) TestFetcher_NotFound() { return err }, }, + "service": { + typ: catalog.ServiceType, + fetchFunc: func(id *pbresource.ID) error { + _, err := f.FetchService(context.Background(), id) + return err + }, + }, } for name, c := range cases { @@ -282,6 +294,13 @@ func (suite *dataFetcherSuite) TestFetcher_FetchErrors() { return err }, }, + "service": { + name: "web-service", + fetchFunc: func(id *pbresource.ID) error { + _, err := f.FetchService(context.Background(), id) + return err + }, + }, } for name, c := range cases { @@ -312,7 +331,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchErrors() { } } -func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { +func (suite *dataFetcherSuite) TestFetcher_FetchExplicitDestinationsData() { destination1 := intermediate.CombinedDestinationRef{ ServiceRef: resource.Reference(suite.api1Service.Id, ""), Port: "tcp", @@ -338,14 +357,14 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { }, } - c := sidecarproxycache.New() + c := sidecarproxycache.NewDestinationsCache() c.WriteDestination(destination1) c.WriteDestination(destination2) c.WriteDestination(destination3) f := Fetcher{ - Cache: c, - Client: suite.client, + DestinationsCache: c, + Client: suite.client, } suite.T().Run("destinations not found", func(t *testing.T) { @@ -360,7 +379,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { c.WriteDestination(destinationRefNoDestinations) destinationRefs := []intermediate.CombinedDestinationRef{destinationRefNoDestinations} - destinations, _, err := f.FetchDestinationsData(suite.ctx, destinationRefs) + destinations, _, err := f.FetchExplicitDestinationsData(suite.ctx, destinationRefs) require.NoError(t, err) require.Nil(t, destinations) _, foundDest := c.ReadDestination(destinationRefNoDestinations.ServiceRef, destinationRefNoDestinations.Port) @@ -380,7 +399,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { c.WriteDestination(destinationNoServiceEndpoints) destinationRefs := []intermediate.CombinedDestinationRef{destinationNoServiceEndpoints} - destinations, statuses, err := f.FetchDestinationsData(suite.ctx, destinationRefs) + destinations, statuses, err := f.FetchExplicitDestinationsData(suite.ctx, destinationRefs) require.NoError(t, err) require.Nil(t, destinations) @@ -420,7 +439,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { c.WriteDestination(destinationNonMeshServiceEndpoints) destinationRefs := []intermediate.CombinedDestinationRef{destinationNonMeshServiceEndpoints} - destinations, statuses, err := f.FetchDestinationsData(suite.ctx, destinationRefs) + destinations, statuses, err := f.FetchExplicitDestinationsData(suite.ctx, destinationRefs) require.NoError(t, err) require.Nil(t, destinations) @@ -455,7 +474,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { destinationRefs := []intermediate.CombinedDestinationRef{destination1} - destinations, statuses, err := f.FetchDestinationsData(suite.ctx, destinationRefs) + destinations, statuses, err := f.FetchExplicitDestinationsData(suite.ctx, destinationRefs) serviceRef := resource.ReferenceToString(destination1.ServiceRef) destinationRef := resource.IDToString(destination1.ExplicitDestinationsID) expectedStatus := &intermediate.Status{ @@ -493,7 +512,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { }, } - _, statuses, err = f.FetchDestinationsData(suite.ctx, destinationRefs) + _, statuses, err = f.FetchExplicitDestinationsData(suite.ctx, destinationRefs) require.NoError(t, err) prototest.AssertDeepEqual(t, expectedStatus, statuses[destinationRef]) }) @@ -511,7 +530,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { c.WriteDestination(destinationMeshDestinationPort) destinationRefs := []intermediate.CombinedDestinationRef{destinationMeshDestinationPort} - destinations, statuses, err := f.FetchDestinationsData(suite.ctx, destinationRefs) + destinations, statuses, err := f.FetchExplicitDestinationsData(suite.ctx, destinationRefs) serviceRef := resource.ReferenceToString(destination1.ServiceRef) destinationRef := resource.IDToString(destination1.ExplicitDestinationsID) expectedStatus := &intermediate.Status{ @@ -550,7 +569,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { }, } - _, statuses, err = f.FetchDestinationsData(suite.ctx, destinationRefs) + _, statuses, err = f.FetchExplicitDestinationsData(suite.ctx, destinationRefs) require.NoError(t, err) prototest.AssertDeepEqual(t, expectedStatus, statuses[destinationRef]) }) @@ -607,7 +626,7 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { meshStatus.ConditionNonMeshProtocolDestinationPort(ref, d.Port)) } - actualDestinations, statuses, err := f.FetchDestinationsData(suite.ctx, destinationRefs) + actualDestinations, statuses, err := f.FetchExplicitDestinationsData(suite.ctx, destinationRefs) require.NoError(t, err) // Check that all statuses have "happy" conditions. @@ -619,6 +638,152 @@ func (suite *dataFetcherSuite) TestFetcher_FetchDestinationsData() { }) } +func (suite *dataFetcherSuite) TestFetcher_FetchImplicitDestinationsData() { + existingDestinations := []*intermediate.Destination{ + { + Explicit: suite.webDestinationsData.Upstreams[0], + ServiceEndpoints: &intermediate.ServiceEndpoints{ + Resource: suite.api1ServiceEndpoints, + Endpoints: suite.api1ServiceEndpointsData, + }, + Identities: []*pbresource.Reference{ + { + Name: "api-1-identity", + Tenancy: suite.api1Service.Id.Tenancy, + }, + }, + }, + { + Explicit: suite.webDestinationsData.Upstreams[1], + ServiceEndpoints: &intermediate.ServiceEndpoints{ + Resource: suite.api2ServiceEndpoints, + Endpoints: suite.api2ServiceEndpointsData, + }, + Identities: []*pbresource.Reference{ + { + Name: "api-2-identity", + Tenancy: suite.api2Service.Id.Tenancy, + }, + }, + }, + { + Explicit: suite.webDestinationsData.Upstreams[2], + ServiceEndpoints: &intermediate.ServiceEndpoints{ + Resource: suite.api2ServiceEndpoints, + Endpoints: suite.api2ServiceEndpointsData, + }, + Identities: []*pbresource.Reference{ + { + Name: "api-2-identity", + Tenancy: suite.api2Service.Id.Tenancy, + }, + }, + }, + } + + // Create a few other services to be implicit upstreams. + api3Service := resourcetest.Resource(catalog.ServiceType, "api-3"). + WithData(suite.T(), &pbcatalog.Service{ + VirtualIps: []string{"192.1.1.1"}, + }). + Write(suite.T(), suite.client) + + api3ServiceEndpointsData := &pbcatalog.ServiceEndpoints{ + Endpoints: []*pbcatalog.Endpoint{ + { + TargetRef: &pbresource.ID{ + Name: "api-3-abc", + Tenancy: api3Service.Id.Tenancy, + Type: catalog.WorkloadType, + }, + Addresses: []*pbcatalog.WorkloadAddress{{Host: "10.0.0.1"}}, + Ports: map[string]*pbcatalog.WorkloadPort{ + "tcp": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP}, + "mesh": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_MESH}, + }, + Identity: "api-3-identity", + }, + }, + } + api3ServiceEndpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "api-3"). + WithData(suite.T(), api3ServiceEndpointsData).Write(suite.T(), suite.client) + + f := Fetcher{ + Client: suite.client, + } + + expDestinations := append(existingDestinations, &intermediate.Destination{ + ServiceEndpoints: &intermediate.ServiceEndpoints{ + Resource: api3ServiceEndpoints, + Endpoints: api3ServiceEndpointsData, + }, + Identities: []*pbresource.Reference{ + { + Name: "api-3-identity", + Tenancy: api3Service.Id.Tenancy, + }, + }, + VirtualIPs: []string{"192.1.1.1"}, + }) + + actualDestinations, err := f.FetchImplicitDestinationsData(context.Background(), suite.webProxy.Id, existingDestinations) + require.NoError(suite.T(), err) + + prototest.AssertElementsMatch(suite.T(), expDestinations, actualDestinations) +} + +func (suite *dataFetcherSuite) TestFetcher_FetchAndMergeProxyConfigurations() { + // Create some proxy configurations. + proxyCfg1Data := &pbmesh.ProxyConfiguration{ + DynamicConfig: &pbmesh.DynamicConfig{ + Mode: pbmesh.ProxyMode_PROXY_MODE_TRANSPARENT, + }, + } + + proxyCfg2Data := &pbmesh.ProxyConfiguration{ + DynamicConfig: &pbmesh.DynamicConfig{ + MutualTlsMode: pbmesh.MutualTLSMode_MUTUAL_TLS_MODE_DEFAULT, + }, + } + + proxyCfg1 := resourcetest.Resource(types.ProxyConfigurationType, "config-1"). + WithData(suite.T(), proxyCfg1Data). + Write(suite.T(), suite.client) + + proxyCfg2 := resourcetest.Resource(types.ProxyConfigurationType, "config-2"). + WithData(suite.T(), proxyCfg2Data). + Write(suite.T(), suite.client) + + proxyCfgCache := sidecarproxycache.NewProxyConfigurationCache() + proxyCfgCache.TrackProxyConfiguration(proxyCfg1.Id, []resource.ReferenceOrID{suite.webProxy.Id}) + proxyCfgCache.TrackProxyConfiguration(proxyCfg2.Id, []resource.ReferenceOrID{suite.webProxy.Id}) + + expectedProxyCfg := &pbmesh.ProxyConfiguration{ + DynamicConfig: &pbmesh.DynamicConfig{ + Mode: pbmesh.ProxyMode_PROXY_MODE_TRANSPARENT, + MutualTlsMode: pbmesh.MutualTLSMode_MUTUAL_TLS_MODE_DEFAULT, + }, + } + + fetcher := Fetcher{Client: suite.client, ProxyCfgCache: proxyCfgCache} + + actualProxyCfg, err := fetcher.FetchAndMergeProxyConfigurations(suite.ctx, suite.webProxy.Id) + require.NoError(suite.T(), err) + prototest.AssertDeepEqual(suite.T(), expectedProxyCfg, actualProxyCfg) + + // Delete proxy cfg and check that the cache gets updated. + _, err = suite.client.Delete(suite.ctx, &pbresource.DeleteRequest{Id: proxyCfg1.Id}) + require.NoError(suite.T(), err) + + _, err = fetcher.FetchAndMergeProxyConfigurations(suite.ctx, suite.webProxy.Id) + require.NoError(suite.T(), err) + + proxyCfg2.Id.Uid = "" + prototest.AssertElementsMatch(suite.T(), + []*pbresource.ID{proxyCfg2.Id}, + fetcher.ProxyCfgCache.ProxyConfigurationsByProxyID(suite.webProxy.Id)) +} + func TestDataFetcher(t *testing.T) { suite.Run(t, new(dataFetcherSuite)) } diff --git a/internal/mesh/internal/mappers/sidecarproxymapper/destinations_mapper.go b/internal/mesh/internal/mappers/sidecarproxymapper/destinations_mapper.go index a0a36a6f48f0..98d82d693541 100644 --- a/internal/mesh/internal/mappers/sidecarproxymapper/destinations_mapper.go +++ b/internal/mesh/internal/mappers/sidecarproxymapper/destinations_mapper.go @@ -3,9 +3,7 @@ package sidecarproxymapper import ( "context" - "github.com/hashicorp/consul/internal/catalog" "github.com/hashicorp/consul/internal/controller" - "github.com/hashicorp/consul/internal/mesh/internal/types" "github.com/hashicorp/consul/internal/mesh/internal/types/intermediate" "github.com/hashicorp/consul/internal/resource" pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" @@ -21,38 +19,15 @@ func (m *Mapper) MapDestinationsToProxyStateTemplate(ctx context.Context, rt con // Look up workloads for this destinations. sourceProxyIDs := make(map[resource.ReferenceKey]struct{}) - var result []controller.Request - for _, prefix := range destinations.Workloads.Prefixes { - resp, err := rt.Client.List(ctx, &pbresource.ListRequest{ - Type: catalog.WorkloadType, - Tenancy: res.Id.Tenancy, - NamePrefix: prefix, - }) - if err != nil { - return nil, err - } - for _, r := range resp.Resources { - proxyID := resource.ReplaceType(types.ProxyStateTemplateType, r.Id) - sourceProxyIDs[resource.NewReferenceKey(proxyID)] = struct{}{} - result = append(result, controller.Request{ - ID: proxyID, - }) - } - } - for _, name := range destinations.Workloads.Names { - proxyID := &pbresource.ID{ - Name: name, - Tenancy: res.Id.Tenancy, - Type: types.ProxyStateTemplateType, - } - sourceProxyIDs[resource.NewReferenceKey(proxyID)] = struct{}{} - result = append(result, controller.Request{ - ID: proxyID, - }) + requests, err := mapWorkloadsBySelector(ctx, rt.Client, destinations.Workloads, res.Id.Tenancy, func(id *pbresource.ID) { + sourceProxyIDs[resource.NewReferenceKey(id)] = struct{}{} + }) + if err != nil { + return nil, err } - // Add this destination to cache. + // Add this destination to destinationsCache. for _, destination := range destinations.Upstreams { destinationRef := intermediate.CombinedDestinationRef{ ServiceRef: destination.DestinationRef, @@ -60,8 +35,8 @@ func (m *Mapper) MapDestinationsToProxyStateTemplate(ctx context.Context, rt con ExplicitDestinationsID: res.Id, SourceProxies: sourceProxyIDs, } - m.cache.WriteDestination(destinationRef) + m.destinationsCache.WriteDestination(destinationRef) } - return result, nil + return requests, nil } diff --git a/internal/mesh/internal/mappers/sidecarproxymapper/destinations_mapper_test.go b/internal/mesh/internal/mappers/sidecarproxymapper/destinations_mapper_test.go index 07a7939404ab..f53071fe384e 100644 --- a/internal/mesh/internal/mappers/sidecarproxymapper/destinations_mapper_test.go +++ b/internal/mesh/internal/mappers/sidecarproxymapper/destinations_mapper_test.go @@ -64,8 +64,8 @@ func TestMapDestinationsToProxyStateTemplate(t *testing.T) { WithData(t, webDestinationsData). Write(t, client) - c := sidecarproxycache.New() - mapper := &Mapper{cache: c} + c := sidecarproxycache.NewDestinationsCache() + mapper := &Mapper{destinationsCache: c} expRequests := []controller.Request{ {ID: resource.ReplaceType(types.ProxyStateTemplateType, webWorkload1.Id)}, diff --git a/internal/mesh/internal/mappers/sidecarproxymapper/mapper.go b/internal/mesh/internal/mappers/sidecarproxymapper/mapper.go new file mode 100644 index 000000000000..468968ae1587 --- /dev/null +++ b/internal/mesh/internal/mappers/sidecarproxymapper/mapper.go @@ -0,0 +1,67 @@ +package sidecarproxymapper + +import ( + "context" + + "github.com/hashicorp/consul/internal/catalog" + "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/mesh/internal/cache/sidecarproxycache" + "github.com/hashicorp/consul/internal/mesh/internal/types" + "github.com/hashicorp/consul/internal/resource" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +type Mapper struct { + destinationsCache *sidecarproxycache.DestinationsCache + proxyCfgCache *sidecarproxycache.ProxyConfigurationCache +} + +func New(destinationsCache *sidecarproxycache.DestinationsCache, proxyCfgCache *sidecarproxycache.ProxyConfigurationCache) *Mapper { + return &Mapper{ + destinationsCache: destinationsCache, + proxyCfgCache: proxyCfgCache, + } +} + +// mapWorkloadsBySelector returns ProxyStateTemplate requests given a workload +// selector and tenancy. The cacheFunc can be called if the resulting ids need to be cached. +func mapWorkloadsBySelector(ctx context.Context, + client pbresource.ResourceServiceClient, + selector *pbcatalog.WorkloadSelector, + tenancy *pbresource.Tenancy, + cacheFunc func(id *pbresource.ID)) ([]controller.Request, error) { + var result []controller.Request + + for _, prefix := range selector.Prefixes { + resp, err := client.List(ctx, &pbresource.ListRequest{ + Type: catalog.WorkloadType, + Tenancy: tenancy, + NamePrefix: prefix, + }) + if err != nil { + return nil, err + } + for _, r := range resp.Resources { + id := resource.ReplaceType(types.ProxyStateTemplateType, r.Id) + result = append(result, controller.Request{ + ID: id, + }) + cacheFunc(id) + } + } + + for _, name := range selector.Names { + id := &pbresource.ID{ + Name: name, + Tenancy: tenancy, + Type: types.ProxyStateTemplateType, + } + result = append(result, controller.Request{ + ID: id, + }) + cacheFunc(id) + } + + return result, nil +} diff --git a/internal/mesh/internal/mappers/sidecarproxymapper/mapper_test.go b/internal/mesh/internal/mappers/sidecarproxymapper/mapper_test.go new file mode 100644 index 000000000000..24e4895c1693 --- /dev/null +++ b/internal/mesh/internal/mappers/sidecarproxymapper/mapper_test.go @@ -0,0 +1,71 @@ +package sidecarproxymapper + +import ( + "context" + "testing" + + svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing" + "github.com/hashicorp/consul/internal/catalog" + "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/mesh/internal/types" + "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/internal/resource/resourcetest" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/proto/private/prototest" + "github.com/stretchr/testify/require" +) + +func TestMapWorkloadsBySelector(t *testing.T) { + client := svctest.RunResourceService(t, types.Register, catalog.RegisterTypes) + + // Create some workloads. + // For this test, we don't care about the workload data, so we will re-use + // the same data for all workloads. + workloadData := &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{{Host: "127.0.0.1"}}, + Ports: map[string]*pbcatalog.WorkloadPort{"p1": {Port: 8080}}, + } + w1 := resourcetest.Resource(catalog.WorkloadType, "w1"). + WithData(t, workloadData). + Write(t, client).Id + w2 := resourcetest.Resource(catalog.WorkloadType, "w2"). + WithData(t, workloadData). + Write(t, client).Id + w3 := resourcetest.Resource(catalog.WorkloadType, "prefix-w3"). + WithData(t, workloadData). + Write(t, client).Id + w4 := resourcetest.Resource(catalog.WorkloadType, "prefix-w4"). + WithData(t, workloadData). + Write(t, client).Id + + selector := &pbcatalog.WorkloadSelector{ + Names: []string{"w1", "w2"}, + Prefixes: []string{"prefix"}, + } + expReqs := []controller.Request{ + {ID: resource.ReplaceType(types.ProxyStateTemplateType, w1)}, + {ID: resource.ReplaceType(types.ProxyStateTemplateType, w2)}, + {ID: resource.ReplaceType(types.ProxyStateTemplateType, w3)}, + {ID: resource.ReplaceType(types.ProxyStateTemplateType, w4)}, + } + + var cachedReqs []controller.Request + + reqs, err := mapWorkloadsBySelector(context.Background(), client, selector, defaultTenancy(), func(id *pbresource.ID) { + // save IDs to check that the cache func is called + cachedReqs = append(cachedReqs, controller.Request{ID: id}) + }) + require.NoError(t, err) + require.Len(t, reqs, len(expReqs)) + prototest.AssertElementsMatch(t, expReqs, reqs) + prototest.AssertElementsMatch(t, expReqs, cachedReqs) +} + +func defaultTenancy() *pbresource.Tenancy { + return &pbresource.Tenancy{ + Namespace: "default", + Partition: "default", + PeerName: "local", + } +} diff --git a/internal/mesh/internal/mappers/sidecarproxymapper/proxy_configuration_mapper.go b/internal/mesh/internal/mappers/sidecarproxymapper/proxy_configuration_mapper.go new file mode 100644 index 000000000000..6a7411dc7ea5 --- /dev/null +++ b/internal/mesh/internal/mappers/sidecarproxymapper/proxy_configuration_mapper.go @@ -0,0 +1,28 @@ +package sidecarproxymapper + +import ( + "context" + + "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/resource" + pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +func (m *Mapper) MapProxyConfigurationToProxyStateTemplate(ctx context.Context, rt controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) { + var proxyConfig pbmesh.ProxyConfiguration + err := res.Data.UnmarshalTo(&proxyConfig) + if err != nil { + return nil, err + } + + var proxyIDs []resource.ReferenceOrID + + requests, err := mapWorkloadsBySelector(ctx, rt.Client, proxyConfig.Workloads, res.Id.Tenancy, func(id *pbresource.ID) { + proxyIDs = append(proxyIDs, id) + }) + + m.proxyCfgCache.TrackProxyConfiguration(res.Id, proxyIDs) + + return requests, nil +} diff --git a/internal/mesh/internal/mappers/sidecarproxymapper/proxy_configuration_mapper_test.go b/internal/mesh/internal/mappers/sidecarproxymapper/proxy_configuration_mapper_test.go new file mode 100644 index 000000000000..b1e355c5cb05 --- /dev/null +++ b/internal/mesh/internal/mappers/sidecarproxymapper/proxy_configuration_mapper_test.go @@ -0,0 +1,73 @@ +package sidecarproxymapper + +import ( + "context" + "testing" + + svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing" + "github.com/hashicorp/consul/internal/catalog" + "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/mesh/internal/cache/sidecarproxycache" + "github.com/hashicorp/consul/internal/mesh/internal/types" + "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/internal/resource/resourcetest" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" + pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/proto/private/prototest" + "github.com/stretchr/testify/require" +) + +func TestProxyConfigurationMapper(t *testing.T) { + client := svctest.RunResourceService(t, types.Register, catalog.RegisterTypes) + + // Create some workloads. + // For this test, we don't care about the workload data, so we will re-use + // the same data for all workloads. + workloadData := &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{{Host: "127.0.0.1"}}, + Ports: map[string]*pbcatalog.WorkloadPort{"p1": {Port: 8080}}, + } + w1 := resourcetest.Resource(catalog.WorkloadType, "w1"). + WithData(t, workloadData). + Write(t, client).Id + w2 := resourcetest.Resource(catalog.WorkloadType, "w2"). + WithData(t, workloadData). + Write(t, client).Id + + // Create proxy configuration. + proxyCfgData := &pbmesh.ProxyConfiguration{ + Workloads: &pbcatalog.WorkloadSelector{ + Names: []string{"w1", "w2"}, + }, + } + pCfg := resourcetest.Resource(types.ProxyConfigurationType, "proxy-config"). + WithData(t, proxyCfgData). + Write(t, client) + + m := Mapper{proxyCfgCache: sidecarproxycache.NewProxyConfigurationCache()} + reqs, err := m.MapProxyConfigurationToProxyStateTemplate(context.Background(), controller.Runtime{ + Client: client, + }, pCfg) + require.NoError(t, err) + + p1 := resource.ReplaceType(types.ProxyStateTemplateType, w1) + p2 := resource.ReplaceType(types.ProxyStateTemplateType, w2) + expReqs := []controller.Request{ + {ID: p1}, + {ID: p2}, + } + prototest.AssertElementsMatch(t, expReqs, reqs) + + // Check that the cache is populated. + + // Clean out UID as we don't care about it in the cache. + pCfg.Id.Uid = "" + prototest.AssertElementsMatch(t, + []*pbresource.ID{pCfg.Id}, + m.proxyCfgCache.ProxyConfigurationsByProxyID(p1)) + + prototest.AssertElementsMatch(t, + []*pbresource.ID{pCfg.Id}, + m.proxyCfgCache.ProxyConfigurationsByProxyID(p2)) +} diff --git a/internal/mesh/internal/mappers/sidecarproxymapper/service_endpoints_mapper.go b/internal/mesh/internal/mappers/sidecarproxymapper/service_endpoints_mapper.go index 3ba78dbd3862..3ab51ee7969a 100644 --- a/internal/mesh/internal/mappers/sidecarproxymapper/service_endpoints_mapper.go +++ b/internal/mesh/internal/mappers/sidecarproxymapper/service_endpoints_mapper.go @@ -5,26 +5,16 @@ import ( "github.com/hashicorp/consul/internal/catalog" "github.com/hashicorp/consul/internal/controller" - "github.com/hashicorp/consul/internal/mesh/internal/cache/sidecarproxycache" "github.com/hashicorp/consul/internal/mesh/internal/types" "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/internal/storage" pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1" "github.com/hashicorp/consul/proto-public/pbresource" ) -type Mapper struct { - cache *sidecarproxycache.Cache -} - -func New(c *sidecarproxycache.Cache) *Mapper { - return &Mapper{ - cache: c, - } -} - // MapServiceEndpointsToProxyStateTemplate maps catalog.ServiceEndpoints objects to the IDs of // ProxyStateTemplate. -func (m *Mapper) MapServiceEndpointsToProxyStateTemplate(_ context.Context, _ controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) { +func (m *Mapper) MapServiceEndpointsToProxyStateTemplate(ctx context.Context, rt controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) { // This mapper needs to look up workload IDs from service endpoints and replace them with ProxyStateTemplate type. var serviceEndpoints pbcatalog.ServiceEndpoints err := res.Data.UnmarshalTo(&serviceEndpoints) @@ -68,7 +58,7 @@ func (m *Mapper) MapServiceEndpointsToProxyStateTemplate(_ context.Context, _ co continue } serviceRef := resource.Reference(serviceID, "") - if destination, ok := m.cache.ReadDestination(serviceRef, portName); ok { + if destination, ok := m.destinationsCache.ReadDestination(serviceRef, portName); ok { for refKey := range destination.SourceProxies { result = append(result, controller.Request{ID: refKey.ToID()}) } @@ -76,5 +66,23 @@ func (m *Mapper) MapServiceEndpointsToProxyStateTemplate(_ context.Context, _ co } } + // todo (ishustava): this is a stub for now until we implement implicit destinations. + // For tproxy, we generate requests for all proxy states in the cluster. + // This will generate duplicate events for proxies already added above, + // however, we expect that the controller runtime will de-dup for us. + rsp, err := rt.Client.List(ctx, &pbresource.ListRequest{ + Type: types.ProxyStateTemplateType, + Tenancy: &pbresource.Tenancy{ + Namespace: storage.Wildcard, + Partition: res.Id.Tenancy.Partition, + }, + }) + if err != nil { + return nil, err + } + for _, r := range rsp.Resources { + result = append(result, controller.Request{ID: r.Id}) + } + return result, err } diff --git a/internal/mesh/internal/mappers/sidecarproxymapper/service_endpoints_mapper_test.go b/internal/mesh/internal/mappers/sidecarproxymapper/service_endpoints_mapper_test.go index b49df9737220..77a36ddd314f 100644 --- a/internal/mesh/internal/mappers/sidecarproxymapper/service_endpoints_mapper_test.go +++ b/internal/mesh/internal/mappers/sidecarproxymapper/service_endpoints_mapper_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing" "github.com/hashicorp/consul/internal/catalog" "github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/mesh/internal/cache/sidecarproxycache" @@ -17,6 +18,8 @@ import ( ) func TestMapServiceEndpointsToProxyStateTemplate(t *testing.T) { + client := svctest.RunResourceService(t, types.Register, catalog.RegisterTypes) + workload1 := resourcetest.Resource(catalog.WorkloadType, "workload-1").Build() workload2 := resourcetest.Resource(catalog.WorkloadType, "workload-2").Build() serviceEndpoints := resourcetest.Resource(catalog.ServiceEndpointsType, "service"). @@ -43,8 +46,8 @@ func TestMapServiceEndpointsToProxyStateTemplate(t *testing.T) { proxyTmpl1ID := resourcetest.Resource(types.ProxyStateTemplateType, "workload-1").ID() proxyTmpl2ID := resourcetest.Resource(types.ProxyStateTemplateType, "workload-2").ID() - c := sidecarproxycache.New() - mapper := &Mapper{cache: c} + c := sidecarproxycache.NewDestinationsCache() + mapper := &Mapper{destinationsCache: c} sourceProxy1 := resourcetest.Resource(types.ProxyStateTemplateType, "workload-3").ID() sourceProxy2 := resourcetest.Resource(types.ProxyStateTemplateType, "workload-4").ID() sourceProxy3 := resourcetest.Resource(types.ProxyStateTemplateType, "workload-5").ID() @@ -85,7 +88,7 @@ func TestMapServiceEndpointsToProxyStateTemplate(t *testing.T) { {ID: sourceProxy3}, } - requests, err := mapper.MapServiceEndpointsToProxyStateTemplate(context.Background(), controller.Runtime{}, serviceEndpoints) + requests, err := mapper.MapServiceEndpointsToProxyStateTemplate(context.Background(), controller.Runtime{Client: client}, serviceEndpoints) require.NoError(t, err) prototest.AssertElementsMatch(t, expRequests, requests) } diff --git a/internal/mesh/internal/types/intermediate/types.go b/internal/mesh/internal/types/intermediate/types.go index 93ae9df4e928..24ac20ca9382 100644 --- a/internal/mesh/internal/types/intermediate/types.go +++ b/internal/mesh/internal/types/intermediate/types.go @@ -29,6 +29,11 @@ type ServiceEndpoints struct { Endpoints *pbcatalog.ServiceEndpoints } +type Service struct { + Resource *pbresource.Resource + Service *pbcatalog.Service +} + type Destinations struct { Resource *pbresource.Resource Destinations *pbmesh.Upstreams @@ -53,6 +58,7 @@ type Destination struct { Explicit *pbmesh.Upstream ServiceEndpoints *ServiceEndpoints Identities []*pbresource.Reference + VirtualIPs []string } type Status struct {