Skip to content

Commit

Permalink
NET-4944 - wire up controllers with proxy tracker (#18603)
Browse files Browse the repository at this point in the history
Co-authored-by: github-team-consul-core <[email protected]>
  • Loading branch information
jmurret and hc-github-team-consul-core authored Aug 29, 2023
1 parent 48c8a83 commit 0e60650
Show file tree
Hide file tree
Showing 34 changed files with 266 additions and 215 deletions.
103 changes: 57 additions & 46 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"encoding/json"
"errors"
"fmt"
proxytracker "github.com/hashicorp/consul/agent/proxy-tracker"
catalogproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/catalog"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
"github.com/hashicorp/consul/lib/stringslice"
"io"
"net"
Expand Down Expand Up @@ -653,6 +653,47 @@ func (a *Agent) Start(ctx context.Context) error {
return fmt.Errorf("failed to start Consul enterprise component: %v", err)
}

// Create proxy config manager now because it is a dependency of creating the proxyWatcher
// which will be passed to consul.NewServer so that it is then passed to the
// controller registration for the XDS controller in v2 mode, and the xds server in v1 and v2 mode.
var intentionDefaultAllow bool
switch a.config.ACLResolverSettings.ACLDefaultPolicy {
case "allow":
intentionDefaultAllow = true
case "deny":
intentionDefaultAllow = false
default:
return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLResolverSettings.ACLDefaultPolicy)
}

go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})

// Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
DataSources: a.proxyDataSources(),
Logger: a.logger.Named(logging.ProxyConfig),
Source: &structs.QuerySource{
Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName,
Node: a.config.NodeName,
NodePartition: a.config.PartitionOrEmpty(),
},
DNSConfig: proxycfg.DNSConfig{
Domain: a.config.DNSDomain,
AltDomain: a.config.DNSAltDomain,
},
TLSConfigurator: a.tlsConfigurator,
IntentionDefaultAllow: intentionDefaultAllow,
UpdateRateLimit: a.config.XDSUpdateRateLimit,
})
if err != nil {
return err
}

// proxyWatcher will be used in the creation of the XDS server and also
// in the registration of the xds controller.
proxyWatcher := a.getProxyWatcher()

// Setup either the client or the server.
if c.ServerMode {
serverLogger := a.baseDeps.Logger.NamedIntercept(logging.ConsulServer)
Expand Down Expand Up @@ -686,7 +727,11 @@ func (a *Agent) Start(ctx context.Context) error {
incomingRPCLimiter,
)

server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer, incomingRPCLimiter, serverLogger)
var pt *proxytracker.ProxyTracker
if a.useV2Resources() {
pt = proxyWatcher.(*proxytracker.ProxyTracker)
}
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer, incomingRPCLimiter, serverLogger, pt)
if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err)
}
Expand Down Expand Up @@ -753,40 +798,6 @@ func (a *Agent) Start(ctx context.Context) error {
return err
}

var intentionDefaultAllow bool
switch a.config.ACLResolverSettings.ACLDefaultPolicy {
case "allow":
intentionDefaultAllow = true
case "deny":
intentionDefaultAllow = false
default:
return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLResolverSettings.ACLDefaultPolicy)
}

go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})

// Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
DataSources: a.proxyDataSources(),
Logger: a.logger.Named(logging.ProxyConfig),
Source: &structs.QuerySource{
Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName,
Node: a.config.NodeName,
NodePartition: a.config.PartitionOrEmpty(),
},
DNSConfig: proxycfg.DNSConfig{
Domain: a.config.DNSDomain,
AltDomain: a.config.DNSAltDomain,
},
TLSConfigurator: a.tlsConfigurator,
IntentionDefaultAllow: intentionDefaultAllow,
UpdateRateLimit: a.config.XDSUpdateRateLimit,
})
if err != nil {
return err
}

