Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
PodMetadata: Use kubecontroller to gather PodMetadata
Browse files Browse the repository at this point in the history
Moves using Proxy UUID, and instead uses available kubecontroller
interface to gather information about the pod for a given proxy.

This is needed to be able to reuse the Node ID for snapshot cache.

Signed-off-by: Eduard Serra [email protected]
  • Loading branch information
eduser25 committed Jun 1, 2021
1 parent ae17465 commit c46db6f
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 44 deletions.
2 changes: 1 addition & 1 deletion cmd/osm-controller/osm-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func main() {
}

// Create and start the ADS gRPC service
xdsServer := ads.NewADSServer(meshCatalog, proxyRegistry, cfg.IsDebugServerEnabled(), osmNamespace, cfg, certManager)
xdsServer := ads.NewADSServer(meshCatalog, proxyRegistry, cfg.IsDebugServerEnabled(), osmNamespace, cfg, certManager, kubernetesClient)
if err := xdsServer.Start(ctx, cancel, constants.ADSServerPort, adsCert); err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error initializing ADS server")
}
Expand Down
39 changes: 0 additions & 39 deletions pkg/envoy/ads/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,46 +26,7 @@ func receive(requests chan xds_discovery.DiscoveryRequest, server *xds_discovery
log.Error().Err(recvErr).Msgf("[grpc] Connection error")
return
}
if !proxy.HasPodMetadata() {
// Set the Pod metadata on the given proxy only once. This could arrive with the first few XDS requests.
if err := recordEnvoyPodMetadata(request, proxy, proxyRegistry); err != nil {
log.Err(err).Msgf("Error recording Pod metadata")
// this terminates the gRPC stream
return
}
}
log.Trace().Msgf("[grpc] Received DiscoveryRequest from Envoy with certificate SerialNumber %s", proxy.GetCertificateSerialNumber())
requests <- *request
}
}

func recordEnvoyPodMetadata(request *xds_discovery.DiscoveryRequest, proxy *envoy.Proxy, proxyRegistry *registry.ProxyRegistry) error {
if request != nil && request.Node != nil {
if meta, err := envoy.ParseEnvoyServiceNodeID(request.Node.Id); err != nil {
log.Error().Err(err).Msgf("Error parsing Envoy Node ID: %s", request.Node.Id)
} else {
log.Trace().Msgf("Recorded metadata for Envoy with xDS Certificate SerialNumber=%s: podUID=%s, podNamespace=%s, serviceAccountName=%s, envoyNodeID=%s",
proxy.GetCertificateSerialNumber(), meta.UID, meta.Namespace, meta.ServiceAccount, meta.EnvoyNodeID)

// Verify that the ServiceAccount from the NodeID is the same as the one in the mTLS cert's CN
cn := proxy.GetCertificateCommonName()
certSA, err := envoy.GetServiceAccountFromProxyCertificate(cn)
if err != nil {
log.Err(err).Msgf("Error getting service account from XDS certificate with CommonName=%s", cn)
return err
}

if certSA != meta.ServiceAccount {
log.Error().Msgf("Service Account referenced in NodeID (%s) does not match Service Account in Certificate (%s). This proxy is not allowed to join the mesh.", meta.ServiceAccount, certSA)
return errServiceAccountMismatch
}

// Set the Pod Metadata, which will be used in the RegisterProxy() invocation below!
proxy.PodMetadata = meta

// We call RegisterProxy again, for a second time, on the ProxyRegistry to update the index on pod metadata
proxyRegistry.RegisterProxy(proxy) // Second of Two invocations. First one was on establishing the gRPC stream.
}
}
return nil
}
7 changes: 5 additions & 2 deletions pkg/envoy/ads/response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/openservicemesh/osm/pkg/auth"
configFake "github.com/openservicemesh/osm/pkg/gen/client/config/clientset/versioned/fake"
"github.com/openservicemesh/osm/pkg/kubernetes"

