Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NET-4944 - wire up controllers with proxy tracker #18603

Merged
merged 1 commit into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 57 additions & 46 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"encoding/json"
"errors"
"fmt"
proxytracker "github.com/hashicorp/consul/agent/proxy-tracker"
catalogproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/catalog"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
"github.com/hashicorp/consul/lib/stringslice"
"io"
"net"
Expand Down Expand Up @@ -653,6 +653,47 @@ func (a *Agent) Start(ctx context.Context) error {
return fmt.Errorf("failed to start Consul enterprise component: %v", err)
}

// Create proxy config manager now because it is a dependency of creating the proxyWatcher
// which will be passed to consul.NewServer so that it is then passed to the
// controller registration for the XDS controller in v2 mode, and the xds server in v1 and v2 mode.
var intentionDefaultAllow bool
switch a.config.ACLResolverSettings.ACLDefaultPolicy {
case "allow":
intentionDefaultAllow = true
case "deny":
intentionDefaultAllow = false
default:
return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLResolverSettings.ACLDefaultPolicy)
}

go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})

// Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
DataSources: a.proxyDataSources(),
Logger: a.logger.Named(logging.ProxyConfig),
Source: &structs.QuerySource{
Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName,
Node: a.config.NodeName,
NodePartition: a.config.PartitionOrEmpty(),
},
DNSConfig: proxycfg.DNSConfig{
Domain: a.config.DNSDomain,
AltDomain: a.config.DNSAltDomain,
},
TLSConfigurator: a.tlsConfigurator,
IntentionDefaultAllow: intentionDefaultAllow,
UpdateRateLimit: a.config.XDSUpdateRateLimit,
})
if err != nil {
return err
}

// proxyWatcher will be used in the creation of the XDS server and also
// in the registration of the xds controller.
proxyWatcher := a.getProxyWatcher()

// Setup either the client or the server.
if c.ServerMode {
serverLogger := a.baseDeps.Logger.NamedIntercept(logging.ConsulServer)
Expand Down Expand Up @@ -686,7 +727,11 @@ func (a *Agent) Start(ctx context.Context) error {
incomingRPCLimiter,
)

server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer, incomingRPCLimiter, serverLogger)
var pt *proxytracker.ProxyTracker
if a.useV2Resources() {
pt = proxyWatcher.(*proxytracker.ProxyTracker)
}
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer, incomingRPCLimiter, serverLogger, pt)
if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err)
}
Expand Down Expand Up @@ -753,40 +798,6 @@ func (a *Agent) Start(ctx context.Context) error {
return err
}

var intentionDefaultAllow bool
switch a.config.ACLResolverSettings.ACLDefaultPolicy {
case "allow":
intentionDefaultAllow = true
case "deny":
intentionDefaultAllow = false
default:
return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLResolverSettings.ACLDefaultPolicy)
}

go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})

// Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
DataSources: a.proxyDataSources(),
Logger: a.logger.Named(logging.ProxyConfig),
Source: &structs.QuerySource{
Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName,
Node: a.config.NodeName,
NodePartition: a.config.PartitionOrEmpty(),
},
DNSConfig: proxycfg.DNSConfig{
Domain: a.config.DNSDomain,
AltDomain: a.config.DNSAltDomain,
},
TLSConfigurator: a.tlsConfigurator,
IntentionDefaultAllow: intentionDefaultAllow,
UpdateRateLimit: a.config.XDSUpdateRateLimit,
})
if err != nil {
return err
}

