diff --git a/xds/internal/client/client_callback.go b/xds/internal/client/client_callback.go index 11dd6ccabc11..4bbdaabd2f7f 100644 --- a/xds/internal/client/client_callback.go +++ b/xds/internal/client/client_callback.go @@ -74,11 +74,11 @@ func (c *Client) callCallback(wiu *watcherInfoWithUpdate) { // // A response can contain multiple resources. They will be parsed and put in a // map from resource name to the resource content. -func (c *Client) newLDSUpdate(d map[string]ldsUpdate) { +func (c *Client) newLDSUpdate(updates map[string]ldsUpdate) { c.mu.Lock() defer c.mu.Unlock() - for name, update := range d { + for name, update := range updates { if s, ok := c.ldsWatchers[name]; ok { for wi := range s { wi.newUpdate(update) @@ -88,11 +88,20 @@ func (c *Client) newLDSUpdate(d map[string]ldsUpdate) { c.ldsCache[name] = update } } - // TODO: handle removing resources, which means if a resource exists in the - // previous update, but not in the new update. This needs the balancers and - // resolvers to handle errors correctly. - - // TODO: remove item from cache and remove corresponding RDS cached data. + for name := range c.ldsCache { + if _, ok := updates[name]; !ok { + // If resource exists in cache, but not in the new update, delete it + // from cache, and also send an resource not found error to indicate + // resource removed. + delete(c.ldsCache, name) + for wi := range c.ldsWatchers[name] { + wi.resourceNotFound() + } + } + } + // When LDS resource is removed, we don't delete corresponding RDS cached + // data. The RDS watch will be canceled, and cache entry is removed when the + // last watch is canceled. } // newRDSUpdate is called by the underlying xdsv2Client when it receives an xDS @@ -100,11 +109,11 @@ func (c *Client) newLDSUpdate(d map[string]ldsUpdate) { // // A response can contain multiple resources. They will be parsed and put in a // map from resource name to the resource content. -func (c *Client) newRDSUpdate(d map[string]rdsUpdate) { +func (c *Client) newRDSUpdate(updates map[string]rdsUpdate) { c.mu.Lock() defer c.mu.Unlock() - for name, update := range d { + for name, update := range updates { if s, ok := c.rdsWatchers[name]; ok { for wi := range s { wi.newUpdate(update) @@ -121,11 +130,11 @@ func (c *Client) newRDSUpdate(d map[string]rdsUpdate) { // // A response can contain multiple resources. They will be parsed and put in a // map from resource name to the resource content. -func (c *Client) newCDSUpdate(d map[string]ClusterUpdate) { +func (c *Client) newCDSUpdate(updates map[string]ClusterUpdate) { c.mu.Lock() defer c.mu.Unlock() - for name, update := range d { + for name, update := range updates { if s, ok := c.cdsWatchers[name]; ok { for wi := range s { wi.newUpdate(update) @@ -135,11 +144,20 @@ func (c *Client) newCDSUpdate(d map[string]ClusterUpdate) { c.cdsCache[name] = update } } - // TODO: handle removing resources, which means if a resource exists in the - // previous update, but not in the new update. This needs the balancers and - // resolvers to handle errors correctly. - - // TODO: remove item from cache and remove corresponding EDS cached data. + for name := range c.cdsCache { + if _, ok := updates[name]; !ok { + // If resource exists in cache, but not in the new update, delete it + // from cache, and also send an resource not found error to indicate + // resource removed. + delete(c.cdsCache, name) + for wi := range c.cdsWatchers[name] { + wi.resourceNotFound() + } + } + } + // When CDS resource is removed, we don't delete corresponding EDS cached + // data. The EDS watch will be canceled, and cache entry is removed when the + // last watch is canceled. } // newEDSUpdate is called by the underlying xdsv2Client when it receives an xDS @@ -147,11 +165,11 @@ func (c *Client) newCDSUpdate(d map[string]ClusterUpdate) { // // A response can contain multiple resources. They will be parsed and put in a // map from resource name to the resource content. -func (c *Client) newEDSUpdate(d map[string]EndpointsUpdate) { +func (c *Client) newEDSUpdate(updates map[string]EndpointsUpdate) { c.mu.Lock() defer c.mu.Unlock() - for name, update := range d { + for name, update := range updates { if s, ok := c.edsWatchers[name]; ok { for wi := range s { wi.newUpdate(update) diff --git a/xds/internal/client/client_test.go b/xds/internal/client/client_test.go index 0775032711c6..367e9e02642a 100644 --- a/xds/internal/client/client_test.go +++ b/xds/internal/client/client_test.go @@ -126,8 +126,8 @@ func (s) TestNew(t *testing.T) { type testXDSV2Client struct { r updateHandler - addWatches map[string]chan string - removeWatches map[string]chan string + addWatches map[string]*testutils.Channel + removeWatches map[string]*testutils.Channel } func overrideNewXDSV2Client() (<-chan *testXDSV2Client, func()) { @@ -142,16 +142,16 @@ func overrideNewXDSV2Client() (<-chan *testXDSV2Client, func()) { } func newTestXDSV2Client(r updateHandler) *testXDSV2Client { - addWatches := make(map[string]chan string) - addWatches[ldsURL] = make(chan string, 10) - addWatches[rdsURL] = make(chan string, 10) - addWatches[cdsURL] = make(chan string, 10) - addWatches[edsURL] = make(chan string, 10) - removeWatches := make(map[string]chan string) - removeWatches[ldsURL] = make(chan string, 10) - removeWatches[rdsURL] = make(chan string, 10) - removeWatches[cdsURL] = make(chan string, 10) - removeWatches[edsURL] = make(chan string, 10) + addWatches := make(map[string]*testutils.Channel) + addWatches[ldsURL] = testutils.NewChannel() + addWatches[rdsURL] = testutils.NewChannel() + addWatches[cdsURL] = testutils.NewChannel() + addWatches[edsURL] = testutils.NewChannel() + removeWatches := make(map[string]*testutils.Channel) + removeWatches[ldsURL] = testutils.NewChannel() + removeWatches[rdsURL] = testutils.NewChannel() + removeWatches[cdsURL] = testutils.NewChannel() + removeWatches[edsURL] = testutils.NewChannel() return &testXDSV2Client{ r: r, addWatches: addWatches, @@ -160,11 +160,11 @@ func newTestXDSV2Client(r updateHandler) *testXDSV2Client { } func (c *testXDSV2Client) addWatch(resourceType, resourceName string) { - c.addWatches[resourceType] <- resourceName + c.addWatches[resourceType].Send(resourceName) } func (c *testXDSV2Client) removeWatch(resourceType, resourceName string) { - c.removeWatches[resourceType] <- resourceName + c.removeWatches[resourceType].Send(resourceName) } func (c *testXDSV2Client) close() {} @@ -184,11 +184,19 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) { v2Client := <-v2ClientCh clusterUpdateCh := testutils.NewChannel() + firstTime := true c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) { clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err}) // Calls another watch inline, to ensure there's deadlock. c.WatchCluster("another-random-name", func(ClusterUpdate, error) {}) + if _, err := v2Client.addWatches[cdsURL].Receive(); firstTime && err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } + firstTime = false }) + if _, err := v2Client.addWatches[cdsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } wantUpdate := ClusterUpdate{ServiceName: testEDSName} v2Client.r.newCDSUpdate(map[string]ClusterUpdate{ diff --git a/xds/internal/client/client_watchers.go b/xds/internal/client/client_watchers.go index 7a5bd2614a03..cfe31e861380 100644 --- a/xds/internal/client/client_watchers.go +++ b/xds/internal/client/client_watchers.go @@ -75,6 +75,17 @@ func (wi *watchInfo) newUpdate(update interface{}) { wi.c.scheduleCallback(wi, update, nil) } +func (wi *watchInfo) resourceNotFound() { + wi.mu.Lock() + defer wi.mu.Unlock() + if wi.state == watchInfoStateCanceled { + return + } + wi.state = watchInfoStateRespReceived + wi.expiryTimer.Stop() + wi.sendErrorLocked(NewErrorf(ErrorTypeResourceNotFound, "xds: %s target %s not found in received response", wi.typeURL, wi.target)) +} + func (wi *watchInfo) timeout() { wi.mu.Lock() defer wi.mu.Unlock() @@ -82,25 +93,25 @@ func (wi *watchInfo) timeout() { return } wi.state = watchInfoStateTimeout + wi.sendErrorLocked(fmt.Errorf("xds: %s target %s not found, watcher timeout", wi.typeURL, wi.target)) +} + +// Caller must hold wi.mu. +func (wi *watchInfo) sendErrorLocked(err error) { var ( u interface{} - t string ) switch wi.typeURL { case ldsURL: u = ldsUpdate{} - t = "LDS" case rdsURL: u = rdsUpdate{} - t = "RDS" case cdsURL: u = ClusterUpdate{} - t = "CDS" case edsURL: u = EndpointsUpdate{} - t = "EDS" } - wi.c.scheduleCallback(wi, u, fmt.Errorf("xds: %s target %s not found, watcher timeout", t, wi.target)) + wi.c.scheduleCallback(wi, u, err) } func (wi *watchInfo) cancel() { @@ -185,7 +196,19 @@ func (c *Client) watch(wi *watchInfo) (cancel func()) { // watching this resource. delete(watchers, resourceName) c.v2c.removeWatch(wi.typeURL, resourceName) - // TODO: remove item from cache. + // Remove the resource from cache. When a watch for this + // resource is added later, it will trigger a xDS request with + // resource names, and client will receive new xDS responses. + switch wi.typeURL { + case ldsURL: + delete(c.ldsCache, resourceName) + case rdsURL: + delete(c.rdsCache, resourceName) + case cdsURL: + delete(c.cdsCache, resourceName) + case edsURL: + delete(c.edsCache, resourceName) + } } } } diff --git a/xds/internal/client/client_watchers_cluster_test.go b/xds/internal/client/client_watchers_cluster_test.go index ab7a62091b9f..c14ea18e3c0a 100644 --- a/xds/internal/client/client_watchers_cluster_test.go +++ b/xds/internal/client/client_watchers_cluster_test.go @@ -32,7 +32,7 @@ type clusterUpdateErr struct { // TestClusterWatch covers the cases: // - an update is received after a watch() -// - an update for another resource name (which doesn't trigger callback) +// - an update for another resource name // - an upate is received after cancel() func (s) TestClusterWatch(t *testing.T) { v2ClientCh, cleanup := overrideNewXDSV2Client() @@ -53,6 +53,9 @@ func (s) TestClusterWatch(t *testing.T) { cancelWatch := c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) { clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err}) }) + if _, err := v2Client.addWatches[cdsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } wantUpdate := ClusterUpdate{ServiceName: testEDSName} // This is calling v2Client.r to send the update, but r is set to Client, so @@ -69,13 +72,14 @@ func (s) TestClusterWatch(t *testing.T) { t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err) } - // Another update for a different resource name. + // Another update, with an extra resource for a different resource name. v2Client.r.newCDSUpdate(map[string]ClusterUpdate{ + testCDSName: wantUpdate, "randomName": {}, }) - if u, err := clusterUpdateCh.TimedReceive(chanRecvTimeout); err != testutils.ErrRecvTimeout { - t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err) + if u, err := clusterUpdateCh.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) { + t.Errorf("unexpected clusterUpdate: %+v, %v, want channel recv timeout", u, err) } // Cancel watch, and send update again. @@ -114,6 +118,9 @@ func (s) TestClusterTwoWatchSameResourceName(t *testing.T) { cancelLastWatch = c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) { clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err}) }) + if _, err := v2Client.addWatches[cdsURL].Receive(); i == 0 && err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } } wantUpdate := ClusterUpdate{ServiceName: testEDSName} @@ -168,6 +175,9 @@ func (s) TestClusterThreeWatchDifferentResourceName(t *testing.T) { c.WatchCluster(testCDSName+"1", func(update ClusterUpdate, err error) { clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err}) }) + if _, err := v2Client.addWatches[cdsURL].Receive(); i == 0 && err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } } // Third watch for a different name. @@ -175,6 +185,9 @@ func (s) TestClusterThreeWatchDifferentResourceName(t *testing.T) { c.WatchCluster(testCDSName+"2", func(update ClusterUpdate, err error) { clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err}) }) + if _, err := v2Client.addWatches[cdsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } wantUpdate1 := ClusterUpdate{ServiceName: testEDSName + "1"} wantUpdate2 := ClusterUpdate{ServiceName: testEDSName + "2"} @@ -212,6 +225,9 @@ func (s) TestClusterWatchAfterCache(t *testing.T) { c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) { clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err}) }) + if _, err := v2Client.addWatches[cdsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } wantUpdate := ClusterUpdate{ServiceName: testEDSName} v2Client.r.newCDSUpdate(map[string]ClusterUpdate{ @@ -227,6 +243,9 @@ func (s) TestClusterWatchAfterCache(t *testing.T) { c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) { clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err}) }) + if n, err := v2Client.addWatches[cdsURL].Receive(); err == nil { + t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err) + } // New watch should receives the update. if u, err := clusterUpdateCh2.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) { @@ -258,12 +277,15 @@ func (s) TestClusterWatchExpiryTimer(t *testing.T) { } defer c.Close() - <-v2ClientCh + v2Client := <-v2ClientCh clusterUpdateCh := testutils.NewChannel() c.WatchCluster(testCDSName, func(u ClusterUpdate, err error) { clusterUpdateCh.Send(clusterUpdateErr{u: u, err: err}) }) + if _, err := v2Client.addWatches[cdsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } u, err := clusterUpdateCh.TimedReceive(defaultWatchExpiryTimeout * 2) if err != nil { @@ -303,6 +325,9 @@ func (s) TestClusterWatchExpiryTimerStop(t *testing.T) { c.WatchCluster(testCDSName, func(u ClusterUpdate, err error) { clusterUpdateCh.Send(clusterUpdateErr{u: u, err: err}) }) + if _, err := v2Client.addWatches[cdsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } wantUpdate := ClusterUpdate{ServiceName: testEDSName} v2Client.r.newCDSUpdate(map[string]ClusterUpdate{ @@ -319,3 +344,83 @@ func (s) TestClusterWatchExpiryTimerStop(t *testing.T) { t.Fatalf("got unexpected: %v, %v, want recv timeout", u.(clusterUpdateErr).u, u.(clusterUpdateErr).err) } } + +// TestClusterResourceRemoved covers the cases: +// - an update is received after a watch() +// - another update is received, with one resource removed +// - this should trigger callback with resource removed error +// - one more update without the removed resource +// - the callback (above) shouldn't receive any update +func (s) TestClusterResourceRemoved(t *testing.T) { + v2ClientCh, cleanup := overrideNewXDSV2Client() + defer cleanup() + + c, err := New(clientOpts(testXDSServer)) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + defer c.Close() + + v2Client := <-v2ClientCh + + clusterUpdateCh1 := testutils.NewChannel() + c.WatchCluster(testCDSName+"1", func(update ClusterUpdate, err error) { + clusterUpdateCh1.Send(clusterUpdateErr{u: update, err: err}) + }) + if _, err := v2Client.addWatches[cdsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } + // Another watch for a different name. + clusterUpdateCh2 := testutils.NewChannel() + c.WatchCluster(testCDSName+"2", func(update ClusterUpdate, err error) { + clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err}) + }) + if _, err := v2Client.addWatches[cdsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } + + wantUpdate1 := ClusterUpdate{ServiceName: testEDSName + "1"} + wantUpdate2 := ClusterUpdate{ServiceName: testEDSName + "2"} + v2Client.r.newCDSUpdate(map[string]ClusterUpdate{ + testCDSName + "1": wantUpdate1, + testCDSName + "2": wantUpdate2, + }) + + if u, err := clusterUpdateCh1.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate1, nil}) { + t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err) + } + + if u, err := clusterUpdateCh2.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate2, nil}) { + t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err) + } + + // Send another update to remove resource 1. + v2Client.r.newCDSUpdate(map[string]ClusterUpdate{ + testCDSName + "2": wantUpdate2, + }) + + // watcher 1 should get an error. + if u, err := clusterUpdateCh1.Receive(); err != nil || ErrType(u.(clusterUpdateErr).err) != ErrorTypeResourceNotFound { + t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v, want update with error resource not found", u, err) + } + + // watcher 2 should get the same update again. + if u, err := clusterUpdateCh2.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate2, nil}) { + t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err) + } + + // Send one more update without resource 1. + v2Client.r.newCDSUpdate(map[string]ClusterUpdate{ + testCDSName + "2": wantUpdate2, + }) + + // watcher 1 should get an error. + if u, err := clusterUpdateCh1.Receive(); err != testutils.ErrRecvTimeout { + t.Errorf("unexpected clusterUpdate: %v, want receiving from channel timeout", u) + } + + // watcher 2 should get the same update again. + if u, err := clusterUpdateCh2.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate2, nil}) { + t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err) + } +} diff --git a/xds/internal/client/client_watchers_endpoints_test.go b/xds/internal/client/client_watchers_endpoints_test.go index 46e62bf57ec7..53b1bc316abc 100644 --- a/xds/internal/client/client_watchers_endpoints_test.go +++ b/xds/internal/client/client_watchers_endpoints_test.go @@ -69,6 +69,9 @@ func (s) TestEndpointsWatch(t *testing.T) { cancelWatch := c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) { endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err}) }) + if _, err := v2Client.addWatches[edsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } wantUpdate := EndpointsUpdate{Localities: []Locality{testLocalities[0]}} v2Client.r.newEDSUpdate(map[string]EndpointsUpdate{ @@ -124,6 +127,9 @@ func (s) TestEndpointsTwoWatchSameResourceName(t *testing.T) { cancelLastWatch = c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) { endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err}) }) + if _, err := v2Client.addWatches[edsURL].Receive(); i == 0 && err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } } wantUpdate := EndpointsUpdate{Localities: []Locality{testLocalities[0]}} @@ -178,6 +184,9 @@ func (s) TestEndpointsThreeWatchDifferentResourceName(t *testing.T) { c.WatchEndpoints(testCDSName+"1", func(update EndpointsUpdate, err error) { endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err}) }) + if _, err := v2Client.addWatches[edsURL].Receive(); i == 0 && err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } } // Third watch for a different name. @@ -185,6 +194,9 @@ func (s) TestEndpointsThreeWatchDifferentResourceName(t *testing.T) { c.WatchEndpoints(testCDSName+"2", func(update EndpointsUpdate, err error) { endpointsUpdateCh2.Send(endpointsUpdateErr{u: update, err: err}) }) + if _, err := v2Client.addWatches[edsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } wantUpdate1 := EndpointsUpdate{Localities: []Locality{testLocalities[0]}} wantUpdate2 := EndpointsUpdate{Localities: []Locality{testLocalities[1]}} @@ -222,6 +234,9 @@ func (s) TestEndpointsWatchAfterCache(t *testing.T) { c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) { endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err}) }) + if _, err := v2Client.addWatches[edsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } wantUpdate := EndpointsUpdate{Localities: []Locality{testLocalities[0]}} v2Client.r.newEDSUpdate(map[string]EndpointsUpdate{ @@ -237,6 +252,9 @@ func (s) TestEndpointsWatchAfterCache(t *testing.T) { c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) { endpointsUpdateCh2.Send(endpointsUpdateErr{u: update, err: err}) }) + if n, err := v2Client.addWatches[edsURL].Receive(); err == nil { + t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err) + } // New watch should receives the update. if u, err := endpointsUpdateCh2.Receive(); err != nil || !cmp.Equal(u, endpointsUpdateErr{wantUpdate, nil}, cmp.AllowUnexported(endpointsUpdateErr{})) { @@ -268,12 +286,15 @@ func (s) TestEndpointsWatchExpiryTimer(t *testing.T) { } defer c.Close() - <-v2ClientCh + v2Client := <-v2ClientCh endpointsUpdateCh := testutils.NewChannel() c.WatchEndpoints(testCDSName, func(update EndpointsUpdate, err error) { endpointsUpdateCh.Send(endpointsUpdateErr{u: update, err: err}) }) + if _, err := v2Client.addWatches[edsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } u, err := endpointsUpdateCh.TimedReceive(defaultWatchExpiryTimeout * 2) if err != nil { diff --git a/xds/internal/client/client_watchers_lds_test.go b/xds/internal/client/client_watchers_lds_test.go index 114726c029a5..5db842d261ca 100644 --- a/xds/internal/client/client_watchers_lds_test.go +++ b/xds/internal/client/client_watchers_lds_test.go @@ -31,7 +31,7 @@ type ldsUpdateErr struct { // TestLDSWatch covers the cases: // - an update is received after a watch() -// - an update for another resource name (which doesn't trigger callback) +// - an update for another resource name // - an upate is received after cancel() func (s) TestLDSWatch(t *testing.T) { v2ClientCh, cleanup := overrideNewXDSV2Client() @@ -49,6 +49,9 @@ func (s) TestLDSWatch(t *testing.T) { cancelWatch := c.watchLDS(testLDSName, func(update ldsUpdate, err error) { ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err}) }) + if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } wantUpdate := ldsUpdate{routeName: testRDSName} v2Client.r.newLDSUpdate(map[string]ldsUpdate{ @@ -59,12 +62,13 @@ func (s) TestLDSWatch(t *testing.T) { t.Errorf("unexpected ldsUpdate: %v, error receiving from channel: %v", u, err) } - // Another update for a different resource name. + // Another update, with an extra resource for a different resource name. v2Client.r.newLDSUpdate(map[string]ldsUpdate{ + testLDSName: wantUpdate, "randomName": {}, }) - if u, err := ldsUpdateCh.TimedReceive(chanRecvTimeout); err != testutils.ErrRecvTimeout { + if u, err := ldsUpdateCh.Receive(); err != nil || u != (ldsUpdateErr{wantUpdate, nil}) { t.Errorf("unexpected ldsUpdate: %v, %v, want channel recv timeout", u, err) } @@ -104,6 +108,9 @@ func (s) TestLDSTwoWatchSameResourceName(t *testing.T) { cancelLastWatch = c.watchLDS(testLDSName, func(update ldsUpdate, err error) { ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err}) }) + if _, err := v2Client.addWatches[ldsURL].Receive(); i == 0 && err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } } wantUpdate := ldsUpdate{routeName: testRDSName} @@ -158,6 +165,9 @@ func (s) TestLDSThreeWatchDifferentResourceName(t *testing.T) { c.watchLDS(testLDSName+"1", func(update ldsUpdate, err error) { ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err}) }) + if _, err := v2Client.addWatches[ldsURL].Receive(); i == 0 && err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } } // Third watch for a different name. @@ -165,6 +175,9 @@ func (s) TestLDSThreeWatchDifferentResourceName(t *testing.T) { c.watchLDS(testLDSName+"2", func(update ldsUpdate, err error) { ldsUpdateCh2.Send(ldsUpdateErr{u: update, err: err}) }) + if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } wantUpdate1 := ldsUpdate{routeName: testRDSName + "1"} wantUpdate2 := ldsUpdate{routeName: testRDSName + "2"} @@ -202,6 +215,9 @@ func (s) TestLDSWatchAfterCache(t *testing.T) { c.watchLDS(testLDSName, func(update ldsUpdate, err error) { ldsUpdateCh.Send(ldsUpdateErr{u: update, err: err}) }) + if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } wantUpdate := ldsUpdate{routeName: testRDSName} v2Client.r.newLDSUpdate(map[string]ldsUpdate{ @@ -217,6 +233,9 @@ func (s) TestLDSWatchAfterCache(t *testing.T) { c.watchLDS(testLDSName, func(update ldsUpdate, err error) { ldsUpdateCh2.Send(ldsUpdateErr{u: update, err: err}) }) + if n, err := v2Client.addWatches[ldsURL].Receive(); err == nil { + t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err) + } // New watch should receives the update. if u, err := ldsUpdateCh2.Receive(); err != nil || u != (ldsUpdateErr{wantUpdate, nil}) { @@ -228,3 +247,83 @@ func (s) TestLDSWatchAfterCache(t *testing.T) { t.Errorf("unexpected ldsUpdate: %v, %v, want channel recv timeout", u, err) } } + +// TestLDSResourceRemoved covers the cases: +// - an update is received after a watch() +// - another update is received, with one resource removed +// - this should trigger callback with resource removed error +// - one more update without the removed resource +// - the callback (above) shouldn't receive any update +func (s) TestLDSResourceRemoved(t *testing.T) { + v2ClientCh, cleanup := overrideNewXDSV2Client() + defer cleanup() + + c, err := New(clientOpts(testXDSServer)) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + defer c.Close() + + v2Client := <-v2ClientCh + + ldsUpdateCh1 := testutils.NewChannel() + c.watchLDS(testLDSName+"1", func(update ldsUpdate, err error) { + ldsUpdateCh1.Send(ldsUpdateErr{u: update, err: err}) + }) + if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } + // Another watch for a different name. + ldsUpdateCh2 := testutils.NewChannel() + c.watchLDS(testLDSName+"2", func(update ldsUpdate, err error) { + ldsUpdateCh2.Send(ldsUpdateErr{u: update, err: err}) + }) + if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } + + wantUpdate1 := ldsUpdate{routeName: testEDSName + "1"} + wantUpdate2 := ldsUpdate{routeName: testEDSName + "2"} + v2Client.r.newLDSUpdate(map[string]ldsUpdate{ + testLDSName + "1": wantUpdate1, + testLDSName + "2": wantUpdate2, + }) + + if u, err := ldsUpdateCh1.Receive(); err != nil || u != (ldsUpdateErr{wantUpdate1, nil}) { + t.Errorf("unexpected ldsUpdate: %v, error receiving from channel: %v", u, err) + } + + if u, err := ldsUpdateCh2.Receive(); err != nil || u != (ldsUpdateErr{wantUpdate2, nil}) { + t.Errorf("unexpected ldsUpdate: %v, error receiving from channel: %v", u, err) + } + + // Send another update to remove resource 1. + v2Client.r.newLDSUpdate(map[string]ldsUpdate{ + testLDSName + "2": wantUpdate2, + }) + + // watcher 1 should get an error. + if u, err := ldsUpdateCh1.Receive(); err != nil || ErrType(u.(ldsUpdateErr).err) != ErrorTypeResourceNotFound { + t.Errorf("unexpected ldsUpdate: %v, error receiving from channel: %v, want update with error resource not found", u, err) + } + + // watcher 2 should get the same update again. + if u, err := ldsUpdateCh2.Receive(); err != nil || u != (ldsUpdateErr{wantUpdate2, nil}) { + t.Errorf("unexpected ldsUpdate: %v, error receiving from channel: %v", u, err) + } + + // Send one more update without resource 1. + v2Client.r.newLDSUpdate(map[string]ldsUpdate{ + testLDSName + "2": wantUpdate2, + }) + + // watcher 1 should get an error. + if u, err := ldsUpdateCh1.Receive(); err != testutils.ErrRecvTimeout { + t.Errorf("unexpected ldsUpdate: %v, want receiving from channel timeout", u) + } + + // watcher 2 should get the same update again. + if u, err := ldsUpdateCh2.Receive(); err != nil || u != (ldsUpdateErr{wantUpdate2, nil}) { + t.Errorf("unexpected ldsUpdate: %v, error receiving from channel: %v", u, err) + } +} diff --git a/xds/internal/client/client_watchers_rds_test.go b/xds/internal/client/client_watchers_rds_test.go index 74bd3deac13b..06ed7a377e2b 100644 --- a/xds/internal/client/client_watchers_rds_test.go +++ b/xds/internal/client/client_watchers_rds_test.go @@ -50,6 +50,9 @@ func (s) TestRDSWatch(t *testing.T) { cancelWatch := c.watchRDS(testRDSName, func(update rdsUpdate, err error) { rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err}) }) + if _, err := v2Client.addWatches[rdsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } wantUpdate := rdsUpdate{weightedCluster: map[string]uint32{testCDSName: 1}} v2Client.r.newRDSUpdate(map[string]rdsUpdate{ @@ -105,6 +108,9 @@ func (s) TestRDSTwoWatchSameResourceName(t *testing.T) { cancelLastWatch = c.watchRDS(testRDSName, func(update rdsUpdate, err error) { rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err}) }) + if _, err := v2Client.addWatches[rdsURL].Receive(); i == 0 && err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } } wantUpdate := rdsUpdate{weightedCluster: map[string]uint32{testCDSName: 1}} @@ -159,6 +165,9 @@ func (s) TestRDSThreeWatchDifferentResourceName(t *testing.T) { c.watchRDS(testRDSName+"1", func(update rdsUpdate, err error) { rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err}) }) + if _, err := v2Client.addWatches[rdsURL].Receive(); i == 0 && err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } } // Third watch for a different name. @@ -166,6 +175,9 @@ func (s) TestRDSThreeWatchDifferentResourceName(t *testing.T) { c.watchRDS(testRDSName+"2", func(update rdsUpdate, err error) { rdsUpdateCh2.Send(rdsUpdateErr{u: update, err: err}) }) + if _, err := v2Client.addWatches[rdsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } wantUpdate1 := rdsUpdate{weightedCluster: map[string]uint32{testCDSName + "1": 1}} wantUpdate2 := rdsUpdate{weightedCluster: map[string]uint32{testCDSName + "2": 1}} @@ -203,6 +215,9 @@ func (s) TestRDSWatchAfterCache(t *testing.T) { c.watchRDS(testRDSName, func(update rdsUpdate, err error) { rdsUpdateCh.Send(rdsUpdateErr{u: update, err: err}) }) + if _, err := v2Client.addWatches[rdsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } wantUpdate := rdsUpdate{weightedCluster: map[string]uint32{testCDSName: 1}} v2Client.r.newRDSUpdate(map[string]rdsUpdate{ @@ -218,6 +233,9 @@ func (s) TestRDSWatchAfterCache(t *testing.T) { c.watchRDS(testRDSName, func(update rdsUpdate, err error) { rdsUpdateCh2.Send(rdsUpdateErr{u: update, err: err}) }) + if n, err := v2Client.addWatches[rdsURL].Receive(); err == nil { + t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err) + } // New watch should receives the update. if u, err := rdsUpdateCh2.Receive(); err != nil || !cmp.Equal(u, rdsUpdateErr{wantUpdate, nil}, cmp.AllowUnexported(rdsUpdate{}, rdsUpdateErr{})) { diff --git a/xds/internal/client/client_watchers_service.go b/xds/internal/client/client_watchers_service.go index b7fe57d6062b..1eea1f316a3c 100644 --- a/xds/internal/client/client_watchers_service.go +++ b/xds/internal/client/client_watchers_service.go @@ -75,12 +75,18 @@ func (w *serviceUpdateWatcher) handleLDSResp(update ldsUpdate, err error) { if w.closed { return } - // TODO: this error case returns early, without canceling the existing RDS - // watch. If we decided to stop the RDS watch when LDS errors, move this - // after rdsCancel(). We may also need to check the error type and do - // different things based on that (e.g. cancel RDS watch only on - // resourceRemovedError, but not on connectionError). if err != nil { + // We check the error type and do different things. For now, the only + // type we check is ResourceNotFound, which indicates the LDS resource + // was removed, and besides sending the error to callback, we also + // cancel the RDS watch. + if ErrType(err) == ErrorTypeResourceNotFound && w.rdsCancel != nil { + w.rdsCancel() + w.rdsName = "" + w.rdsCancel = nil + } + // The other error cases still return early without canceling the + // existing RDS watch. w.serviceCb(ServiceUpdate{}, err) return } @@ -104,6 +110,11 @@ func (w *serviceUpdateWatcher) handleRDSResp(update rdsUpdate, err error) { if w.closed { return } + if w.rdsCancel == nil { + // This mean only the RDS watch is canceled, can happen if the LDS + // resource is removed. + return + } if err != nil { w.serviceCb(ServiceUpdate{}, err) return diff --git a/xds/internal/client/client_watchers_service_test.go b/xds/internal/client/client_watchers_service_test.go index 71de6c750146..8b341683d094 100644 --- a/xds/internal/client/client_watchers_service_test.go +++ b/xds/internal/client/client_watchers_service_test.go @@ -57,11 +57,15 @@ func (s) TestServiceWatch(t *testing.T) { wantUpdate := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName: 1}} - <-v2Client.addWatches[ldsURL] + if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } v2Client.r.newLDSUpdate(map[string]ldsUpdate{ testLDSName: {routeName: testRDSName}, }) - <-v2Client.addWatches[rdsURL] + if _, err := v2Client.addWatches[rdsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } v2Client.r.newRDSUpdate(map[string]rdsUpdate{ testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}}, }) @@ -93,11 +97,15 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) { wantUpdate := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName: 1}} - <-v2Client.addWatches[ldsURL] + if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } v2Client.r.newLDSUpdate(map[string]ldsUpdate{ testLDSName: {routeName: testRDSName}, }) - <-v2Client.addWatches[rdsURL] + if _, err := v2Client.addWatches[rdsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } v2Client.r.newRDSUpdate(map[string]rdsUpdate{ testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}}, }) @@ -110,7 +118,9 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) { v2Client.r.newLDSUpdate(map[string]ldsUpdate{ testLDSName: {routeName: testRDSName + "2"}, }) - <-v2Client.addWatches[rdsURL] + if _, err := v2Client.addWatches[rdsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } // Another update for the old name. v2Client.r.newRDSUpdate(map[string]rdsUpdate{ @@ -154,11 +164,15 @@ func (s) TestServiceWatchSecond(t *testing.T) { wantUpdate := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName: 1}} - <-v2Client.addWatches[ldsURL] + if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } v2Client.r.newLDSUpdate(map[string]ldsUpdate{ testLDSName: {routeName: testRDSName}, }) - <-v2Client.addWatches[rdsURL] + if _, err := v2Client.addWatches[rdsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } v2Client.r.newRDSUpdate(map[string]rdsUpdate{ testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}}, }) @@ -361,11 +375,15 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) { wantUpdate := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName: 1}} - <-v2Client.addWatches[ldsURL] + if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } v2Client.r.newLDSUpdate(map[string]ldsUpdate{ testLDSName: {routeName: testRDSName}, }) - <-v2Client.addWatches[rdsURL] + if _, err := v2Client.addWatches[rdsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } v2Client.r.newRDSUpdate(map[string]rdsUpdate{ testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}}, }) @@ -378,9 +396,89 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) { v2Client.r.newLDSUpdate(map[string]ldsUpdate{ testLDSName: {routeName: testRDSName}, }) - select { - case <-v2Client.removeWatches[rdsURL]: - t.Fatalf("unexpected rds watch cancel") - case <-time.After(time.Second): + if v, err := v2Client.removeWatches[rdsURL].Receive(); err == nil { + t.Fatalf("unexpected rds watch cancel: %v", v) + } +} + +// TestServiceResourceRemoved covers the cases: +// - an update is received after a watch() +// - another update is received, with one resource removed +// - this should trigger callback with resource removed error +// - one more update without the removed resource +// - the callback (above) shouldn't receive any update +func (s) TestServiceResourceRemoved(t *testing.T) { + v2ClientCh, cleanup := overrideNewXDSV2Client() + defer cleanup() + + c, err := New(clientOpts(testXDSServer)) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + defer c.Close() + + v2Client := <-v2ClientCh + + serviceUpdateCh := testutils.NewChannel() + c.WatchService(testLDSName, func(update ServiceUpdate, err error) { + serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err}) + }) + + wantUpdate := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName: 1}} + + if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } + v2Client.r.newLDSUpdate(map[string]ldsUpdate{ + testLDSName: {routeName: testRDSName}, + }) + if _, err := v2Client.addWatches[rdsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } + v2Client.r.newRDSUpdate(map[string]rdsUpdate{ + testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}}, + }) + + if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, cmp.AllowUnexported(serviceUpdateErr{})) { + t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err) + } + + // Remove LDS resource, should cancel the RDS watch, and trigger resource + // removed error. + v2Client.r.newLDSUpdate(map[string]ldsUpdate{}) + if _, err := v2Client.removeWatches[rdsURL].Receive(); err != nil { + t.Fatalf("want watch to be canceled, got error %v", err) + } + if u, err := serviceUpdateCh.Receive(); err != nil || ErrType(u.(serviceUpdateErr).err) != ErrorTypeResourceNotFound { + t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v, want update with error resource not found", u, err) + } + + // Send RDS update for the removed LDS resource, expect no updates to + // callback, because RDS should be canceled. + v2Client.r.newRDSUpdate(map[string]rdsUpdate{ + testRDSName: {weightedCluster: map[string]uint32{testCDSName + "new": 1}}, + }) + if u, err := serviceUpdateCh.Receive(); err != testutils.ErrRecvTimeout { + t.Errorf("unexpected serviceUpdate: %v, want receiving from channel timeout", u) + } + + // Add LDS resource, but not RDS resource, should + // - start a new RDS watch + // - timeout on service channel, because RDS cache was cleared + v2Client.r.newLDSUpdate(map[string]ldsUpdate{ + testLDSName: {routeName: testRDSName}, + }) + if _, err := v2Client.addWatches[rdsURL].Receive(); err != nil { + t.Fatalf("want new watch to start, got error %v", err) + } + if u, err := serviceUpdateCh.Receive(); err != testutils.ErrRecvTimeout { + t.Errorf("unexpected serviceUpdate: %v, want receiving from channel timeout", u) + } + + v2Client.r.newRDSUpdate(map[string]rdsUpdate{ + testRDSName: {weightedCluster: map[string]uint32{testCDSName + "new2": 1}}, + }) + if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName + "new2": 1}}, nil}, cmp.AllowUnexported(serviceUpdateErr{})) { + t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err) } }