Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: handle errors in xds_client #3658

Merged
merged 3 commits into from
Jun 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 36 additions & 18 deletions xds/internal/client/client_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ func (c *Client) callCallback(wiu *watcherInfoWithUpdate) {
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *Client) newLDSUpdate(d map[string]ldsUpdate) {
func (c *Client) newLDSUpdate(updates map[string]ldsUpdate) {
c.mu.Lock()
defer c.mu.Unlock()

for name, update := range d {
for name, update := range updates {
if s, ok := c.ldsWatchers[name]; ok {
for wi := range s {
wi.newUpdate(update)
Expand All @@ -88,23 +88,32 @@ func (c *Client) newLDSUpdate(d map[string]ldsUpdate) {
c.ldsCache[name] = update
}
}
// TODO: handle removing resources, which means if a resource exists in the
// previous update, but not in the new update. This needs the balancers and
// resolvers to handle errors correctly.

// TODO: remove item from cache and remove corresponding RDS cached data.
for name := range c.ldsCache {
if _, ok := updates[name]; !ok {
// If resource exists in cache, but not in the new update, delete it
// from cache, and also send an resource not found error to indicate
// resource removed.
delete(c.ldsCache, name)
for wi := range c.ldsWatchers[name] {
wi.resourceNotFound()
}
}
}
// When LDS resource is removed, we don't delete corresponding RDS cached
// data. The RDS watch will be canceled, and cache entry is removed when the
// last watch is canceled.
}

// newRDSUpdate is called by the underlying xdsv2Client when it receives an xDS
// response.
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *Client) newRDSUpdate(d map[string]rdsUpdate) {
func (c *Client) newRDSUpdate(updates map[string]rdsUpdate) {
c.mu.Lock()
defer c.mu.Unlock()

for name, update := range d {
for name, update := range updates {
if s, ok := c.rdsWatchers[name]; ok {
for wi := range s {
wi.newUpdate(update)
Expand All @@ -121,11 +130,11 @@ func (c *Client) newRDSUpdate(d map[string]rdsUpdate) {
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *Client) newCDSUpdate(d map[string]ClusterUpdate) {
func (c *Client) newCDSUpdate(updates map[string]ClusterUpdate) {
c.mu.Lock()
defer c.mu.Unlock()

for name, update := range d {
for name, update := range updates {
if s, ok := c.cdsWatchers[name]; ok {
for wi := range s {
wi.newUpdate(update)
Expand All @@ -135,23 +144,32 @@ func (c *Client) newCDSUpdate(d map[string]ClusterUpdate) {
c.cdsCache[name] = update
}
}
// TODO: handle removing resources, which means if a resource exists in the
// previous update, but not in the new update. This needs the balancers and
// resolvers to handle errors correctly.

// TODO: remove item from cache and remove corresponding EDS cached data.
for name := range c.cdsCache {
if _, ok := updates[name]; !ok {
// If resource exists in cache, but not in the new update, delete it
// from cache, and also send an resource not found error to indicate
// resource removed.
delete(c.cdsCache, name)
for wi := range c.cdsWatchers[name] {
wi.resourceNotFound()
}
}
}
// When CDS resource is removed, we don't delete corresponding EDS cached
// data. The EDS watch will be canceled, and cache entry is removed when the
// last watch is canceled.
}

// newEDSUpdate is called by the underlying xdsv2Client when it receives an xDS
// response.
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *Client) newEDSUpdate(d map[string]EndpointsUpdate) {
func (c *Client) newEDSUpdate(updates map[string]EndpointsUpdate) {
c.mu.Lock()
defer c.mu.Unlock()

for name, update := range d {
for name, update := range updates {
if s, ok := c.edsWatchers[name]; ok {
for wi := range s {
wi.newUpdate(update)
Expand Down
36 changes: 22 additions & 14 deletions xds/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ func (s) TestNew(t *testing.T) {
type testXDSV2Client struct {
r updateHandler

addWatches map[string]chan string
removeWatches map[string]chan string
addWatches map[string]*testutils.Channel
removeWatches map[string]*testutils.Channel
}

func overrideNewXDSV2Client() (<-chan *testXDSV2Client, func()) {
Expand All @@ -142,16 +142,16 @@ func overrideNewXDSV2Client() (<-chan *testXDSV2Client, func()) {
}

func newTestXDSV2Client(r updateHandler) *testXDSV2Client {
addWatches := make(map[string]chan string)
addWatches[ldsURL] = make(chan string, 10)
addWatches[rdsURL] = make(chan string, 10)
addWatches[cdsURL] = make(chan string, 10)
addWatches[edsURL] = make(chan string, 10)
removeWatches := make(map[string]chan string)
removeWatches[ldsURL] = make(chan string, 10)
removeWatches[rdsURL] = make(chan string, 10)
removeWatches[cdsURL] = make(chan string, 10)
removeWatches[edsURL] = make(chan string, 10)
addWatches := make(map[string]*testutils.Channel)
addWatches[ldsURL] = testutils.NewChannel()
addWatches[rdsURL] = testutils.NewChannel()
addWatches[cdsURL] = testutils.NewChannel()
addWatches[edsURL] = testutils.NewChannel()
removeWatches := make(map[string]*testutils.Channel)
removeWatches[ldsURL] = testutils.NewChannel()
removeWatches[rdsURL] = testutils.NewChannel()
removeWatches[cdsURL] = testutils.NewChannel()
removeWatches[edsURL] = testutils.NewChannel()
return &testXDSV2Client{
r: r,
addWatches: addWatches,
Expand All @@ -160,11 +160,11 @@ func newTestXDSV2Client(r updateHandler) *testXDSV2Client {
}

func (c *testXDSV2Client) addWatch(resourceType, resourceName string) {
c.addWatches[resourceType] <- resourceName
c.addWatches[resourceType].Send(resourceName)
}

func (c *testXDSV2Client) removeWatch(resourceType, resourceName string) {
c.removeWatches[resourceType] <- resourceName
c.removeWatches[resourceType].Send(resourceName)
}

func (c *testXDSV2Client) close() {}
Expand All @@ -184,11 +184,19 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) {
v2Client := <-v2ClientCh

clusterUpdateCh := testutils.NewChannel()
firstTime := true
c.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
// Calls another watch inline, to ensure there's deadlock.
c.WatchCluster("another-random-name", func(ClusterUpdate, error) {})
if _, err := v2Client.addWatches[cdsURL].Receive(); firstTime && err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}
firstTime = false
})
if _, err := v2Client.addWatches[cdsURL].Receive(); err != nil {
t.Fatalf("want new watch to start, got error %v", err)
}

wantUpdate := ClusterUpdate{ServiceName: testEDSName}
v2Client.r.newCDSUpdate(map[string]ClusterUpdate{
Expand Down
37 changes: 30 additions & 7 deletions xds/internal/client/client_watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,32 +75,43 @@ func (wi *watchInfo) newUpdate(update interface{}) {
wi.c.scheduleCallback(wi, update, nil)
}

func (wi *watchInfo) resourceNotFound() {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled {
return
}
wi.state = watchInfoStateRespReceived
wi.expiryTimer.Stop()
wi.sendErrorLocked(NewErrorf(ErrorTypeResourceNotFound, "xds: %s target %s not found in received response", wi.typeURL, wi.target))
}

func (wi *watchInfo) timeout() {
wi.mu.Lock()
defer wi.mu.Unlock()
if wi.state == watchInfoStateCanceled || wi.state == watchInfoStateRespReceived {
return
}
wi.state = watchInfoStateTimeout
wi.sendErrorLocked(fmt.Errorf("xds: %s target %s not found, watcher timeout", wi.typeURL, wi.target))
}

// Caller must hold wi.mu.
func (wi *watchInfo) sendErrorLocked(err error) {
var (
u interface{}
t string
)
switch wi.typeURL {
case ldsURL:
u = ldsUpdate{}
t = "LDS"
case rdsURL:
u = rdsUpdate{}
t = "RDS"
case cdsURL:
u = ClusterUpdate{}
t = "CDS"
case edsURL:
u = EndpointsUpdate{}
t = "EDS"
}
wi.c.scheduleCallback(wi, u, fmt.Errorf("xds: %s target %s not found, watcher timeout", t, wi.target))
wi.c.scheduleCallback(wi, u, err)
}

func (wi *watchInfo) cancel() {
Expand Down Expand Up @@ -185,7 +196,19 @@ func (c *Client) watch(wi *watchInfo) (cancel func()) {
// watching this resource.
delete(watchers, resourceName)
c.v2c.removeWatch(wi.typeURL, resourceName)
// TODO: remove item from cache.
// Remove the resource from cache. When a watch for this
// resource is added later, it will trigger a xDS request with
// resource names, and client will receive new xDS responses.
switch wi.typeURL {
case ldsURL:
delete(c.ldsCache, resourceName)
case rdsURL:
delete(c.rdsCache, resourceName)
case cdsURL:
delete(c.cdsCache, resourceName)
case edsURL:
delete(c.edsCache, resourceName)
}
}
}
}
Expand Down
Loading