From 415b2cab178205c5a0c79c18fb43ff29a3e7ad8b Mon Sep 17 00:00:00 2001 From: Purnesh Dixit Date: Tue, 28 Jan 2025 00:07:56 +0530 Subject: [PATCH] xds: add lrs client and xDS client interfaces --- xds/internal/clients/lrsclient/client.go | 45 +++++++++ xds/internal/clients/lrsclient/config.go | 34 +++++++ xds/internal/clients/lrsclient/load_store.go | 30 ++++++ xds/internal/clients/xdsclient/client.go | 72 ++++++++++++++ xds/internal/clients/xdsclient/config.go | 93 +++++++++++++++++++ .../clients/xdsclient/resource_type.go | 90 ++++++++++++++++++ .../clients/xdsclient/resource_watcher.go | 79 ++++++++++++++++ 7 files changed, 443 insertions(+) create mode 100644 xds/internal/clients/lrsclient/client.go create mode 100644 xds/internal/clients/lrsclient/config.go create mode 100644 xds/internal/clients/lrsclient/load_store.go create mode 100644 xds/internal/clients/xdsclient/client.go create mode 100644 xds/internal/clients/xdsclient/config.go create mode 100644 xds/internal/clients/xdsclient/resource_type.go create mode 100644 xds/internal/clients/xdsclient/resource_watcher.go diff --git a/xds/internal/clients/lrsclient/client.go b/xds/internal/clients/lrsclient/client.go new file mode 100644 index 000000000000..d16c8c3b5225 --- /dev/null +++ b/xds/internal/clients/lrsclient/client.go @@ -0,0 +1,45 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package lrsclient provides implementation of the LRS client for enabling +// applications to report load to the xDS management servers. +// +// It allows applications to report load data to an LRS server via the LRS +// stream. This data can be used for monitoring, traffic management, and other +// purposes. +package lrsclient + +import ( + "google.golang.org/grpc/xds/internal/clients" +) + +// LRSClient is a full fledged LRS client. +type LRSClient struct { +} + +// ReportLoad starts a load reporting stream to the given server. All load +// reports to the same server share the LRS stream. +// +// It returns a [LoadStore] for the user to report loads, a function to +// cancel the load reporting stream. +// +// The stats from [LoadStore] are reported periodically until cleanup +// function is called. +func (c *LRSClient) ReportLoad(_ *clients.ServerConfig) (*LoadStore, func()) { + return NewLoadStore(), func() {} +} diff --git a/xds/internal/clients/lrsclient/config.go b/xds/internal/clients/lrsclient/config.go new file mode 100644 index 000000000000..80a8326b6490 --- /dev/null +++ b/xds/internal/clients/lrsclient/config.go @@ -0,0 +1,34 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package lrsclient + +import ( + "google.golang.org/grpc/xds/internal/clients" +) + +// Config provides parameters for configuring the LRS client. +type Config struct { + // Node is the identity of the client application, reporting load to the + // xDS management server. + Node clients.Node + + // TransportBuilder is the implementation to create a communication channel + // to an xDS management server. + TransportBuilder clients.TransportBuilder +} diff --git a/xds/internal/clients/lrsclient/load_store.go b/xds/internal/clients/lrsclient/load_store.go new file mode 100644 index 000000000000..2ff7f981d965 --- /dev/null +++ b/xds/internal/clients/lrsclient/load_store.go @@ -0,0 +1,30 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package lrsclient + +// LoadStore keeps the loads for multiple clusters and services to be reported via +// LRS. It contains loads to report to one LRS server. Create multiple stores +// for multiple servers. +type LoadStore struct { +} + +// NewLoadStore creates a new load store. +func NewLoadStore() *LoadStore { + return &LoadStore{} +} diff --git a/xds/internal/clients/xdsclient/client.go b/xds/internal/clients/xdsclient/client.go new file mode 100644 index 000000000000..0502f13987b2 --- /dev/null +++ b/xds/internal/clients/xdsclient/client.go @@ -0,0 +1,72 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package xdsclient provides implementation of the xDS client for enabling +// applications to communicate with xDS management servers. +// +// It allows applications to: +// - Create xDS client instance with in-memory configurations. +// - Register watches for named resources. +// - Receive resources via the ADS (Aggregated Discovery Service) stream. +// +// This enables applications to dynamically discover and configure resources +// such as listeners, routes, clusters, and endpoints from an xDS management +// server. +package xdsclient + +import ( + "errors" + + v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3" +) + +// XDSClient is a full fledged client which queries a set of discovery APIs +// (collectively termed as xDS) on a remote management server, to discover +// various dynamic resources. +type XDSClient struct { +} + +// New returns a new xDS Client configured with provided config. +func New(_ Config) (*XDSClient, error) { + return nil, errors.New("xds: xDS client is not yet implemented") +} + +// WatchResource uses xDS to discover the resource associated with the provided +// resource name. The resource type url look up the resource type +// implementation which determines how xDS responses are received, are +// deserialized and validated. Upon receipt of a response from the management +// server, an appropriate callback on the watcher is invoked. +// +// During a race (e.g. an xDS response is received while the user is calling +// cancel()), there's a small window where the callback can be called after +// the watcher is canceled. Callers need to handle this case. +func (c *XDSClient) WatchResource(_ string, _ string, _ ResourceWatcher) (cancel func()) { + return nil +} + +// Close closes the xDS client and releases all resources. The caller is +// expected to invoke it once they are done using the client. +func (c *XDSClient) Close() error { + return nil +} + +// DumpResources returns the status and contents of all xDS resources from the +// xDS client. +func (c *XDSClient) DumpResources() *v3statuspb.ClientStatusResponse { + return nil +} diff --git a/xds/internal/clients/xdsclient/config.go b/xds/internal/clients/xdsclient/config.go new file mode 100644 index 000000000000..0159485c5a64 --- /dev/null +++ b/xds/internal/clients/xdsclient/config.go @@ -0,0 +1,93 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsclient + +import ( + "time" + + "google.golang.org/grpc/internal/backoff" + "google.golang.org/grpc/xds/internal/clients" +) + +const ( + defaultWatchExpiryTimeout = 15 * time.Second +) + +var ( + defaultStreamBackoffFunc = backoff.DefaultExponential.Backoff +) + +// Config provides parameters for configuring the xDS client. +type Config struct { + // Servers specifies a list of xDS servers to connect to. This field should + // be used only for old-style names without an authority. + Servers []clients.ServerConfig + + // Authorities is a map of authority names to authority configurations. + // Each authority configuration contains list of xDS servers to connect to, + // including fallbacks. + Authorities map[string]clients.Authority + + // Node is the identity of the xDS client, connecting to the xDS + // management server. + Node clients.Node + + // TransportBuilder is the implementation to create a communication channel + // to an xDS management server. + TransportBuilder clients.TransportBuilder + + // ResourceTypes is a map from resource type URLs to resource type + // implementations. Each resource type URL uniquely identifies a specific + // kind of xDS resource, and the corresponding resource type implementation + // provides logic for parsing, validating, and processing resources of that + // type. + ResourceTypes map[string]ResourceType + + // Below values will have default values but can be overridden for testing + + // watchExpiryTimeOut is the duration after which a watch for a resource + // will expire if no updates are received. + watchExpiryTimeOut time.Duration + + // streamBackOffTimeout is a function that returns the backoff duration for + // retrying failed xDS streams. + streamBackOffTimeout func(int) time.Duration +} + +// NewConfig returns a new xDS client config with provided parameters. +func NewConfig(servers []clients.ServerConfig, authorities map[string]clients.Authority, node clients.Node, transport clients.TransportBuilder, resourceTypes map[string]ResourceType) Config { + c := Config{Servers: servers, Authorities: authorities, Node: node, TransportBuilder: transport, ResourceTypes: resourceTypes} + c.watchExpiryTimeOut = defaultWatchExpiryTimeout + c.streamBackOffTimeout = defaultStreamBackoffFunc + return c +} + +// SetWatchExpiryTimeoutForTesting sets the watch expiry timeout. +// +// For testing purpose only. +func (c *Config) SetWatchExpiryTimeoutForTesting(d time.Duration) { + c.watchExpiryTimeOut = d +} + +// SetStreamBackOffTimeoutForTesting sets the stream backoff timeout function +// +// For testing purpose only. +func (c *Config) SetStreamBackOffTimeoutForTesting(d func(int) time.Duration) { + c.streamBackOffTimeout = d +} diff --git a/xds/internal/clients/xdsclient/resource_type.go b/xds/internal/clients/xdsclient/resource_type.go new file mode 100644 index 000000000000..c349f3c39bee --- /dev/null +++ b/xds/internal/clients/xdsclient/resource_type.go @@ -0,0 +1,90 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsclient + +import ( + "google.golang.org/grpc/xds/internal/clients" + "google.golang.org/protobuf/types/known/anypb" +) + +// ResourceType wraps all resource-type specific functionality. Each supported +// resource type will provide an implementation of this interface. +type ResourceType interface { + // TypeURL is the xDS type URL of this resource type for the v3 xDS + // protocol. This URL is used as the key to look up the corresponding + // ResourceType implementation in the ResourceTypes map provided in the + // [Config]. + TypeURL() string + + // TypeName identifies resources in a transport protocol agnostic way. This + // can be used for logging/debugging purposes, as well in cases where the + // resource type name is to be uniquely identified but the actual + // functionality provided by the resource type is not required. + TypeName() string + + // AllResourcesRequiredInSotW indicates whether this resource type requires + // that all resources be present in every SotW response from the server. If + // true, a response that does not include a previously seen resource will + // be interpreted as a deletion of that resource. + AllResourcesRequiredInSotW() bool + + // Decode deserializes and validates an xDS resource serialized inside the + // provided `Any` proto, as received from the xDS management server. + // + // If protobuf deserialization fails or resource validation fails, + // returns a non-nil error. Otherwise, returns a fully populated + // DecodeResult. + Decode(DecodeOptions, any) (*DecodeResult, error) +} + +// DecodeOptions wraps the options required by ResourceType implementation for +// decoding configuration received from the xDS management server. +type DecodeOptions struct { + // Config contains the complete configuration passed to the xDS client. + // This contains useful data for resource validation. + Config *Config + + // ServerConfig contains the server config (from the above bootstrap + // configuration) of the xDS server from which the current resource, for + // which Decode() is being invoked, was received. + ServerConfig *clients.ServerConfig +} + +// DecodeResult is the result of a decode operation. +type DecodeResult struct { + // Name is the name of the resource being watched. + Name string + + // Resource contains the configuration associated with the resource being + // watched. + Resource ResourceData +} + +// ResourceData contains the configuration data sent by the xDS management +// server, associated with the resource being watched. Every resource type must +// provide an implementation of this interface to represent the configuration +// received from the xDS management server. +type ResourceData interface { + // RawEqual returns true if the passed in resource data is equal to that of + // the receiver, based on the underlying raw protobuf message. + RawEqual(ResourceData) bool + + // Raw returns the underlying raw protobuf form of the resource. + Raw() *anypb.Any +} diff --git a/xds/internal/clients/xdsclient/resource_watcher.go b/xds/internal/clients/xdsclient/resource_watcher.go new file mode 100644 index 000000000000..54f226b7edd4 --- /dev/null +++ b/xds/internal/clients/xdsclient/resource_watcher.go @@ -0,0 +1,79 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsclient + +// OnUpdateProcessed is a function to be invoked by watcher implementations +// upon completing the processing of a callback from the xDS client. Failure to +// invoke this callback prevents the xDS client from reading further messages +// from the xDS server. +type OnUpdateProcessed func() + +// ResourceDataOrError is a struct that contains either ResourceData or error. +// It is used to represent the result of an xDS resource update. Exactly one of +// Data or Err will be non-nil. +type ResourceDataOrError struct { + Data ResourceData + Err error +} + +// ResourceWatcher wraps the callbacks to be invoked for different events +// corresponding to the resource being watched. +type ResourceWatcher interface { + // OnResourceChanged is invoked to notify the watcher of a new version of + // the resource received from the xDS server or an error indicating the + // reason why the resource could not be obtained. + // + // In the former case, this callback will be invoked with a non-nil + // ResourceData in ResourceDataOrError. The ResourceData of the + // ResourceDataOrError needs to be type asserted to the appropriate type + // for the resource being watched. + // + // In the latter case, this callback will be invoked with a non-nil error + // value in ResourceDataOrError. + // + // Watcher is expected to use the most recent value passed to + // OnResourceChanged(), regardless of whether that's a ResourceData or an + // error i.e., if the watcher is given an error via OnResourceChanged(), + // that means it should stop using any previously delivered resource. + // + // It is invoked under different error conditions including but not + // limited to the following: + // - authority mentioned in the resource is not found + // - resource name parsing error + // - resource validation error (if resource is not cached) + // - ADS stream failure (if resource is not cached) + // - connection failure (if resource is not cached) + OnResourceChanged(ResourceDataOrError, OnUpdateProcessed) + + // OnAmbientError is invoked to notify the watcher of an error that occurs + // after a resource has been received (i.e. we already have a cached + // resource) that should not modify the watcher’s use of that resource but + // that may be useful information about the ambient state of the XdsClient. + // In particular, the watcher should NOT stop using the previously seen + // resource, and the XdsClient will NOT remove the resource from its cache. + // However, the error message may be useful as additional context to + // include in errors that are being generated for other reasons. + // + // If resource is already cached, it is invoked under different error + // conditions including but not limited to the following: + // - resource validation error + // - ADS stream failure + // - connection failure + OnAmbientError(error, OnUpdateProcessed) +}