Skip to content

Commit

Permalink
xds: ensure that dependent xDS resources are reconfigured during prim…
Browse files Browse the repository at this point in the history
…ary type warming (#10381)

Updates to a cluster will clear the associated endpoints, and updates to
a listener will clear the associated routes. Update the incremental xDS
logic to account for this implicit cleanup so that we can finish warming
the clusters and listeners.

Fixes #10379
  • Loading branch information
rboyer authored Jun 14, 2021
1 parent ffb13f3 commit 848ad85
Show file tree
Hide file tree
Showing 5 changed files with 560 additions and 33 deletions.
3 changes: 3 additions & 0 deletions .changelog/10381.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
xds: (beta-only) ensure that dependent xDS resources are reconfigured during primary type warming
```
149 changes: 118 additions & 31 deletions agent/xds/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,18 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
EndpointType: newDeltaType(generator, stream, EndpointType, nil),
}

// Endpoints are stored within a Cluster (and Routes
// are stored within a Listener) so whenever the
// enclosing resource is updated the inner resource
// list is cleared implicitly.
//
// When this happens we should update our local
// representation of envoy state to force an update.
//
// see: https://github.com/envoyproxy/envoy/issues/13009
handlers[ListenerType].childType = handlers[RouteType]
handlers[ClusterType].childType = handlers[EndpointType]

var authTimer <-chan time.Time
extendAuthTimer := func() {
authTimer = time.After(s.AuthCheckFrequency)
Expand Down Expand Up @@ -177,6 +189,10 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
// index and hash the xDS structures
newResourceMap := indexResources(generator.Logger, newRes)

if err := populateChildIndexMap(newResourceMap); err != nil {
return status.Errorf(codes.Unavailable, "failed to index xDS resource versions: %v", err)
}

newVersions, err := computeResourceVersions(newResourceMap)
if err != nil {
return status.Errorf(codes.Unavailable, "failed to compute xDS resource versions: %v", err)
Expand Down Expand Up @@ -352,6 +368,11 @@ type xDSDeltaType struct {
typeURL string
allowEmptyFn func(kind structs.ServiceKind) bool

// childType is a type that in Envoy is actually stored within this type.
// Upserts of THIS type should potentially trigger dependent named
// resources within the child to be re-configured.
childType *xDSDeltaType

// registered indicates if this type has been requested at least once by
// the proxy
registered bool
Expand All @@ -373,8 +394,13 @@ type xDSDeltaType struct {
// map. Once we get an ACK from envoy we'll update the resourceVersions map
// and strike the entry from this map.
//
// nonce -> name -> version
pendingUpdates map[string]map[string]string
// nonce -> name -> {version}
pendingUpdates map[string]map[string]PendingUpdate
}

type PendingUpdate struct {
Version string
ChildResources []string // optional
}

func newDeltaType(
Expand All @@ -389,7 +415,7 @@ func newDeltaType(
typeURL: typeUrl,
allowEmptyFn: allowEmptyFn,
resourceVersions: make(map[string]string),
pendingUpdates: make(map[string]map[string]string),
pendingUpdates: make(map[string]map[string]PendingUpdate),
}
}

Expand Down Expand Up @@ -511,11 +537,28 @@ func (t *xDSDeltaType) ack(nonce string) {
return
}

for name, version := range pending {
if version == "" {
for name, obj := range pending {
if obj.Version == "" {
delete(t.resourceVersions, name)
} else {
t.resourceVersions[name] = version
t.resourceVersions[name] = obj.Version
}
if t.childType != nil && obj.Version != "" {
// This branch only matters on UPDATE, since we already have
// mechanisms to clean up orphaned resources.
for _, childName := range obj.ChildResources {
if _, exist := t.childType.resourceVersions[childName]; exist {
t.generator.Logger.Trace(
"triggering implicit update of resource",
"typeUrl", t.typeURL,
"resource", name,
"childTypeUrl", t.childType.typeURL,
"childResource", childName,
)
// Basically manifest this as a re-subscribe
t.childType.resourceVersions[childName] = ""
}
}
}
}
t.sentToEnvoyOnce = true
Expand All @@ -529,7 +572,7 @@ func (t *xDSDeltaType) nack(nonce string) {
func (t *xDSDeltaType) SendIfNew(
kind structs.ServiceKind,
currentVersions map[string]string, // type => name => version (as consul knows right now)
resourceMap IndexedResources,
resourceMap *IndexedResources,
nonce *uint64,
upsert, remove bool,
) (error, bool) {
Expand Down Expand Up @@ -571,20 +614,31 @@ func (t *xDSDeltaType) SendIfNew(
}
logger.Trace("sent response", "nonce", resp.Nonce)

if t.childType != nil {
// Capture the relevant child resource names on this pending update so
// we can properly clean up the linked children when this change is
// ACKed.
for name, obj := range updates {
if children, ok := resourceMap.ChildIndex[t.typeURL][name]; ok {
obj.ChildResources = children
updates[name] = obj
}
}
}
t.pendingUpdates[resp.Nonce] = updates

return nil, true
}

func (t *xDSDeltaType) createDeltaResponse(
currentVersions map[string]string, // name => version (as consul knows right now)
resourceMap IndexedResources,
resourceMap *IndexedResources,
upsert, remove bool,
) (*envoy_discovery_v3.DeltaDiscoveryResponse, map[string]string, error) {
) (*envoy_discovery_v3.DeltaDiscoveryResponse, map[string]PendingUpdate, error) {
// compute difference
var (
hasRelevantUpdates = false
updates = make(map[string]string)
updates = make(map[string]PendingUpdate)
)
// First find things that need updating or deleting
for name, envoyVers := range t.resourceVersions {
Expand All @@ -593,20 +647,20 @@ func (t *xDSDeltaType) createDeltaResponse(
if remove {
hasRelevantUpdates = true
}
updates[name] = ""
updates[name] = PendingUpdate{Version: ""}
} else if currVers != envoyVers {
if upsert {
hasRelevantUpdates = true
}
updates[name] = currVers
updates[name] = PendingUpdate{Version: currVers}
}
}

// Now find new things
if t.wildcard {
for name, currVers := range currentVersions {
if _, ok := t.resourceVersions[name]; !ok {
updates[name] = currVers
updates[name] = PendingUpdate{Version: currVers}
if upsert {
hasRelevantUpdates = true
}
Expand All @@ -623,15 +677,15 @@ func (t *xDSDeltaType) createDeltaResponse(
// TODO(rb): consider putting something in SystemVersionInfo?
TypeUrl: t.typeURL,
}
realUpdates := make(map[string]string)
for name, vers := range updates {
if vers == "" {
realUpdates := make(map[string]PendingUpdate)
for name, obj := range updates {
if obj.Version == "" {
if remove {
resp.RemovedResources = append(resp.RemovedResources, name)
realUpdates[name] = ""
realUpdates[name] = PendingUpdate{Version: ""}
}
} else if upsert {
resources, ok := resourceMap[t.typeURL]
resources, ok := resourceMap.Index[t.typeURL]
if !ok {
return nil, nil, fmt.Errorf("unknown type url: %s", t.typeURL)
}
Expand All @@ -647,18 +701,18 @@ func (t *xDSDeltaType) createDeltaResponse(
resp.Resources = append(resp.Resources, &envoy_discovery_v3.Resource{
Name: name,
Resource: any,
Version: vers,
Version: obj.Version,
})
realUpdates[name] = vers
realUpdates[name] = obj
}
}

return resp, realUpdates, nil
}

func computeResourceVersions(resourceMap IndexedResources) (map[string]map[string]string, error) {
func computeResourceVersions(resourceMap *IndexedResources) (map[string]map[string]string, error) {
out := make(map[string]map[string]string)
for typeUrl, resources := range resourceMap {
for typeUrl, resources := range resourceMap.Index {
m, err := hashResourceMap(resources)
if err != nil {
return nil, fmt.Errorf("failed to hash resources for %q: %v", typeUrl, err)
Expand All @@ -668,18 +722,51 @@ func computeResourceVersions(resourceMap IndexedResources) (map[string]map[strin
return out, nil
}

type IndexedResources map[string]map[string]proto.Message
type IndexedResources struct {
// Index is a map of typeURL => resourceName => resource
Index map[string]map[string]proto.Message

// ChildIndex is a map of typeURL => parentResourceName => list of
// childResourceNames. This only applies if the child and parent do not
// share a name.
ChildIndex map[string]map[string][]string
}

func emptyIndexedResources() *IndexedResources {
return &IndexedResources{
Index: map[string]map[string]proto.Message{
ListenerType: make(map[string]proto.Message),
RouteType: make(map[string]proto.Message),
ClusterType: make(map[string]proto.Message),
EndpointType: make(map[string]proto.Message),
},
ChildIndex: map[string]map[string][]string{
ListenerType: make(map[string][]string),
ClusterType: make(map[string][]string),
},
}
}

func populateChildIndexMap(resourceMap *IndexedResources) error {
// LDS and RDS have a more complicated relationship.
for name, res := range resourceMap.Index[ListenerType] {
listener := res.(*envoy_listener_v3.Listener)
rdsRouteNames, err := extractRdsResourceNames(listener)
if err != nil {
return err
}
resourceMap.ChildIndex[ListenerType][name] = rdsRouteNames
}

func emptyIndexedResources() IndexedResources {
return map[string]map[string]proto.Message{
ListenerType: make(map[string]proto.Message),
RouteType: make(map[string]proto.Message),
ClusterType: make(map[string]proto.Message),
EndpointType: make(map[string]proto.Message),
// CDS and EDS share exact names.
for name := range resourceMap.Index[ClusterType] {
resourceMap.ChildIndex[ClusterType][name] = []string{name}
}

return nil
}

func indexResources(logger hclog.Logger, resources map[string][]proto.Message) IndexedResources {
func indexResources(logger hclog.Logger, resources map[string][]proto.Message) *IndexedResources {
data := emptyIndexedResources()

for typeURL, typeRes := range resources {
Expand All @@ -688,7 +775,7 @@ func indexResources(logger hclog.Logger, resources map[string][]proto.Message) I
if name == "" {
logger.Warn("skipping unexpected xDS type found in delta snapshot", "typeURL", typeURL)
} else {
data[typeURL][name] = res
data.Index[typeURL][name] = res
}
}
}
Expand Down
Loading

0 comments on commit 848ad85

Please sign in to comment.