From a28876fb6e5b39363fe1afa98324217d88e8f2e1 Mon Sep 17 00:00:00 2001 From: Fu-Sheng Date: Mon, 28 Feb 2022 12:09:10 +0800 Subject: [PATCH] linear: keep streamState to respond resources Signed-off-by: Fu-Sheng --- pkg/cache/v3/linear.go | 46 +++++++++++++++++++++++++++++-------- pkg/cache/v3/linear_test.go | 7 +++++- 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index 7d711fa7a9..88534d3a01 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -27,7 +27,7 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) -type watches = map[chan Response]struct{} +type watches = map[chan Response]stream.StreamState // LinearCache supports collections of opaque resources. This cache has a // single collection indexed by resource names and manages resource versions @@ -114,12 +114,17 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache { } func (cache *LinearCache) respond(value chan Response, staleResources []string) { - var resources []types.ResourceWithTTL + var ( + resources []types.ResourceWithTTL + respondResourceNames []string + ) + // TODO: optimize the resources slice creations across different clients if len(staleResources) == 0 { resources = make([]types.ResourceWithTTL, 0, len(cache.resources)) - for _, resource := range cache.resources { + for name, resource := range cache.resources { resources = append(resources, types.ResourceWithTTL{Resource: resource}) + respondResourceNames = append(respondResourceNames, name) } } else { resources = make([]types.ResourceWithTTL, 0, len(staleResources)) @@ -127,11 +132,12 @@ func (cache *LinearCache) respond(value chan Response, staleResources []string) resource := cache.resources[name] if resource != nil { resources = append(resources, types.ResourceWithTTL{Resource: resource}) + respondResourceNames = append(respondResourceNames, name) } } } value <- &RawResponse{ - Request: &Request{TypeUrl: cache.typeURL}, + Request: &Request{TypeUrl: cache.typeURL, ResourceNames: respondResourceNames}, Resources: resources, Version: cache.getVersion(), } @@ -141,11 +147,25 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) { // de-duplicate watches that need to be responded notifyList := make(map[chan Response][]string) for name := range modified { - for watch := range cache.watches[name] { - notifyList[watch] = append(notifyList[watch], name) + for watch, streamState := range cache.watches[name] { + resourceNames := streamState.GetKnownResourceNames(cache.typeURL) + modifiedNameInResourceName := false + for resourceName := range resourceNames { + if !modifiedNameInResourceName && resourceName == name { + modifiedNameInResourceName = true + } + // To avoid the stale in notifyList becomes empty slice. + // Don't skip resource name that has been deleted here. + // It would be filtered out in respond because the corresponding resource has been deleted. + notifyList[watch] = append(notifyList[watch], resourceName) + } + if !modifiedNameInResourceName { + notifyList[watch] = append(notifyList[watch], name) + } } delete(cache.watches, name) } + for value, stale := range notifyList { cache.respond(value, stale) } @@ -293,10 +313,16 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea stale = lastVersion != cache.version } else { for _, name := range request.ResourceNames { + _, has := streamState.GetKnownResourceNames(request.TypeUrl)[name] + version, exists := cache.versionVector[name] + // When a resource is removed, its version defaults 0 and it is not considered stale. - if lastVersion < cache.versionVector[name] { + if lastVersion < version || (!has && exists) { stale = true - staleResources = append(staleResources, name) + + // Here we collect all requested names. + // It would be filtered out in respond if the resource name doesn't appear in cache. + staleResources = request.ResourceNames } } } @@ -306,7 +332,7 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea } // Create open watches since versions are up to date. if len(request.ResourceNames) == 0 { - cache.watchAll[value] = struct{}{} + cache.watchAll[value] = streamState return func() { cache.mu.Lock() defer cache.mu.Unlock() @@ -319,7 +345,7 @@ func (cache *LinearCache) CreateWatch(request *Request, streamState stream.Strea set = make(watches) cache.watches[name] = set } - set[value] = struct{}{} + set[value] = streamState } return func() { cache.mu.Lock() diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 5cd86ef44e..072d38fad8 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -230,6 +230,7 @@ func TestLinearSetResources(t *testing.T) { // Create new resources w1 := make(chan Response, 1) + streamState.SetKnownResourceNamesAsList(testType, []string{"a"}) c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1) mustBlock(t, w1) w2 := make(chan Response, 1) @@ -297,6 +298,7 @@ func TestLinearVersionPrefix(t *testing.T) { c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) verifyResponse(t, w, "instance1-1", 1) + streamState.SetKnownResourceNamesAsList(testType, []string{"a"}) c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) @@ -306,6 +308,7 @@ func TestLinearDeletion(t *testing.T) { streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) + streamState.SetKnownResourceNamesAsList(testType, []string{"a"}) c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) @@ -325,6 +328,7 @@ func TestLinearWatchTwo(t *testing.T) { streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) + streamState.SetKnownResourceNamesAsList(testType, []string{"a", "b"}) c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) mustBlock(t, w) w1 := make(chan Response, 1) @@ -332,7 +336,7 @@ func TestLinearWatchTwo(t *testing.T) { mustBlock(t, w1) require.NoError(t, c.UpdateResource("a", testResource("aa"))) // should only get the modified resource - verifyResponse(t, w, "1", 1) + verifyResponse(t, w, "1", 2) verifyResponse(t, w1, "1", 2) } @@ -350,6 +354,7 @@ func TestLinearCancel(t *testing.T) { checkWatchCount(t, c, "a", 0) // cancel watch for "a" + streamState.SetKnownResourceNamesAsList(testType, []string{"a"}) cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1)