Skip to content

Commit

Permalink
Add support for transparent proxy in xDS generation
Browse files Browse the repository at this point in the history
  • Loading branch information
freddygv authored Mar 18, 2021
2 parents eca45f1 + 1a8586a commit fc02bb7
Show file tree
Hide file tree
Showing 27 changed files with 1,774 additions and 247 deletions.
3 changes: 3 additions & 0 deletions .changelog/9894.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
connect: Add support for transparently proxying traffic through Envoy. [experimental]
```
2 changes: 2 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3708,6 +3708,8 @@ func (a *Agent) registerCache() {

a.cache.RegisterType(cachetype.IntentionMatchName, &cachetype.IntentionMatch{RPC: a})

a.cache.RegisterType(cachetype.IntentionUpstreamsName, &cachetype.IntentionUpstreams{RPC: a})

a.cache.RegisterType(cachetype.CatalogServicesName, &cachetype.CatalogServices{RPC: a})

a.cache.RegisterType(cachetype.HealthServicesName, &cachetype.HealthServices{RPC: a})
Expand Down
52 changes: 52 additions & 0 deletions agent/cache-types/intention_upstreams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package cachetype

import (
"fmt"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)

// Recommended name for registration.
const IntentionUpstreamsName = "intention-upstreams"

// GatewayUpstreams supports fetching upstreams for a given gateway name.
type IntentionUpstreams struct {
RegisterOptionsBlockingRefresh
RPC RPC
}

func (i *IntentionUpstreams) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult

// The request should be a ServiceSpecificRequest.
reqReal, ok := req.(*structs.ServiceSpecificRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}

// Lightweight copy this object so that manipulating QueryOptions doesn't race.
dup := *reqReal
reqReal = &dup

// Set the minimum query index to our current index so we block
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
reqReal.QueryOptions.MaxQueryTime = opts.Timeout

// Always allow stale - there's no point in hitting leader if the request is
// going to be served from cache and end up arbitrarily stale anyway. This
// allows cached service-discover to automatically read scale across all
// servers too.
reqReal.AllowStale = true

// Fetch
var reply structs.IndexedServiceList
if err := i.RPC.RPC("Internal.IntentionUpstreams", reqReal, &reply); err != nil {
return result, err
}

result.Value = &reply
result.Index = reply.QueryMeta.Index
return result, nil
}
52 changes: 52 additions & 0 deletions agent/cache-types/intention_upstreams_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package cachetype

import (
"testing"
"time"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestIntentionUpstreams(t *testing.T) {
rpc := TestRPC(t)
typ := &IntentionUpstreams{RPC: rpc}

// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedServiceList
rpc.On("RPC", "Internal.IntentionUpstreams", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ServiceSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
require.Equal(t, "foo", req.ServiceName)

services := structs.ServiceList{
{Name: "foo"},
}
reply := args.Get(2).(*structs.IndexedServiceList)
reply.Services = services
reply.QueryMeta.Index = 48
resp = reply
})

// Fetch
resultA, err := typ.Fetch(cache.FetchOptions{
MinIndex: 24,
Timeout: 1 * time.Second,
}, &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "foo",
})
require.NoError(t, err)
require.Equal(t, cache.FetchResult{
Value: resp,
Index: 48,
}, resultA)

rpc.AssertExpectations(t)
}
3 changes: 3 additions & 0 deletions agent/consul/state/intention.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,9 @@ func (s *Store) IntentionTopology(ws memdb.WatchSet,
}
result := make(structs.ServiceList, 0, len(allServices))
for _, candidate := range allServices {
if candidate.Name == structs.ConsulServiceName {
continue
}
decision, err := s.IntentionDecision(candidate.Name, candidate.NamespaceOrDefault(), intentions, decisionMatchType, defaultDecision, true)
if err != nil {
src, dst := target, candidate
Expand Down
19 changes: 12 additions & 7 deletions agent/consul/state/intention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1889,6 +1889,11 @@ func TestStore_IntentionTopology(t *testing.T) {
Address: "127.0.0.1",
}
services := []structs.NodeService{
{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
{
ID: "api-1",
Service: "api",
Expand Down Expand Up @@ -1960,7 +1965,7 @@ func TestStore_IntentionTopology(t *testing.T) {
target: structs.NewServiceName("web", nil),
downstreams: false,
expect: expect{
idx: 9,
idx: 10,
services: structs.ServiceList{
{
Name: "mysql",
Expand All @@ -1987,7 +1992,7 @@ func TestStore_IntentionTopology(t *testing.T) {
target: structs.NewServiceName("web", nil),
downstreams: false,
expect: expect{
idx: 9,
idx: 10,
services: structs.ServiceList{
{
Name: "api",
Expand All @@ -2014,7 +2019,7 @@ func TestStore_IntentionTopology(t *testing.T) {
target: structs.NewServiceName("api", nil),
downstreams: true,
expect: expect{
idx: 9,
idx: 10,
services: structs.ServiceList{
{
Name: "ingress-gateway",
Expand Down Expand Up @@ -2045,7 +2050,7 @@ func TestStore_IntentionTopology(t *testing.T) {
target: structs.NewServiceName("api", nil),
downstreams: true,
expect: expect{
idx: 9,
idx: 10,
services: structs.ServiceList{
{
Name: "web",
Expand All @@ -2072,7 +2077,7 @@ func TestStore_IntentionTopology(t *testing.T) {
target: structs.NewServiceName("web", nil),
downstreams: false,
expect: expect{
idx: 9,
idx: 10,
services: structs.ServiceList{
{
Name: "api",
Expand Down Expand Up @@ -2103,7 +2108,7 @@ func TestStore_IntentionTopology(t *testing.T) {
target: structs.NewServiceName("web", nil),
downstreams: false,
expect: expect{
idx: 9,
idx: 10,
services: structs.ServiceList{},
},
},
Expand All @@ -2125,7 +2130,7 @@ func TestStore_IntentionTopology(t *testing.T) {
target: structs.NewServiceName("web", nil),
downstreams: false,
expect: expect{
idx: 9,
idx: 10,
services: structs.ServiceList{
{
Name: "api",
Expand Down
22 changes: 19 additions & 3 deletions agent/proxycfg/manager_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package proxycfg

import (
"context"
"path"
"testing"
"time"
Expand Down Expand Up @@ -106,6 +107,11 @@ func TestManager_BasicLifecycle(t *testing.T) {
},
)
}

upstreams := structs.TestUpstreams(t)
for i := range upstreams {
upstreams[i].DestinationNamespace = structs.IntentionDefaultNamespace
}
webProxy := &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-sidecar-proxy",
Expand All @@ -120,7 +126,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
Config: map[string]interface{}{
"foo": "bar",
},
Upstreams: structs.TestUpstreams(t),
Upstreams: upstreams,
},
}

Expand Down Expand Up @@ -213,7 +219,8 @@ func TestManager_BasicLifecycle(t *testing.T) {
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
"db": dbDefaultChain(),
},
WatchedUpstreams: nil, // Clone() clears this out
WatchedDiscoveryChains: map[string]context.CancelFunc{},
WatchedUpstreams: nil, // Clone() clears this out
WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {
"db.default.dc1": TestUpstreamNodes(t),
Expand All @@ -223,6 +230,10 @@ func TestManager_BasicLifecycle(t *testing.T) {
WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {},
},
UpstreamConfig: map[string]*structs.Upstream{
upstreams[0].Identifier(): &upstreams[0],
upstreams[1].Identifier(): &upstreams[1],
},
},
PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{},
WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{},
Expand Down Expand Up @@ -262,7 +273,8 @@ func TestManager_BasicLifecycle(t *testing.T) {
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
"db": dbSplitChain(),
},
WatchedUpstreams: nil, // Clone() clears this out
WatchedDiscoveryChains: map[string]context.CancelFunc{},
WatchedUpstreams: nil, // Clone() clears this out
WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {
"v1.db.default.dc1": TestUpstreamNodes(t),
Expand All @@ -273,6 +285,10 @@ func TestManager_BasicLifecycle(t *testing.T) {
WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {},
},
UpstreamConfig: map[string]*structs.Upstream{
upstreams[0].Identifier(): &upstreams[0],
upstreams[1].Identifier(): &upstreams[1],
},
},
PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{},
WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{},
Expand Down
21 changes: 13 additions & 8 deletions agent/proxycfg/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ type ConfigSnapshotUpstreams struct {
// targeted by this upstream. We then instantiate watches for those targets.
DiscoveryChain map[string]*structs.CompiledDiscoveryChain

// WatchedDiscoveryChains is a map of upstream.Identifier() -> CancelFunc's
// in order to cancel any watches when the proxy's configuration is
// changed. Ingress gateways and transparent proxies need this because
// discovery chain watches are added and removed through the lifecycle
// of a single proxycfg state instance.
WatchedDiscoveryChains map[string]context.CancelFunc

// WatchedUpstreams is a map of upstream.Identifier() -> (map of TargetID ->
// CancelFunc's) in order to cancel any watches when the configuration is
// changed.
Expand All @@ -36,6 +43,9 @@ type ConfigSnapshotUpstreams struct {
// TargetID -> CheckServiceNodes) and is used to determine the backing
// endpoints of a mesh gateway.
WatchedGatewayEndpoints map[string]map[string]structs.CheckServiceNodes

// UpstreamConfig is a map to an upstream's configuration.
UpstreamConfig map[string]*structs.Upstream
}

type configSnapshotConnectProxy struct {
Expand All @@ -58,12 +68,14 @@ func (c *configSnapshotConnectProxy) IsEmpty() bool {
return c.Leaf == nil &&
!c.IntentionsSet &&
len(c.DiscoveryChain) == 0 &&
len(c.WatchedDiscoveryChains) == 0 &&
len(c.WatchedUpstreams) == 0 &&
len(c.WatchedUpstreamEndpoints) == 0 &&
len(c.WatchedGateways) == 0 &&
len(c.WatchedGatewayEndpoints) == 0 &&
len(c.WatchedServiceChecks) == 0 &&
len(c.PreparedQueryEndpoints) == 0
len(c.PreparedQueryEndpoints) == 0 &&
len(c.UpstreamConfig) == 0
}

type configSnapshotTerminatingGateway struct {
Expand Down Expand Up @@ -287,12 +299,6 @@ type configSnapshotIngressGateway struct {
// to. This is constructed from the ingress-gateway config entry, and uses
// the GatewayServices RPC to retrieve them.
Upstreams map[IngressListenerKey]structs.Upstreams

// WatchedDiscoveryChains is a map of upstream.Identifier() -> CancelFunc's
// in order to cancel any watches when the ingress gateway configuration is
// changed. Ingress gateways need this because discovery chain watches are
// added and removed through the lifecycle of single proxycfg.state instance.
WatchedDiscoveryChains map[string]context.CancelFunc
}

func (c *configSnapshotIngressGateway) IsEmpty() bool {
Expand All @@ -301,7 +307,6 @@ func (c *configSnapshotIngressGateway) IsEmpty() bool {
}
return len(c.Upstreams) == 0 &&
len(c.DiscoveryChain) == 0 &&
len(c.WatchedDiscoveryChains) == 0 &&
len(c.WatchedUpstreams) == 0 &&
len(c.WatchedUpstreamEndpoints) == 0
}
Expand Down
Loading

0 comments on commit fc02bb7

Please sign in to comment.