Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proxycfg: Ensure that endpoints for explicit upstreams in other datacenters are watched in transparent mode #10391

Merged
merged 2 commits into from
Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/10391.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
proxycfg: Ensure that endpoints for explicit upstreams in other datacenters are watched in transparent mode.
```
33 changes: 28 additions & 5 deletions agent/proxycfg/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (s *state) initWatchesConnectProxy(snap *ConfigSnapshot) error {
OverrideConnectTimeout: cfg.ConnectTimeout(),
}, "discovery-chain:"+u.Identifier(), s.ch)
if err != nil {
return err
return fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err)
}

default:
Expand Down Expand Up @@ -815,6 +815,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
id: svc.String(),
name: svc.Name,
namespace: svc.NamespaceOrDefault(),
datacenter: s.source.Datacenter,
cfg: cfg,
meshGateway: meshGateway,
}
Expand All @@ -826,26 +827,46 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh

// Clean up data from services that were not in the update
for sn := range snap.ConnectProxy.WatchedUpstreams {
upstream := snap.ConnectProxy.UpstreamConfig[sn]
if upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
continue
}
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.WatchedUpstreams, sn)
}
}
for sn := range snap.ConnectProxy.WatchedUpstreamEndpoints {
upstream := snap.ConnectProxy.UpstreamConfig[sn]
if upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
continue
}
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.WatchedUpstreamEndpoints, sn)
}
}
for sn := range snap.ConnectProxy.WatchedGateways {
upstream := snap.ConnectProxy.UpstreamConfig[sn]
if upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
continue
}
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.WatchedGateways, sn)
}
}
for sn := range snap.ConnectProxy.WatchedGatewayEndpoints {
upstream := snap.ConnectProxy.UpstreamConfig[sn]
if upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
continue
}
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.WatchedGatewayEndpoints, sn)
}
}
for sn, cancelFn := range snap.ConnectProxy.WatchedDiscoveryChains {
upstream := snap.ConnectProxy.UpstreamConfig[sn]
if upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
continue
}
if _, ok := seenServices[sn]; !ok {
cancelFn()
delete(snap.ConnectProxy.WatchedDiscoveryChains, sn)
Expand Down Expand Up @@ -1721,9 +1742,10 @@ func (s *state) handleUpdateIngressGateway(u cache.UpdateEvent, snap *ConfigSnap
u := makeUpstream(service)

watchOpts := discoveryChainWatchOpts{
id: u.Identifier(),
name: u.DestinationName,
namespace: u.DestinationNamespace,
id: u.Identifier(),
name: u.DestinationName,
namespace: u.DestinationNamespace,
datacenter: s.source.Datacenter,
}
err := s.watchDiscoveryChain(snap, watchOpts)
if err != nil {
Expand Down Expand Up @@ -1781,6 +1803,7 @@ type discoveryChainWatchOpts struct {
id string
name string
namespace string
datacenter string
cfg reducedUpstreamConfig
meshGateway structs.MeshGatewayConfig
}
Expand All @@ -1795,7 +1818,7 @@ func (s *state) watchDiscoveryChain(snap *ConfigSnapshot, opts discoveryChainWat
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Name: opts.name,
EvaluateInDatacenter: s.source.Datacenter,
EvaluateInDatacenter: opts.datacenter,
EvaluateInNamespace: opts.namespace,
OverrideProtocol: opts.cfg.Protocol,
OverrideConnectTimeout: opts.cfg.ConnectTimeout(),
Expand Down
180 changes: 179 additions & 1 deletion agent/proxycfg/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package proxycfg
import (
"context"
"fmt"
"github.com/hashicorp/consul/agent/connect"
"sync"
"testing"
"time"

"github.com/hashicorp/consul/agent/connect"

"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/agent/cache"
Expand Down Expand Up @@ -362,6 +363,10 @@ func ingressConfigWatchEvent(tlsEnabled bool) cache.UpdateEvent {
}
}

func upstreamIDForDC2(name string) string {
return fmt.Sprintf("%s?dc=dc2", name)
}

// This test is meant to exercise the various parts of the cache watching done by the state as
// well as its management of the ConfigSnapshot
//
Expand Down Expand Up @@ -1942,6 +1947,179 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
},
},
// Receiving an empty upstreams from Intentions list shouldn't delete explicit upstream watches
"transparent-proxy-handle-update-explicit-cross-dc": {
ns: structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Address: "10.0.1.1",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
Mode: structs.ProxyModeTransparent,
Upstreams: structs.Upstreams{
{
CentrallyConfigured: true,
DestinationName: structs.WildcardSpecifier,
DestinationNamespace: structs.WildcardSpecifier,
Config: map[string]interface{}{
"connect_timeout_ms": 6000,
},
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
},
{
DestinationName: db.Name,
DestinationNamespace: db.NamespaceOrDefault(),
Datacenter: "dc2",
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal},
},
},
},
},
sourceDC: "dc1",
stages: []verificationStage{
// Empty on initialization
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID,
"api", "", "dc1", false),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
"discovery-chain:" + upstreamIDForDC2(db.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "db",
EvaluateInDatacenter: "dc2",
EvaluateInNamespace: "default",
Datacenter: "dc1",
OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal},
}),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid")
require.True(t, snap.MeshGateway.IsEmpty())
require.True(t, snap.IngressGateway.IsEmpty())
require.True(t, snap.TerminatingGateway.IsEmpty())

// Centrally configured upstream defaults should be stored so that upstreams from intentions can inherit them
require.Len(t, snap.ConnectProxy.UpstreamConfig, 2)

wc := structs.NewServiceName(structs.WildcardSpecifier, structs.WildcardEnterpriseMeta())
require.Contains(t, snap.ConnectProxy.UpstreamConfig, wc.String())
require.Contains(t, snap.ConnectProxy.UpstreamConfig, upstreamIDForDC2(db.String()))
},
},
// Valid snapshot after roots, leaf, and intentions
{
events: []cache.UpdateEvent{
rootWatchEvent(),
{
CorrelationID: leafWatchID,
Result: issuedCert,
Err: nil,
},
{
CorrelationID: intentionsWatchID,
Result: TestIntentions(),
Err: nil,
},
{
CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{
Entry: &structs.MeshConfigEntry{
TransparentProxy: structs.TransparentProxyMeshConfig{},
},
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid")
require.Equal(t, indexedRoots, snap.Roots)
require.Equal(t, issuedCert, snap.Leaf())
require.Equal(t, TestIntentions().Matches[0], snap.ConnectProxy.Intentions)
require.True(t, snap.MeshGateway.IsEmpty())
require.True(t, snap.IngressGateway.IsEmpty())
require.True(t, snap.TerminatingGateway.IsEmpty())
require.True(t, snap.ConnectProxy.MeshConfigSet)
require.NotNil(t, snap.ConnectProxy.MeshConfig)
},
},
// Discovery chain updates should be stored
{
requiredWatches: map[string]verifyWatchRequest{
"discovery-chain:" + upstreamIDForDC2(db.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "db",
EvaluateInDatacenter: "dc2",
EvaluateInNamespace: "default",
Datacenter: "dc1",
OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal},
}),
},
events: []cache.UpdateEvent{
{
CorrelationID: "discovery-chain:" + upstreamIDForDC2(db.String()),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "dc2", "trustdomain.consul", "dc1",
func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = structs.MeshGatewayModeLocal
}),
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.Len(t, snap.ConnectProxy.WatchedGateways, 1)
require.Len(t, snap.ConnectProxy.WatchedGateways[upstreamIDForDC2(db.String())], 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams[upstreamIDForDC2(db.String())], 1)
},
},
// Empty list of upstreams should only clean up implicit upstreams. The explicit upstream db should not
// be deleted from the snapshot.
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID,
"api", "", "dc1", false),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
"discovery-chain:" + upstreamIDForDC2(db.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "db",
EvaluateInDatacenter: "dc2",
EvaluateInNamespace: "default",
Datacenter: "dc1",
OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal},
}),
},
events: []cache.UpdateEvent{
{
CorrelationID: intentionUpstreamsID,
Result: &structs.IndexedServiceList{
Services: structs.ServiceList{},
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid(), "should still be valid")

// Explicit upstream discovery chain watches don't get stored in these maps because they don't
// get canceled unless the proxy registration is modified.
require.Empty(t, snap.ConnectProxy.WatchedDiscoveryChains)

// Explicit upstreams should not be deleted when the empty update event happens since that is
// for intention upstreams.
require.Len(t, snap.ConnectProxy.DiscoveryChain, 1)
require.Contains(t, snap.ConnectProxy.DiscoveryChain, upstreamIDForDC2(db.String()))
require.Len(t, snap.ConnectProxy.WatchedGateways, 1)
require.Len(t, snap.ConnectProxy.WatchedGateways[upstreamIDForDC2(db.String())], 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams[upstreamIDForDC2(db.String())], 1)
},
},
},
},
"connect-proxy": newConnectProxyCase(structs.MeshGatewayModeDefault),
"connect-proxy-mesh-gateway-local": newConnectProxyCase(structs.MeshGatewayModeLocal),
}
Expand Down