Skip to content

Commit

Permalink
[v10] Tag forwarded spans with custom attributes (#15215)
Browse files Browse the repository at this point in the history
Tag forwarded spans with custom attributes (#14706)

* Tag forwarded spans with custom attributes

Adds a `teleport.forwarded.for` attribute to a resource or
all spans that are forwarded to the auth server. This allows
consumers of the spans to identify where the spans are coming
from and take possible action. In some scenarios it may
be desirable to drop forwarded spans along the collection
process, by tagging them we can provide a way for those
consumers to identify them. It also allows for potentially
identifying a malicious user that may be trying to spam the
telemetry backend with spans.

Part of #12241
  • Loading branch information
rosstimothy authored Aug 9, 2022
1 parent 78cb698 commit bdc1a7f
Show file tree
Hide file tree
Showing 8 changed files with 440 additions and 41 deletions.
10 changes: 10 additions & 0 deletions lib/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
saml2 "github.com/russellhaering/gosaml2"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"golang.org/x/crypto/ssh"

"github.com/gravitational/teleport"
Expand All @@ -71,6 +72,7 @@ import (
kubeutils "github.com/gravitational/teleport/lib/kube/utils"
"github.com/gravitational/teleport/lib/limiter"
"github.com/gravitational/teleport/lib/modules"
"github.com/gravitational/teleport/lib/observability/tracing"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/services/local"
"github.com/gravitational/teleport/lib/session"
Expand Down Expand Up @@ -164,6 +166,9 @@ func NewServer(cfg *InitConfig, opts ...ServerOption) (*Server, error) {
if cfg.KeyStoreConfig.HostUUID == "" {
cfg.KeyStoreConfig.HostUUID = cfg.HostUUID
}
if cfg.TraceClient == nil {
cfg.TraceClient = tracing.NewNoopClient()
}

limiter, err := limiter.NewConnectionsLimiter(limiter.Config{
MaxConnections: defaults.LimiterMaxConcurrentSignatures,
Expand Down Expand Up @@ -215,6 +220,7 @@ func NewServer(cfg *InitConfig, opts ...ServerOption) (*Server, error) {
keyStore: keyStore,
getClaimsFun: getClaims,
inventory: inventory.NewController(cfg.Presence),
traceClient: cfg.TraceClient,
}
for _, o := range opts {
if err := o(&as); err != nil {
Expand Down Expand Up @@ -390,6 +396,10 @@ type Server struct {
getClaimsFun func(closeCtx context.Context, oidcClient *oidc.Client, connector types.OIDCConnector, code string) (jose.Claims, error)

inventory *inventory.Controller

// traceClient is used to forward spans to the upstream collector for components
// within the cluster that don't have a direct connection to said collector
traceClient otlptrace.Client
}

func (a *Server) CloseContext() context.Context {
Expand Down
103 changes: 100 additions & 3 deletions lib/auth/auth_with_roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ import (
"context"
"fmt"
"net/url"
"strings"
"time"

"github.com/coreos/go-semver/semver"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/client"
"github.com/gravitational/teleport/api/client/proto"
Expand All @@ -41,9 +40,11 @@ import (
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/teleport/lib/utils"

"github.com/coreos/go-semver/semver"
"github.com/gravitational/trace"

"github.com/sirupsen/logrus"
collectortracev1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
otlpcommonv1 "go.opentelemetry.io/proto/otlp/common/v1"
)

// ServerWithRoles is a wrapper around auth service
Expand Down Expand Up @@ -314,6 +315,102 @@ func (a *ServerWithRoles) filterSessionTracker(ctx context.Context, joinerRoles
return true
}

const (
forwardedTag = "teleport.forwarded.for"
)

// Export forwards OTLP traces to the upstream collector configured in the tracing service. This allows for
// tsh, tctl, etc to be able to export traces without having to know how to connect to the upstream collector
// for the cluster.
//
// All spans received will have a `teleport.forwarded.for` attribute added to them with the value being one of
// two things depending on the role of the forwarder:
// 1) User forwarded: `teleport.forwarded.for: alice`
// 2) Instance forwarded: `teleport.forwarded.for: Proxy.clustername:Proxy,Node,Instance`
//
// This allows upstream consumers of the spans to be able to identify forwarded spans and act on them accordingly.
func (a *ServerWithRoles) Export(ctx context.Context, req *collectortracev1.ExportTraceServiceRequest) (*collectortracev1.ExportTraceServiceResponse, error) {
var sb strings.Builder

sb.WriteString(a.context.User.GetName())

// if forwarded on behalf of a Teleport service add its system roles
if role, ok := a.context.Identity.(BuiltinRole); ok {
sb.WriteRune(':')
sb.WriteString(role.Role.String())
if len(role.AdditionalSystemRoles) > 0 {
sb.WriteRune(',')
sb.WriteString(role.AdditionalSystemRoles.String())
}
}

// the forwarded attribute to add
value := &otlpcommonv1.KeyValue{
Key: forwardedTag,
Value: &otlpcommonv1.AnyValue{
Value: &otlpcommonv1.AnyValue_StringValue{
StringValue: sb.String(),
},
},
}

// returns the index at which the attribute with
// the forwardedTag key exists, -1 if not found
tagIndex := func(attrs []*otlpcommonv1.KeyValue) int {
for i, attr := range attrs {
if attr.Key == forwardedTag {
return i
}
}

return -1
}

for _, resourceSpans := range req.ResourceSpans {
// if there is a resource, tag it with the
// forwarded attribute instead of each of tagging
// each span
if resourceSpans.Resource != nil {
if index := tagIndex(resourceSpans.Resource.Attributes); index != -1 {
resourceSpans.Resource.Attributes[index] = value
} else {
resourceSpans.Resource.Attributes = append(resourceSpans.Resource.Attributes, value)
}

// override any span attributes with a forwarded tag,
// but we don't need to add one if the span isn't already
// tagged since we just tagged the resource
for _, scopeSpans := range resourceSpans.ScopeSpans {
for _, span := range scopeSpans.Spans {
if index := tagIndex(span.Attributes); index != -1 {
span.Attributes[index] = value
}
}
}

continue
}

// there was no resource, so we must now tag all the
// individual spans with the forwarded tag
for _, scopeSpans := range resourceSpans.ScopeSpans {
for _, span := range scopeSpans.Spans {
if index := tagIndex(span.Attributes); index != -1 {
span.Attributes[index] = value
} else {
span.Attributes = append(span.Attributes, value)
}
}
}
}

if err := a.authServer.traceClient.UploadTraces(ctx, req.ResourceSpans); err != nil {
return &collectortracev1.ExportTraceServiceResponse{}, trace.Wrap(err)
}

return &collectortracev1.ExportTraceServiceResponse{}, nil
}

// GetSessionTracker returns the current state of a session tracker for an active session.
func (a *ServerWithRoles) GetSessionTracker(ctx context.Context, sessionID string) (types.SessionTracker, error) {
tracker, err := a.authServer.GetSessionTracker(ctx, sessionID)
Expand Down
20 changes: 7 additions & 13 deletions lib/auth/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
collectortracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -94,9 +93,6 @@ type GRPCServer struct {
APIConfig
server *grpc.Server

// traceClient is used to forward spans to the upstream collector for components
// within the cluster that don't have a direct connection to said collector
traceClient otlptrace.Client
// TraceServiceServer exposes the exporter server so that the auth server may
// collect and forward spans
collectortracepb.TraceServiceServer
Expand All @@ -110,15 +106,16 @@ func (g *GRPCServer) serverContext() context.Context {
// tsh, tctl, etc to be able to export traces without having to know how to connect to the upstream collector
// for the cluster.
func (g *GRPCServer) Export(ctx context.Context, req *collectortracepb.ExportTraceServiceRequest) (*collectortracepb.ExportTraceServiceResponse, error) {
if len(req.ResourceSpans) == 0 {
return &collectortracepb.ExportTraceServiceResponse{}, nil
auth, err := g.authenticate(ctx)
if err != nil {
return nil, trace.Wrap(err)
}

if err := g.traceClient.UploadTraces(ctx, req.ResourceSpans); err != nil {
return &collectortracepb.ExportTraceServiceResponse{}, trace.Wrap(err)
if len(req.ResourceSpans) == 0 {
return &collectortracepb.ExportTraceServiceResponse{}, nil
}

return &collectortracepb.ExportTraceServiceResponse{}, nil
return auth.Export(ctx, req)
}

// GetServer returns an instance of grpc server
Expand Down Expand Up @@ -4164,8 +4161,6 @@ type GRPCServerConfig struct {
// UnaryInterceptor intercepts GRPC streams
// for authentication and rate limiting
StreamInterceptor grpc.StreamServerInterceptor
// TraceClient is used to forward spans to the upstream telemetry collector
TraceClient otlptrace.Client
}

// CheckAndSetDefaults checks and sets default values
Expand Down Expand Up @@ -4218,8 +4213,7 @@ func NewGRPCServer(cfg GRPCServerConfig) (*GRPCServer, error) {
Entry: logrus.WithFields(logrus.Fields{
trace.Component: teleport.Component(teleport.ComponentAuth, teleport.ComponentGRPC),
}),
server: server,
traceClient: cfg.TraceClient,
server: server,
}
proto.RegisterAuthServiceServer(server, authServer)
collectortracepb.RegisterTraceServiceServer(server, authServer)
Expand Down
Loading

0 comments on commit bdc1a7f

Please sign in to comment.