Skip to content

Commit

Permalink
internal/xds: convert xDS resources for the envoy xDS server
Browse files Browse the repository at this point in the history
Add a snapshotter interface to paper over the go-control-plane API
bifurcation. Use this to populate v3 and v2 resources in the snapshot
cache.

This updates #1898.

Signed-off-by: James Peach <[email protected]>
  • Loading branch information
jpeach committed Nov 2, 2020
1 parent 59c9e43 commit a6ba9f7
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 65 deletions.
28 changes: 16 additions & 12 deletions cmd/contour/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"syscall"
"time"

envoy_api_v2_auth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
envoy_cache_v2 "github.com/envoyproxy/go-control-plane/pkg/cache/v2"
envoy_auth_v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
envoy_server_v2 "github.com/envoyproxy/go-control-plane/pkg/server/v2"
envoy_server_v3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
contour_api_v1 "github.com/projectcontour/contour/apis/projectcontour/v1"
"github.com/projectcontour/contour/internal/annotation"
"github.com/projectcontour/contour/internal/contour"
Expand Down Expand Up @@ -353,7 +353,7 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
// that are explicitly using 1.1 to continue working by default. However, the
// *default* minimum TLS version for proxies/ingresses that don't specify it
// is 1.2, set in the DAG processors.
globalMinTLSVersion := annotation.MinTLSVersion(ctx.Config.TLS.MinimumProtocolVersion, envoy_api_v2_auth.TlsParameters_TLSv1_1)
globalMinTLSVersion := annotation.MinTLSVersion(ctx.Config.TLS.MinimumProtocolVersion, envoy_auth_v2.TlsParameters_TLSv1_1)

listenerConfig := xdscache_v2.ListenerConfig{
UseProxyProto: ctx.useProxyProto,
Expand Down Expand Up @@ -388,13 +388,8 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
endpointHandler,
}

// snapshotCache is used to store the state of what all xDS services should
// contain at any given point in time.
snapshotCache := envoy_cache_v2.NewSnapshotCache(false, xds.DefaultHash,
log.WithField("context", "xDS"))

// snapshotHandler is used to produce new snapshots when the internal state changes for any xDS resource.
snapshotHandler := xdscache.NewSnapshotHandler(snapshotCache, resources, log.WithField("context", "snapshotHandler"))
snapshotHandler := xdscache.NewSnapshotHandler(resources, log.WithField("context", "snapshotHandler"))