go localproxycfg.Sync(
&lib.StopChannelContext{StopCh: a.shutdownCh},
localproxycfg.SyncConfig{
Expand Down Expand Up @@ -839,7 +850,7 @@ func (a *Agent) Start(ctx context.Context) error {
}

// Start grpc and grpc_tls servers.
if err := a.listenAndServeGRPC(); err != nil {
if err := a.listenAndServeGRPC(proxyWatcher); err != nil {
return err
}

Expand Down Expand Up @@ -924,28 +935,28 @@ func (a *Agent) useV2Resources() bool {
// it will return a ConfigSource.
func (a *Agent) getProxyWatcher() xds.ProxyWatcher {
if a.useV2Resources() {
a.logger.Trace("returning proxyTracker for getProxyWatcher")
return proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{
Logger: a.proxyConfig.Logger.Named("proxy-tracker"),
Logger: a.logger.Named("proxy-tracker"),
SessionLimiter: a.baseDeps.XDSStreamLimiter,
})
} else {
a.logger.Trace("returning configSource for getProxyWatcher")
return localproxycfg.NewConfigSource(a.proxyConfig)
}
}

// configureXDSServer configures an XDS server with the proper implementation of
// the PRoxyWatcher interface and registers the XDS server with Consul's
// external facing GRPC server.
func (a *Agent) configureXDSServer() {
cfg := a.getProxyWatcher()

func (a *Agent) configureXDSServer(proxyWatcher xds.ProxyWatcher) {
// TODO(agentless): rather than asserting the concrete type of delegate, we
// should add a method to the Delegate interface to build a ConfigSource.
if server, ok := a.delegate.(*consul.Server); ok {
catalogCfg := catalogproxycfg.NewConfigSource(catalogproxycfg.Config{
NodeName: a.config.NodeName,
LocalState: a.State,
LocalConfigSource: cfg,
LocalConfigSource: proxyWatcher,
Manager: a.proxyConfig,
GetStore: func() catalogproxycfg.Store { return server.FSM().State() },
Logger: a.proxyConfig.Logger.Named("server-catalog"),
Expand All @@ -955,12 +966,12 @@ func (a *Agent) configureXDSServer() {
<-a.shutdownCh
catalogCfg.Shutdown()
}()
cfg = catalogCfg
proxyWatcher = catalogCfg
}
a.xdsServer = xds.NewServer(
a.config.NodeName,
a.logger.Named(logging.Envoy),
cfg,
proxyWatcher,
func(id string) (acl.Authorizer, error) {
return a.delegate.ResolveTokenAndDefaultMeta(id, nil, nil)
},
Expand All @@ -969,12 +980,12 @@ func (a *Agent) configureXDSServer() {
a.xdsServer.Register(a.externalGRPCServer)
}

func (a *Agent) listenAndServeGRPC() error {
func (a *Agent) listenAndServeGRPC(proxyWatcher xds.ProxyWatcher) error {
if len(a.config.GRPCAddrs) < 1 && len(a.config.GRPCTLSAddrs) < 1 {
return nil
}

a.configureXDSServer()
a.configureXDSServer(proxyWatcher)

// Attempt to spawn listeners
var listeners []net.Listener
Expand Down
2 changes: 1 addition & 1 deletion agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import (
"errors"
"fmt"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
proxytracker "github.com/hashicorp/consul/agent/proxy-tracker"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/proxycfg-sources/local"
"github.com/hashicorp/consul/agent/xds"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
mathrand "math/rand"
"net"
"net/http"
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/leader_connect_ca_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ func TestCAManager_Initialize_Logging(t *testing.T) {
deps := newDefaultDeps(t, conf1)
deps.Logger = logger

s1, err := NewServer(conf1, deps, grpc.NewServer(), nil, logger)
s1, err := NewServer(conf1, deps, grpc.NewServer(), nil, logger, nil)
require.NoError(t, err)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1640,7 +1640,7 @@ func TestLeader_ConfigEntryBootstrap_Fail(t *testing.T) {
deps := newDefaultDeps(t, config)
deps.Logger = logger

srv, err := NewServer(config, deps, grpc.NewServer(), nil, logger)
srv, err := NewServer(config, deps, grpc.NewServer(), nil, logger, nil)
require.NoError(t, err)
defer srv.Shutdown()

Expand Down
20 changes: 17 additions & 3 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"crypto/x509"
"errors"
"fmt"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"io"
"net"
"os"
Expand Down Expand Up @@ -480,9 +481,21 @@ type connHandler interface {
Shutdown() error
}

// ProxyUpdater is an interface for ProxyTracker.
type ProxyUpdater interface {
// PushChange allows pushing a computed ProxyState to xds for xds resource generation to send to a proxy.
PushChange(id *pbresource.ID, snapshot proxysnapshot.ProxySnapshot) error

// ProxyConnectedToServer returns whether this id is connected to this server.
ProxyConnectedToServer(id *pbresource.ID) bool

EventChannel() chan controller.Event
}

// NewServer is used to construct a new Consul server from the configuration
// and extra options, potentially returning an error.
func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incomingRPCLimiter rpcRate.RequestLimitsHandler, serverLogger hclog.InterceptLogger) (*Server, error) {
func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server,
incomingRPCLimiter rpcRate.RequestLimitsHandler, serverLogger hclog.InterceptLogger, proxyUpdater ProxyUpdater) (*Server, error) {
logger := flat.Logger
if err := config.CheckProtocolVersion(); err != nil {
return nil, err
Expand Down Expand Up @@ -822,7 +835,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
s.insecureResourceServiceClient,
logger.Named(logging.ControllerRuntime),
)
s.registerControllers(flat)
s.registerControllers(flat, proxyUpdater)
go s.controllerManager.Run(&lib.StopChannelContext{StopCh: shutdownCh})

go s.trackLeaderChanges()
Expand Down Expand Up @@ -873,7 +886,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
return s, nil
}

func (s *Server) registerControllers(deps Deps) {
func (s *Server) registerControllers(deps Deps, proxyUpdater ProxyUpdater) {
if stringslice.Contains(deps.Experiments, CatalogResourceExperimentName) {
catalog.RegisterControllers(s.controllerManager, catalog.DefaultControllerDependencies())
mesh.RegisterControllers(s.controllerManager, mesh.ControllerDependencies{
Expand All @@ -889,6 +902,7 @@ func (s *Server) registerControllers(deps Deps) {
}
return &bundle, nil
},
ProxyUpdater: proxyUpdater,
})
}

Expand Down
10 changes: 5 additions & 5 deletions agent/consul/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func newServerWithDeps(t *testing.T, c *Config, deps Deps) (*Server, error) {
}
}
grpcServer := external.NewServer(deps.Logger.Named("grpc.external"), nil, deps.TLSConfigurator, rpcRate.NullRequestLimitsHandler())
srv, err := NewServer(c, deps, grpcServer, nil, deps.Logger)
srv, err := NewServer(c, deps, grpcServer, nil, deps.Logger, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1243,7 +1243,7 @@ func TestServer_RPC_MetricsIntercept_Off(t *testing.T) {
}
}

s1, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger)
s1, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1281,7 +1281,7 @@ func TestServer_RPC_MetricsIntercept_Off(t *testing.T) {
return nil
}

s2, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger)
s2, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1315,7 +1315,7 @@ func TestServer_RPC_RequestRecorder(t *testing.T) {
deps := newDefaultDeps(t, conf)
deps.NewRequestRecorderFunc = nil

s1, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger)
s1, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger, nil)

require.Error(t, err, "need err when provider func is nil")
require.Equal(t, err.Error(), "cannot initialize server without an RPC request recorder provider")
Expand All @@ -1334,7 +1334,7 @@ func TestServer_RPC_RequestRecorder(t *testing.T) {
return nil
}

s2, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger)
s2, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger, nil)

require.Error(t, err, "need err when RequestRecorder is nil")
require.Equal(t, err.Error(), "cannot initialize server with a nil RPC request recorder")
Expand Down
10 changes: 6 additions & 4 deletions agent/proxycfg-sources/catalog/config_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package catalog
import (
"context"
"errors"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"github.com/hashicorp/consul/proto-public/pbresource"
"sync"

Expand All @@ -14,7 +16,6 @@ import (

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
Expand Down Expand Up @@ -49,7 +50,7 @@ func NewConfigSource(cfg Config) *ConfigSource {

// Watch wraps the underlying proxycfg.Manager and dynamically registers
// services from the catalog with it when requested by the xDS server.
func (m *ConfigSource) Watch(id *pbresource.ID, nodeName string, token string) (<-chan proxycfg.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) {
func (m *ConfigSource) Watch(id *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) {
// Create service ID
serviceID := structs.NewServiceID(id.Name, GetEnterpriseMetaFromResourceID(id))
// If the service is registered to the local agent, use the LocalConfigSource
Expand Down Expand Up @@ -279,7 +280,7 @@ type Config struct {

//go:generate mockery --name ConfigManager --inpackage
type ConfigManager interface {
Watch(req proxycfg.ProxyID) (<-chan proxycfg.ProxySnapshot, proxycfg.CancelFunc)
Watch(req proxycfg.ProxyID) (<-chan proxysnapshot.ProxySnapshot, proxysnapshot.CancelFunc)
Register(proxyID proxycfg.ProxyID, service *structs.NodeService, source proxycfg.ProxySource, token string, overwrite bool) error
Deregister(proxyID proxycfg.ProxyID, source proxycfg.ProxySource)
}
Expand All @@ -292,10 +293,11 @@ type Store interface {

//go:generate mockery --name Watcher --inpackage
type Watcher interface {
Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxycfg.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error)
Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error)
}

//go:generate mockery --name SessionLimiter --inpackage
type SessionLimiter interface {
BeginSession() (limiter.Session, error)
Run(ctx context.Context)
}
Loading

0 comments on commit 0e60650

Please sign in to comment.