Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update proxycfg state management and xDS generation for transparent proxy #9894

Merged
merged 14 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3713,6 +3713,8 @@ func (a *Agent) registerCache() {

a.cache.RegisterType(cachetype.IntentionMatchName, &cachetype.IntentionMatch{RPC: a})

a.cache.RegisterType(cachetype.IntentionUpstreamsName, &cachetype.IntentionUpstreams{RPC: a})

a.cache.RegisterType(cachetype.CatalogServicesName, &cachetype.CatalogServices{RPC: a})

a.cache.RegisterType(cachetype.HealthServicesName, &cachetype.HealthServices{RPC: a})
Expand Down
52 changes: 52 additions & 0 deletions agent/cache-types/intention_upstreams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package cachetype

import (
"fmt"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)

// Recommended name for registration.
const IntentionUpstreamsName = "intention-upstreams"

// GatewayUpstreams supports fetching upstreams for a given gateway name.
type IntentionUpstreams struct {
RegisterOptionsBlockingRefresh
RPC RPC
}

func (i *IntentionUpstreams) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult

// The request should be a ServiceSpecificRequest.
reqReal, ok := req.(*structs.ServiceSpecificRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}

// Lightweight copy this object so that manipulating QueryOptions doesn't race.
dup := *reqReal
reqReal = &dup

// Set the minimum query index to our current index so we block
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
reqReal.QueryOptions.MaxQueryTime = opts.Timeout

// Always allow stale - there's no point in hitting leader if the request is
// going to be served from cache and end up arbitrarily stale anyway. This
// allows cached service-discover to automatically read scale across all
// servers too.
reqReal.AllowStale = true

// Fetch
var reply structs.IndexedServiceList
if err := i.RPC.RPC("Internal.IntentionUpstreams", reqReal, &reply); err != nil {
return result, err
}

result.Value = &reply
result.Index = reply.QueryMeta.Index
return result, nil
}
52 changes: 52 additions & 0 deletions agent/cache-types/intention_upstreams_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package cachetype

import (
"testing"
"time"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestIntentionUpstreams(t *testing.T) {
rpc := TestRPC(t)
typ := &IntentionUpstreams{RPC: rpc}

// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedServiceList
rpc.On("RPC", "Internal.IntentionUpstreams", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ServiceSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
require.Equal(t, "foo", req.ServiceName)

services := structs.ServiceList{
{Name: "foo"},
}
reply := args.Get(2).(*structs.IndexedServiceList)
reply.Services = services
reply.QueryMeta.Index = 48
resp = reply
})

// Fetch
resultA, err := typ.Fetch(cache.FetchOptions{
MinIndex: 24,
Timeout: 1 * time.Second,
}, &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "foo",
})
require.NoError(t, err)
require.Equal(t, cache.FetchResult{
Value: resp,
Index: 48,
}, resultA)

