Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: handle errors in xds_client #3658

Merged
merged 3 commits into from
Jun 3, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions xds/internal/client/client_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,20 @@ func (c *Client) newLDSUpdate(d map[string]ldsUpdate) {
c.ldsCache[name] = update
}
}
// TODO: handle removing resources, which means if a resource exists in the
// previous update, but not in the new update. This needs the balancers and
// resolvers to handle errors correctly.

// TODO: remove item from cache and remove corresponding RDS cached data.
for name := range c.ldsCache {
if _, ok := d[name]; !ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a better variable name for d whose scope has now gotten bigger.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to updates

// If resource exists in cache, but not in the new update, delete it
// from cache, and also send an resource not found error to indicate
// resource removed.
delete(c.ldsCache, name)
for wi := range c.ldsWatchers[name] {
wi.resourceNotFound()
}
}
}
// When LDS resource is removed, we don't delete corresponding RDS cached
// data. The RDS watch will be canceled, and cache is removed when the last
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/cache/cache entry/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// watch is canceled.
}

// newRDSUpdate is called by the underlying xdsv2Client when it receives an xDS
Expand Down Expand Up @@ -135,11 +144,20 @@ func (c *Client) newCDSUpdate(d map[string]ClusterUpdate) {
c.cdsCache[name] = update
}
}
// TODO: handle removing resources, which means if a resource exists in the
// previous update, but not in the new update. This needs the balancers and
// resolvers to handle errors correctly.

// TODO: remove item from cache and remove corresponding EDS cached data.
for name := range c.cdsCache {
if _, ok := d[name]; !ok {
// If resource exists in cache, but not in the new update, delete it
// from cache, and also send an resource not found error to indicate
// resource removed.
delete(c.cdsCache, name)
for wi := range c.cdsWatchers[name] {
wi.resourceNotFound()
}
}
}
// When CDS resource is removed, we don't delete corresponding EDS cached
// data. The EDS watch will be canceled, and cache is removed when the last
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here.
s/cache/cache entry/
And better name for d.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// watch is canceled.
}

