Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
pkg/envoy/ads: add error codes
Browse files Browse the repository at this point in the history
Adds error codes for errors related to Envoy ADS.

Part of #2866

Signed-off-by: jaellio <[email protected]>
  • Loading branch information
jaellio committed Jul 4, 2021
1 parent ead5c9e commit c41708d
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 22 deletions.
4 changes: 3 additions & 1 deletion pkg/envoy/ads/cache_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/openservicemesh/osm/pkg/certificate"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/envoy"
"github.com/openservicemesh/osm/pkg/errcode"
"github.com/openservicemesh/osm/pkg/k8s/events"
)

Expand All @@ -33,7 +34,8 @@ func (s *Server) allPodUpdater() {
for _, pod := range allpods {
proxy, err := GetProxyFromPod(pod)
if err != nil {
log.Error().Err(err).Msgf("Could not get proxy from pod %s/%s", pod.Namespace, pod.Name)
log.Error().Err(err).Str(errcode.Kind, errcode.ErrGettingProxyFromPod.String()).
Msgf("Could not get proxy from pod %s/%s", pod.Namespace, pod.Name)
continue
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/envoy/ads/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/openservicemesh/osm/pkg/envoy"
"github.com/openservicemesh/osm/pkg/envoy/registry"
"github.com/openservicemesh/osm/pkg/errcode"
)

func receive(requests chan xds_discovery.DiscoveryRequest, server *xds_discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer, proxy *envoy.Proxy, quit chan struct{}, proxyRegistry *registry.ProxyRegistry) {
Expand All @@ -23,7 +24,8 @@ func receive(requests chan xds_discovery.DiscoveryRequest, server *xds_discovery
log.Debug().Err(recvErr).Msgf("[grpc] Connection terminated")
return
}
log.Error().Err(recvErr).Msgf("[grpc] Connection error")
log.Error().Err(recvErr).Str(errcode.Kind, errcode.ErrGRPCConnectionFailed.String()).
Msgf("[grpc] Connection error")
return
}
log.Trace().Msgf("[grpc] Received DiscoveryRequest from Envoy with certificate SerialNumber %s", proxy.GetCertificateSerialNumber())
Expand Down
13 changes: 9 additions & 4 deletions pkg/envoy/ads/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/openservicemesh/osm/pkg/configurator"
"github.com/openservicemesh/osm/pkg/envoy"
"github.com/openservicemesh/osm/pkg/errcode"
)

// getTypeResource invokes the XDS handler (LDS, CDS etc.) to respond to the XDS request containing the requests' type and associated resources
Expand Down Expand Up @@ -73,7 +74,8 @@ func (s *Server) sendResponse(proxy *envoy.Proxy, server *xds_discovery.Aggregat
// Generate the resources for this request
resources, err := s.getTypeResources(proxy, finalReq)
if err != nil {
log.Error().Err(err).Msgf("Creating %s update for Proxy %s", typeURI.Short(), proxy.GetCertificateCommonName())
log.Error().Err(err).Str(errcode.Kind, errcode.ErrGeneratingReqResource.String()).
Msgf("Error generating response for typeURI: %s, proxy %s", typeURI.Short(), proxy.String())
thereWereErrors = true
continue
}
Expand All @@ -93,7 +95,8 @@ func (s *Server) sendResponse(proxy *envoy.Proxy, server *xds_discovery.Aggregat
if s.cacheEnabled {
// Store the aggregated resources as a full snapshot
if err := s.RecordFullSnapshot(proxy, cacheResourceMap); err != nil {
log.Error().Err(err).Msgf("Failed to record snapshot for proxy %s: %v", proxy.GetCertificateCommonName(), err)
log.Error().Err(err).Str(errcode.Kind, errcode.ErrRecordingSnapshot.String()).
Msgf("Failed to record snapshot for proxy %s: %v", proxy.GetCertificateCommonName(), err)
thereWereErrors = true
}
}
Expand Down Expand Up @@ -122,7 +125,8 @@ func (s *Server) SendDiscoveryResponse(proxy *envoy.Proxy, request *xds_discover
for _, res := range resourcesToSend {
proto, err := ptypes.MarshalAny(res)
if err != nil {
log.Error().Err(err).Msgf("Error marshalling resource %s for proxy %s", typeURI, proxy.GetCertificateSerialNumber())
log.Error().Err(err).Str(errcode.Kind, errcode.ErrMarshallingXDSResource.String()).
Msgf("Error marshalling resource %s for proxy %s", typeURI, proxy.GetCertificateSerialNumber())
continue
}
response.Resources = append(response.Resources, proto)
Expand All @@ -137,7 +141,8 @@ func (s *Server) SendDiscoveryResponse(proxy *envoy.Proxy, request *xds_discover

// Send the response
if err := (*server).Send(response); err != nil {
log.Error().Err(err).Msgf("Error sending response for type %s to proxy %s", typeURI.Short(), proxy.String())
log.Error().Err(err).Str(errcode.Kind, errcode.ErrSendingDiscoveryResponse.String()).
Msgf("Error sending response for type %s to proxy %s", typeURI.Short(), proxy.String())
return err
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/envoy/ads/secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/openservicemesh/osm/pkg/catalog"
"github.com/openservicemesh/osm/pkg/envoy"
"github.com/openservicemesh/osm/pkg/envoy/secrets"
"github.com/openservicemesh/osm/pkg/errcode"
)

// makeRequestForAllSecrets constructs an SDS DiscoveryRequest as if an Envoy proxy sent it.
Expand All @@ -25,7 +26,8 @@ func makeRequestForAllSecrets(proxy *envoy.Proxy, meshCatalog catalog.MeshCatalo
// TODO(draychev): The proxy Certificate should revolve around ServiceIdentity, not specific to ServiceAccount [https://github.com/openservicemesh/osm/issues/3186]
proxyIdentity, err := envoy.GetServiceIdentityFromProxyCertificate(proxy.GetCertificateCommonName())
if err != nil {
log.Error().Err(err).Msgf("Error looking up proxy identity for proxy %s", proxy.String())
log.Error().Err(err).Str(errcode.Kind, errcode.ErrGettingServiceIdentity.String()).
Msgf("Error looking up proxy identity for proxy %s", proxy.String())
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/envoy/ads/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/openservicemesh/osm/pkg/envoy/rds"
"github.com/openservicemesh/osm/pkg/envoy/registry"
"github.com/openservicemesh/osm/pkg/envoy/sds"
"github.com/openservicemesh/osm/pkg/errcode"
"github.com/openservicemesh/osm/pkg/k8s"
"github.com/openservicemesh/osm/pkg/utils"
"github.com/openservicemesh/osm/pkg/workerpool"
Expand Down Expand Up @@ -71,7 +72,8 @@ func (s *Server) withXdsLogMutex(f func()) {
func (s *Server) Start(ctx context.Context, cancel context.CancelFunc, port int, adsCert certificate.Certificater) error {
grpcServer, lis, err := utils.NewGrpc(ServerType, port, adsCert.GetCertificateChain(), adsCert.GetPrivateKey(), adsCert.GetIssuingCA())
if err != nil {
log.Error().Err(err).Msg("Error starting ADS server")
log.Error().Err(err).Str(errcode.Kind, errcode.ErrStartingADSServer.String()).
Msg("Error starting ADS server")
return err
}

Expand Down
23 changes: 15 additions & 8 deletions pkg/envoy/ads/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/openservicemesh/osm/pkg/certificate"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/envoy"
"github.com/openservicemesh/osm/pkg/errcode"
"github.com/openservicemesh/osm/pkg/identity"
"github.com/openservicemesh/osm/pkg/k8s/events"
"github.com/openservicemesh/osm/pkg/metricsstore"
Expand Down Expand Up @@ -43,7 +44,8 @@ func (s *Server) StreamAggregatedResources(server xds_discovery.AggregatedDiscov
// When this arrives we will call RegisterProxy() a second time - this time with Pod context!
proxy, err := envoy.NewProxy(certCommonName, certSerialNumber, utils.GetIPFromContext(server.Context()))
if err != nil {
log.Error().Err(err).Msgf("Error initializing proxy with certificate SerialNumber=%s", certSerialNumber)
log.Error().Err(err).Str(errcode.Kind, errcode.ErrInitializingProxy.String()).
Msgf("Error initializing proxy with certificate SerialNumber=%s", certSerialNumber)
return err
}

Expand Down Expand Up @@ -97,7 +99,8 @@ func (s *Server) StreamAggregatedResources(server xds_discovery.AggregatedDiscov

case discoveryRequest, ok := <-requests:
if !ok {
log.Error().Msgf("gRPC stream closed by proxy %s!", proxy.String())
log.Error().Str(errcode.Kind, errcode.ErrGRPCStreamClosedByProxy.String()).
Msgf("gRPC stream closed by proxy %s!", proxy.String())
metricsstore.DefaultMetricsStore.ProxyConnectCount.Dec()
return errGrpcClosed
}
Expand Down Expand Up @@ -148,8 +151,9 @@ func shouldPushUpdate(proxy *envoy.Proxy) bool {
// In ADS, CDS and LDS will come first in all cases. Only allow an control-plane-push update push if
// we have sent either to the proxy already.
if proxy.GetLastSentNonce(envoy.TypeLDS) == "" && proxy.GetLastSentNonce(envoy.TypeCDS) == "" {
log.Error().Msgf("Proxy %s: LDS and CDS unrequested yet, waiting for first request for this proxy to be responded to",
proxy.String())
log.Error().Str(errcode.Kind, errcode.ErrUnexpectedXDSRequest.String()).
Msgf("Proxy %s: LDS and CDS unrequested yet, waiting for first request for this proxy to be responded to",
proxy.String())
return false
}
return true
Expand Down Expand Up @@ -181,8 +185,9 @@ func respondToRequest(proxy *envoy.Proxy, discoveryRequest *xds_discovery.Discov
// Parse TypeURL of the request
typeURL, ok := envoy.ValidURI[discoveryRequest.TypeUrl]
if !ok {
log.Error().Msgf("Proxy %s: Unknown/Unsupported URI: %s",
proxy.String(), discoveryRequest.TypeUrl)
log.Error().Str(errcode.Kind, errcode.ErrInvalidXDSTypeURI.String()).
Msgf("Proxy %s: Unknown/Unsupported URI: %s",
proxy.String(), discoveryRequest.TypeUrl)
return false
}

Expand All @@ -195,7 +200,8 @@ func respondToRequest(proxy *envoy.Proxy, discoveryRequest *xds_discovery.Discov
// Parse ACK'd verion on the proxy for this given resource
requestVersion, err = parseRequestVersion(discoveryRequest)
if err != nil {
log.Error().Err(err).Msgf("Proxy %s: Error parsing version %s for type %s", proxy.String(), discoveryRequest.VersionInfo, typeURL)
log.Error().Err(err).Str(errcode.Kind, errcode.ErrParsingDiscoveryReqVersion.String()).
Msgf("Proxy %s: Error parsing version %s for type %s", proxy.String(), discoveryRequest.VersionInfo, typeURL)
return false
}

Expand Down Expand Up @@ -348,7 +354,8 @@ func (s *Server) recordPodMetadata(p *envoy.Proxy) error {
}

if certSA.ToK8sServiceAccount() != p.PodMetadata.ServiceAccount {
log.Error().Msgf("Service Account referenced in NodeID (%s) does not match Service Account in Certificate (%s). This proxy is not allowed to join the mesh.", p.PodMetadata.ServiceAccount, certSA)
log.Error().Str(errcode.Kind, errcode.ErrMismatchedServiceAccount.String()).
Msgf("Service Account referenced in NodeID (%s) does not match Service Account in Certificate (%s). This proxy is not allowed to join the mesh.", p.PodMetadata.ServiceAccount, certSA)
return errServiceAccountMismatch
}

Expand Down
17 changes: 11 additions & 6 deletions pkg/envoy/xdsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/openservicemesh/osm/pkg/certificate"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/envoy/secrets"
"github.com/openservicemesh/osm/pkg/errcode"
"github.com/openservicemesh/osm/pkg/identity"
"github.com/openservicemesh/osm/pkg/k8s"
"github.com/openservicemesh/osm/pkg/service"
Expand Down Expand Up @@ -84,7 +85,8 @@ func GetTLSParams() *xds_auth.TlsParameters {
func GetAccessLog() []*xds_accesslog_filter.AccessLog {
accessLog, err := ptypes.MarshalAny(getStdoutAccessLog())
if err != nil {
log.Error().Err(err).Msg("Error marshalling AccessLog object")
log.Error().Err(err).Str(errcode.Kind, errcode.ErrMarshallingXDSResource.String()).
Msgf("Error marshalling AccessLog object")
return nil
}
return []*xds_accesslog_filter.AccessLog{{
Expand Down Expand Up @@ -317,7 +319,8 @@ func getCertificateCommonNameMeta(cn certificate.CommonName) (*certificateCommon
}
proxyUUID, err := uuid.Parse(chunks[0])
if err != nil {
log.Error().Err(err).Msgf("Error parsing %s into uuid.UUID", chunks[0])
log.Error().Err(err).Str(errcode.Kind, errcode.ErrParsingXDSCertCN.String()).
Msgf("Error parsing %s into uuid.UUID", chunks[0])
return nil, err
}

Expand Down Expand Up @@ -348,8 +351,9 @@ func GetPodFromCertificate(cn certificate.CommonName, kubecontroller k8s.Control
}

if len(pods) == 0 {
log.Error().Msgf("Did not find Pod with label %s = %s in namespace %s",
constants.EnvoyUniqueIDLabelName, cnMeta.ProxyUUID, cnMeta.ServiceIdentity.ToK8sServiceAccount().Namespace)
log.Error().Str(errcode.Kind, errcode.ErrFetchingPodFromCert.String()).
Msgf("Did not find Pod with label %s = %s in namespace %s",
constants.EnvoyUniqueIDLabelName, cnMeta.ProxyUUID, cnMeta.ServiceIdentity.ToK8sServiceAccount().Namespace)
return nil, ErrDidNotFindPodForCertificate
}

Expand All @@ -358,8 +362,9 @@ func GetPodFromCertificate(cn certificate.CommonName, kubecontroller k8s.Control
// This is a limitation we set in place in order to make the mesh easy to understand and reason about.
// When a pod belongs to more than one service XDS will not program the Envoy proxy, leaving it out of the mesh.
if len(pods) > 1 {
log.Error().Msgf("Found more than one pod with label %s = %s in namespace %s. There can be only one!",
constants.EnvoyUniqueIDLabelName, cnMeta.ProxyUUID, cnMeta.ServiceIdentity.ToK8sServiceAccount().Namespace)
log.Error().Str(errcode.Kind, errcode.ErrPodBelongsToMultipleServices.String()).
Msgf("Found more than one pod with label %s = %s in namespace %s. There can be only one!",
constants.EnvoyUniqueIDLabelName, cnMeta.ProxyUUID, cnMeta.ServiceIdentity.ToK8sServiceAccount().Namespace)
return nil, ErrMoreThanOnePodForCertificate
}

Expand Down
Loading

0 comments on commit c41708d

Please sign in to comment.