// register observer for endpoints updates.
endpointHandler.Observer = contour.ComposeObservers(snapshotHandler)
Expand Down Expand Up @@ -652,17 +647,26 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {

switch ctx.Config.Server.XDSServerType {
case config.EnvoyServerType:
contour_xds_v2.RegisterServer(envoy_server_v2.NewServer(context.Background(), snapshotCache, nil), grpcServer)
v3cache := contour_xds_v3.NewSnapshotCache(false, log)
snapshotHandler.AddSnapshotter(v3cache)
contour_xds_v3.RegisterServer(envoy_server_v3.NewServer(context.Background(), v3cache, nil), grpcServer)

// Check an internal feature flag to disable xDS v2 endpoints. This is strictly for testing.
if config.GetenvOr("CONTOUR_INTERNAL_DISABLE_XDSV2", "N") == "N" {
v2cache := contour_xds_v2.NewSnapshotCache(false, log)
snapshotHandler.AddSnapshotter(v2cache)
contour_xds_v2.RegisterServer(envoy_server_v2.NewServer(context.Background(), v2cache, nil), grpcServer)
}
case config.ContourServerType:
contour_xds_v3.RegisterServer(contour_xds_v3.NewContourServer(log, xdscache.ResourcesOf(resources)...), grpcServer)

// Check an internal feature flag to disable xDS v2 endpoints. This is strictly for testing.
if config.GetenvOr("CONTOUR_INTERNAL_DISABLE_XDSV2", "N") != "N" {
if config.GetenvOr("CONTOUR_INTERNAL_DISABLE_XDSV2", "N") == "N" {
contour_xds_v2.RegisterServer(contour_xds_v2.NewContourServer(log, xdscache.ResourcesOf(resources)...), grpcServer)
}
default:
// This can't happen due to config validation.
log.Fatalf("invalid xdsServerType %q configured", ctx.Config.Server.XDSServerType)
log.Fatalf("invalid xDS server type %q", ctx.Config.Server.XDSServerType)
}

addr := net.JoinHostPort(ctx.xdsAddr, strconv.Itoa(ctx.xdsPort))
Expand Down
14 changes: 14 additions & 0 deletions internal/protobuf/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package protobuf

import (
"fmt"
"reflect"
"time"

"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
Expand Down Expand Up @@ -101,3 +103,15 @@ func AnyMessageTypeOf(msg proto.Message) string {
a := MustMarshalAny(msg)
return a.TypeUrl
}

/// MustMarshalJSON marshals msg to indented JSON.
func MustMarshalJSON(msg proto.Message) string {
m := jsonpb.Marshaler{Indent: " "}

str, err := m.MarshalToString(msg)
if err != nil {
panic(fmt.Sprintf("failed to marshal %T: %s", msg, err))
}

return str
}
29 changes: 19 additions & 10 deletions internal/xds/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,32 @@
package xds

import (
envoy_api_v2_core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
cache "github.com/envoyproxy/go-control-plane/pkg/cache/v2"
envoy_api_core_v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoy_config_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
)

// ConstantHash is a specialized node ID hasher used to allow
const CONSTANT_HASH_VALUE = "contour"

// ConstantHashV2 is a specialized node ID hasher used to allow
// any instance of Envoy to connect to Contour regardless of the
// service-node flag configured on Envoy.
type ConstantHash string
type ConstantHashV2 struct{}

func (c ConstantHash) ID(*envoy_api_v2_core.Node) string {
return string(c)
func (c ConstantHashV2) ID(*envoy_api_core_v2.Node) string {
return CONSTANT_HASH_VALUE
}

func (c ConstantHash) String() string {
return string(c)
func (c ConstantHashV2) String() string {
return CONSTANT_HASH_VALUE
}

var _ cache.NodeHash = ConstantHash("")
// ConstantHashV3 is the same as ConstantHashV2 but for xDS v3.
type ConstantHashV3 struct{}

func (c ConstantHashV3) ID(*envoy_config_v3.Node) string {
return CONSTANT_HASH_VALUE
}

var DefaultHash = ConstantHash("contour")
func (c ConstantHashV3) String() string {
return CONSTANT_HASH_VALUE
}
55 changes: 55 additions & 0 deletions internal/xds/v2/snapshotter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright Project Contour Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v2

import (
envoy_types "github.com/envoyproxy/go-control-plane/pkg/cache/types"
envoy_cache_v2 "github.com/envoyproxy/go-control-plane/pkg/cache/v2"
envoy_log "github.com/envoyproxy/go-control-plane/pkg/log"
"github.com/projectcontour/contour/internal/xds"
"github.com/projectcontour/contour/internal/xdscache"
)

var Hash = xds.ConstantHashV2{}

// Snapshotter is a v2 Snapshot cache that implements the xds.Snapshotter interface.
type Snapshotter interface {
xdscache.Snapshotter
envoy_cache_v2.SnapshotCache
}

type snapshotter struct {
envoy_cache_v2.SnapshotCache
}

func (s *snapshotter) Generate(version string, resources map[envoy_types.ResponseType][]envoy_types.Resource) error {
// Create a snapshot with all xDS resources.
snapshot := envoy_cache_v2.NewSnapshot(
version,
resources[envoy_types.Endpoint],
resources[envoy_types.Cluster],
resources[envoy_types.Route],
resources[envoy_types.Listener],
nil,
resources[envoy_types.Secret],
)

return s.SetSnapshot(Hash.String(), snapshot)
}

func NewSnapshotCache(ads bool, logger envoy_log.Logger) Snapshotter {
return &snapshotter{
SnapshotCache: envoy_cache_v2.NewSnapshotCache(ads, &Hash, logger),
}
}
77 changes: 77 additions & 0 deletions internal/xds/v3/snapshotter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright Project Contour Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v3

import (
envoy_types "github.com/envoyproxy/go-control-plane/pkg/cache/types"
envoy_cache_v2 "github.com/envoyproxy/go-control-plane/pkg/cache/v2"
envoy_cache_v3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
envoy_log "github.com/envoyproxy/go-control-plane/pkg/log"
"github.com/golang/protobuf/proto"
"github.com/projectcontour/contour/internal/xds"
"github.com/projectcontour/contour/internal/xdscache"
)

var Hash = xds.ConstantHashV3{}

// Snapshotter is a v3 Snapshot cache that implements the xds.Snapshotter interface.
type Snapshotter interface {
xdscache.Snapshotter
envoy_cache_v3.SnapshotCache
}

type snapshotter struct {
envoy_cache_v3.SnapshotCache
}

func (s *snapshotter) Generate(version string, resources map[envoy_types.ResponseType][]envoy_types.Resource) error {
// Create a snapshot with all xDS resources.
snapshot := envoy_cache_v3.Snapshot{}

snapshot.Resources[envoy_types.Endpoint] = rewriteResources(version, resources[envoy_types.Endpoint])
snapshot.Resources[envoy_types.Cluster] = rewriteResources(version, resources[envoy_types.Cluster])
snapshot.Resources[envoy_types.Route] = rewriteResources(version, resources[envoy_types.Route])
snapshot.Resources[envoy_types.Listener] = rewriteResources(version, resources[envoy_types.Listener])
snapshot.Resources[envoy_types.Secret] = rewriteResources(version, resources[envoy_types.Secret])

return s.SetSnapshot(Hash.String(), snapshot)
}

func NewSnapshotCache(ads bool, logger envoy_log.Logger) Snapshotter {
return &snapshotter{
SnapshotCache: envoy_cache_v3.NewSnapshotCache(ads, &Hash, logger),
}
}

func rewriteResources(version string, items []envoy_types.Resource) envoy_cache_v3.Resources {
// Since we are using the xDS v2 types internally, create
// the resources with the v2 package so that it indexes them
// by name using the correct v2 type switch.
v2 := envoy_cache_v2.NewResources(version, rewrite(items))

return envoy_cache_v3.Resources{
Version: v2.Version,
Items: v2.Items,
}
}

func rewrite(resources []envoy_types.Resource) []envoy_types.Resource {
rewritten := make([]envoy_types.Resource, len(resources))

for i, r := range resources {
rewritten[i] = xds.Rewrite(proto.Clone(r))
}

return rewritten
}
Loading

0 comments on commit a6ba9f7

Please sign in to comment.