Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Passthrough Response type #300

Merged
merged 14 commits into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/golang/protobuf v1.3.2
github.com/google/go-cmp v0.2.0
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
github.com/stretchr/testify v1.5.1
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55
google.golang.org/grpc v1.25.1
)
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20200313221541-5f7e5dd04533 h1:8wZizuKuZVu5COB7EsBYxBQz8nRcXXn5d4Gt91eJLvU=
github.com/cncf/udpa/go v0.0.0-20200313221541-5f7e5dd04533/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
Expand All @@ -16,8 +18,13 @@ github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCbXsO+nc9n1hCxHcGA3Zx3Eo+UHZoInFGUIXNM=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down Expand Up @@ -51,5 +58,8 @@ google.golang.org/grpc v1.23.0 h1:AzbTB6ux+okLTzP8Ru1Xs41C303zdcfEht7MQnYJt5A=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1 h1:wdKvqQk7IttEw92GoRyKG2IDrUIpgpj6H6m81yfeMW0=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
85 changes: 79 additions & 6 deletions pkg/cache/v2/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

discovery "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/golang/protobuf/ptypes/any"
)

// Request is an alias for the discovery request type.
Expand All @@ -40,32 +41,104 @@ type ConfigWatcher interface {
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
CreateWatch(Request) (value chan Response, cancel func())
CreateWatch(Request) (value chan ResponseIface, cancel func())
jyotimahapatra marked this conversation as resolved.
Show resolved Hide resolved
}

// Cache is a generic config cache with a watcher.
type Cache interface {
ConfigWatcher

// Fetch implements the polling method of the config cache using a non-empty request.
Fetch(context.Context, Request) (*Response, error)
Fetch(context.Context, Request) (ResponseIface, error)
}

type ResponseIface interface {
// Get the Constructed DiscoveryResponse
GetDiscoveryResponse() (*discovery.DiscoveryResponse, error)

// Get te original Request for the Response.
GetRequest() *discovery.DiscoveryRequest

// Get the version in the Response.
GetVersion() string
}

// Response is a pre-serialized xDS response.
type Response struct {
ResponseIface
// Request is the original request.
Request discovery.DiscoveryRequest

// Version of the resources as tracked by the cache for the given type.
// Proxy responds with this version as an acknowledgement.
Version string

// The value indicating whether the resource is marshaled, and only one of `Resources` and `MarshaledResources` is available.
ResourceMarshaled bool

// Resources to be included in the response.
Resources []types.Resource

// The value indicating whether the resource is marshaled, and only one of `Resources` and `MarshaledResources` is available.
isResourceMarshaled bool

// Marshaled Resources to be included in the response.
MarshaledResources []types.MarshaledResource
marshaledResponse *discovery.DiscoveryResponse
}

// PassthroughResponse is a pre constructed xDS response that need not go through marshalling transformations.
type PassthroughResponse struct {
ResponseIface
// Request is the original request.
Request discovery.DiscoveryRequest

// The discovery response that needs to be sent as is, without any marshalling transformations.
Response *discovery.DiscoveryResponse
}

// GetDiscoveryResponse performs the marshalling the first time its called and uses the cached response subsequently.
// This is necessary because the marshalled response does not change across the calls.
// This caching behavior is important in high throughput scenarios because grpc marshalling has a cost and it drives the cpu utilization under load.
func (r Response) GetDiscoveryResponse() (*discovery.DiscoveryResponse, error) {
if r.isResourceMarshaled {
return r.marshaledResponse, nil
}

marshaledResources := make([]*any.Any, len(r.Resources))
jyotimahapatra marked this conversation as resolved.
Show resolved Hide resolved

for i, resource := range r.Resources {
marshaledResource, err := MarshalResource(resource)
if err != nil {
return nil, err
}
marshaledResources[i] = &any.Any{
TypeUrl: r.Request.TypeUrl,
Value: marshaledResource,
}
}

r.isResourceMarshaled = true

return &discovery.DiscoveryResponse{
VersionInfo: r.Version,
Resources: marshaledResources,
TypeUrl: r.Request.TypeUrl,
}, nil
}

func (r Response) GetRequest() *discovery.DiscoveryRequest {
return &r.Request
}

func (r Response) GetVersion() string {
return r.Version
}

func (r PassthroughResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, error) {
return r.Response, nil
}

func (r PassthroughResponse) GetRequest() *discovery.DiscoveryRequest {
return &r.Request
}

func (r PassthroughResponse) GetVersion() string {
return r.Response.VersionInfo
}
63 changes: 63 additions & 0 deletions pkg/cache/v2/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package cache_test

