Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
Implement the envoy proxy snapshot cache
Browse files Browse the repository at this point in the history
This alleviates our need to handle the envoy state machine, and know how to respond to envoy requests.
Instead, we can simply populate the values for each proxy (keyed by the NodeID, which is set to the proxy's UUID)
and let the snapshot cache handle all communication with the proxy.

Signed-off-by: Sean Teeling <[email protected]>
  • Loading branch information
steeling authored Sep 7, 2022
1 parent a551827 commit 4d8c632
Show file tree
Hide file tree
Showing 35 changed files with 428 additions and 1,552 deletions.
48 changes: 35 additions & 13 deletions pkg/envoy/ads/cache_callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package ads
import (
"context"
"fmt"
"sync"
"time"

discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"

Expand Down Expand Up @@ -52,14 +54,16 @@ func (s *Server) OnStreamOpen(ctx context.Context, streamID int64, typ string) e
certRotations, unsubRotations := s.certManager.SubscribeRotations(proxy.Identity.String())
defer unsubRotations()

// schedule one update for this proxy initially.
s.scheduleUpdate(proxy)
for {
select {
case <-proxyUpdateChan:
log.Debug().Str("proxy", proxy.String()).Msg("Broadcast update received")
s.update(proxy)
s.scheduleUpdate(proxy)
case <-certRotations:
log.Debug().Str("proxy", proxy.String()).Msg("Certificate has been updated for proxy")
s.update(proxy)
s.scheduleUpdate(proxy)
case <-ctx.Done():
return
}
Expand All @@ -68,15 +72,33 @@ func (s *Server) OnStreamOpen(ctx context.Context, streamID int64, typ string) e
return nil
}

func (s *Server) update(proxy *envoy.Proxy) {
ch := s.workqueues.AddJob(&proxyResponseJob{
proxy: proxy,
xdsServer: s,
typeURIs: envoy.XDSResponseOrder,
done: make(chan struct{}),
})
<-ch
close(ch)
func (s *Server) scheduleUpdate(proxy *envoy.Proxy) {
var wg sync.WaitGroup
wg.Add(1)
s.workqueues.AddJob(
func() {
t := time.Now()
log.Debug().Msgf("Starting update for proxy %s", proxy.String())

if err := s.update(proxy); err != nil {
log.Error().Err(err).Str("proxy", proxy.String()).Msg("Error generating resources for proxy")
}
log.Debug().Msgf("Update for proxy %s took took %v", proxy.String(), time.Since(t))
wg.Done()
})
wg.Wait()
}

func (s *Server) update(proxy *envoy.Proxy) error {
resources, err := s.GenerateResources(proxy)
if err != nil {
return err
}
if err := s.ServeResources(proxy, resources); err != nil {
return err
}
log.Debug().Msgf("successfully updated resources for proxy %s", proxy.String())
return nil
}

// OnStreamClosed is called on stream closed
Expand All @@ -100,8 +122,8 @@ func (s *Server) OnStreamRequest(streamID int64, req *discovery.DiscoveryRequest
}

// OnStreamResponse is called when a response is being sent to a request
func (s *Server) OnStreamResponse(_ context.Context, aa int64, req *discovery.DiscoveryRequest, resp *discovery.DiscoveryResponse) {
log.Debug().Msgf("OnStreamResponse RESP: type: %s, v: %s, nonce: %s, NumResources: %d", resp.TypeUrl, resp.VersionInfo, resp.Nonce, len(resp.Resources))
func (s *Server) OnStreamResponse(_ context.Context, streamID int64, req *discovery.DiscoveryRequest, resp *discovery.DiscoveryResponse) {
log.Debug().Msgf("OnStreamResponse RESP: %d type: %s, v: %s, nonce: %s, NumResources: %d", streamID, resp.TypeUrl, resp.VersionInfo, resp.Nonce, len(resp.Resources))
}

// --- Fetch request types. Callback interfaces still requires these to be defined
Expand Down
16 changes: 11 additions & 5 deletions pkg/envoy/ads/cache_log.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
package ads

import (
"github.com/rs/zerolog"
)

// scLogger implements envoy control plane's log.Logger and delegates calls to the `log` variable defined in
// types.go. It is used for the envoy snapshot cache.
type scLogger struct{}
type scLogger struct {
log zerolog.Logger
}

// Debugf logs a formatted debugging message.
func (l *scLogger) Debugf(format string, args ...interface{}) {
log.Debug().Msgf(format, args...)
l.log.Debug().Msgf(format, args...)
}

// Infof logs a formatted informational message.
func (l *scLogger) Infof(format string, args ...interface{}) {
log.Info().Msgf(format, args...)
l.log.Info().Msgf(format, args...)
}

// Warnf logs a formatted warning message.
func (l *scLogger) Warnf(format string, args ...interface{}) {
log.Warn().Msgf(format, args...)
l.log.Warn().Msgf(format, args...)
}

// Errorf logs a formatted error message.
func (l *scLogger) Errorf(format string, args ...interface{}) {
log.Error().Msgf(format, args...)
l.log.Error().Msgf(format, args...)
}
86 changes: 75 additions & 11 deletions pkg/envoy/ads/cache_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,96 @@ package ads
import (
"context"
"fmt"
"strings"
"time"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/google/uuid"

"github.com/openservicemesh/osm/pkg/certificate"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/envoy"
"github.com/openservicemesh/osm/pkg/errcode"
"github.com/openservicemesh/osm/pkg/identity"
)

// RecordFullSnapshot stores a group of resources as a new Snapshot with a new version in the cache.
// GenerateResources generates and returns the resources for the given proxy.
func (s *Server) GenerateResources(proxy *envoy.Proxy) (map[string][]types.Resource, error) {
cacheResourceMap := map[string][]types.Resource{}
for _, typeURI := range envoy.XDSResponseOrder {
log.Trace().Str("proxy", proxy.String()).Msgf("Getting resources for type %s", typeURI.Short())

handler, ok := s.xdsHandlers[typeURI]
if !ok {
return nil, errUnknownTypeURL
}

if s.catalog.GetMeshConfig().Spec.Observability.EnableDebugServer {
s.trackXDSLog(proxy.UUID.String(), typeURI)
}

startedAt := time.Now()
resources, err := handler(s.catalog, proxy, s.certManager, s.proxyRegistry)
xdsPathTimeTrack(startedAt, typeURI, proxy, err == nil)
if err != nil {
log.Error().Err(err).Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrGeneratingReqResource)).Str("proxy", proxy.String()).
Msgf("Error generating response for typeURI: %s", typeURI.Short())
xdsPathTimeTrack(time.Now(), envoy.TypeADS, proxy, false)
return nil, err
}

cacheResourceMap[typeURI.String()] = resources
}

xdsPathTimeTrack(time.Now(), envoy.TypeADS, proxy, true)
return cacheResourceMap, nil
}

// ServeResources stores a group of resources as a new Snapshot with a new version in the cache.
// It also runs a consistency check on the snapshot (will warn if there are missing resources referenced in
// the snapshot)
func (s *Server) RecordFullSnapshot(proxy *envoy.Proxy, snapshotResources map[string][]types.Resource) error {
snapshot, err := cache.NewSnapshot(
fmt.Sprintf("%d", s.configVersion[proxy.UUID.String()]),
snapshotResources,
)
func (s *Server) ServeResources(proxy *envoy.Proxy, snapshotResources map[string][]types.Resource) error {
uuid := proxy.UUID.String()

s.configVerMutex.Lock()
s.configVersion[uuid]++
configVersion := s.configVersion[uuid]
s.configVerMutex.Unlock()

snapshot, err := cache.NewSnapshot(fmt.Sprintf("%d", configVersion), snapshotResources)
if err != nil {
return err
}

if err := snapshot.Consistent(); err != nil {
log.Warn().Err(err).Str("proxy", proxy.String()).Msgf("Snapshot for proxy not consistent")
return err
}

s.configVerMutex.Lock()
defer s.configVerMutex.Unlock()
s.configVersion[proxy.UUID.String()]++
return s.snapshotCache.SetSnapshot(context.TODO(), uuid, snapshot)
}

func getCertificateCommonNameMeta(cn certificate.CommonName) (envoy.ProxyKind, uuid.UUID, identity.ServiceIdentity, error) {
// XDS cert CN is of the form <proxy-UUID>.<kind>.<proxy-identity>.<trust-domain>
chunks := strings.SplitN(cn.String(), constants.DomainDelimiter, 5)
if len(chunks) < 4 {
return "", uuid.UUID{}, "", errInvalidCertificateCN
}
proxyUUID, err := uuid.Parse(chunks[0])
if err != nil {
log.Error().Err(err).Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrParsingXDSCertCN)).
Msgf("Error parsing %s into uuid.UUID", chunks[0])
return "", uuid.UUID{}, "", err
}

switch {
case chunks[1] == "":
return "", uuid.UUID{}, "", errInvalidCertificateCN
case chunks[2] == "":
return "", uuid.UUID{}, "", errInvalidCertificateCN
case chunks[3] == "":
return "", uuid.UUID{}, "", errInvalidCertificateCN
}

return s.snapshotCache.SetSnapshot(context.TODO(), proxy.UUID.String(), snapshot)
return envoy.ProxyKind(chunks[1]), proxyUUID, identity.New(chunks[2], chunks[3]), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"testing"

xds_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/google/uuid"
tassert "github.com/stretchr/testify/assert"

Expand All @@ -13,36 +12,6 @@ import (
"github.com/openservicemesh/osm/pkg/identity"
)

func findSliceElem(slice []string, elem string) bool {
for _, v := range slice {
if v == elem {
return true
}
}
return false
}

func TestMapsetToSliceConvFunctions(t *testing.T) {
assert := tassert.New(t)

discRequest := &xds_discovery.DiscoveryRequest{TypeUrl: "TestTypeurl"}
discRequest.ResourceNames = []string{"A", "B", "C"}

nameSet := getRequestedResourceNamesSet(discRequest)

assert.True(nameSet.Contains("A"))
assert.True(nameSet.Contains("B"))
assert.True(nameSet.Contains("C"))
assert.False(nameSet.Contains("D"))

nameSlice := getResourceSliceFromMapset(nameSet)

assert.True(findSliceElem(nameSlice, "A"))
assert.True(findSliceElem(nameSlice, "B"))
assert.True(findSliceElem(nameSlice, "C"))
assert.False(findSliceElem(nameSlice, "D"))
}

func TestGetCertificateCommonNameMeta(t *testing.T) {
testCases := []struct {
name string
Expand Down
1 change: 0 additions & 1 deletion pkg/envoy/ads/cache_test.go

This file was deleted.

2 changes: 0 additions & 2 deletions pkg/envoy/ads/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package ads
import "fmt"

var errUnknownTypeURL = fmt.Errorf("unknown TypeUrl")
var errCreatingResponse = fmt.Errorf("creating response")
var errGrpcClosed = fmt.Errorf("grpc closed")
var errTooManyConnections = fmt.Errorf("too many connections")
var errUnsuportedXDSRequest = fmt.Errorf("Unsupported XDS server connection type")
var errInvalidCertificateCN = fmt.Errorf("invalid cn")
32 changes: 0 additions & 32 deletions pkg/envoy/ads/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,16 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net"
"sync"
"time"

xds_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"

"github.com/openservicemesh/osm/pkg/certificate"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/envoy"
"github.com/openservicemesh/osm/pkg/errcode"
"github.com/openservicemesh/osm/pkg/k8s/events"
)

Expand Down Expand Up @@ -187,28 +180,3 @@ func (s *GRPCServer) GrpcServe(ctx context.Context, cancel context.CancelFunc, l
}()
return nil
}

func receive(requests chan *xds_discovery.DiscoveryRequest, server *xds_discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer, proxy *envoy.Proxy, quit chan struct{}) {
for {
var request *xds_discovery.DiscoveryRequest
request, recvErr := (*server).Recv()
if recvErr != nil {
defer close(requests)
if status.Code(recvErr) == codes.Canceled || recvErr == io.EOF {
log.Debug().Err(recvErr).Str("proxy", proxy.String()).Msg("gRPC Connection terminated")
return
}
log.Error().Err(recvErr).Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrGRPCConnectionFailed)).
Str("proxy", proxy.String()).Msg("gRPC Connection error")
return
}
select {
case <-(*server).Context().Done():
log.Trace().Str("proxy", proxy.String()).Msgf("gRPC stream from proxy terminated")
close(quit)
return
case requests <- request:
}
log.Debug().Str("proxy", proxy.String()).Msgf("Received DiscoveryRequest from proxy")
}
}
41 changes: 0 additions & 41 deletions pkg/envoy/ads/jobs.go

This file was deleted.

Loading

0 comments on commit 4d8c632

Please sign in to comment.