"github.com/openservicemesh/osm/pkg/catalog"
"github.com/openservicemesh/osm/pkg/certificate"
Expand Down Expand Up @@ -122,6 +123,7 @@ var _ = Describe("Test ADS response functions", func() {
certPEM, _ := certManager.IssueCertificate(certCommonName, certDuration)
cert, _ := certificate.DecodePEMCertificate(certPEM.GetCertificateChain())
server, actualResponses := tests.NewFakeXDSServer(cert, nil, nil)
kubectrlMock := kubernetes.NewMockController(mockCtrl)

mockConfigurator.EXPECT().IsEgressEnabled().Return(false).AnyTimes()
mockConfigurator.EXPECT().IsPrometheusScrapingEnabled().Return(false).AnyTimes()
Expand All @@ -131,7 +133,7 @@ var _ = Describe("Test ADS response functions", func() {
mockConfigurator.EXPECT().IsDebugServerEnabled().Return(true).AnyTimes()

It("returns Aggregated Discovery Service response", func() {
s := NewADSServer(mc, proxyRegistry, true, tests.Namespace, mockConfigurator, mockCertManager)
s := NewADSServer(mc, proxyRegistry, true, tests.Namespace, mockConfigurator, mockCertManager, kubectrlMock)

Expect(s).ToNot(BeNil())

Expand Down Expand Up @@ -202,6 +204,7 @@ var _ = Describe("Test ADS response functions", func() {
certPEM, _ := certManager.IssueCertificate(certCommonName, certDuration)
cert, _ := certificate.DecodePEMCertificate(certPEM.GetCertificateChain())
server, actualResponses := tests.NewFakeXDSServer(cert, nil, nil)
kubectrlMock := kubernetes.NewMockController(mockCtrl)

mockConfigurator.EXPECT().IsEgressEnabled().Return(false).AnyTimes()
mockConfigurator.EXPECT().IsPrometheusScrapingEnabled().Return(false).AnyTimes()
Expand All @@ -214,7 +217,7 @@ var _ = Describe("Test ADS response functions", func() {
}).AnyTimes()

It("returns Aggregated Discovery Service response", func() {
s := NewADSServer(mc, proxyRegistry, true, tests.Namespace, mockConfigurator, mockCertManager)
s := NewADSServer(mc, proxyRegistry, true, tests.Namespace, mockConfigurator, mockCertManager, kubectrlMock)

Expect(s).ToNot(BeNil())

Expand Down
4 changes: 3 additions & 1 deletion pkg/envoy/ads/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/openservicemesh/osm/pkg/envoy/rds"
"github.com/openservicemesh/osm/pkg/envoy/registry"
"github.com/openservicemesh/osm/pkg/envoy/sds"
k8s "github.com/openservicemesh/osm/pkg/kubernetes"
"github.com/openservicemesh/osm/pkg/utils"
"github.com/openservicemesh/osm/pkg/workerpool"
)
Expand All @@ -31,7 +32,7 @@ const (
)

// NewADSServer creates a new Aggregated Discovery Service server
func NewADSServer(meshCatalog catalog.MeshCataloger, proxyRegistry *registry.ProxyRegistry, enableDebug bool, osmNamespace string, cfg configurator.Configurator, certManager certificate.Manager) *Server {
func NewADSServer(meshCatalog catalog.MeshCataloger, proxyRegistry *registry.ProxyRegistry, enableDebug bool, osmNamespace string, cfg configurator.Configurator, certManager certificate.Manager, kubecontroller k8s.Controller) *Server {
server := Server{
catalog: meshCatalog,
proxyRegistry: proxyRegistry,
Expand All @@ -48,6 +49,7 @@ func NewADSServer(meshCatalog catalog.MeshCataloger, proxyRegistry *registry.Pro
xdsMapLogMutex: sync.Mutex{},
xdsLog: make(map[certificate.CommonName]map[envoy.TypeURI][]time.Time),
workqueues: workerpool.NewWorkerPool(workerPoolSize),
kubecontroller: kubecontroller,
}

return &server
Expand Down
55 changes: 54 additions & 1 deletion pkg/envoy/ads/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@ func (s *Server) StreamAggregatedResources(server xds_discovery.AggregatedDiscov
// Details on which Pod this Envoy is fronting will arrive via xDS in the NODE_ID string.
// When this arrives we will call RegisterProxy() a second time - this time with Pod context!
proxy := envoy.NewProxy(certCommonName, certSerialNumber, utils.GetIPFromContext(server.Context()))
s.proxyRegistry.RegisterProxy(proxy) // First of Two invocations. Second one will be during xDS hand-shake!
err = s.recordPodMetadata(proxy)
if err == errServiceAccountMismatch {
// Service Account mismatch
return err
}

s.proxyRegistry.RegisterProxy(proxy)

defer s.proxyRegistry.UnregisterProxy(proxy)

Expand Down Expand Up @@ -293,3 +299,50 @@ func isCNforProxy(proxy *envoy.Proxy, cn certificate.CommonName) bool {
identityForCN := identity.K8sServiceAccount{Name: chunks[0], Namespace: chunks[1]}
return identityForCN == proxyIdentity
}

// recordPodMetadata records pod metadata and verifies the certificate issued for this pod
// is for the same service account as seen on the pod's service account
func (s *Server) recordPodMetadata(p *envoy.Proxy) error {
pod, err := envoy.GetPodFromCertificate(p.GetCertificateCommonName(), s.kubecontroller)
if err != nil {
log.Warn().Msgf("Could not find pod for connecting proxy %s. No metadata was recorded.", p.GetCertificateSerialNumber())
return nil
}

workloadKind := ""
workloadName := ""
for _, ref := range pod.GetOwnerReferences() {
if ref.Controller != nil && *ref.Controller {
workloadKind = ref.Kind
workloadName = ref.Name
break
}
}

p.PodMetadata = &envoy.PodMetadata{
UID: string(pod.UID),
Name: pod.Name,
Namespace: pod.Namespace,
ServiceAccount: identity.K8sServiceAccount{
Namespace: pod.Namespace,
Name: pod.Spec.ServiceAccountName,
},
WorkloadKind: workloadKind,
WorkloadName: workloadName,
}

// Verify Service account matches (cert to pod Service Account)
cn := p.GetCertificateCommonName()
certSA, err := envoy.GetServiceAccountFromProxyCertificate(cn)
if err != nil {
log.Err(err).Msgf("Error getting service account from XDS certificate with CommonName=%s", cn)
return err
}

if certSA != p.PodMetadata.ServiceAccount {
log.Error().Msgf("Service Account referenced in NodeID (%s) does not match Service Account in Certificate (%s). This proxy is not allowed to join the mesh.", p.PodMetadata.ServiceAccount, certSA)
return errServiceAccountMismatch
}

return nil
}
2 changes: 2 additions & 0 deletions pkg/envoy/ads/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/openservicemesh/osm/pkg/configurator"
"github.com/openservicemesh/osm/pkg/envoy"
"github.com/openservicemesh/osm/pkg/envoy/registry"
k8s "github.com/openservicemesh/osm/pkg/kubernetes"
"github.com/openservicemesh/osm/pkg/logger"
"github.com/openservicemesh/osm/pkg/workerpool"
)
Expand All @@ -33,4 +34,5 @@ type Server struct {
certManager certificate.Manager
ready bool
workqueues *workerpool.WorkerPool
kubecontroller k8s.Controller
}

0 comments on commit c46db6f

Please sign in to comment.