import (
"testing"

discovery "github.com/envoyproxy/go-control-plane/envoy/api/v2"
route "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v2"
"github.com/envoyproxy/go-control-plane/pkg/resource/v2"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"github.com/stretchr/testify/assert"
)

const (
resourceName = "route1"
)

func TestResponseGetDiscoveryResponse(t *testing.T) {
routes := []types.Resource{&route.RouteConfiguration{Name: resourceName}}
resp := cache.Response{
Request: discovery.DiscoveryRequest{TypeUrl: resource.RouteType},
Version: "v",
Resources: routes,
}

discoveryResponse, err := resp.GetDiscoveryResponse()
assert.Nil(t, err)
assert.Equal(t, discoveryResponse.VersionInfo, resp.Version)
assert.Equal(t, len(discoveryResponse.Resources), 1)

r := &route.RouteConfiguration{}
err = ptypes.UnmarshalAny(discoveryResponse.Resources[0], r)
assert.Nil(t, err)
assert.Equal(t, r.Name, resourceName)
}

func TestPassthroughResponseGetDiscoveryResponse(t *testing.T) {
routes := []types.Resource{&route.RouteConfiguration{Name: resourceName}}
rsrc, err := ptypes.MarshalAny(routes[0])
assert.Nil(t, err)
dr := &discovery.DiscoveryResponse{
TypeUrl: resource.RouteType,
Resources: []*any.Any{rsrc},
VersionInfo: "v",
}
resp := cache.PassthroughResponse{
Request: discovery.DiscoveryRequest{TypeUrl: resource.RouteType},
Response: dr,
}

discoveryResponse, err := resp.GetDiscoveryResponse()
assert.Nil(t, err)
assert.Equal(t, discoveryResponse.VersionInfo, resp.Response.VersionInfo)
assert.Equal(t, len(discoveryResponse.Resources), 1)

r := &route.RouteConfiguration{}
err = ptypes.UnmarshalAny(discoveryResponse.Resources[0], r)
assert.Nil(t, err)
assert.Equal(t, r.Name, resourceName)
assert.Equal(t, discoveryResponse, dr)
}
12 changes: 6 additions & 6 deletions pkg/cache/v2/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func superset(names map[string]bool, resources map[string]types.Resource) error
}

