From 5b398958706a7c93f1cf175a763b3a3c72e2ad33 Mon Sep 17 00:00:00 2001 From: github-team-consul-core Date: Tue, 29 Aug 2023 08:45:39 -0600 Subject: [PATCH] NET-4944 - wire up controllers with proxy tracker --- agent/agent.go | 103 ++++++++++-------- agent/agent_test.go | 2 +- agent/consul/leader_connect_ca_test.go | 2 +- agent/consul/leader_test.go | 2 +- agent/consul/server.go | 20 +++- agent/consul/server_test.go | 10 +- .../proxycfg-sources/catalog/config_source.go | 10 +- .../catalog/config_source_test.go | 18 +-- .../catalog/mock_ConfigManager.go | 17 +-- .../catalog/mock_SessionLimiter.go | 23 ++-- .../proxycfg-sources/catalog/mock_Watcher.go | 21 ++-- agent/proxycfg-sources/local/config_source.go | 5 +- .../local/mock_ConfigManager.go | 17 +-- agent/proxycfg-sources/local/sync.go | 3 +- ...oxysnapshot.go => config_snapshot_glue.go} | 10 -- ...t_test.go => config_snapshot_glue_test.go} | 5 +- agent/proxycfg/manager.go | 17 ++- agent/proxycfg/manager_test.go | 9 +- agent/proxycfg_test.go | 10 +- agent/rpc/peering/service_test.go | 2 +- agent/xds/delta.go | 20 ++-- agent/xds/proxystateconverter/converter.go | 9 +- agent/xds/server.go | 14 +-- agent/xds/xds_protocol_helpers_test.go | 13 ++- agent/xdsv2/resources.go | 7 +- .../mesh/internal/controllers/register.go | 4 +- .../internal/controllers/xds/controller.go | 8 +- .../controllers/xds/controller_test.go | 2 +- .../internal/controllers/xds/mock_updater.go | 25 +++-- .../mesh/proxy-snapshot/proxy_snapshot.go | 17 +++ .../proxy-tracker/mock_SessionLimiter.go | 2 +- .../proxy_state_exports.go | 2 +- .../mesh}/proxy-tracker/proxy_tracker.go | 26 ++--- .../mesh}/proxy-tracker/proxy_tracker_test.go | 26 ++--- 34 files changed, 266 insertions(+), 215 deletions(-) rename agent/proxycfg/{proxysnapshot.go => config_snapshot_glue.go} (88%) rename agent/proxycfg/{proxysnapshot_test.go => config_snapshot_glue_test.go} (99%) create mode 100644 internal/mesh/proxy-snapshot/proxy_snapshot.go rename {agent => internal/mesh}/proxy-tracker/mock_SessionLimiter.go (94%) rename internal/mesh/{ => proxy-tracker}/proxy_state_exports.go (98%) rename {agent => internal/mesh}/proxy-tracker/proxy_tracker.go (92%) rename {agent => internal/mesh}/proxy-tracker/proxy_tracker_test.go (89%) diff --git a/agent/agent.go b/agent/agent.go index 5f2e57c718f3..e3af2f1c9826 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -9,8 +9,8 @@ import ( "encoding/json" "errors" "fmt" - proxytracker "github.com/hashicorp/consul/agent/proxy-tracker" catalogproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/catalog" + proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker" "github.com/hashicorp/consul/lib/stringslice" "io" "net" @@ -653,6 +653,47 @@ func (a *Agent) Start(ctx context.Context) error { return fmt.Errorf("failed to start Consul enterprise component: %v", err) } + // Create proxy config manager now because it is a dependency of creating the proxyWatcher + // which will be passed to consul.NewServer so that it is then passed to the + // controller registration for the XDS controller in v2 mode, and the xds server in v1 and v2 mode. + var intentionDefaultAllow bool + switch a.config.ACLResolverSettings.ACLDefaultPolicy { + case "allow": + intentionDefaultAllow = true + case "deny": + intentionDefaultAllow = false + default: + return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLResolverSettings.ACLDefaultPolicy) + } + + go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh}) + + // Start the proxy config manager. + a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{ + DataSources: a.proxyDataSources(), + Logger: a.logger.Named(logging.ProxyConfig), + Source: &structs.QuerySource{ + Datacenter: a.config.Datacenter, + Segment: a.config.SegmentName, + Node: a.config.NodeName, + NodePartition: a.config.PartitionOrEmpty(), + }, + DNSConfig: proxycfg.DNSConfig{ + Domain: a.config.DNSDomain, + AltDomain: a.config.DNSAltDomain, + }, + TLSConfigurator: a.tlsConfigurator, + IntentionDefaultAllow: intentionDefaultAllow, + UpdateRateLimit: a.config.XDSUpdateRateLimit, + }) + if err != nil { + return err + } + + // proxyWatcher will be used in the creation of the XDS server and also + // in the registration of the xds controller. + proxyWatcher := a.getProxyWatcher() + // Setup either the client or the server. if c.ServerMode { serverLogger := a.baseDeps.Logger.NamedIntercept(logging.ConsulServer) @@ -686,7 +727,11 @@ func (a *Agent) Start(ctx context.Context) error { incomingRPCLimiter, ) - server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer, incomingRPCLimiter, serverLogger) + var pt *proxytracker.ProxyTracker + if a.useV2Resources() { + pt = proxyWatcher.(*proxytracker.ProxyTracker) + } + server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer, incomingRPCLimiter, serverLogger, pt) if err != nil { return fmt.Errorf("Failed to start Consul server: %v", err) } @@ -753,40 +798,6 @@ func (a *Agent) Start(ctx context.Context) error { return err } - var intentionDefaultAllow bool - switch a.config.ACLResolverSettings.ACLDefaultPolicy { - case "allow": - intentionDefaultAllow = true - case "deny": - intentionDefaultAllow = false - default: - return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLResolverSettings.ACLDefaultPolicy) - } - - go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh}) - - // Start the proxy config manager. - a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{ - DataSources: a.proxyDataSources(), - Logger: a.logger.Named(logging.ProxyConfig), - Source: &structs.QuerySource{ - Datacenter: a.config.Datacenter, - Segment: a.config.SegmentName, - Node: a.config.NodeName, - NodePartition: a.config.PartitionOrEmpty(), - }, - DNSConfig: proxycfg.DNSConfig{ - Domain: a.config.DNSDomain, - AltDomain: a.config.DNSAltDomain, - }, - TLSConfigurator: a.tlsConfigurator, - IntentionDefaultAllow: intentionDefaultAllow, - UpdateRateLimit: a.config.XDSUpdateRateLimit, - }) - if err != nil { - return err - } - go localproxycfg.Sync( &lib.StopChannelContext{StopCh: a.shutdownCh}, localproxycfg.SyncConfig{ @@ -839,7 +850,7 @@ func (a *Agent) Start(ctx context.Context) error { } // Start grpc and grpc_tls servers. - if err := a.listenAndServeGRPC(); err != nil { + if err := a.listenAndServeGRPC(proxyWatcher); err != nil { return err } @@ -924,11 +935,13 @@ func (a *Agent) useV2Resources() bool { // it will return a ConfigSource. func (a *Agent) getProxyWatcher() xds.ProxyWatcher { if a.useV2Resources() { + a.logger.Trace("returning proxyTracker for getProxyWatcher") return proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{ - Logger: a.proxyConfig.Logger.Named("proxy-tracker"), + Logger: a.logger.Named("proxy-tracker"), SessionLimiter: a.baseDeps.XDSStreamLimiter, }) } else { + a.logger.Trace("returning configSource for getProxyWatcher") return localproxycfg.NewConfigSource(a.proxyConfig) } } @@ -936,16 +949,14 @@ func (a *Agent) getProxyWatcher() xds.ProxyWatcher { // configureXDSServer configures an XDS server with the proper implementation of // the PRoxyWatcher interface and registers the XDS server with Consul's // external facing GRPC server. -func (a *Agent) configureXDSServer() { - cfg := a.getProxyWatcher() - +func (a *Agent) configureXDSServer(proxyWatcher xds.ProxyWatcher) { // TODO(agentless): rather than asserting the concrete type of delegate, we // should add a method to the Delegate interface to build a ConfigSource. if server, ok := a.delegate.(*consul.Server); ok { catalogCfg := catalogproxycfg.NewConfigSource(catalogproxycfg.Config{ NodeName: a.config.NodeName, LocalState: a.State, - LocalConfigSource: cfg, + LocalConfigSource: proxyWatcher, Manager: a.proxyConfig, GetStore: func() catalogproxycfg.Store { return server.FSM().State() }, Logger: a.proxyConfig.Logger.Named("server-catalog"), @@ -955,12 +966,12 @@ func (a *Agent) configureXDSServer() { <-a.shutdownCh catalogCfg.Shutdown() }() - cfg = catalogCfg + proxyWatcher = catalogCfg } a.xdsServer = xds.NewServer( a.config.NodeName, a.logger.Named(logging.Envoy), - cfg, + proxyWatcher, func(id string) (acl.Authorizer, error) { return a.delegate.ResolveTokenAndDefaultMeta(id, nil, nil) }, @@ -969,12 +980,12 @@ func (a *Agent) configureXDSServer() { a.xdsServer.Register(a.externalGRPCServer) } -func (a *Agent) listenAndServeGRPC() error { +func (a *Agent) listenAndServeGRPC(proxyWatcher xds.ProxyWatcher) error { if len(a.config.GRPCAddrs) < 1 && len(a.config.GRPCTLSAddrs) < 1 { return nil } - a.configureXDSServer() + a.configureXDSServer(proxyWatcher) // Attempt to spawn listeners var listeners []net.Listener diff --git a/agent/agent_test.go b/agent/agent_test.go index 174e8c8cdfbb..c597f1d5d575 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -15,10 +15,10 @@ import ( "errors" "fmt" "github.com/hashicorp/consul/agent/grpc-external/limiter" - proxytracker "github.com/hashicorp/consul/agent/proxy-tracker" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/proxycfg-sources/local" "github.com/hashicorp/consul/agent/xds" + proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker" mathrand "math/rand" "net" "net/http" diff --git a/agent/consul/leader_connect_ca_test.go b/agent/consul/leader_connect_ca_test.go index 42dcdd447d49..b3e8fdc9d0ed 100644 --- a/agent/consul/leader_connect_ca_test.go +++ b/agent/consul/leader_connect_ca_test.go @@ -566,7 +566,7 @@ func TestCAManager_Initialize_Logging(t *testing.T) { deps := newDefaultDeps(t, conf1) deps.Logger = logger - s1, err := NewServer(conf1, deps, grpc.NewServer(), nil, logger) + s1, err := NewServer(conf1, deps, grpc.NewServer(), nil, logger, nil) require.NoError(t, err) defer s1.Shutdown() testrpc.WaitForLeader(t, s1.RPC, "dc1") diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 6cd30f38c987..7a4b63eb05ee 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -1640,7 +1640,7 @@ func TestLeader_ConfigEntryBootstrap_Fail(t *testing.T) { deps := newDefaultDeps(t, config) deps.Logger = logger - srv, err := NewServer(config, deps, grpc.NewServer(), nil, logger) + srv, err := NewServer(config, deps, grpc.NewServer(), nil, logger, nil) require.NoError(t, err) defer srv.Shutdown() diff --git a/agent/consul/server.go b/agent/consul/server.go index 64c0c1f76a15..9357cf36db83 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -8,6 +8,7 @@ import ( "crypto/x509" "errors" "fmt" + proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" "io" "net" "os" @@ -480,9 +481,21 @@ type connHandler interface { Shutdown() error } +// ProxyUpdater is an interface for ProxyTracker. +type ProxyUpdater interface { + // PushChange allows pushing a computed ProxyState to xds for xds resource generation to send to a proxy. + PushChange(id *pbresource.ID, snapshot proxysnapshot.ProxySnapshot) error + + // ProxyConnectedToServer returns whether this id is connected to this server. + ProxyConnectedToServer(id *pbresource.ID) bool + + EventChannel() chan controller.Event +} + // NewServer is used to construct a new Consul server from the configuration // and extra options, potentially returning an error. -func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incomingRPCLimiter rpcRate.RequestLimitsHandler, serverLogger hclog.InterceptLogger) (*Server, error) { +func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, + incomingRPCLimiter rpcRate.RequestLimitsHandler, serverLogger hclog.InterceptLogger, proxyUpdater ProxyUpdater) (*Server, error) { logger := flat.Logger if err := config.CheckProtocolVersion(); err != nil { return nil, err @@ -822,7 +835,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom s.insecureResourceServiceClient, logger.Named(logging.ControllerRuntime), ) - s.registerControllers(flat) + s.registerControllers(flat, proxyUpdater) go s.controllerManager.Run(&lib.StopChannelContext{StopCh: shutdownCh}) go s.trackLeaderChanges() @@ -873,7 +886,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom return s, nil } -func (s *Server) registerControllers(deps Deps) { +func (s *Server) registerControllers(deps Deps, proxyUpdater ProxyUpdater) { if stringslice.Contains(deps.Experiments, CatalogResourceExperimentName) { catalog.RegisterControllers(s.controllerManager, catalog.DefaultControllerDependencies()) mesh.RegisterControllers(s.controllerManager, mesh.ControllerDependencies{ @@ -889,6 +902,7 @@ func (s *Server) registerControllers(deps Deps) { } return &bundle, nil }, + ProxyUpdater: proxyUpdater, }) } diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index 1ad333c61321..e8058a468a3f 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -336,7 +336,7 @@ func newServerWithDeps(t *testing.T, c *Config, deps Deps) (*Server, error) { } } grpcServer := external.NewServer(deps.Logger.Named("grpc.external"), nil, deps.TLSConfigurator, rpcRate.NullRequestLimitsHandler()) - srv, err := NewServer(c, deps, grpcServer, nil, deps.Logger) + srv, err := NewServer(c, deps, grpcServer, nil, deps.Logger, nil) if err != nil { return nil, err } @@ -1243,7 +1243,7 @@ func TestServer_RPC_MetricsIntercept_Off(t *testing.T) { } } - s1, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger) + s1, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -1281,7 +1281,7 @@ func TestServer_RPC_MetricsIntercept_Off(t *testing.T) { return nil } - s2, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger) + s2, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -1315,7 +1315,7 @@ func TestServer_RPC_RequestRecorder(t *testing.T) { deps := newDefaultDeps(t, conf) deps.NewRequestRecorderFunc = nil - s1, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger) + s1, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger, nil) require.Error(t, err, "need err when provider func is nil") require.Equal(t, err.Error(), "cannot initialize server without an RPC request recorder provider") @@ -1334,7 +1334,7 @@ func TestServer_RPC_RequestRecorder(t *testing.T) { return nil } - s2, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger) + s2, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger, nil) require.Error(t, err, "need err when RequestRecorder is nil") require.Equal(t, err.Error(), "cannot initialize server with a nil RPC request recorder") diff --git a/agent/proxycfg-sources/catalog/config_source.go b/agent/proxycfg-sources/catalog/config_source.go index a7205b66221c..1a41af4658d7 100644 --- a/agent/proxycfg-sources/catalog/config_source.go +++ b/agent/proxycfg-sources/catalog/config_source.go @@ -6,6 +6,8 @@ package catalog import ( "context" "errors" + "github.com/hashicorp/consul/agent/grpc-external/limiter" + proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" "github.com/hashicorp/consul/proto-public/pbresource" "sync" @@ -14,7 +16,6 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/configentry" - "github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" @@ -49,7 +50,7 @@ func NewConfigSource(cfg Config) *ConfigSource { // Watch wraps the underlying proxycfg.Manager and dynamically registers // services from the catalog with it when requested by the xDS server. -func (m *ConfigSource) Watch(id *pbresource.ID, nodeName string, token string) (<-chan proxycfg.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { +func (m *ConfigSource) Watch(id *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) { // Create service ID serviceID := structs.NewServiceID(id.Name, GetEnterpriseMetaFromResourceID(id)) // If the service is registered to the local agent, use the LocalConfigSource @@ -279,7 +280,7 @@ type Config struct { //go:generate mockery --name ConfigManager --inpackage type ConfigManager interface { - Watch(req proxycfg.ProxyID) (<-chan proxycfg.ProxySnapshot, proxycfg.CancelFunc) + Watch(req proxycfg.ProxyID) (<-chan proxysnapshot.ProxySnapshot, proxysnapshot.CancelFunc) Register(proxyID proxycfg.ProxyID, service *structs.NodeService, source proxycfg.ProxySource, token string, overwrite bool) error Deregister(proxyID proxycfg.ProxyID, source proxycfg.ProxySource) } @@ -292,10 +293,11 @@ type Store interface { //go:generate mockery --name Watcher --inpackage type Watcher interface { - Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxycfg.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) + Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) } //go:generate mockery --name SessionLimiter --inpackage type SessionLimiter interface { BeginSession() (limiter.Session, error) + Run(ctx context.Context) } diff --git a/agent/proxycfg-sources/catalog/config_source_test.go b/agent/proxycfg-sources/catalog/config_source_test.go index 653ac66977e1..eef62652ce88 100644 --- a/agent/proxycfg-sources/catalog/config_source_test.go +++ b/agent/proxycfg-sources/catalog/config_source_test.go @@ -4,7 +4,9 @@ package catalog import ( + "context" "errors" + proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" rtest "github.com/hashicorp/consul/internal/resource/resourcetest" "testing" "time" @@ -177,7 +179,7 @@ func TestConfigSource_LocallyManagedService(t *testing.T) { localWatcher := NewMockWatcher(t) localWatcher.On("Watch", proxyID, nodeName, token). - Return(make(<-chan proxycfg.ProxySnapshot), nil, proxycfg.CancelFunc(func() {}), nil) + Return(make(<-chan proxysnapshot.ProxySnapshot), nil, proxysnapshot.CancelFunc(func() {}), nil) mgr := NewConfigSource(Config{ NodeName: nodeName, @@ -211,12 +213,12 @@ func TestConfigSource_ErrorRegisteringService(t *testing.T) { })) var canceledWatch bool - cancel := proxycfg.CancelFunc(func() { canceledWatch = true }) + cancel := proxysnapshot.CancelFunc(func() { canceledWatch = true }) cfgMgr := NewMockConfigManager(t) cfgMgr.On("Watch", mock.Anything). - Return(make(<-chan proxycfg.ProxySnapshot), cancel) + Return(make(<-chan proxysnapshot.ProxySnapshot), cancel) cfgMgr.On("Register", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(errors.New("KABOOM")) @@ -261,12 +263,12 @@ func TestConfigSource_NotProxyService(t *testing.T) { })) var canceledWatch bool - cancel := proxycfg.CancelFunc(func() { canceledWatch = true }) + cancel := proxysnapshot.CancelFunc(func() { canceledWatch = true }) cfgMgr := NewMockConfigManager(t) cfgMgr.On("Watch", mock.Anything). - Return(make(<-chan proxycfg.ProxySnapshot), cancel) + Return(make(<-chan proxysnapshot.ProxySnapshot), cancel) mgr := NewConfigSource(Config{ Manager: cfgMgr, @@ -312,9 +314,9 @@ func testConfigManager(t *testing.T, serviceID structs.ServiceID, nodeName strin Token: token, } - snapCh := make(chan proxycfg.ProxySnapshot, 1) + snapCh := make(chan proxysnapshot.ProxySnapshot, 1) cfgMgr.On("Watch", proxyID). - Return((<-chan proxycfg.ProxySnapshot)(snapCh), proxycfg.CancelFunc(func() {}), nil) + Return((<-chan proxysnapshot.ProxySnapshot)(snapCh), proxysnapshot.CancelFunc(func() {}), nil) cfgMgr.On("Register", mock.Anything, mock.Anything, source, token, false). Run(func(args mock.Arguments) { @@ -358,6 +360,8 @@ func (nullSessionLimiter) BeginSession() (limiter.Session, error) { return nullSession{}, nil } +func (nullSessionLimiter) Run(ctx context.Context) {} + type nullSession struct{} func (nullSession) End() {} diff --git a/agent/proxycfg-sources/catalog/mock_ConfigManager.go b/agent/proxycfg-sources/catalog/mock_ConfigManager.go index b50d032d024a..1608a148f061 100644 --- a/agent/proxycfg-sources/catalog/mock_ConfigManager.go +++ b/agent/proxycfg-sources/catalog/mock_ConfigManager.go @@ -4,6 +4,7 @@ package catalog import ( proxycfg "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" mock "github.com/stretchr/testify/mock" structs "github.com/hashicorp/consul/agent/structs" @@ -34,27 +35,27 @@ func (_m *MockConfigManager) Register(proxyID proxycfg.ProxyID, service *structs } // Watch provides a mock function with given fields: req -func (_m *MockConfigManager) Watch(req proxycfg.ProxyID) (<-chan proxycfg.ProxySnapshot, proxycfg.CancelFunc) { +func (_m *MockConfigManager) Watch(req proxycfg.ProxyID) (<-chan proxysnapshot.ProxySnapshot, proxysnapshot.CancelFunc) { ret := _m.Called(req) - var r0 <-chan proxycfg.ProxySnapshot - var r1 proxycfg.CancelFunc - if rf, ok := ret.Get(0).(func(proxycfg.ProxyID) (<-chan proxycfg.ProxySnapshot, proxycfg.CancelFunc)); ok { + var r0 <-chan proxysnapshot.ProxySnapshot + var r1 proxysnapshot.CancelFunc + if rf, ok := ret.Get(0).(func(proxycfg.ProxyID) (<-chan proxysnapshot.ProxySnapshot, proxysnapshot.CancelFunc)); ok { return rf(req) } - if rf, ok := ret.Get(0).(func(proxycfg.ProxyID) <-chan proxycfg.ProxySnapshot); ok { + if rf, ok := ret.Get(0).(func(proxycfg.ProxyID) <-chan proxysnapshot.ProxySnapshot); ok { r0 = rf(req) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(<-chan proxycfg.ProxySnapshot) + r0 = ret.Get(0).(<-chan proxysnapshot.ProxySnapshot) } } - if rf, ok := ret.Get(1).(func(proxycfg.ProxyID) proxycfg.CancelFunc); ok { + if rf, ok := ret.Get(1).(func(proxycfg.ProxyID) proxysnapshot.CancelFunc); ok { r1 = rf(req) } else { if ret.Get(1) != nil { - r1 = ret.Get(1).(proxycfg.CancelFunc) + r1 = ret.Get(1).(proxysnapshot.CancelFunc) } } diff --git a/agent/proxycfg-sources/catalog/mock_SessionLimiter.go b/agent/proxycfg-sources/catalog/mock_SessionLimiter.go index 3b7147cb064c..a1670b190d9d 100644 --- a/agent/proxycfg-sources/catalog/mock_SessionLimiter.go +++ b/agent/proxycfg-sources/catalog/mock_SessionLimiter.go @@ -1,9 +1,11 @@ -// Code generated by mockery v2.15.0. DO NOT EDIT. +// Code generated by mockery v2.32.4. DO NOT EDIT. package catalog import ( - limiter "github.com/hashicorp/consul/agent/grpc-external/limiter" + context "context" + "github.com/hashicorp/consul/agent/grpc-external/limiter" + mock "github.com/stretchr/testify/mock" ) @@ -17,6 +19,10 @@ func (_m *MockSessionLimiter) BeginSession() (limiter.Session, error) { ret := _m.Called() var r0 limiter.Session + var r1 error + if rf, ok := ret.Get(0).(func() (limiter.Session, error)); ok { + return rf() + } if rf, ok := ret.Get(0).(func() limiter.Session); ok { r0 = rf() } else { @@ -25,7 +31,6 @@ func (_m *MockSessionLimiter) BeginSession() (limiter.Session, error) { } } - var r1 error if rf, ok := ret.Get(1).(func() error); ok { r1 = rf() } else { @@ -35,13 +40,17 @@ func (_m *MockSessionLimiter) BeginSession() (limiter.Session, error) { return r0, r1 } -type mockConstructorTestingTNewMockSessionLimiter interface { - mock.TestingT - Cleanup(func()) +// Run provides a mock function with given fields: ctx +func (_m *MockSessionLimiter) Run(ctx context.Context) { + _m.Called(ctx) } // NewMockSessionLimiter creates a new instance of MockSessionLimiter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockSessionLimiter(t mockConstructorTestingTNewMockSessionLimiter) *MockSessionLimiter { +// The first argument is typically a *testing.T value. +func NewMockSessionLimiter(t interface { + mock.TestingT + Cleanup(func()) +}) *MockSessionLimiter { mock := &MockSessionLimiter{} mock.Mock.Test(t) diff --git a/agent/proxycfg-sources/catalog/mock_Watcher.go b/agent/proxycfg-sources/catalog/mock_Watcher.go index 7701fd7d389a..71460a261440 100644 --- a/agent/proxycfg-sources/catalog/mock_Watcher.go +++ b/agent/proxycfg-sources/catalog/mock_Watcher.go @@ -3,12 +3,11 @@ package catalog import ( - limiter "github.com/hashicorp/consul/agent/grpc-external/limiter" + "github.com/hashicorp/consul/agent/grpc-external/limiter" + "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" mock "github.com/stretchr/testify/mock" pbresource "github.com/hashicorp/consul/proto-public/pbresource" - - proxycfg "github.com/hashicorp/consul/agent/proxycfg" ) // MockWatcher is an autogenerated mock type for the Watcher type @@ -17,21 +16,21 @@ type MockWatcher struct { } // Watch provides a mock function with given fields: proxyID, nodeName, token -func (_m *MockWatcher) Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxycfg.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { +func (_m *MockWatcher) Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) { ret := _m.Called(proxyID, nodeName, token) - var r0 <-chan proxycfg.ProxySnapshot + var r0 <-chan proxysnapshot.ProxySnapshot var r1 limiter.SessionTerminatedChan - var r2 proxycfg.CancelFunc + var r2 proxysnapshot.CancelFunc var r3 error - if rf, ok := ret.Get(0).(func(*pbresource.ID, string, string) (<-chan proxycfg.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error)); ok { + if rf, ok := ret.Get(0).(func(*pbresource.ID, string, string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error)); ok { return rf(proxyID, nodeName, token) } - if rf, ok := ret.Get(0).(func(*pbresource.ID, string, string) <-chan proxycfg.ProxySnapshot); ok { + if rf, ok := ret.Get(0).(func(*pbresource.ID, string, string) <-chan proxysnapshot.ProxySnapshot); ok { r0 = rf(proxyID, nodeName, token) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(<-chan proxycfg.ProxySnapshot) + r0 = ret.Get(0).(<-chan proxysnapshot.ProxySnapshot) } } @@ -43,11 +42,11 @@ func (_m *MockWatcher) Watch(proxyID *pbresource.ID, nodeName string, token stri } } - if rf, ok := ret.Get(2).(func(*pbresource.ID, string, string) proxycfg.CancelFunc); ok { + if rf, ok := ret.Get(2).(func(*pbresource.ID, string, string) proxysnapshot.CancelFunc); ok { r2 = rf(proxyID, nodeName, token) } else { if ret.Get(2) != nil { - r2 = ret.Get(2).(proxycfg.CancelFunc) + r2 = ret.Get(2).(proxysnapshot.CancelFunc) } } diff --git a/agent/proxycfg-sources/local/config_source.go b/agent/proxycfg-sources/local/config_source.go index 634a0f479ebc..7b3a835fb819 100644 --- a/agent/proxycfg-sources/local/config_source.go +++ b/agent/proxycfg-sources/local/config_source.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/proxycfg-sources/catalog" structs "github.com/hashicorp/consul/agent/structs" + proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" "github.com/hashicorp/consul/proto-public/pbresource" ) @@ -22,8 +23,8 @@ func NewConfigSource(cfgMgr ConfigManager) *ConfigSource { return &ConfigSource{cfgMgr} } -func (m *ConfigSource) Watch(proxyID *pbresource.ID, nodeName string, _ string) (<-chan proxycfg.ProxySnapshot, - limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { +func (m *ConfigSource) Watch(proxyID *pbresource.ID, nodeName string, _ string) (<-chan proxysnapshot.ProxySnapshot, + limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) { serviceID := structs.NewServiceID(proxyID.Name, catalog.GetEnterpriseMetaFromResourceID(proxyID)) watchCh, cancelWatch := m.manager.Watch(proxycfg.ProxyID{ ServiceID: serviceID, diff --git a/agent/proxycfg-sources/local/mock_ConfigManager.go b/agent/proxycfg-sources/local/mock_ConfigManager.go index d16f0e98ee3d..6c8d4e811d7f 100644 --- a/agent/proxycfg-sources/local/mock_ConfigManager.go +++ b/agent/proxycfg-sources/local/mock_ConfigManager.go @@ -4,6 +4,7 @@ package local import ( proxycfg "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" mock "github.com/stretchr/testify/mock" structs "github.com/hashicorp/consul/agent/structs" @@ -50,27 +51,27 @@ func (_m *MockConfigManager) RegisteredProxies(source proxycfg.ProxySource) []pr } // Watch provides a mock function with given fields: id -func (_m *MockConfigManager) Watch(id proxycfg.ProxyID) (<-chan proxycfg.ProxySnapshot, proxycfg.CancelFunc) { +func (_m *MockConfigManager) Watch(id proxycfg.ProxyID) (<-chan proxysnapshot.ProxySnapshot, proxysnapshot.CancelFunc) { ret := _m.Called(id) - var r0 <-chan proxycfg.ProxySnapshot - var r1 proxycfg.CancelFunc - if rf, ok := ret.Get(0).(func(proxycfg.ProxyID) (<-chan proxycfg.ProxySnapshot, proxycfg.CancelFunc)); ok { + var r0 <-chan proxysnapshot.ProxySnapshot + var r1 proxysnapshot.CancelFunc + if rf, ok := ret.Get(0).(func(proxycfg.ProxyID) (<-chan proxysnapshot.ProxySnapshot, proxysnapshot.CancelFunc)); ok { return rf(id) } - if rf, ok := ret.Get(0).(func(proxycfg.ProxyID) <-chan proxycfg.ProxySnapshot); ok { + if rf, ok := ret.Get(0).(func(proxycfg.ProxyID) <-chan proxysnapshot.ProxySnapshot); ok { r0 = rf(id) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(<-chan proxycfg.ProxySnapshot) + r0 = ret.Get(0).(<-chan proxysnapshot.ProxySnapshot) } } - if rf, ok := ret.Get(1).(func(proxycfg.ProxyID) proxycfg.CancelFunc); ok { + if rf, ok := ret.Get(1).(func(proxycfg.ProxyID) proxysnapshot.CancelFunc); ok { r1 = rf(id) } else { if ret.Get(1) != nil { - r1 = ret.Get(1).(proxycfg.CancelFunc) + r1 = ret.Get(1).(proxysnapshot.CancelFunc) } } diff --git a/agent/proxycfg-sources/local/sync.go b/agent/proxycfg-sources/local/sync.go index bb06cc84381c..b5583db43a3d 100644 --- a/agent/proxycfg-sources/local/sync.go +++ b/agent/proxycfg-sources/local/sync.go @@ -5,6 +5,7 @@ package local import ( "context" + proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" "time" "github.com/hashicorp/go-hclog" @@ -135,7 +136,7 @@ func sync(cfg SyncConfig) { //go:generate mockery --name ConfigManager --inpackage type ConfigManager interface { - Watch(id proxycfg.ProxyID) (<-chan proxycfg.ProxySnapshot, proxycfg.CancelFunc) + Watch(id proxycfg.ProxyID) (<-chan proxysnapshot.ProxySnapshot, proxysnapshot.CancelFunc) Register(proxyID proxycfg.ProxyID, service *structs.NodeService, source proxycfg.ProxySource, token string, overwrite bool) error Deregister(proxyID proxycfg.ProxyID, source proxycfg.ProxySource) RegisteredProxies(source proxycfg.ProxySource) []proxycfg.ProxyID diff --git a/agent/proxycfg/proxysnapshot.go b/agent/proxycfg/config_snapshot_glue.go similarity index 88% rename from agent/proxycfg/proxysnapshot.go rename to agent/proxycfg/config_snapshot_glue.go index fb1c79183cf1..7d1c1d9770e0 100644 --- a/agent/proxycfg/proxysnapshot.go +++ b/agent/proxycfg/config_snapshot_glue.go @@ -8,16 +8,6 @@ import ( "google.golang.org/grpc/status" ) -// ProxySnapshot is an abstraction that allows interchangeability between -// Catalog V1 ConfigSnapshot and Catalog V2 ProxyState. -type ProxySnapshot interface { - AllowEmptyListeners() bool - AllowEmptyRoutes() bool - AllowEmptyClusters() bool - Authorize(authz acl.Authorizer) error - LoggerName() string -} - // The below functions are added to ConfigSnapshot to allow it to conform to // the ProxySnapshot interface. func (s *ConfigSnapshot) AllowEmptyListeners() bool { diff --git a/agent/proxycfg/proxysnapshot_test.go b/agent/proxycfg/config_snapshot_glue_test.go similarity index 99% rename from agent/proxycfg/proxysnapshot_test.go rename to agent/proxycfg/config_snapshot_glue_test.go index 9a61a7f1c64c..6ff20714eb32 100644 --- a/agent/proxycfg/proxysnapshot_test.go +++ b/agent/proxycfg/config_snapshot_glue_test.go @@ -4,10 +4,11 @@ import ( "strings" "testing" - "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/structs" ) func TestConfigSnapshot_AllowEmptyClusters(t *testing.T) { diff --git a/agent/proxycfg/manager.go b/agent/proxycfg/manager.go index 296bd831a8d9..32d725365b02 100644 --- a/agent/proxycfg/manager.go +++ b/agent/proxycfg/manager.go @@ -5,6 +5,7 @@ package proxycfg import ( "errors" + proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" "runtime/debug" "sync" @@ -36,10 +37,6 @@ type ProxyID struct { // from overwriting each other's registrations. type ProxySource string -// CancelFunc is a type for a returned function that can be called to cancel a -// watch. -type CancelFunc func() - // Manager provides an API with which proxy services can be registered, and // coordinates the fetching (and refreshing) of intentions, upstreams, discovery // chain, certificates etc. @@ -55,7 +52,7 @@ type Manager struct { mu sync.Mutex proxies map[ProxyID]*state - watchers map[ProxyID]map[uint64]chan ProxySnapshot + watchers map[ProxyID]map[uint64]chan proxysnapshot.ProxySnapshot maxWatchID uint64 } @@ -106,7 +103,7 @@ func NewManager(cfg ManagerConfig) (*Manager, error) { m := &Manager{ ManagerConfig: cfg, proxies: make(map[ProxyID]*state), - watchers: make(map[ProxyID]map[uint64]chan ProxySnapshot), + watchers: make(map[ProxyID]map[uint64]chan proxysnapshot.ProxySnapshot), rateLimiter: rate.NewLimiter(cfg.UpdateRateLimit, 1), } return m, nil @@ -262,7 +259,7 @@ func (m *Manager) notify(snap *ConfigSnapshot) { // it will drain the chan and then re-attempt delivery so that a slow consumer // gets the latest config earlier. This MUST be called from a method where m.mu // is held to be safe since it assumes we are the only goroutine sending on ch. -func (m *Manager) deliverLatest(snap *ConfigSnapshot, ch chan ProxySnapshot) { +func (m *Manager) deliverLatest(snap *ConfigSnapshot, ch chan proxysnapshot.ProxySnapshot) { // Send if chan is empty select { case ch <- snap: @@ -299,16 +296,16 @@ OUTER: // will not fail, but no updates will be delivered until the proxy is // registered. If there is already a valid snapshot in memory, it will be // delivered immediately. -func (m *Manager) Watch(id ProxyID) (<-chan ProxySnapshot, CancelFunc) { +func (m *Manager) Watch(id ProxyID) (<-chan proxysnapshot.ProxySnapshot, proxysnapshot.CancelFunc) { m.mu.Lock() defer m.mu.Unlock() // This buffering is crucial otherwise we'd block immediately trying to // deliver the current snapshot below if we already have one. - ch := make(chan ProxySnapshot, 1) + ch := make(chan proxysnapshot.ProxySnapshot, 1) watchers, ok := m.watchers[id] if !ok { - watchers = make(map[uint64]chan ProxySnapshot) + watchers = make(map[uint64]chan proxysnapshot.ProxySnapshot) } watchID := m.maxWatchID m.maxWatchID++ diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index c66352c1ec31..50729b5bdf28 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -4,6 +4,7 @@ package proxycfg import ( + proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" "testing" "time" @@ -469,7 +470,7 @@ func testManager_BasicLifecycle( require.Len(t, m.watchers, 0) } -func assertWatchChanBlocks(t *testing.T, ch <-chan ProxySnapshot) { +func assertWatchChanBlocks(t *testing.T, ch <-chan proxysnapshot.ProxySnapshot) { t.Helper() select { @@ -479,7 +480,7 @@ func assertWatchChanBlocks(t *testing.T, ch <-chan ProxySnapshot) { } } -func assertWatchChanRecvs(t *testing.T, ch <-chan ProxySnapshot, expect ProxySnapshot) { +func assertWatchChanRecvs(t *testing.T, ch <-chan proxysnapshot.ProxySnapshot, expect proxysnapshot.ProxySnapshot) { t.Helper() select { @@ -517,7 +518,7 @@ func TestManager_deliverLatest(t *testing.T) { } // test 1 buffered chan - ch1 := make(chan ProxySnapshot, 1) + ch1 := make(chan proxysnapshot.ProxySnapshot, 1) // Sending to an unblocked chan should work m.deliverLatest(snap1, ch1) @@ -533,7 +534,7 @@ func TestManager_deliverLatest(t *testing.T) { require.Equal(t, snap2, <-ch1) // Same again for 5-buffered chan - ch5 := make(chan ProxySnapshot, 5) + ch5 := make(chan proxysnapshot.ProxySnapshot, 5) // Sending to an unblocked chan should work m.deliverLatest(snap1, ch5) diff --git a/agent/proxycfg_test.go b/agent/proxycfg_test.go index c51d9a64bcb3..04fa0ca6b51a 100644 --- a/agent/proxycfg_test.go +++ b/agent/proxycfg_test.go @@ -5,7 +5,9 @@ package agent import ( "encoding/json" + "github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/internal/mesh" + proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" rtest "github.com/hashicorp/consul/internal/resource/resourcetest" "net/http" "net/http/httptest" @@ -14,8 +16,6 @@ import ( "github.com/stretchr/testify/require" - "github.com/hashicorp/consul/agent/grpc-external/limiter" - "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/testrpc" @@ -54,7 +54,7 @@ func TestAgent_local_proxycfg(t *testing.T) { // This is a little gross, but this gives us the layered pair of // local/catalog sources for now. - cfg := a.xdsServer.CfgSrc + cfg := a.xdsServer.ProxyWatcher var ( timer = time.After(100 * time.Millisecond) @@ -64,9 +64,9 @@ func TestAgent_local_proxycfg(t *testing.T) { var ( firstTime = true - ch <-chan proxycfg.ProxySnapshot + ch <-chan proxysnapshot.ProxySnapshot stc limiter.SessionTerminatedChan - cancel proxycfg.CancelFunc + cancel proxysnapshot.CancelFunc ) defer func() { if cancel != nil { diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index 49bf97b19c7f..8fde278c8b7f 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -1820,7 +1820,7 @@ func newTestServer(t *testing.T, cb func(conf *consul.Config)) testingServer { deps := newDefaultDeps(t, conf) externalGRPCServer := external.NewServer(deps.Logger, nil, deps.TLSConfigurator, rate.NullRequestLimitsHandler()) - server, err := consul.NewServer(conf, deps, externalGRPCServer, nil, deps.Logger) + server, err := consul.NewServer(conf, deps, externalGRPCServer, nil, deps.Logger, nil) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, server.Shutdown()) diff --git a/agent/xds/delta.go b/agent/xds/delta.go index 7d345ff77073..6d668006d70f 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -11,6 +11,8 @@ import ( "github.com/hashicorp/consul/agent/xds/configfetcher" "github.com/hashicorp/consul/agent/xdsv2" "github.com/hashicorp/consul/internal/mesh" + proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" + proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker" "github.com/hashicorp/consul/proto-public/pbresource" "strconv" "sync" @@ -91,7 +93,7 @@ func (s *Server) DeltaAggregatedResources(stream ADSDeltaStream) error { // Envoy resource generator based on whether it was passed a ConfigSource or // ProxyState implementation of the ProxySnapshot interface and returns the // generated Envoy configuration. -func getEnvoyConfiguration(proxySnapshot proxycfg.ProxySnapshot, logger hclog.Logger, cfgFetcher configfetcher.ConfigFetcher) (map[string][]proto.Message, error) { +func getEnvoyConfiguration(proxySnapshot proxysnapshot.ProxySnapshot, logger hclog.Logger, cfgFetcher configfetcher.ConfigFetcher) (map[string][]proto.Message, error) { switch proxySnapshot.(type) { case *proxycfg.ConfigSnapshot: logger.Trace("ProxySnapshot update channel received a ProxySnapshot of type ConfigSnapshot", @@ -106,14 +108,14 @@ func getEnvoyConfiguration(proxySnapshot proxycfg.ProxySnapshot, logger hclog.Lo c := proxySnapshot.(*proxycfg.ConfigSnapshot) logger.Trace("ConfigSnapshot", c) return generator.AllResourcesFromSnapshot(c) - case *mesh.ProxyState: + case *proxytracker.ProxyState: logger.Trace("ProxySnapshot update channel received a ProxySnapshot of type ProxyState", "proxySnapshot", proxySnapshot, ) generator := xdsv2.NewResourceGenerator( logger, ) - c := proxySnapshot.(*mesh.ProxyState) + c := proxySnapshot.(*proxytracker.ProxyState) logger.Trace("ProxyState", c) return generator.AllResourcesFromIR(c) default: @@ -135,9 +137,9 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove // Loop state var ( - proxySnapshot proxycfg.ProxySnapshot + proxySnapshot proxysnapshot.ProxySnapshot node *envoy_config_core_v3.Node - stateCh <-chan proxycfg.ProxySnapshot + stateCh <-chan proxysnapshot.ProxySnapshot drainCh limiter.SessionTerminatedChan watchCancel func() nonce uint64 // xDS requires a unique nonce to correlate response/request pairs @@ -202,7 +204,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove authTimer = time.After(s.AuthCheckFrequency) } - checkStreamACLs := func(proxySnap proxycfg.ProxySnapshot) error { + checkStreamACLs := func(proxySnap proxysnapshot.ProxySnapshot) error { return s.authorize(stream.Context(), proxySnap) } @@ -326,7 +328,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove return status.Errorf(codes.Internal, "failed to watch proxy service: %s", err) } - stateCh, drainCh, watchCancel, err = s.CfgSrc.Watch(proxyID, nodeName, options.Token) + stateCh, drainCh, watchCancel, err = s.ProxyWatcher.Watch(proxyID, nodeName, options.Token) switch { case errors.Is(err, limiter.ErrCapacityReached): return errOverwhelmed @@ -432,14 +434,14 @@ func newResourceIDFromEnvoyNode(node *envoy_config_core_v3.Node) *pbresource.ID } } -func (s *Server) applyEnvoyExtensions(resources *xdscommon.IndexedResources, proxySnapshot proxycfg.ProxySnapshot, node *envoy_config_core_v3.Node) (*xdscommon.IndexedResources, error) { +func (s *Server) applyEnvoyExtensions(resources *xdscommon.IndexedResources, proxySnapshot proxysnapshot.ProxySnapshot, node *envoy_config_core_v3.Node) (*xdscommon.IndexedResources, error) { // TODO(proxystate) // This is a workaround for now as envoy extensions are not yet supported with ProxyState. // For now, we cast to proxycfg.ConfigSnapshot and no-op if it's the pbmesh.ProxyState type. var snapshot *proxycfg.ConfigSnapshot switch proxySnapshot.(type) { //TODO(proxystate): implement envoy extensions for ProxyState - case *mesh.ProxyState: + case *proxytracker.ProxyState: return resources, nil case *proxycfg.ConfigSnapshot: snapshot = proxySnapshot.(*proxycfg.ConfigSnapshot) diff --git a/agent/xds/proxystateconverter/converter.go b/agent/xds/proxystateconverter/converter.go index 191b9084b842..be6451417504 100644 --- a/agent/xds/proxystateconverter/converter.go +++ b/agent/xds/proxystateconverter/converter.go @@ -5,11 +5,10 @@ package proxystateconverter import ( "fmt" - "github.com/hashicorp/consul/internal/mesh" - "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/xds/configfetcher" + proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker" pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate" "github.com/hashicorp/go-hclog" @@ -19,7 +18,7 @@ import ( type Converter struct { Logger hclog.Logger CfgFetcher configfetcher.ConfigFetcher - proxyState *mesh.ProxyState + proxyState *proxytracker.ProxyState } func NewConverter( @@ -29,7 +28,7 @@ func NewConverter( return &Converter{ Logger: logger, CfgFetcher: cfgFetcher, - proxyState: &mesh.ProxyState{ + proxyState: &proxytracker.ProxyState{ ProxyState: &pbmesh.ProxyState{ Listeners: make([]*pbproxystate.Listener, 0), Clusters: make(map[string]*pbproxystate.Cluster), @@ -40,7 +39,7 @@ func NewConverter( } } -func (g *Converter) ProxyStateFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot) (*mesh.ProxyState, error) { +func (g *Converter) ProxyStateFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot) (*proxytracker.ProxyState, error) { err := g.resourcesFromSnapshot(cfgSnap) if err != nil { return nil, fmt.Errorf("failed to generate FullProxyState: %v", err) diff --git a/agent/xds/server.go b/agent/xds/server.go index cc4220da2804..7b9f8b6bf118 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -6,6 +6,8 @@ package xds import ( "context" "errors" + "github.com/hashicorp/consul/agent/grpc-external/limiter" + "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" "github.com/hashicorp/consul/proto-public/pbresource" "sync/atomic" "time" @@ -24,8 +26,6 @@ import ( "github.com/hashicorp/consul/acl" external "github.com/hashicorp/consul/agent/grpc-external" - "github.com/hashicorp/consul/agent/grpc-external/limiter" - "github.com/hashicorp/consul/agent/proxycfg" ) var ( @@ -85,7 +85,7 @@ type ACLResolverFunc func(id string) (acl.Authorizer, error) // ProxyConfigSource is the interface xds.Server requires to consume proxy // config updates. type ProxyWatcher interface { - Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxycfg.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) + Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) } // Server represents a gRPC server that can handle xDS requests from Envoy. All @@ -96,7 +96,7 @@ type ProxyWatcher interface { type Server struct { NodeName string Logger hclog.Logger - CfgSrc ProxyWatcher + ProxyWatcher ProxyWatcher ResolveToken ACLResolverFunc CfgFetcher configfetcher.ConfigFetcher @@ -147,14 +147,14 @@ func (c *activeStreamCounters) Increment(ctx context.Context) func() { func NewServer( nodeName string, logger hclog.Logger, - cfgMgr ProxyWatcher, + proxyWatcher ProxyWatcher, resolveTokenSecret ACLResolverFunc, cfgFetcher configfetcher.ConfigFetcher, ) *Server { return &Server{ NodeName: nodeName, Logger: logger, - CfgSrc: cfgMgr, + ProxyWatcher: proxyWatcher, ResolveToken: resolveTokenSecret, CfgFetcher: cfgFetcher, AuthCheckFrequency: DefaultAuthCheckFrequency, @@ -203,7 +203,7 @@ func (s *Server) authenticate(ctx context.Context) (acl.Authorizer, error) { // proxy ID. We assume that any data in the snapshot was already filtered, // which allows this authorization to be a shallow authorization check // for all the data in a ProxySnapshot. -func (s *Server) authorize(ctx context.Context, proxySnapshot proxycfg.ProxySnapshot) error { +func (s *Server) authorize(ctx context.Context, proxySnapshot proxysnapshot.ProxySnapshot) error { if proxySnapshot == nil { return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot") } diff --git a/agent/xds/xds_protocol_helpers_test.go b/agent/xds/xds_protocol_helpers_test.go index 671974f45b5e..31aa03682032 100644 --- a/agent/xds/xds_protocol_helpers_test.go +++ b/agent/xds/xds_protocol_helpers_test.go @@ -6,6 +6,7 @@ package xds import ( "github.com/hashicorp/consul/agent/proxycfg-sources/catalog" "github.com/hashicorp/consul/agent/xds/response" + "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" "github.com/hashicorp/consul/proto-public/pbresource" "sort" "sync" @@ -74,14 +75,14 @@ func newTestSnapshot( // testing. It also implements ConnectAuthz to allow control over authorization. type testManager struct { sync.Mutex - stateChans map[structs.ServiceID]chan proxycfg.ProxySnapshot + stateChans map[structs.ServiceID]chan proxysnapshot.ProxySnapshot drainChans map[structs.ServiceID]chan struct{} cancels chan structs.ServiceID } func newTestManager(t *testing.T) *testManager { return &testManager{ - stateChans: map[structs.ServiceID]chan proxycfg.ProxySnapshot{}, + stateChans: map[structs.ServiceID]chan proxysnapshot.ProxySnapshot{}, drainChans: map[structs.ServiceID]chan struct{}{}, cancels: make(chan structs.ServiceID, 10), } @@ -91,12 +92,12 @@ func newTestManager(t *testing.T) *testManager { func (m *testManager) RegisterProxy(t *testing.T, proxyID structs.ServiceID) { m.Lock() defer m.Unlock() - m.stateChans[proxyID] = make(chan proxycfg.ProxySnapshot, 1) + m.stateChans[proxyID] = make(chan proxysnapshot.ProxySnapshot, 1) m.drainChans[proxyID] = make(chan struct{}) } // Deliver simulates a proxy registration -func (m *testManager) DeliverConfig(t *testing.T, proxyID structs.ServiceID, cfg proxycfg.ProxySnapshot) { +func (m *testManager) DeliverConfig(t *testing.T, proxyID structs.ServiceID, cfg proxysnapshot.ProxySnapshot) { t.Helper() m.Lock() defer m.Unlock() @@ -123,8 +124,8 @@ func (m *testManager) DrainStreams(proxyID structs.ServiceID) { } // Watch implements ConfigManager -func (m *testManager) Watch(id *pbresource.ID, _ string, _ string) (<-chan proxycfg.ProxySnapshot, - limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { +func (m *testManager) Watch(id *pbresource.ID, _ string, _ string) (<-chan proxysnapshot.ProxySnapshot, + limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) { // Create service ID proxyID := structs.NewServiceID(id.Name, catalog.GetEnterpriseMetaFromResourceID(id)) m.Lock() diff --git a/agent/xdsv2/resources.go b/agent/xdsv2/resources.go index 349671ceb903..2aeeb97db990 100644 --- a/agent/xdsv2/resources.go +++ b/agent/xdsv2/resources.go @@ -5,8 +5,7 @@ package xdsv2 import ( "fmt" - "github.com/hashicorp/consul/internal/mesh" - + proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker" "github.com/hashicorp/go-hclog" "google.golang.org/protobuf/proto" @@ -29,11 +28,11 @@ func NewResourceGenerator( } type ProxyResources struct { - proxyState *mesh.ProxyState + proxyState *proxytracker.ProxyState envoyResources map[string][]proto.Message } -func (g *ResourceGenerator) AllResourcesFromIR(proxyState *mesh.ProxyState) (map[string][]proto.Message, error) { +func (g *ResourceGenerator) AllResourcesFromIR(proxyState *proxytracker.ProxyState) (map[string][]proto.Message, error) { pr := &ProxyResources{ proxyState: proxyState, envoyResources: make(map[string][]proto.Message), diff --git a/internal/mesh/internal/controllers/register.go b/internal/mesh/internal/controllers/register.go index e48540609cf1..adfdd5c8afc0 100644 --- a/internal/mesh/internal/controllers/register.go +++ b/internal/mesh/internal/controllers/register.go @@ -13,10 +13,10 @@ import ( type Dependencies struct { TrustBundleFetcher xds.TrustBundleFetcher + ProxyUpdater xds.ProxyUpdater } func Register(mgr *controller.Manager, deps Dependencies) { mapper := bimapper.New(types.ProxyStateTemplateType, catalog.ServiceEndpointsType) - // TODO: Pass in a "real" updater once proxy tracker work has completed. - mgr.Register(xds.Controller(mapper, nil, deps.TrustBundleFetcher)) + mgr.Register(xds.Controller(mapper, deps.ProxyUpdater, deps.TrustBundleFetcher)) } diff --git a/internal/mesh/internal/controllers/xds/controller.go b/internal/mesh/internal/controllers/xds/controller.go index 1135f27f7518..cff8430ae532 100644 --- a/internal/mesh/internal/controllers/xds/controller.go +++ b/internal/mesh/internal/controllers/xds/controller.go @@ -5,14 +5,14 @@ package xds import ( "context" - "github.com/hashicorp/consul/internal/catalog" "github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/mesh/internal/controllers/xds/status" "github.com/hashicorp/consul/internal/mesh/internal/types" + proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" + proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker" "github.com/hashicorp/consul/internal/resource" "github.com/hashicorp/consul/internal/resource/mappers/bimapper" - pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate" "github.com/hashicorp/consul/proto-public/pbresource" ) @@ -43,7 +43,7 @@ type TrustBundleFetcher func() (*pbproxystate.TrustBundle, error) // and also check its connectivity to the server. type ProxyUpdater interface { // PushChange allows pushing a computed ProxyState to xds for xds resource generation to send to a proxy. - PushChange(id *pbresource.ID, snapshot *pbmesh.ProxyState) error + PushChange(id *pbresource.ID, snapshot proxysnapshot.ProxySnapshot) error // ProxyConnectedToServer returns whether this id is connected to this server. ProxyConnectedToServer(id *pbresource.ID) bool @@ -156,7 +156,7 @@ func (r *xdsReconciler) Reconcile(ctx context.Context, rt controller.Runtime, re computedProxyState := proxyStateTemplate.Template.ProxyState - err = r.updater.PushChange(req.ID, computedProxyState) + err = r.updater.PushChange(req.ID, &proxytracker.ProxyState{ProxyState: computedProxyState}) if err != nil { // Set the status. statusCondition = status.ConditionRejectedPushChangeFailed(status.KeyFromID(req.ID)) diff --git a/internal/mesh/internal/controllers/xds/controller_test.go b/internal/mesh/internal/controllers/xds/controller_test.go index 2363c0cb8460..fb28c59a23c1 100644 --- a/internal/mesh/internal/controllers/xds/controller_test.go +++ b/internal/mesh/internal/controllers/xds/controller_test.go @@ -60,7 +60,7 @@ func (suite *xdsControllerTestSuite) SetupTest() { suite.fetcher = mockFetcher suite.mapper = bimapper.New(types.ProxyStateTemplateType, catalog.ServiceEndpointsType) - suite.updater = NewMockUpdater() + suite.updater = newMockUpdater() suite.ctl = &xdsReconciler{ bimapper: suite.mapper, diff --git a/internal/mesh/internal/controllers/xds/mock_updater.go b/internal/mesh/internal/controllers/xds/mock_updater.go index 3b8554f49116..1fcde13fdd50 100644 --- a/internal/mesh/internal/controllers/xds/mock_updater.go +++ b/internal/mesh/internal/controllers/xds/mock_updater.go @@ -5,9 +5,10 @@ package xds import ( "fmt" + proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" + proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker" "sync" - pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate" "github.com/hashicorp/consul/proto-public/pbresource" @@ -19,14 +20,14 @@ type mockUpdater struct { lock sync.Mutex // latestPs is a map from a ProxyStateTemplate's id.Name in string form to the last computed ProxyState for that // ProxyStateTemplate. - latestPs map[string]*pbmesh.ProxyState + latestPs map[string]proxysnapshot.ProxySnapshot notConnected bool pushChangeError bool } -func NewMockUpdater() *mockUpdater { +func newMockUpdater() *mockUpdater { return &mockUpdater{ - latestPs: make(map[string]*pbmesh.ProxyState), + latestPs: make(map[string]proxysnapshot.ProxySnapshot), } } @@ -42,13 +43,13 @@ func (m *mockUpdater) SetProxyAsNotConnected() { m.notConnected = true } -func (m *mockUpdater) PushChange(id *pbresource.ID, snapshot *pbmesh.ProxyState) error { +func (m *mockUpdater) PushChange(id *pbresource.ID, snapshot proxysnapshot.ProxySnapshot) error { m.lock.Lock() defer m.lock.Unlock() if m.pushChangeError { return fmt.Errorf("mock push change error") } else { - m.setUnsafe(id.Name, snapshot) + m.setUnsafe(id.Name, snapshot.(*proxytracker.ProxyState)) } return nil } @@ -62,12 +63,12 @@ func (m *mockUpdater) ProxyConnectedToServer(_ *pbresource.ID) bool { return true } -func (p *mockUpdater) Get(name string) *pbmesh.ProxyState { +func (p *mockUpdater) Get(name string) *proxytracker.ProxyState { p.lock.Lock() defer p.lock.Unlock() ps, ok := p.latestPs[name] if ok { - return ps + return ps.(*proxytracker.ProxyState) } return nil } @@ -77,7 +78,7 @@ func (p *mockUpdater) GetEndpoints(name string) map[string]*pbproxystate.Endpoin defer p.lock.Unlock() ps, ok := p.latestPs[name] if ok { - return ps.Endpoints + return ps.(*proxytracker.ProxyState).Endpoints } return nil } @@ -87,17 +88,17 @@ func (p *mockUpdater) GetTrustBundle(name string) map[string]*pbproxystate.Trust defer p.lock.Unlock() ps, ok := p.latestPs[name] if ok { - return ps.TrustBundles + return ps.(*proxytracker.ProxyState).TrustBundles } return nil } -func (p *mockUpdater) Set(name string, ps *pbmesh.ProxyState) { +func (p *mockUpdater) Set(name string, ps *proxytracker.ProxyState) { p.lock.Lock() defer p.lock.Unlock() p.setUnsafe(name, ps) } -func (p *mockUpdater) setUnsafe(name string, ps *pbmesh.ProxyState) { +func (p *mockUpdater) setUnsafe(name string, ps *proxytracker.ProxyState) { p.latestPs[name] = ps } diff --git a/internal/mesh/proxy-snapshot/proxy_snapshot.go b/internal/mesh/proxy-snapshot/proxy_snapshot.go new file mode 100644 index 000000000000..40763f568c17 --- /dev/null +++ b/internal/mesh/proxy-snapshot/proxy_snapshot.go @@ -0,0 +1,17 @@ +package proxysnapshot + +import "github.com/hashicorp/consul/acl" + +// ProxySnapshot is an abstraction that allows interchangeability between +// Catalog V1 ConfigSnapshot and Catalog V2 ProxyState. +type ProxySnapshot interface { + AllowEmptyListeners() bool + AllowEmptyRoutes() bool + AllowEmptyClusters() bool + Authorize(authz acl.Authorizer) error + LoggerName() string +} + +// CancelFunc is a type for a returned function that can be called to cancel a +// watch. +type CancelFunc func() diff --git a/agent/proxy-tracker/mock_SessionLimiter.go b/internal/mesh/proxy-tracker/mock_SessionLimiter.go similarity index 94% rename from agent/proxy-tracker/mock_SessionLimiter.go rename to internal/mesh/proxy-tracker/mock_SessionLimiter.go index 4a2c5f324a1b..b50c9e84fcb2 100644 --- a/agent/proxy-tracker/mock_SessionLimiter.go +++ b/internal/mesh/proxy-tracker/mock_SessionLimiter.go @@ -3,7 +3,7 @@ package proxytracker import ( - limiter "github.com/hashicorp/consul/agent/grpc-external/limiter" + "github.com/hashicorp/consul/agent/grpc-external/limiter" mock "github.com/stretchr/testify/mock" ) diff --git a/internal/mesh/proxy_state_exports.go b/internal/mesh/proxy-tracker/proxy_state_exports.go similarity index 98% rename from internal/mesh/proxy_state_exports.go rename to internal/mesh/proxy-tracker/proxy_state_exports.go index 2739e3a679d6..59c4e1070f10 100644 --- a/internal/mesh/proxy_state_exports.go +++ b/internal/mesh/proxy-tracker/proxy_state_exports.go @@ -1,4 +1,4 @@ -package mesh +package proxytracker import ( "github.com/hashicorp/consul/acl" diff --git a/agent/proxy-tracker/proxy_tracker.go b/internal/mesh/proxy-tracker/proxy_tracker.go similarity index 92% rename from agent/proxy-tracker/proxy_tracker.go rename to internal/mesh/proxy-tracker/proxy_tracker.go index ca4478f23782..ac25e54bacbb 100644 --- a/agent/proxy-tracker/proxy_tracker.go +++ b/internal/mesh/proxy-tracker/proxy_tracker.go @@ -6,15 +6,15 @@ package proxytracker import ( "errors" "fmt" + "github.com/hashicorp/consul/agent/grpc-external/limiter" + "github.com/hashicorp/consul/internal/mesh/internal/types" + proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" "github.com/hashicorp/go-hclog" "sync" "github.com/hashicorp/consul/internal/controller" - "github.com/hashicorp/consul/internal/mesh" "github.com/hashicorp/consul/internal/resource" - "github.com/hashicorp/consul/agent/grpc-external/limiter" - "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/proto-public/pbresource" ) @@ -35,9 +35,9 @@ func (e *ProxyConnection) Key() string { // when the ProxyState for that proxyID has changed. type proxyWatchData struct { // notifyCh is the channel that the watcher receives updates from ProxyTracker. - notifyCh chan proxycfg.ProxySnapshot + notifyCh chan proxysnapshot.ProxySnapshot // state is the current/last updated ProxyState for a given proxy. - state *mesh.ProxyState + state proxysnapshot.ProxySnapshot // token is the ACL token provided by the watcher. token string // nodeName is the node where the given proxy resides. @@ -87,8 +87,8 @@ func NewProxyTracker(cfg ProxyTrackerConfig) *ProxyTracker { // Watch connects a proxy with ProxyTracker and returns the consumer a channel to receive updates, // a channel to notify of xDS terminated session, and a cancel function to cancel the watch. func (pt *ProxyTracker) Watch(proxyID *pbresource.ID, - nodeName string, token string) (<-chan proxycfg.ProxySnapshot, - limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { + nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, + limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) { pt.config.Logger.Trace("watch initiated", "proxyID", proxyID, "nodeName", nodeName) if err := pt.validateWatchArgs(proxyID, nodeName); err != nil { pt.config.Logger.Error("args failed validation", err) @@ -106,7 +106,7 @@ func (pt *ProxyTracker) Watch(proxyID *pbresource.ID, // This buffering is crucial otherwise we'd block immediately trying to // deliver the current snapshot below if we already have one. - proxyStateChan := make(chan proxycfg.ProxySnapshot, 1) + proxyStateChan := make(chan proxysnapshot.ProxySnapshot, 1) watchData := &proxyWatchData{ notifyCh: proxyStateChan, state: nil, @@ -166,7 +166,7 @@ func (pt *ProxyTracker) notifyNewProxyChannel(proxyID *pbresource.ID) error { // - ends the session with xDS session limiter. // - closes the proxy state channel assigned to the proxy. // This function assumes the state lock is already held. -func (pt *ProxyTracker) cancelWatchLocked(proxyReferenceKey resource.ReferenceKey, proxyStateChan chan proxycfg.ProxySnapshot, session limiter.Session) { +func (pt *ProxyTracker) cancelWatchLocked(proxyReferenceKey resource.ReferenceKey, proxyStateChan chan proxysnapshot.ProxySnapshot, session limiter.Session) { delete(pt.proxies, proxyReferenceKey) session.End() close(proxyStateChan) @@ -179,8 +179,8 @@ func (pt *ProxyTracker) validateWatchArgs(proxyID *pbresource.ID, nodeName string) error { if proxyID == nil { return errors.New("proxyID is required") - } else if proxyID.GetType().GetKind() != mesh.ProxyStateTemplateConfigurationType.Kind { - return fmt.Errorf("proxyID must be a %s", mesh.ProxyStateTemplateConfigurationType.GetKind()) + } else if proxyID.GetType().GetKind() != types.ProxyStateTemplateType.Kind { + return fmt.Errorf("proxyID must be a %s", types.ProxyStateTemplateType.GetKind()) } else if nodeName == "" { return errors.New("nodeName is required") } @@ -189,7 +189,7 @@ func (pt *ProxyTracker) validateWatchArgs(proxyID *pbresource.ID, } // PushChange allows pushing a computed ProxyState to xds for xds resource generation to send to a proxy. -func (pt *ProxyTracker) PushChange(proxyID *pbresource.ID, proxyState *mesh.ProxyState) error { +func (pt *ProxyTracker) PushChange(proxyID *pbresource.ID, proxyState proxysnapshot.ProxySnapshot) error { pt.config.Logger.Trace("push change called for proxy", "proxyID", proxyID) proxyReferenceKey := resource.NewReferenceKey(proxyID) pt.mu.Lock() @@ -205,7 +205,7 @@ func (pt *ProxyTracker) PushChange(proxyID *pbresource.ID, proxyState *mesh.Prox return nil } -func (pt *ProxyTracker) deliverLatest(proxyID *pbresource.ID, proxyState *mesh.ProxyState, ch chan proxycfg.ProxySnapshot) { +func (pt *ProxyTracker) deliverLatest(proxyID *pbresource.ID, proxyState proxysnapshot.ProxySnapshot, ch chan proxysnapshot.ProxySnapshot) { pt.config.Logger.Trace("delivering latest proxy snapshot to proxy", "proxyID", proxyID) // Send if chan is empty select { diff --git a/agent/proxy-tracker/proxy_tracker_test.go b/internal/mesh/proxy-tracker/proxy_tracker_test.go similarity index 89% rename from agent/proxy-tracker/proxy_tracker_test.go rename to internal/mesh/proxy-tracker/proxy_tracker_test.go index e799249d43af..9738dba17c67 100644 --- a/agent/proxy-tracker/proxy_tracker_test.go +++ b/internal/mesh/proxy-tracker/proxy_tracker_test.go @@ -8,7 +8,7 @@ import ( "fmt" "github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/internal/controller" - "github.com/hashicorp/consul/internal/mesh" + "github.com/hashicorp/consul/internal/mesh/internal/types" "github.com/hashicorp/consul/internal/resource" "github.com/hashicorp/consul/internal/resource/resourcetest" pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" @@ -20,7 +20,7 @@ import ( ) func TestProxyTracker_Watch(t *testing.T) { - resourceID := resourcetest.Resource(mesh.ProxyStateTemplateConfigurationType, "test").ID() + resourceID := resourcetest.Resource(types.ProxyStateTemplateType, "test").ID() proxyReferenceKey := resource.NewReferenceKey(resourceID) lim := NewMockSessionLimiter(t) session1 := newMockSession(t) @@ -70,7 +70,7 @@ func TestProxyTracker_Watch(t *testing.T) { } func TestProxyTracker_Watch_ErrorConsumerNotReady(t *testing.T) { - resourceID := resourcetest.Resource(mesh.ProxyStateTemplateConfigurationType, "test").ID() + resourceID := resourcetest.Resource(types.ProxyStateTemplateType, "test").ID() proxyReferenceKey := resource.NewReferenceKey(resourceID) lim := NewMockSessionLimiter(t) session1 := newMockSession(t) @@ -85,7 +85,7 @@ func TestProxyTracker_Watch_ErrorConsumerNotReady(t *testing.T) { //fill up buffered channel while the consumer is not ready to simulate the error for i := 0; i < 1000; i++ { - event := controller.Event{Obj: &ProxyConnection{ProxyID: resourcetest.Resource(mesh.ProxyStateTemplateConfigurationType, fmt.Sprintf("test%d", i)).ID()}} + event := controller.Event{Obj: &ProxyConnection{ProxyID: resourcetest.Resource(types.ProxyStateTemplateType, fmt.Sprintf("test%d", i)).ID()}} pt.newProxyConnectionCh <- event } @@ -121,14 +121,14 @@ func TestProxyTracker_Watch_ArgValidationErrors(t *testing.T) { }, { description: "Empty nodeName", - proxyID: resourcetest.Resource(mesh.ProxyStateTemplateConfigurationType, "test").ID(), + proxyID: resourcetest.Resource(types.ProxyStateTemplateType, "test").ID(), nodeName: "", token: "something", expectedError: errors.New("nodeName is required"), }, { description: "resource is not ProxyStateTemplate", - proxyID: resourcetest.Resource(mesh.ProxyConfigurationType, "test").ID(), + proxyID: resourcetest.Resource(types.ProxyConfigurationType, "test").ID(), nodeName: "something", token: "something else", expectedError: errors.New("proxyID must be a ProxyStateTemplate"), @@ -155,7 +155,7 @@ func TestProxyTracker_Watch_ArgValidationErrors(t *testing.T) { } func TestProxyTracker_Watch_SessionLimiterError(t *testing.T) { - resourceID := resourcetest.Resource(mesh.ProxyStateTemplateConfigurationType, "test").ID() + resourceID := resourcetest.Resource(types.ProxyStateTemplateType, "test").ID() lim := NewMockSessionLimiter(t) lim.On("BeginSession").Return(nil, errors.New("kaboom")) logger := testutil.Logger(t) @@ -174,7 +174,7 @@ func TestProxyTracker_Watch_SessionLimiterError(t *testing.T) { } func TestProxyTracker_PushChange(t *testing.T) { - resourceID := resourcetest.Resource(mesh.ProxyStateTemplateConfigurationType, "test").ID() + resourceID := resourcetest.Resource(types.ProxyStateTemplateType, "test").ID() proxyReferenceKey := resource.NewReferenceKey(resourceID) lim := NewMockSessionLimiter(t) session1 := newMockSession(t) @@ -193,7 +193,7 @@ func TestProxyTracker_PushChange(t *testing.T) { require.NoError(t, err) // PushChange - proxyState := &mesh.ProxyState{ProxyState: &pbmesh.ProxyState{ + proxyState := &ProxyState{ProxyState: &pbmesh.ProxyState{ IntentionDefaultAllow: true, }} @@ -216,7 +216,7 @@ func TestProxyTracker_PushChange(t *testing.T) { } func TestProxyTracker_PushChanges_ErrorProxyNotConnected(t *testing.T) { - resourceID := resourcetest.Resource(mesh.ProxyStateTemplateConfigurationType, "test").ID() + resourceID := resourcetest.Resource(types.ProxyStateTemplateType, "test").ID() lim := NewMockSessionLimiter(t) logger := testutil.Logger(t) @@ -226,7 +226,7 @@ func TestProxyTracker_PushChanges_ErrorProxyNotConnected(t *testing.T) { }) // PushChange - proxyState := &mesh.ProxyState{ProxyState: &pbmesh.ProxyState{ + proxyState := &ProxyState{ProxyState: &pbmesh.ProxyState{ IntentionDefaultAllow: true, }} @@ -273,14 +273,14 @@ func TestProxyTracker_ProxyConnectedToServer(t *testing.T) { Logger: logger, SessionLimiter: lim, }) - resourceID := resourcetest.Resource(mesh.ProxyStateTemplateConfigurationType, "test").ID() + resourceID := resourcetest.Resource(types.ProxyStateTemplateType, "test").ID() tc.preProcessingFunc(pt, resourceID, lim, session1, session1TermCh) require.Equal(t, tc.shouldExist, pt.ProxyConnectedToServer(resourceID)) } } func TestProxyTracker_Shutdown(t *testing.T) { - resourceID := resourcetest.Resource(mesh.ProxyStateTemplateConfigurationType, "test").ID() + resourceID := resourcetest.Resource(types.ProxyStateTemplateType, "test").ID() proxyReferenceKey := resource.NewReferenceKey(resourceID) lim := NewMockSessionLimiter(t) session1 := newMockSession(t)