diff --git a/lib/backend/report.go b/lib/backend/report.go index b229dac8a5b07..fa4bf2531903d 100644 --- a/lib/backend/report.go +++ b/lib/backend/report.go @@ -24,6 +24,7 @@ import ( "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/types" apiutils "github.com/gravitational/teleport/api/utils" + "github.com/gravitational/teleport/lib/observability/tracing" "github.com/gravitational/teleport/lib/utils" "github.com/gravitational/trace" @@ -31,6 +32,8 @@ import ( "github.com/jonboulle/clockwork" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + oteltrace "go.opentelemetry.io/otel/trace" ) const reporterDefaultCacheSize = 1000 @@ -45,6 +48,8 @@ type ReporterConfig struct { // metric. Higher value means higher memory usage but fewer infrequent // requests forgotten. TopRequestsCount int + // Tracer is used to create spans + Tracer oteltrace.Tracer } // CheckAndSetDefaults checks and sets @@ -58,6 +63,9 @@ func (r *ReporterConfig) CheckAndSetDefaults() error { if r.TopRequestsCount == 0 { r.TopRequestsCount = reporterDefaultCacheSize } + if r.Tracer == nil { + r.Tracer = tracing.NoopTracer(teleport.ComponentBackend) + } return nil } @@ -108,6 +116,17 @@ func NewReporter(cfg ReporterConfig) (*Reporter, error) { // GetRange returns query range func (s *Reporter) GetRange(ctx context.Context, startKey []byte, endKey []byte, limit int) (*GetResult, error) { + ctx, span := s.Tracer.Start( + ctx, + "backend/GetRange", + oteltrace.WithAttributes( + attribute.Int("limit", limit), + attribute.String("start_key", string(startKey)), + attribute.String("end_key", string(endKey)), + ), + ) + defer span.End() + start := s.Clock().Now() res, err := s.Backend.GetRange(ctx, startKey, endKey, limit) batchReadLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds()) @@ -121,6 +140,15 @@ func (s *Reporter) GetRange(ctx context.Context, startKey []byte, endKey []byte, // Create creates item if it does not exist func (s *Reporter) Create(ctx context.Context, i Item) (*Lease, error) { + ctx, span := s.Tracer.Start( + ctx, + "backend/Create", + oteltrace.WithAttributes( + attribute.String("key", string(i.Key)), + ), + ) + defer span.End() + start := s.Clock().Now() lease, err := s.Backend.Create(ctx, i) writeLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds()) @@ -135,6 +163,15 @@ func (s *Reporter) Create(ctx context.Context, i Item) (*Lease, error) { // Put puts value into backend (creates if it does not // exists, updates it otherwise) func (s *Reporter) Put(ctx context.Context, i Item) (*Lease, error) { + ctx, span := s.Tracer.Start( + ctx, + "backend/Put", + oteltrace.WithAttributes( + attribute.String("key", string(i.Key)), + ), + ) + defer span.End() + start := s.Clock().Now() lease, err := s.Backend.Put(ctx, i) writeLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds()) @@ -148,6 +185,15 @@ func (s *Reporter) Put(ctx context.Context, i Item) (*Lease, error) { // Update updates value in the backend func (s *Reporter) Update(ctx context.Context, i Item) (*Lease, error) { + ctx, span := s.Tracer.Start( + ctx, + "backend/Update", + oteltrace.WithAttributes( + attribute.String("key", string(i.Key)), + ), + ) + defer span.End() + start := s.Clock().Now() lease, err := s.Backend.Update(ctx, i) writeLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds()) @@ -161,6 +207,15 @@ func (s *Reporter) Update(ctx context.Context, i Item) (*Lease, error) { // Get returns a single item or not found error func (s *Reporter) Get(ctx context.Context, key []byte) (*Item, error) { + ctx, span := s.Tracer.Start( + ctx, + "backend/Get", + oteltrace.WithAttributes( + attribute.String("key", string(key)), + ), + ) + defer span.End() + start := s.Clock().Now() readLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds()) readRequests.WithLabelValues(s.Component).Inc() @@ -175,6 +230,15 @@ func (s *Reporter) Get(ctx context.Context, key []byte) (*Item, error) { // CompareAndSwap compares item with existing item // and replaces is with replaceWith item func (s *Reporter) CompareAndSwap(ctx context.Context, expected Item, replaceWith Item) (*Lease, error) { + ctx, span := s.Tracer.Start( + ctx, + "backend/CompareAndSwap", + oteltrace.WithAttributes( + attribute.String("key", string(expected.Key)), + ), + ) + defer span.End() + start := s.Clock().Now() lease, err := s.Backend.CompareAndSwap(ctx, expected, replaceWith) writeLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds()) @@ -188,6 +252,15 @@ func (s *Reporter) CompareAndSwap(ctx context.Context, expected Item, replaceWit // Delete deletes item by key func (s *Reporter) Delete(ctx context.Context, key []byte) error { + ctx, span := s.Tracer.Start( + ctx, + "backend/Delete", + oteltrace.WithAttributes( + attribute.String("key", string(key)), + ), + ) + defer span.End() + start := s.Clock().Now() err := s.Backend.Delete(ctx, key) writeLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds()) @@ -201,6 +274,16 @@ func (s *Reporter) Delete(ctx context.Context, key []byte) error { // DeleteRange deletes range of items func (s *Reporter) DeleteRange(ctx context.Context, startKey []byte, endKey []byte) error { + ctx, span := s.Tracer.Start( + ctx, + "backend/DeleteRange", + oteltrace.WithAttributes( + attribute.String("start_key", string(startKey)), + attribute.String("end_key", string(endKey)), + ), + ) + defer span.End() + start := s.Clock().Now() err := s.Backend.DeleteRange(ctx, startKey, endKey) batchWriteLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds()) @@ -217,6 +300,16 @@ func (s *Reporter) DeleteRange(ctx context.Context, startKey []byte, endKey []by // some backends may ignore expires based on the implementation // in case if the lease managed server side func (s *Reporter) KeepAlive(ctx context.Context, lease Lease, expires time.Time) error { + ctx, span := s.Tracer.Start( + ctx, + "backend/KeepAlive", + oteltrace.WithAttributes( + attribute.Int64("lease", lease.ID), + attribute.String("key", string(lease.Key)), + ), + ) + defer span.End() + start := s.Clock().Now() err := s.Backend.KeepAlive(ctx, lease, expires) writeLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds()) @@ -230,6 +323,15 @@ func (s *Reporter) KeepAlive(ctx context.Context, lease Lease, expires time.Time // NewWatcher returns a new event watcher func (s *Reporter) NewWatcher(ctx context.Context, watch Watch) (Watcher, error) { + ctx, span := s.Tracer.Start( + ctx, + "backend/NewWatcher", + oteltrace.WithAttributes( + attribute.String("name", watch.Name), + ), + ) + defer span.End() + w, err := s.Backend.NewWatcher(ctx, watch) if err != nil { return nil, trace.Wrap(err) diff --git a/lib/observability/tracing/tracing.go b/lib/observability/tracing/tracing.go index 3fc63b789c587..25e45cc4f1cb9 100644 --- a/lib/observability/tracing/tracing.go +++ b/lib/observability/tracing/tracing.go @@ -283,6 +283,11 @@ func NoopProvider() *Provider { )} } +// NoopTracer creates a new Tracer that never samples any spans. +func NoopTracer(instrumentationName string) oteltrace.Tracer { + return NoopProvider().Tracer(instrumentationName) +} + // NewTraceProvider creates a new Provider that is configured per the provided Config. func NewTraceProvider(ctx context.Context, cfg Config) (*Provider, error) { if err := cfg.CheckAndSetDefaults(); err != nil { diff --git a/lib/service/service.go b/lib/service/service.go index 6f793a2b9698d..b794f936867ed 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -1620,6 +1620,7 @@ func (process *TeleportProcess) newAccessCache(cfg accessCacheConfig) (*cache.Ca reporter, err := backend.NewReporter(backend.ReporterConfig{ Component: teleport.ComponentCache, Backend: mem, + Tracer: process.TracingProvider.Tracer(teleport.ComponentCache), }) if err != nil { return nil, trace.Wrap(err) @@ -3848,6 +3849,7 @@ func (process *TeleportProcess) initAuthStorage() (bk backend.Backend, err error reporter, err := backend.NewReporter(backend.ReporterConfig{ Component: teleport.ComponentBackend, Backend: backend.NewSanitizer(bk), + Tracer: process.TracingProvider.Tracer(teleport.ComponentBackend), }) if err != nil { return nil, trace.Wrap(err)