// CreateWatch returns a watch for an xDS request.
func (cache *snapshotCache) CreateWatch(request Request) (chan Response, func()) {
func (cache *snapshotCache) CreateWatch(request Request) (chan ResponseIface, func()) {
nodeID := cache.hash.ID(request.Node)

cache.mu.Lock()
Expand All @@ -190,7 +190,7 @@ func (cache *snapshotCache) CreateWatch(request Request) (chan Response, func())
info.mu.Unlock()

// allocate capacity 1 to allow one-time non-blocking use
value := make(chan Response, 1)
value := make(chan ResponseIface, 1)

snapshot, exists := cache.snapshots[nodeID]
version := snapshot.GetVersion(request.TypeUrl)
Expand Down Expand Up @@ -234,7 +234,7 @@ func (cache *snapshotCache) cancelWatch(nodeID string, watchID int64) func() {

// Respond to a watch with the snapshot value. The value channel should have capacity not to block.
// TODO(kuat) do not respond always, see issue https://github.com/envoyproxy/go-control-plane/issues/46
func (cache *snapshotCache) respond(request Request, value chan Response, resources map[string]types.Resource, version string) {
func (cache *snapshotCache) respond(request Request, value chan ResponseIface, resources map[string]types.Resource, version string) {
// for ADS, the request names must match the snapshot names
// if they do not, then the watch is never responded, and it is expected that envoy makes another request
if len(request.ResourceNames) != 0 && cache.ads {
Expand All @@ -253,7 +253,7 @@ func (cache *snapshotCache) respond(request Request, value chan Response, resour
value <- createResponse(request, resources, version)
}

func createResponse(request Request, resources map[string]types.Resource, version string) Response {
func createResponse(request Request, resources map[string]types.Resource, version string) ResponseIface {
filtered := make([]types.Resource, 0, len(resources))

// Reply only with the requested resources. Envoy may ask each resource
Expand Down Expand Up @@ -281,7 +281,7 @@ func createResponse(request Request, resources map[string]types.Resource, versio

// Fetch implements the cache fetch function.
// Fetch is called on multiple streams, so responding to individual names with the same version works.
func (cache *snapshotCache) Fetch(ctx context.Context, request Request) (*Response, error) {
func (cache *snapshotCache) Fetch(ctx context.Context, request Request) (ResponseIface, error) {
nodeID := cache.hash.ID(request.Node)

cache.mu.RLock()
Expand All @@ -300,7 +300,7 @@ func (cache *snapshotCache) Fetch(ctx context.Context, request Request) (*Respon

resources := snapshot.GetResources(request.TypeUrl)
out := createResponse(request, resources, version)
return &out, nil
return out, nil
}

return nil, fmt.Errorf("missing snapshot for %q", nodeID)
Expand Down
34 changes: 17 additions & 17 deletions pkg/cache/v2/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ var (
[]types.Resource{testRuntime})

names = map[string][]string{
rsrc.EndpointType: []string{clusterName},
rsrc.EndpointType: {clusterName},
rsrc.ClusterType: nil,
rsrc.RouteType: []string{routeName},
rsrc.RouteType: {routeName},
rsrc.ListenerType: nil,
rsrc.RuntimeType: nil,
}
Expand Down Expand Up @@ -112,11 +112,11 @@ func TestSnapshotCache(t *testing.T) {
value, _ := c.CreateWatch(discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]})
select {
case out := <-value:
if out.Version != version {
t.Errorf("got version %q, want %q", out.Version, version)
if out.GetVersion() != version {
t.Errorf("got version %q, want %q", out.GetVersion(), version)
}
if !reflect.DeepEqual(cache.IndexResourcesByName(out.Resources), snapshot.GetResources(typ)) {
t.Errorf("get resources %v, want %v", out.Resources, snapshot.GetResources(typ))
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(cache.Response).Resources), snapshot.GetResources(typ)) {
t.Errorf("get resources %v, want %v", out.(cache.Response).Resources, snapshot.GetResources(typ))
}
case <-time.After(time.Second):
t.Fatal("failed to receive snapshot response")
Expand All @@ -137,8 +137,8 @@ func TestSnapshotCacheFetch(t *testing.T) {
if err != nil || resp == nil {
t.Fatal("unexpected error or null response")
}
if resp.Version != version {
t.Errorf("got version %q, want %q", resp.Version, version)
if resp.GetVersion() != version {
t.Errorf("got version %q, want %q", resp.GetVersion(), version)
}
})
}
Expand All @@ -158,7 +158,7 @@ func TestSnapshotCacheFetch(t *testing.T) {

func TestSnapshotCacheWatch(t *testing.T) {
c := cache.NewSnapshotCache(true, group{}, logger{t: t})
watches := make(map[string]chan cache.Response)
watches := make(map[string]chan cache.ResponseIface)
for _, typ := range testTypes {
watches[typ], _ = c.CreateWatch(discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]})
}
Expand All @@ -169,11 +169,11 @@ func TestSnapshotCacheWatch(t *testing.T) {
t.Run(typ, func(t *testing.T) {
select {
case out := <-watches[typ]:
if out.Version != version {
t.Errorf("got version %q, want %q", out.Version, version)
if out.GetVersion() != version {
t.Errorf("got version %q, want %q", out.GetVersion(), version)
}
if !reflect.DeepEqual(cache.IndexResourcesByName(out.Resources), snapshot.GetResources(typ)) {
t.Errorf("get resources %v, want %v", out.Resources, snapshot.GetResources(typ))
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(cache.Response).Resources), snapshot.GetResources(typ)) {
t.Errorf("get resources %v, want %v", out.(cache.Response).Resources, snapshot.GetResources(typ))
}
case <-time.After(time.Second):
t.Fatal("failed to receive snapshot response")
Expand Down Expand Up @@ -202,11 +202,11 @@ func TestSnapshotCacheWatch(t *testing.T) {
// validate response for endpoints
select {
case out := <-watches[rsrc.EndpointType]:
if out.Version != version2 {
t.Errorf("got version %q, want %q", out.Version, version2)
if out.GetVersion() != version2 {
t.Errorf("got version %q, want %q", out.GetVersion(), version2)
}
if !reflect.DeepEqual(cache.IndexResourcesByName(out.Resources), snapshot2.Resources[types.Endpoint].Items) {
t.Errorf("get resources %v, want %v", out.Resources, snapshot2.Resources[types.Endpoint].Items)
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(cache.Response).Resources), snapshot2.Resources[types.Endpoint].Items) {
t.Errorf("get resources %v, want %v", out.(cache.Response).Resources, snapshot2.Resources[types.Endpoint].Items)
}
case <-time.After(time.Second):
t.Fatal("failed to receive snapshot response")
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/v2/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type ResponseWatch struct {
Request Request

// Response is the channel to push response to.
Response chan Response
Response chan ResponseIface
}

// newStatusInfo initializes a status info data structure.
Expand Down
Loading