rpc.AssertExpectations(t)
}
3 changes: 3 additions & 0 deletions agent/consul/state/intention.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,9 @@ func (s *Store) IntentionTopology(ws memdb.WatchSet,
}
result := make(structs.ServiceList, 0, len(allServices))
for _, candidate := range allServices {
if candidate.Name == structs.ConsulServiceName {
continue
}
decision, err := s.IntentionDecision(candidate.Name, candidate.NamespaceOrDefault(), intentions, decisionMatchType, defaultDecision, true)
if err != nil {
src, dst := target, candidate
Expand Down
19 changes: 12 additions & 7 deletions agent/consul/state/intention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1889,6 +1889,11 @@ func TestStore_IntentionTopology(t *testing.T) {
Address: "127.0.0.1",
}
services := []structs.NodeService{
{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
{
ID: "api-1",
Service: "api",
Expand Down Expand Up @@ -1960,7 +1965,7 @@ func TestStore_IntentionTopology(t *testing.T) {
target: structs.NewServiceName("web", nil),
downstreams: false,
expect: expect{
idx: 9,
idx: 10,
services: structs.ServiceList{
{
Name: "mysql",
Expand All @@ -1987,7 +1992,7 @@ func TestStore_IntentionTopology(t *testing.T) {
target: structs.NewServiceName("web", nil),
downstreams: false,
expect: expect{
idx: 9,
idx: 10,
services: structs.ServiceList{
{
Name: "api",
Expand All @@ -2014,7 +2019,7 @@ func TestStore_IntentionTopology(t *testing.T) {
target: structs.NewServiceName("api", nil),
downstreams: true,
expect: expect{
idx: 9,
idx: 10,
services: structs.ServiceList{
{
Name: "ingress-gateway",
Expand Down Expand Up @@ -2045,7 +2050,7 @@ func TestStore_IntentionTopology(t *testing.T) {
target: structs.NewServiceName("api", nil),
downstreams: true,
expect: expect{
idx: 9,
idx: 10,
services: structs.ServiceList{
{
Name: "web",
Expand All @@ -2072,7 +2077,7 @@ func TestStore_IntentionTopology(t *testing.T) {
target: structs.NewServiceName("web", nil),
downstreams: false,
expect: expect{
idx: 9,
idx: 10,
services: structs.ServiceList{
{
Name: "api",
Expand Down Expand Up @@ -2103,7 +2108,7 @@ func TestStore_IntentionTopology(t *testing.T) {
target: structs.NewServiceName("web", nil),
downstreams: false,
expect: expect{
idx: 9,
idx: 10,
services: structs.ServiceList{},
},
},
Expand All @@ -2125,7 +2130,7 @@ func TestStore_IntentionTopology(t *testing.T) {
target: structs.NewServiceName("web", nil),
downstreams: false,
expect: expect{
idx: 9,
idx: 10,
services: structs.ServiceList{
{
Name: "api",
Expand Down
22 changes: 19 additions & 3 deletions agent/proxycfg/manager_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package proxycfg

import (
"context"
"path"
"testing"
"time"
Expand Down Expand Up @@ -105,6 +106,11 @@ func TestManager_BasicLifecycle(t *testing.T) {
},
)
}

upstreams := structs.TestUpstreams(t)
for i := range upstreams {
upstreams[i].DestinationNamespace = structs.IntentionDefaultNamespace
}
webProxy := &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-sidecar-proxy",
Expand All @@ -119,7 +125,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
Config: map[string]interface{}{
"foo": "bar",
},
Upstreams: structs.TestUpstreams(t),
Upstreams: upstreams,
},
}

Expand Down Expand Up @@ -212,7 +218,8 @@ func TestManager_BasicLifecycle(t *testing.T) {
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
"db": dbDefaultChain(),
},
WatchedUpstreams: nil, // Clone() clears this out
WatchedDiscoveryChains: map[string]context.CancelFunc{},
WatchedUpstreams: nil, // Clone() clears this out
WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {
"db.default.dc1": TestUpstreamNodes(t),
Expand All @@ -222,6 +229,10 @@ func TestManager_BasicLifecycle(t *testing.T) {
WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {},
},
UpstreamConfig: map[string]*structs.Upstream{
upstreams[0].Identifier(): &upstreams[0],
upstreams[1].Identifier(): &upstreams[1],
},
},
PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{},
WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{},
Expand Down Expand Up @@ -261,7 +272,8 @@ func TestManager_BasicLifecycle(t *testing.T) {
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
"db": dbSplitChain(),
},
WatchedUpstreams: nil, // Clone() clears this out
WatchedDiscoveryChains: map[string]context.CancelFunc{},
WatchedUpstreams: nil, // Clone() clears this out
WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {
"v1.db.default.dc1": TestUpstreamNodes(t),
Expand All @@ -272,6 +284,10 @@ func TestManager_BasicLifecycle(t *testing.T) {
WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {},
},
UpstreamConfig: map[string]*structs.Upstream{
upstreams[0].Identifier(): &upstreams[0],
upstreams[1].Identifier(): &upstreams[1],
},
},
PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{},
WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{},
Expand Down
21 changes: 13 additions & 8 deletions agent/proxycfg/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ type ConfigSnapshotUpstreams struct {
// targeted by this upstream. We then instantiate watches for those targets.
DiscoveryChain map[string]*structs.CompiledDiscoveryChain

// WatchedDiscoveryChains is a map of upstream.Identifier() -> CancelFunc's
// in order to cancel any watches when the proxy's configuration is
// changed. Ingress gateways and transparent proxies need this because
// discovery chain watches are added and removed through the lifecycle
// of a single proxycfg state instance.
WatchedDiscoveryChains map[string]context.CancelFunc

// WatchedUpstreams is a map of upstream.Identifier() -> (map of TargetID ->
// CancelFunc's) in order to cancel any watches when the configuration is
// changed.
Expand All @@ -36,6 +43,9 @@ type ConfigSnapshotUpstreams struct {
// TargetID -> CheckServiceNodes) and is used to determine the backing
// endpoints of a mesh gateway.
WatchedGatewayEndpoints map[string]map[string]structs.CheckServiceNodes

// UpstreamConfig is a map to an upstream's configuration.
UpstreamConfig map[string]*structs.Upstream
}

type configSnapshotConnectProxy struct {
Expand All @@ -58,12 +68,14 @@ func (c *configSnapshotConnectProxy) IsEmpty() bool {
return c.Leaf == nil &&
!c.IntentionsSet &&
len(c.DiscoveryChain) == 0 &&
len(c.WatchedDiscoveryChains) == 0 &&
len(c.WatchedUpstreams) == 0 &&
len(c.WatchedUpstreamEndpoints) == 0 &&
len(c.WatchedGateways) == 0 &&
len(c.WatchedGatewayEndpoints) == 0 &&
len(c.WatchedServiceChecks) == 0 &&
len(c.PreparedQueryEndpoints) == 0
len(c.PreparedQueryEndpoints) == 0 &&
len(c.UpstreamConfig) == 0
}

type configSnapshotTerminatingGateway struct {
Expand Down Expand Up @@ -287,12 +299,6 @@ type configSnapshotIngressGateway struct {
// to. This is constructed from the ingress-gateway config entry, and uses
// the GatewayServices RPC to retrieve them.
Upstreams map[IngressListenerKey]structs.Upstreams

// WatchedDiscoveryChains is a map of upstream.Identifier() -> CancelFunc's
// in order to cancel any watches when the ingress gateway configuration is
// changed. Ingress gateways need this because discovery chain watches are
// added and removed through the lifecycle of single proxycfg.state instance.
WatchedDiscoveryChains map[string]context.CancelFunc
}

func (c *configSnapshotIngressGateway) IsEmpty() bool {
Expand All @@ -301,7 +307,6 @@ func (c *configSnapshotIngressGateway) IsEmpty() bool {
}
return len(c.Upstreams) == 0 &&
len(c.DiscoveryChain) == 0 &&
len(c.WatchedDiscoveryChains) == 0 &&
len(c.WatchedUpstreams) == 0 &&
len(c.WatchedUpstreamEndpoints) == 0
}
Expand Down
Loading