go localproxycfg.Sync(
&lib.StopChannelContext{StopCh: a.shutdownCh},
localproxycfg.SyncConfig{
Expand Down Expand Up @@ -839,7 +850,7 @@ func (a *Agent) Start(ctx context.Context) error {
}

// Start grpc and grpc_tls servers.
if err := a.listenAndServeGRPC(); err != nil {
if err := a.listenAndServeGRPC(proxyWatcher); err != nil {
return err
}

Expand Down Expand Up @@ -924,28 +935,28 @@ func (a *Agent) useV2Resources() bool {
// it will return a ConfigSource.
func (a *Agent) getProxyWatcher() xds.ProxyWatcher {
if a.useV2Resources() {
a.logger.Trace("returning proxyTracker for getProxyWatcher")
return proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{
Logger: a.proxyConfig.Logger.Named("proxy-tracker"),
Logger: a.logger.Named("proxy-tracker"),
SessionLimiter: a.baseDeps.XDSStreamLimiter,
})
} else {
a.logger.Trace("returning configSource for getProxyWatcher")
return localproxycfg.NewConfigSource(a.proxyConfig)
}
}

// configureXDSServer configures an XDS server with the proper implementation of
// the PRoxyWatcher interface and registers the XDS server with Consul's
// external facing GRPC server.
func (a *Agent) configureXDSServer() {
cfg := a.getProxyWatcher()

func (a *Agent) configureXDSServer(proxyWatcher xds.ProxyWatcher) {
// TODO(agentless): rather than asserting the concrete type of delegate, we
// should add a method to the Delegate interface to build a ConfigSource.
if server, ok := a.delegate.(*consul.Server); ok {
catalogCfg := catalogproxycfg.NewConfigSource(catalogproxycfg.Config{
NodeName: a.config.NodeName,
LocalState: a.State,
LocalConfigSource: cfg,
LocalConfigSource: proxyWatcher,
Manager: a.proxyConfig,
GetStore: func() catalogproxycfg.Store { return server.FSM().State() },
Logger: a.proxyConfig.Logger.Named("server-catalog"),
Expand All @@ -955,12 +966,12 @@ func (a *Agent) configureXDSServer() {
<-a.shutdownCh
catalogCfg.Shutdown()
}()
cfg = catalogCfg
proxyWatcher = catalogCfg
}
a.xdsServer = xds.NewServer(
a.config.NodeName,
a.logger.Named(logging.Envoy),
cfg,
proxyWatcher,
func(id string) (acl.Authorizer, error) {
return a.delegate.ResolveTokenAndDefaultMeta(id, nil, nil)
},
Expand All @@ -969,12 +980,12 @@ func (a *Agent) configureXDSServer() {
a.xdsServer.Register(a.externalGRPCServer)
}

func (a *Agent) listenAndServeGRPC() error {
func (a *Agent) listenAndServeGRPC(proxyWatcher xds.ProxyWatcher) error {
if len(a.config.GRPCAddrs) < 1 && len(a.config.GRPCTLSAddrs) < 1 {
return nil
}

a.configureXDSServer()
a.configureXDSServer(proxyWatcher)

// Attempt to spawn listeners
var listeners []net.Listener
Expand Down
2 changes: 1 addition & 1 deletion agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import (
"errors"
"fmt"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
proxytracker "github.com/hashicorp/consul/agent/proxy-tracker"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/proxycfg-sources/local"
"github.com/hashicorp/consul/agent/xds"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
mathrand "math/rand"
"net"
"net/http"
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/leader_connect_ca_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ func TestCAManager_Initialize_Logging(t *testing.T) {
deps := newDefaultDeps(t, conf1)
deps.Logger = logger

s1, err := NewServer(conf1, deps, grpc.NewServer(), nil, logger)
s1, err := NewServer(conf1, deps, grpc.NewServer(), nil, logger, nil)
require.NoError(t, err)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1640,7 +1640,7 @@ func TestLeader_ConfigEntryBootstrap_Fail(t *testing.T) {
deps := newDefaultDeps(t, config)
deps.Logger = logger

srv, err := NewServer(config, deps, grpc.NewServer(), nil, logger)
srv, err := NewServer(config, deps, grpc.NewServer(), nil, logger, nil)
require.NoError(t, err)
defer srv.Shutdown()

Expand Down
20 changes: 17 additions & 3 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"crypto/x509"
"errors"
"fmt"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"io"
"net"
"os"
Expand Down Expand Up @@ -480,9 +481,21 @@ type connHandler interface {
Shutdown() error
}

// ProxyUpdater is an interface for ProxyTracker.
type ProxyUpdater interface {
// PushChange allows pushing a computed ProxyState to xds for xds resource generation to send to a proxy.
PushChange(id *pbresource.ID, snapshot proxysnapshot.ProxySnapshot) error

// ProxyConnectedToServer returns whether this id is connected to this server.
ProxyConnectedToServer(id *pbresource.ID) bool

EventChannel() chan controller.Event
}

// NewServer is used to construct a new Consul server from the configuration
// and extra options, potentially returning an error.
func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incomingRPCLimiter rpcRate.RequestLimitsHandler, serverLogger hclog.InterceptLogger) (*Server, error) {
func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server,
incomingRPCLimiter rpcRate.RequestLimitsHandler, serverLogger hclog.InterceptLogger, proxyUpdater ProxyUpdater) (*Server, error) {
logger := flat.Logger
if err := config.CheckProtocolVersion(); err != nil {
return nil, err
Expand Down Expand Up @@ -822,7 +835,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
s.insecureResourceServiceClient,
logger.Named(logging.ControllerRuntime),
)
s.registerControllers(flat)
s.registerControllers(flat, proxyUpdater)
go s.controllerManager.Run(&lib.StopChannelContext{StopCh: shutdownCh})

go s.trackLeaderChanges()
Expand Down Expand Up @@ -873,7 +886,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
return s, nil
}

func (s *Server) registerControllers(deps Deps) {
func (s *Server) registerControllers(deps Deps, proxyUpdater ProxyUpdater) {
if stringslice.Contains(deps.Experiments, CatalogResourceExperimentName) {
catalog.RegisterControllers(s.controllerManager, catalog.DefaultControllerDependencies())
mesh.RegisterControllers(s.controllerManager, mesh.ControllerDependencies{
Expand All @@ -889,6 +902,7 @@ func (s *Server) registerControllers(deps Deps) {
}
return &bundle, nil
},
ProxyUpdater: proxyUpdater,
})
}

Expand Down
10 changes: 5 additions & 5 deletions agent/consul/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func newServerWithDeps(t *testing.T, c *Config, deps Deps) (*Server, error) {
}
}
grpcServer := external.NewServer(deps.Logger.Named("grpc.external"), nil, deps.TLSConfigurator, rpcRate.NullRequestLimitsHandler())
srv, err := NewServer(c, deps, grpcServer, nil, deps.Logger)
srv, err := NewServer(c, deps, grpcServer, nil, deps.Logger, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1243,7 +1243,7 @@ func TestServer_RPC_MetricsIntercept_Off(t *testing.T) {
}
}

s1, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger)
s1, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1281,7 +1281,7 @@ func TestServer_RPC_MetricsIntercept_Off(t *testing.T) {
return nil
}

s2, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger)
s2, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1315,7 +1315,7 @@ func TestServer_RPC_RequestRecorder(t *testing.T) {
deps := newDefaultDeps(t, conf)
deps.NewRequestRecorderFunc = nil

s1, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger)
s1, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger, nil)

require.Error(t, err, "need err when provider func is nil")
require.Equal(t, err.Error(), "cannot initialize server without an RPC request recorder provider")
Expand All @@ -1334,7 +1334,7 @@ func TestServer_RPC_RequestRecorder(t *testing.T) {
return nil
}

s2, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger)
s2, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger, nil)

require.Error(t, err, "need err when RequestRecorder is nil")
require.Equal(t, err.Error(), "cannot initialize server with a nil RPC request recorder")
Expand Down
10 changes: 6 additions & 4 deletions agent/proxycfg-sources/catalog/config_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package catalog
import (
"context"
"errors"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"github.com/hashicorp/consul/proto-public/pbresource"
"sync"

Expand All @@ -14,7 +16,6 @@ import (

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
Expand Down Expand Up @@ -49,7 +50,7 @@ func NewConfigSource(cfg Config) *ConfigSource {

// Watch wraps the underlying proxycfg.Manager and dynamically registers
// services from the catalog with it when requested by the xDS server.
func (m *ConfigSource) Watch(id *pbresource.ID, nodeName string, token string) (<-chan proxycfg.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) {
func (m *ConfigSource) Watch(id *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) {
// Create service ID
serviceID := structs.NewServiceID(id.Name, GetEnterpriseMetaFromResourceID(id))
// If the service is registered to the local agent, use the LocalConfigSource
Expand Down Expand Up @@ -279,7 +280,7 @@ type Config struct {

//go:generate mockery --name ConfigManager --inpackage
type ConfigManager interface {
Watch(req proxycfg.ProxyID) (<-chan proxycfg.ProxySnapshot, proxycfg.CancelFunc)
Watch(req proxycfg.ProxyID) (<-chan proxysnapshot.ProxySnapshot, proxysnapshot.CancelFunc)
Register(proxyID proxycfg.ProxyID, service *structs.NodeService, source proxycfg.ProxySource, token string, overwrite bool) error
Deregister(proxyID proxycfg.ProxyID, source proxycfg.ProxySource)
}
Expand All @@ -292,10 +293,11 @@ type Store interface {

//go:generate mockery --name Watcher --inpackage
type Watcher interface {
Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxycfg.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error)
Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error)
}

//go:generate mockery --name SessionLimiter --inpackage
type SessionLimiter interface {
BeginSession() (limiter.Session, error)
Run(ctx context.Context)
}
Loading