Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NET-5338 - NET-5338 - Run a v2 mode xds server #18579

Merged
merged 2 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"encoding/json"
"errors"
"fmt"
proxytracker "github.com/hashicorp/consul/agent/proxy-tracker"
catalogproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/catalog"
"github.com/hashicorp/consul/lib/stringslice"
"io"
"net"
"net/http"
Expand Down Expand Up @@ -54,7 +57,6 @@ import (
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
proxycfgglue "github.com/hashicorp/consul/agent/proxycfg-glue"
catalogproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/catalog"
localproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/local"
"github.com/hashicorp/consul/agent/rpcclient"
"github.com/hashicorp/consul/agent/rpcclient/configentry"
Expand Down Expand Up @@ -908,13 +910,37 @@ func (a *Agent) Failed() <-chan struct{} {
return a.apiServers.failed
}

func (a *Agent) listenAndServeGRPC() error {
if len(a.config.GRPCAddrs) < 1 && len(a.config.GRPCTLSAddrs) < 1 {
return nil
// useV2Resources returns true if "resource-apis" is present in the Experiments
// array of the agent config.
func (a *Agent) useV2Resources() bool {
if stringslice.Contains(a.baseDeps.Experiments, consul.CatalogResourceExperimentName) {
return true
}
return false
}

// getProxyWatcher returns the proper implementation of the ProxyWatcher interface.
// It will return a ProxyTracker if "resource-apis" experiment is active. Otherwise,
// it will return a ConfigSource.
func (a *Agent) getProxyWatcher() xds.ProxyWatcher {
if a.useV2Resources() {
return proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{
Logger: a.proxyConfig.Logger.Named("proxy-tracker"),
SessionLimiter: a.baseDeps.XDSStreamLimiter,
})
} else {
return localproxycfg.NewConfigSource(a.proxyConfig)
}
}

// configureXDSServer configures an XDS server with the proper implementation of
// the PRoxyWatcher interface and registers the XDS server with Consul's
// external facing GRPC server.
func (a *Agent) configureXDSServer() {
cfg := a.getProxyWatcher()

// TODO(agentless): rather than asserting the concrete type of delegate, we
// should add a method to the Delegate interface to build a ConfigSource.
var cfg xds.ProxyConfigSource = localproxycfg.NewConfigSource(a.proxyConfig)
if server, ok := a.delegate.(*consul.Server); ok {
catalogCfg := catalogproxycfg.NewConfigSource(catalogproxycfg.Config{
NodeName: a.config.NodeName,
Expand All @@ -941,6 +967,14 @@ func (a *Agent) listenAndServeGRPC() error {
a,
)
a.xdsServer.Register(a.externalGRPCServer)
}

func (a *Agent) listenAndServeGRPC() error {
if len(a.config.GRPCAddrs) < 1 && len(a.config.GRPCTLSAddrs) < 1 {
return nil
}

a.configureXDSServer()

// Attempt to spawn listeners
var listeners []net.Listener
Expand Down
73 changes: 73 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
proxytracker "github.com/hashicorp/consul/agent/proxy-tracker"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/proxycfg-sources/local"
"github.com/hashicorp/consul/agent/xds"
mathrand "math/rand"
"net"
"net/http"
Expand All @@ -22,6 +27,7 @@ import (
"os"
"path"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -6358,6 +6364,73 @@ func TestAgent_checkServerLastSeen(t *testing.T) {
})
}

func TestAgent_getProxyWatcher(t *testing.T) {
type testcase struct {
description string
getExperiments func() []string
expectedType xds.ProxyWatcher
}
testscases := []testcase{
{
description: "config source is returned when api-resources experiment is not configured",
expectedType: &local.ConfigSource{},
getExperiments: func() []string {
return []string{}
},
},
{
description: "proxy tracker is returned when api-resources experiment is configured",
expectedType: &proxytracker.ProxyTracker{},
getExperiments: func() []string {
return []string{consul.CatalogResourceExperimentName}
},
},
}
for _, tc := range testscases {
caConfig := tlsutil.Config{}
tlsConf, err := tlsutil.NewConfigurator(caConfig, hclog.New(nil))
require.NoError(t, err)

bd := BaseDeps{
Deps: consul.Deps{
Logger: hclog.NewInterceptLogger(nil),
Tokens: new(token.Store),
TLSConfigurator: tlsConf,
GRPCConnPool: &fakeGRPCConnPool{},
Registry: resource.NewRegistry(),
},
RuntimeConfig: &config.RuntimeConfig{
HTTPAddrs: []net.Addr{
&net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: freeport.GetOne(t)},
},
},
Cache: cache.New(cache.Options{}),
NetRPC: &LazyNetRPC{},
}

bd.XDSStreamLimiter = limiter.NewSessionLimiter()
bd.LeafCertManager = leafcert.NewManager(leafcert.Deps{
CertSigner: leafcert.NewNetRPCCertSigner(bd.NetRPC),
RootsReader: leafcert.NewCachedRootsReader(bd.Cache, "dc1"),
Config: leafcert.Config{},
})

cfg := config.RuntimeConfig{
BuildDate: time.Date(2000, 1, 1, 0, 0, 1, 0, time.UTC),
}
bd, err = initEnterpriseBaseDeps(bd, &cfg)
require.NoError(t, err)

bd.Experiments = tc.getExperiments()

agent, err := New(bd)
require.NoError(t, err)
agent.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{Logger: bd.Logger, Source: &structs.QuerySource{}})
require.NoError(t, err)
require.IsTypef(t, tc.expectedType, agent.getProxyWatcher(), fmt.Sprintf("Expected proxyWatcher to be of type %s", reflect.TypeOf(tc.expectedType)))
}

}
func getExpectedCaPoolByFile(t *testing.T) *x509.CertPool {
pool := x509.NewCertPool()
data, err := os.ReadFile("../test/ca/root.cer")
Expand Down
4 changes: 1 addition & 3 deletions agent/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,7 @@ func DevSource() Source {
ports = {
grpc = 8502
}
experiments = [
"resource-apis"
]
experiments = []
`,
}
}
Expand Down
2 changes: 1 addition & 1 deletion agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func TestLoad_IntegrationWithFlags(t *testing.T) {
rt.DevMode = true
rt.DisableAnonymousSignature = true
rt.DisableKeyringFile = true
rt.Experiments = []string{"resource-apis"}
rt.Experiments = nil
rt.EnableDebug = true
rt.UIConfig.Enabled = true
rt.LeaveOnTerm = false
Expand Down
4 changes: 2 additions & 2 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ const (

LeaderTransferMinVersion = "1.6.0"

catalogResourceExperimentName = "resource-apis"
CatalogResourceExperimentName = "resource-apis"
)

const (
Expand Down Expand Up @@ -874,7 +874,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
}

func (s *Server) registerControllers(deps Deps) {
if stringslice.Contains(deps.Experiments, catalogResourceExperimentName) {
if stringslice.Contains(deps.Experiments, CatalogResourceExperimentName) {
catalog.RegisterControllers(s.controllerManager, catalog.DefaultControllerDependencies())
mesh.RegisterControllers(s.controllerManager, mesh.ControllerDependencies{
TrustBundleFetcher: func() (*pbproxystate.TrustBundle, error) {
Expand Down
31 changes: 0 additions & 31 deletions agent/proxy-tracker/mock_Logger.go

This file was deleted.

45 changes: 24 additions & 21 deletions agent/proxy-tracker/proxy_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package proxytracker
import (
"errors"
"fmt"
"github.com/hashicorp/go-hclog"
"sync"

"github.com/hashicorp/consul/internal/controller"
Expand All @@ -14,7 +15,6 @@ import (

"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbresource"
)

Expand All @@ -35,9 +35,9 @@ func (e *ProxyConnection) Key() string {
// when the ProxyState for that proxyID has changed.
type proxyWatchData struct {
// notifyCh is the channel that the watcher receives updates from ProxyTracker.
notifyCh chan *pbmesh.ProxyState
notifyCh chan proxycfg.ProxySnapshot
// state is the current/last updated ProxyState for a given proxy.
state *pbmesh.ProxyState
state *mesh.ProxyState
// token is the ACL token provided by the watcher.
token string
// nodeName is the node where the given proxy resides.
Expand All @@ -46,7 +46,7 @@ type proxyWatchData struct {

type ProxyTrackerConfig struct {
// logger will be used to write log messages.
Logger Logger
Logger hclog.Logger

// sessionLimiter is used to enforce xDS concurrency limits.
SessionLimiter SessionLimiter
Expand Down Expand Up @@ -87,10 +87,10 @@ func NewProxyTracker(cfg ProxyTrackerConfig) *ProxyTracker {
// Watch connects a proxy with ProxyTracker and returns the consumer a channel to receive updates,
// a channel to notify of xDS terminated session, and a cancel function to cancel the watch.
func (pt *ProxyTracker) Watch(proxyID *pbresource.ID,
nodeName string, token string) (<-chan *pbmesh.ProxyState,
nodeName string, token string) (<-chan proxycfg.ProxySnapshot,
limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) {

if err := validateArgs(proxyID, nodeName, token); err != nil {
pt.config.Logger.Trace("watch initiated", "proxyID", proxyID, "nodeName", nodeName)
if err := pt.validateWatchArgs(proxyID, nodeName); err != nil {
pt.config.Logger.Error("args failed validation", err)
return nil, nil, nil, err
}
Expand All @@ -105,7 +105,8 @@ func (pt *ProxyTracker) Watch(proxyID *pbresource.ID,

// This buffering is crucial otherwise we'd block immediately trying to
// deliver the current snapshot below if we already have one.
proxyStateChan := make(chan *pbmesh.ProxyState, 1)

proxyStateChan := make(chan proxycfg.ProxySnapshot, 1)
watchData := &proxyWatchData{
notifyCh: proxyStateChan,
state: nil,
Expand All @@ -128,9 +129,11 @@ func (pt *ProxyTracker) Watch(proxyID *pbresource.ID,
//Send an event to the controller
err = pt.notifyNewProxyChannel(proxyID)
if err != nil {
pt.config.Logger.Error("failed to notify controller of new proxy connection", err)
pt.cancelWatchLocked(proxyReferenceKey, watchData.notifyCh, session)
return nil, nil, nil, err
}
pt.config.Logger.Trace("controller notified of watch created", "proxyID", proxyID, "nodeName", nodeName)

return proxyStateChan, session.Terminated(), cancel, nil
}
Expand Down Expand Up @@ -163,34 +166,37 @@ func (pt *ProxyTracker) notifyNewProxyChannel(proxyID *pbresource.ID) error {
// - ends the session with xDS session limiter.
// - closes the proxy state channel assigned to the proxy.
// This function assumes the state lock is already held.
func (pt *ProxyTracker) cancelWatchLocked(proxyReferenceKey resource.ReferenceKey, proxyStateChan chan *pbmesh.ProxyState, session limiter.Session) {
func (pt *ProxyTracker) cancelWatchLocked(proxyReferenceKey resource.ReferenceKey, proxyStateChan chan proxycfg.ProxySnapshot, session limiter.Session) {
delete(pt.proxies, proxyReferenceKey)
session.End()
close(proxyStateChan)
pt.config.Logger.Trace("watch cancelled", "proxyReferenceKey", proxyReferenceKey)
}

func validateArgs(proxyID *pbresource.ID,
nodeName string, token string) error {
// validateWatchArgs checks the proxyIDand nodeName passed to Watch
// and returns an error if the args are not properly constructed.
func (pt *ProxyTracker) validateWatchArgs(proxyID *pbresource.ID,
nodeName string) error {
if proxyID == nil {
return errors.New("proxyID is required")
} else if proxyID.Type.Kind != mesh.ProxyStateTemplateConfigurationType.Kind {
} else if proxyID.GetType().GetKind() != mesh.ProxyStateTemplateConfigurationType.Kind {
return fmt.Errorf("proxyID must be a %s", mesh.ProxyStateTemplateConfigurationType.GetKind())
} else if nodeName == "" {
return errors.New("nodeName is required")
} else if token == "" {
return errors.New("token is required")
}

return nil
}

// PushChange allows pushing a computed ProxyState to xds for xds resource generation to send to a proxy.
func (pt *ProxyTracker) PushChange(proxyID *pbresource.ID, proxyState *pbmesh.ProxyState) error {
func (pt *ProxyTracker) PushChange(proxyID *pbresource.ID, proxyState *mesh.ProxyState) error {
pt.config.Logger.Trace("push change called for proxy", "proxyID", proxyID)
proxyReferenceKey := resource.NewReferenceKey(proxyID)
pt.mu.Lock()
defer pt.mu.Unlock()
if data, ok := pt.proxies[proxyReferenceKey]; ok {
data.state = proxyState

pt.deliverLatest(proxyID, proxyState, data.notifyCh)
} else {
return errors.New("proxyState change could not be sent because proxy is not connected")
Expand All @@ -199,7 +205,8 @@ func (pt *ProxyTracker) PushChange(proxyID *pbresource.ID, proxyState *pbmesh.Pr
return nil
}

func (pt *ProxyTracker) deliverLatest(proxyID *pbresource.ID, proxyState *pbmesh.ProxyState, ch chan *pbmesh.ProxyState) {
func (pt *ProxyTracker) deliverLatest(proxyID *pbresource.ID, proxyState *mesh.ProxyState, ch chan proxycfg.ProxySnapshot) {
pt.config.Logger.Trace("delivering latest proxy snapshot to proxy", "proxyID", proxyID)
// Send if chan is empty
select {
case ch <- proxyState:
Expand Down Expand Up @@ -254,6 +261,7 @@ func (pt *ProxyTracker) ProxyConnectedToServer(proxyID *pbresource.ID) bool {

// Shutdown removes all state and close all channels.
func (pt *ProxyTracker) Shutdown() {
pt.config.Logger.Info("proxy tracker shutdown initiated")
pt.mu.Lock()
defer pt.mu.Unlock()

Expand All @@ -271,8 +279,3 @@ func (pt *ProxyTracker) Shutdown() {
type SessionLimiter interface {
BeginSession() (limiter.Session, error)
}

//go:generate mockery --name Logger --inpackage
type Logger interface {
Error(args ...any)
}
Loading