Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v8] Manually instrument backend.Backend #15446

Merged
merged 3 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions lib/backend/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import (
"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/types"
apiutils "github.com/gravitational/teleport/api/utils"
"github.com/gravitational/teleport/lib/observability/tracing"
"github.com/gravitational/teleport/lib/utils"

"github.com/gravitational/trace"
lru "github.com/hashicorp/golang-lru"
"github.com/jonboulle/clockwork"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
oteltrace "go.opentelemetry.io/otel/trace"
)

const reporterDefaultCacheSize = 1000
Expand All @@ -45,6 +48,8 @@ type ReporterConfig struct {
// metric. Higher value means higher memory usage but fewer infrequent
// requests forgotten.
TopRequestsCount int
// Tracer is used to create spans
Tracer oteltrace.Tracer
}

// CheckAndSetDefaults checks and sets
Expand All @@ -58,6 +63,9 @@ func (r *ReporterConfig) CheckAndSetDefaults() error {
if r.TopRequestsCount == 0 {
r.TopRequestsCount = reporterDefaultCacheSize
}
if r.Tracer == nil {
r.Tracer = tracing.NoopTracer(teleport.ComponentBackend)
}
return nil
}

Expand Down Expand Up @@ -108,6 +116,17 @@ func NewReporter(cfg ReporterConfig) (*Reporter, error) {

// GetRange returns query range
func (s *Reporter) GetRange(ctx context.Context, startKey []byte, endKey []byte, limit int) (*GetResult, error) {
ctx, span := s.Tracer.Start(
ctx,
"backend/GetRange",
oteltrace.WithAttributes(
attribute.Int("limit", limit),
attribute.String("start_key", string(startKey)),
attribute.String("end_key", string(endKey)),
),
)
defer span.End()

start := s.Clock().Now()
res, err := s.Backend.GetRange(ctx, startKey, endKey, limit)
batchReadLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds())
Expand All @@ -121,6 +140,15 @@ func (s *Reporter) GetRange(ctx context.Context, startKey []byte, endKey []byte,

// Create creates item if it does not exist
func (s *Reporter) Create(ctx context.Context, i Item) (*Lease, error) {
ctx, span := s.Tracer.Start(
ctx,
"backend/Create",
oteltrace.WithAttributes(
attribute.String("key", string(i.Key)),
),
)
defer span.End()

start := s.Clock().Now()
lease, err := s.Backend.Create(ctx, i)
writeLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds())
Expand All @@ -135,6 +163,15 @@ func (s *Reporter) Create(ctx context.Context, i Item) (*Lease, error) {
// Put puts value into backend (creates if it does not
// exists, updates it otherwise)
func (s *Reporter) Put(ctx context.Context, i Item) (*Lease, error) {
ctx, span := s.Tracer.Start(
ctx,
"backend/Put",
oteltrace.WithAttributes(
attribute.String("key", string(i.Key)),
),
)
defer span.End()

start := s.Clock().Now()
lease, err := s.Backend.Put(ctx, i)
writeLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds())
Expand All @@ -148,6 +185,15 @@ func (s *Reporter) Put(ctx context.Context, i Item) (*Lease, error) {

// Update updates value in the backend
func (s *Reporter) Update(ctx context.Context, i Item) (*Lease, error) {
ctx, span := s.Tracer.Start(
ctx,
"backend/Update",
oteltrace.WithAttributes(
attribute.String("key", string(i.Key)),
),
)
defer span.End()

start := s.Clock().Now()
lease, err := s.Backend.Update(ctx, i)
writeLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds())
Expand All @@ -161,6 +207,15 @@ func (s *Reporter) Update(ctx context.Context, i Item) (*Lease, error) {

// Get returns a single item or not found error
func (s *Reporter) Get(ctx context.Context, key []byte) (*Item, error) {
ctx, span := s.Tracer.Start(
ctx,
"backend/Get",
oteltrace.WithAttributes(
attribute.String("key", string(key)),
),
)
defer span.End()

start := s.Clock().Now()
readLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds())
readRequests.WithLabelValues(s.Component).Inc()
Expand All @@ -175,6 +230,15 @@ func (s *Reporter) Get(ctx context.Context, key []byte) (*Item, error) {
// CompareAndSwap compares item with existing item
// and replaces is with replaceWith item
func (s *Reporter) CompareAndSwap(ctx context.Context, expected Item, replaceWith Item) (*Lease, error) {
ctx, span := s.Tracer.Start(
ctx,
"backend/CompareAndSwap",
oteltrace.WithAttributes(
attribute.String("key", string(expected.Key)),
),
)
defer span.End()

start := s.Clock().Now()
lease, err := s.Backend.CompareAndSwap(ctx, expected, replaceWith)
writeLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds())
Expand All @@ -188,6 +252,15 @@ func (s *Reporter) CompareAndSwap(ctx context.Context, expected Item, replaceWit

// Delete deletes item by key
func (s *Reporter) Delete(ctx context.Context, key []byte) error {
ctx, span := s.Tracer.Start(
ctx,
"backend/Delete",
oteltrace.WithAttributes(
attribute.String("key", string(key)),
),
)
defer span.End()

start := s.Clock().Now()
err := s.Backend.Delete(ctx, key)
writeLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds())
Expand All @@ -201,6 +274,16 @@ func (s *Reporter) Delete(ctx context.Context, key []byte) error {

// DeleteRange deletes range of items
func (s *Reporter) DeleteRange(ctx context.Context, startKey []byte, endKey []byte) error {
ctx, span := s.Tracer.Start(
ctx,
"backend/DeleteRange",
oteltrace.WithAttributes(
attribute.String("start_key", string(startKey)),
attribute.String("end_key", string(endKey)),
),
)
defer span.End()

start := s.Clock().Now()
err := s.Backend.DeleteRange(ctx, startKey, endKey)
batchWriteLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds())
Expand All @@ -217,6 +300,16 @@ func (s *Reporter) DeleteRange(ctx context.Context, startKey []byte, endKey []by
// some backends may ignore expires based on the implementation
// in case if the lease managed server side
func (s *Reporter) KeepAlive(ctx context.Context, lease Lease, expires time.Time) error {
ctx, span := s.Tracer.Start(
ctx,
"backend/KeepAlive",
oteltrace.WithAttributes(
attribute.Int64("lease", lease.ID),
attribute.String("key", string(lease.Key)),
),
)
defer span.End()

start := s.Clock().Now()
err := s.Backend.KeepAlive(ctx, lease, expires)
writeLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds())
Expand All @@ -230,6 +323,15 @@ func (s *Reporter) KeepAlive(ctx context.Context, lease Lease, expires time.Time

// NewWatcher returns a new event watcher
func (s *Reporter) NewWatcher(ctx context.Context, watch Watch) (Watcher, error) {
ctx, span := s.Tracer.Start(
ctx,
"backend/NewWatcher",
oteltrace.WithAttributes(
attribute.String("name", watch.Name),
),
)
defer span.End()

w, err := s.Backend.NewWatcher(ctx, watch)
if err != nil {
return nil, trace.Wrap(err)
Expand Down
5 changes: 5 additions & 0 deletions lib/observability/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ func NoopProvider() *Provider {
)}
}

// NoopTracer creates a new Tracer that never samples any spans.
func NoopTracer(instrumentationName string) oteltrace.Tracer {
return NoopProvider().Tracer(instrumentationName)
}

// NewTraceProvider creates a new Provider that is configured per the provided Config.
func NewTraceProvider(ctx context.Context, cfg Config) (*Provider, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1620,6 +1620,7 @@ func (process *TeleportProcess) newAccessCache(cfg accessCacheConfig) (*cache.Ca
reporter, err := backend.NewReporter(backend.ReporterConfig{
Component: teleport.ComponentCache,
Backend: mem,
Tracer: process.TracingProvider.Tracer(teleport.ComponentCache),
})
if err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -3848,6 +3849,7 @@ func (process *TeleportProcess) initAuthStorage() (bk backend.Backend, err error
reporter, err := backend.NewReporter(backend.ReporterConfig{
Component: teleport.ComponentBackend,
Backend: backend.NewSanitizer(bk),
Tracer: process.TracingProvider.Tracer(teleport.ComponentBackend),
})
if err != nil {
return nil, trace.Wrap(err)
Expand Down