// newEDSUpdate is called by the underlying xdsv2Client when it receives an xDS
Expand Down
28 changes: 14 additions & 14 deletions xds/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ func (s) TestNew(t *testing.T) {
type testXDSV2Client struct {
r updateHandler

addWatches map[string]chan string
removeWatches map[string]chan string
addWatches map[string]*testutils.Channel
removeWatches map[string]*testutils.Channel
}

func overrideNewXDSV2Client() (<-chan *testXDSV2Client, func()) {
Expand All @@ -142,16 +142,16 @@ func overrideNewXDSV2Client() (<-chan *testXDSV2Client, func()) {
}

func newTestXDSV2Client(r updateHandler) *testXDSV2Client {
addWatches := make(map[string]chan string)
addWatches[ldsURL] = make(chan string, 10)
addWatches[rdsURL] = make(chan string, 10)
addWatches[cdsURL] = make(chan string, 10)
addWatches[edsURL] = make(chan string, 10)
removeWatches := make(map[string]chan string)
removeWatches[ldsURL] = make(chan string, 10)
removeWatches[rdsURL] = make(chan string, 10)
removeWatches[cdsURL] = make(chan string, 10)
removeWatches[edsURL] = make(chan string, 10)
addWatches := make(map[string]*testutils.Channel)
addWatches[ldsURL] = testutils.NewChannel()
addWatches[rdsURL] = testutils.NewChannel()
addWatches[cdsURL] = testutils.NewChannel()
addWatches[edsURL] = testutils.NewChannel()
removeWatches := make(map[string]*testutils.Channel)
removeWatches[ldsURL] = testutils.NewChannel()
removeWatches[rdsURL] = testutils.NewChannel()
removeWatches[cdsURL] = testutils.NewChannel()
removeWatches[edsURL] = testutils.NewChannel()
return &testXDSV2Client{
r: r,
addWatches: addWatches,
Expand All @@ -160,11 +160,11 @@ func newTestXDSV2Client(r updateHandler) *testXDSV2Client {
}

func (c *testXDSV2Client) addWatch(resourceType, resourceName string) {
c.addWatches[resourceType] <- resourceName
c.addWatches[resourceType].Send(resourceName)
}

func (c *testXDSV2Client) removeWatch(resourceType, resourceName string) {
c.removeWatches[resourceType] <- resourceName
c.removeWatches[resourceType].Send(resourceName)
}

func (c *testXDSV2Client) close() {}
Expand Down
37 changes: 30 additions & 7 deletions xds/internal/client/client_watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,32 +75,43 @@ func (wi *watchInfo) newUpdate(update interface{}) {
wi.c.scheduleCallback(wi, update, nil)
}

func (wi *watchInfo) resourceNotFound() {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled {
return
}
wi.state = watchInfoStateRespReceived
wi.expiryTimer.Stop()
wi.sendErrorLocked(NewErrorf(ErrorTypeResourceNotFound, "xds: %s target %s not found in received response", wi.typeURL, wi.target))
}

func (wi *watchInfo) timeout() {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled || wi.state == watchInfoStateRespReceived {
return
}
wi.state = watchInfoStateTimeout
wi.sendErrorLocked(fmt.Errorf("xds: %s target %s not found, watcher timeout", wi.typeURL, wi.target))
}

// Caller must hold wi.mu.
func (wi *watchInfo) sendErrorLocked(err error) {
var (
u interface{}
t string
)
switch wi.typeURL {
case ldsURL:
u = ldsUpdate{}
t = "LDS"
case rdsURL:
u = rdsUpdate{}
t = "RDS"
case cdsURL:
u = ClusterUpdate{}
t = "CDS"
case edsURL:
u = EndpointsUpdate{}
t = "EDS"
}
wi.c.scheduleCallback(wi, u, fmt.Errorf("xds: %s target %s not found, watcher timeout", t, wi.target))
wi.c.scheduleCallback(wi, u, err)
}

func (wi *watchInfo) cancel() {
Expand Down Expand Up @@ -185,7 +196,19 @@ func (c *Client) watch(wi *watchInfo) (cancel func()) {
// watching this resource.
delete(watchers, resourceName)
c.v2c.removeWatch(wi.typeURL, resourceName)
// TODO: remove item from cache.
// Remove the resource from cache. When a watch for this
// resource is added later, it will trigger a xDS request with
// resource names, and client will receive new xDS responses.
switch wi.typeURL {
case ldsURL:
delete(c.ldsCache, resourceName)
case rdsURL:
delete(c.rdsCache, resourceName)
case cdsURL:
delete(c.cdsCache, resourceName)
case edsURL:
delete(c.edsCache, resourceName)
}
}
}
}
Expand Down
83 changes: 79 additions & 4 deletions xds/internal/client/client_watchers_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type clusterUpdateErr struct {

// TestClusterWatch covers the cases:
// - an update is received after a watch()
// - an update for another resource name (which doesn't trigger callback)
// - an update for another resource name
// - an upate is received after cancel()
func (s) TestClusterWatch(t *testing.T) {
v2ClientCh, cleanup := overrideNewXDSV2Client()
Expand Down Expand Up @@ -69,13 +69,14 @@ func (s) TestClusterWatch(t *testing.T) {
t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
}

// Another update for a different resource name.
// Another update, with an extra resource for a different resource name.
v2Client.r.newCDSUpdate(map[string]ClusterUpdate{
testCDSName: wantUpdate,
"randomName": {},
})

if u, err := clusterUpdateCh.TimedReceive(chanRecvTimeout); err != testutils.ErrRecvTimeout {
t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err)
if u, err := clusterUpdateCh.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate, nil}) {
t.Errorf("unexpected clusterUpdate: %+v, %v, want channel recv timeout", u, err)
}

// Cancel watch, and send update again.
Expand Down Expand Up @@ -319,3 +320,77 @@ func (s) TestClusterWatchExpiryTimerStop(t *testing.T) {
t.Fatalf("got unexpected: %v, %v, want recv timeout", u.(clusterUpdateErr).u, u.(clusterUpdateErr).err)
}
}

// TestClusterResourceRemoved covers the cases:
// - an update is received after a watch()
// - another update is received, with one resource removed
// - this should trigger callback with resource removed error
// - one more update without the removed resource
// - the callback (above) shouldn't receive any update
func (s) TestClusterResourceRemoved(t *testing.T) {
v2ClientCh, cleanup := overrideNewXDSV2Client()
defer cleanup()

c, err := New(clientOpts(testXDSServer))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
defer c.Close()

v2Client := <-v2ClientCh

clusterUpdateCh1 := testutils.NewChannel()
c.WatchCluster(testCDSName+"1", func(update ClusterUpdate, err error) {
clusterUpdateCh1.Send(clusterUpdateErr{u: update, err: err})
})
// Another watch for a different name.
clusterUpdateCh2 := testutils.NewChannel()
c.WatchCluster(testCDSName+"2", func(update ClusterUpdate, err error) {
clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err})
})

wantUpdate1 := ClusterUpdate{ServiceName: testEDSName + "1"}
wantUpdate2 := ClusterUpdate{ServiceName: testEDSName + "2"}
v2Client.r.newCDSUpdate(map[string]ClusterUpdate{
testCDSName + "1": wantUpdate1,
testCDSName + "2": wantUpdate2,
})

if u, err := clusterUpdateCh1.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate1, nil}) {
t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
}

if u, err := clusterUpdateCh2.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate2, nil}) {
t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
}

// Send another update to remove resource 1.
v2Client.r.newCDSUpdate(map[string]ClusterUpdate{
testCDSName + "2": wantUpdate2,
})

// watcher 1 should get an error.
if u, err := clusterUpdateCh1.Receive(); err != nil || ErrType(u.(clusterUpdateErr).err) != ErrorTypeResourceNotFound {
t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v, want update with error resource not found", u, err)
}

// watcher 2 should get the same update again.
if u, err := clusterUpdateCh2.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate2, nil}) {
t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
}

// Send one more update without resource 1.
v2Client.r.newCDSUpdate(map[string]ClusterUpdate{
testCDSName + "2": wantUpdate2,
})

