Skip to content

Commit

Permalink
proxycfg: explicit upstreams in other dcs shouldn't be deleted
Browse files Browse the repository at this point in the history
when they're not in the seen services list of explicit and implicit
upstreams within a datacenter.

Co-authored-by: Freddy Vallenilla <[email protected]>
  • Loading branch information
ndhanushkodi and freddygv committed Jun 15, 2021
1 parent dbca996 commit 9a525eb
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 6 deletions.
3 changes: 3 additions & 0 deletions .changelog/10391.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
proxycfg: Explicit upstreams in other datacenters shouldn't be deleted when transparent proxy is enabled, and when they're not in the seen services list of explicit and implicit upstreams within a datacenter.
```
33 changes: 28 additions & 5 deletions agent/proxycfg/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (s *state) initWatchesConnectProxy(snap *ConfigSnapshot) error {
OverrideConnectTimeout: cfg.ConnectTimeout(),
}, "discovery-chain:"+u.Identifier(), s.ch)
if err != nil {
return err
return fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err)
}

default:
Expand Down Expand Up @@ -815,6 +815,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
id: svc.String(),
name: svc.Name,
namespace: svc.NamespaceOrDefault(),
datacenter: s.source.Datacenter,
cfg: cfg,
meshGateway: meshGateway,
}
Expand All @@ -826,26 +827,46 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh

// Clean up data from services that were not in the update
for sn := range snap.ConnectProxy.WatchedUpstreams {
upstream := snap.ConnectProxy.UpstreamConfig[sn]
if upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
continue
}
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.WatchedUpstreams, sn)
}
}
for sn := range snap.ConnectProxy.WatchedUpstreamEndpoints {
upstream := snap.ConnectProxy.UpstreamConfig[sn]
if upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
continue
}
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.WatchedUpstreamEndpoints, sn)
}
}
for sn := range snap.ConnectProxy.WatchedGateways {
upstream := snap.ConnectProxy.UpstreamConfig[sn]
if upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
continue
}
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.WatchedGateways, sn)
}
}
for sn := range snap.ConnectProxy.WatchedGatewayEndpoints {
upstream := snap.ConnectProxy.UpstreamConfig[sn]
if upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
continue
}
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.WatchedGatewayEndpoints, sn)
}
}
for sn, cancelFn := range snap.ConnectProxy.WatchedDiscoveryChains {
upstream := snap.ConnectProxy.UpstreamConfig[sn]
if upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
continue
}
if _, ok := seenServices[sn]; !ok {
cancelFn()
delete(snap.ConnectProxy.WatchedDiscoveryChains, sn)
Expand Down Expand Up @@ -1721,9 +1742,10 @@ func (s *state) handleUpdateIngressGateway(u cache.UpdateEvent, snap *ConfigSnap
u := makeUpstream(service)

watchOpts := discoveryChainWatchOpts{
id: u.Identifier(),
name: u.DestinationName,
namespace: u.DestinationNamespace,
id: u.Identifier(),
name: u.DestinationName,
namespace: u.DestinationNamespace,
datacenter: s.source.Datacenter,
}
err := s.watchDiscoveryChain(snap, watchOpts)
if err != nil {
Expand Down Expand Up @@ -1781,6 +1803,7 @@ type discoveryChainWatchOpts struct {
id string
name string
namespace string
datacenter string
cfg reducedUpstreamConfig
meshGateway structs.MeshGatewayConfig
}
Expand All @@ -1795,7 +1818,7 @@ func (s *state) watchDiscoveryChain(snap *ConfigSnapshot, opts discoveryChainWat
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Name: opts.name,
EvaluateInDatacenter: s.source.Datacenter,
EvaluateInDatacenter: opts.datacenter,
EvaluateInNamespace: opts.namespace,
OverrideProtocol: opts.cfg.Protocol,
OverrideConnectTimeout: opts.cfg.ConnectTimeout(),
Expand Down
180 changes: 179 additions & 1 deletion agent/proxycfg/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package proxycfg
import (
"context"
"fmt"
"github.com/hashicorp/consul/agent/connect"
"sync"
"testing"
"time"

"github.com/hashicorp/consul/agent/connect"

"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/agent/cache"
Expand Down Expand Up @@ -362,6 +363,10 @@ func ingressConfigWatchEvent(tlsEnabled bool) cache.UpdateEvent {
}
}

func upstreamIDForDC2(name string) string {
return fmt.Sprintf("%s?dc=dc2", name)
}

// This test is meant to exercise the various parts of the cache watching done by the state as
// well as its management of the ConfigSnapshot
//
Expand Down Expand Up @@ -1942,6 +1947,179 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
},
},
// Receiving an empty upstreams from Intentions list shouldn't delete explicit upstream watches
"transparent-proxy-handle-update-explicit-cross-dc": {
ns: structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Address: "10.0.1.1",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
Mode: structs.ProxyModeTransparent,
Upstreams: structs.Upstreams{
{
CentrallyConfigured: true,
DestinationName: structs.WildcardSpecifier,
DestinationNamespace: structs.WildcardSpecifier,
Config: map[string]interface{}{
"connect_timeout_ms": 6000,
},
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
},
{
DestinationName: db.Name,
DestinationNamespace: db.NamespaceOrDefault(),
Datacenter: "dc2",
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal},
},
},
},
},
sourceDC: "dc1",
stages: []verificationStage{
// Empty on initialization
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID,
"api", "", "dc1", false),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
"discovery-chain:" + upstreamIDForDC2(db.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "db",
EvaluateInDatacenter: "dc2",
EvaluateInNamespace: "default",
Datacenter: "dc1",
OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal},
}),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid")
require.True(t, snap.MeshGateway.IsEmpty())
require.True(t, snap.IngressGateway.IsEmpty())
require.True(t, snap.TerminatingGateway.IsEmpty())

// Centrally configured upstream defaults should be stored so that upstreams from intentions can inherit them
require.Len(t, snap.ConnectProxy.UpstreamConfig, 2)

wc := structs.NewServiceName(structs.WildcardSpecifier, structs.WildcardEnterpriseMeta())
require.Contains(t, snap.ConnectProxy.UpstreamConfig, wc.String())
require.Contains(t, snap.ConnectProxy.UpstreamConfig, upstreamIDForDC2(db.String()))
},
},
// Valid snapshot after roots, leaf, and intentions
{
events: []cache.UpdateEvent{
rootWatchEvent(),
{
CorrelationID: leafWatchID,
Result: issuedCert,
Err: nil,
},
{
CorrelationID: intentionsWatchID,
Result: TestIntentions(),
Err: nil,
},
{
CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{
Entry: &structs.MeshConfigEntry{
TransparentProxy: structs.TransparentProxyMeshConfig{},
},
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid")
require.Equal(t, indexedRoots, snap.Roots)
require.Equal(t, issuedCert, snap.Leaf())
require.Equal(t, TestIntentions().Matches[0], snap.ConnectProxy.Intentions)
require.True(t, snap.MeshGateway.IsEmpty())
require.True(t, snap.IngressGateway.IsEmpty())
require.True(t, snap.TerminatingGateway.IsEmpty())
require.True(t, snap.ConnectProxy.MeshConfigSet)
require.NotNil(t, snap.ConnectProxy.MeshConfig)
},
},
// Discovery chain updates should be stored
{
requiredWatches: map[string]verifyWatchRequest{
"discovery-chain:" + upstreamIDForDC2(db.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "db",
EvaluateInDatacenter: "dc2",
EvaluateInNamespace: "default",
Datacenter: "dc1",
OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal},
}),
},
events: []cache.UpdateEvent{
{
CorrelationID: "discovery-chain:" + upstreamIDForDC2(db.String()),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "dc2", "trustdomain.consul", "dc1",
func(req *discoverychain.CompileRequest) {
req.OverrideMeshGateway.Mode = structs.MeshGatewayModeLocal
}),
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.Len(t, snap.ConnectProxy.WatchedGateways, 1)
require.Len(t, snap.ConnectProxy.WatchedGateways[upstreamIDForDC2(db.String())], 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams[upstreamIDForDC2(db.String())], 1)
},
},
// Empty list of upstreams should only clean up implicit upstreams. The explicit upstream db should not
// be deleted from the snapshot.
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID,
"api", "", "dc1", false),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
"discovery-chain:" + upstreamIDForDC2(db.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "db",
EvaluateInDatacenter: "dc2",
EvaluateInNamespace: "default",
Datacenter: "dc1",
OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal},
}),
},
events: []cache.UpdateEvent{
{
CorrelationID: intentionUpstreamsID,
Result: &structs.IndexedServiceList{
Services: structs.ServiceList{},
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid(), "should still be valid")

// Explicit upstream discovery chain watches don't get stored in these maps because they don't
// get canceled unless the proxy registration is modified.
require.Empty(t, snap.ConnectProxy.WatchedDiscoveryChains)

// Explicit upstreams should not be deleted when the empty update event happens since that is
// for intention upstreams.
require.Len(t, snap.ConnectProxy.DiscoveryChain, 1)
require.Contains(t, snap.ConnectProxy.DiscoveryChain, upstreamIDForDC2(db.String()))
require.Len(t, snap.ConnectProxy.WatchedGateways, 1)
require.Len(t, snap.ConnectProxy.WatchedGateways[upstreamIDForDC2(db.String())], 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams[upstreamIDForDC2(db.String())], 1)
},
},
},
},
"connect-proxy": newConnectProxyCase(structs.MeshGatewayModeDefault),
"connect-proxy-mesh-gateway-local": newConnectProxyCase(structs.MeshGatewayModeLocal),
}
Expand Down

0 comments on commit 9a525eb

Please sign in to comment.