// watcher 1 should get an error.
if u, err := clusterUpdateCh1.Receive(); err != testutils.ErrRecvTimeout {
t.Errorf("unexpected clusterUpdate: %v, want receiving from channel timeout", u)
}

// watcher 2 should get the same update again.
if u, err := clusterUpdateCh2.Receive(); err != nil || u != (clusterUpdateErr{wantUpdate2, nil}) {
t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v", u, err)
}
}
81 changes: 78 additions & 3 deletions xds/internal/client/client_watchers_lds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type ldsUpdateErr struct {

// TestLDSWatch covers the cases:
// - an update is received after a watch()
// - an update for another resource name (which doesn't trigger callback)
// - an update for another resource name
// - an upate is received after cancel()
func (s) TestLDSWatch(t *testing.T) {
v2ClientCh, cleanup := overrideNewXDSV2Client()
Expand Down Expand Up @@ -59,12 +59,13 @@ func (s) TestLDSWatch(t *testing.T) {
t.Errorf("unexpected ldsUpdate: %v, error receiving from channel: %v", u, err)
}

// Another update for a different resource name.
// Another update, with an extra resource for a different resource name.
v2Client.r.newLDSUpdate(map[string]ldsUpdate{
testLDSName: wantUpdate,
"randomName": {},
})

if u, err := ldsUpdateCh.TimedReceive(chanRecvTimeout); err != testutils.ErrRecvTimeout {
if u, err := ldsUpdateCh.Receive(); err != nil || u != (ldsUpdateErr{wantUpdate, nil}) {
t.Errorf("unexpected ldsUpdate: %v, %v, want channel recv timeout", u, err)
}

Expand Down Expand Up @@ -228,3 +229,77 @@ func (s) TestLDSWatchAfterCache(t *testing.T) {
t.Errorf("unexpected ldsUpdate: %v, %v, want channel recv timeout", u, err)
}
}

// TestLDSResourceRemoved covers the cases:
// - an update is received after a watch()
// - another update is received, with one resource removed
// - this should trigger callback with resource removed error
// - one more update without the removed resource
// - the callback (above) shouldn't receive any update
func (s) TestLDSResourceRemoved(t *testing.T) {
v2ClientCh, cleanup := overrideNewXDSV2Client()
defer cleanup()

c, err := New(clientOpts(testXDSServer))
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
defer c.Close()

v2Client := <-v2ClientCh

ldsUpdateCh1 := testutils.NewChannel()
c.watchLDS(testLDSName+"1", func(update ldsUpdate, err error) {
ldsUpdateCh1.Send(ldsUpdateErr{u: update, err: err})
})
// Another watch for a different name.
ldsUpdateCh2 := testutils.NewChannel()
c.watchLDS(testLDSName+"2", func(update ldsUpdate, err error) {
ldsUpdateCh2.Send(ldsUpdateErr{u: update, err: err})
})

wantUpdate1 := ldsUpdate{routeName: testEDSName + "1"}
wantUpdate2 := ldsUpdate{routeName: testEDSName + "2"}
v2Client.r.newLDSUpdate(map[string]ldsUpdate{
testLDSName + "1": wantUpdate1,
testLDSName + "2": wantUpdate2,
})

if u, err := ldsUpdateCh1.Receive(); err != nil || u != (ldsUpdateErr{wantUpdate1, nil}) {
t.Errorf("unexpected ldsUpdate: %v, error receiving from channel: %v", u, err)
}

if u, err := ldsUpdateCh2.Receive(); err != nil || u != (ldsUpdateErr{wantUpdate2, nil}) {
t.Errorf("unexpected ldsUpdate: %v, error receiving from channel: %v", u, err)
}

// Send another update to remove resource 1.
v2Client.r.newLDSUpdate(map[string]ldsUpdate{
testLDSName + "2": wantUpdate2,
})

// watcher 1 should get an error.
if u, err := ldsUpdateCh1.Receive(); err != nil || ErrType(u.(ldsUpdateErr).err) != ErrorTypeResourceNotFound {
t.Errorf("unexpected ldsUpdate: %v, error receiving from channel: %v, want update with error resource not found", u, err)
}

// watcher 2 should get the same update again.
if u, err := ldsUpdateCh2.Receive(); err != nil || u != (ldsUpdateErr{wantUpdate2, nil}) {
t.Errorf("unexpected ldsUpdate: %v, error receiving from channel: %v", u, err)
}

// Send one more update without resource 1.
v2Client.r.newLDSUpdate(map[string]ldsUpdate{
testLDSName + "2": wantUpdate2,
})

// watcher 1 should get an error.
if u, err := ldsUpdateCh1.Receive(); err != testutils.ErrRecvTimeout {
t.Errorf("unexpected ldsUpdate: %v, want receiving from channel timeout", u)
}

// watcher 2 should get the same update again.
if u, err := ldsUpdateCh2.Receive(); err != nil || u != (ldsUpdateErr{wantUpdate2, nil}) {
t.Errorf("unexpected ldsUpdate: %v, error receiving from channel: %v", u, err)
}
}
Loading