Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manually instrument backend.Backend #13268

Merged
merged 13 commits into from
Jun 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions lib/backend/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import (
"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/types"
apiutils "github.com/gravitational/teleport/api/utils"
"github.com/gravitational/teleport/lib/observability/tracing"
"github.com/gravitational/teleport/lib/utils"

"github.com/gravitational/trace"
lru "github.com/hashicorp/golang-lru"
"github.com/jonboulle/clockwork"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
oteltrace "go.opentelemetry.io/otel/trace"
)

const reporterDefaultCacheSize = 1000
Expand All @@ -45,6 +48,8 @@ type ReporterConfig struct {
// metric. Higher value means higher memory usage but fewer infrequent
// requests forgotten.
TopRequestsCount int
// Tracer is used to create spans
Tracer oteltrace.Tracer
}

// CheckAndSetDefaults checks and sets
Expand All @@ -58,6 +63,9 @@ func (r *ReporterConfig) CheckAndSetDefaults() error {
if r.TopRequestsCount == 0 {
r.TopRequestsCount = reporterDefaultCacheSize
}
if r.Tracer == nil {
r.Tracer = tracing.NoopTracer(teleport.ComponentBackend)
}
return nil
}

Expand Down Expand Up @@ -108,6 +116,17 @@ func NewReporter(cfg ReporterConfig) (*Reporter, error) {

// GetRange returns query range
func (s *Reporter) GetRange(ctx context.Context, startKey []byte, endKey []byte, limit int) (*GetResult, error) {
ctx, span := s.Tracer.Start(
ctx,
"backend/GetRange",
oteltrace.WithAttributes(
attribute.Int("limit", limit),
attribute.String("start_key", string(startKey)),
attribute.String("end_key", string(endKey)),
),
)
defer span.End()

start := s.Clock().Now()
res, err := s.Backend.GetRange(ctx, startKey, endKey, limit)
batchReadLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds())
Expand All @@ -121,6 +140,15 @@ func (s *Reporter) GetRange(ctx context.Context, startKey []byte, endKey []byte,

// Create creates item if it does not exist
func (s *Reporter) Create(ctx context.Context, i Item) (*Lease, error) {
ctx, span := s.Tracer.Start(
ctx,
"backend/Create",
oteltrace.WithAttributes(
attribute.String("key", string(i.Key)),
),
)
defer span.End()

start := s.Clock().Now()
lease, err := s.Backend.Create(ctx, i)
writeLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds())
Expand All @@ -135,6 +163,15 @@ func (s *Reporter) Create(ctx context.Context, i Item) (*Lease, error) {
// Put puts value into backend (creates if it does not
// exists, updates it otherwise)
func (s *Reporter) Put(ctx context.Context, i Item) (*Lease, error) {
ctx, span := s.Tracer.Start(
ctx,
"backend/Put",
oteltrace.WithAttributes(
attribute.String("key", string(i.Key)),
),
)
defer span.End()

start := s.Clock().Now()
lease, err := s.Backend.Put(ctx, i)
writeLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds())
Expand All @@ -148,6 +185,15 @@ func (s *Reporter) Put(ctx context.Context, i Item) (*Lease, error) {

// Update updates value in the backend
func (s *Reporter) Update(ctx context.Context, i Item) (*Lease, error) {
ctx, span := s.Tracer.Start(
ctx,
"backend/Update",
oteltrace.WithAttributes(
attribute.String("key", string(i.Key)),
),
)
defer span.End()

start := s.Clock().Now()
lease, err := s.Backend.Update(ctx, i)
writeLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds())
Expand All @@ -161,6 +207,15 @@ func (s *Reporter) Update(ctx context.Context, i Item) (*Lease, error) {

// Get returns a single item or not found error
func (s *Reporter) Get(ctx context.Context, key []byte) (*Item, error) {
ctx, span := s.Tracer.Start(
ctx,
"backend/Get",
oteltrace.WithAttributes(
attribute.String("key", string(key)),
),
)
defer span.End()

start := s.Clock().Now()
readLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds())
readRequests.WithLabelValues(s.Component).Inc()
Expand All @@ -175,6 +230,15 @@ func (s *Reporter) Get(ctx context.Context, key []byte) (*Item, error) {
// CompareAndSwap compares item with existing item
// and replaces is with replaceWith item
func (s *Reporter) CompareAndSwap(ctx context.Context, expected Item, replaceWith Item) (*Lease, error) {
ctx, span := s.Tracer.Start(
ctx,
"backend/CompareAndSwap",
oteltrace.WithAttributes(
attribute.String("key", string(expected.Key)),
),
)
defer span.End()

start := s.Clock().Now()
lease, err := s.Backend.CompareAndSwap(ctx, expected, replaceWith)
writeLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds())
Expand All @@ -188,6 +252,15 @@ func (s *Reporter) CompareAndSwap(ctx context.Context, expected Item, replaceWit

// Delete deletes item by key
func (s *Reporter) Delete(ctx context.Context, key []byte) error {
ctx, span := s.Tracer.Start(
ctx,
"backend/Delete",
oteltrace.WithAttributes(
attribute.String("key", string(key)),
),
)
defer span.End()

start := s.Clock().Now()
err := s.Backend.Delete(ctx, key)
writeLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds())
Expand All @@ -201,6 +274,16 @@ func (s *Reporter) Delete(ctx context.Context, key []byte) error {

// DeleteRange deletes range of items
func (s *Reporter) DeleteRange(ctx context.Context, startKey []byte, endKey []byte) error {
ctx, span := s.Tracer.Start(
ctx,
"backend/DeleteRange",
oteltrace.WithAttributes(
attribute.String("start_key", string(startKey)),
attribute.String("end_key", string(endKey)),
),
)
defer span.End()

start := s.Clock().Now()
err := s.Backend.DeleteRange(ctx, startKey, endKey)
batchWriteLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds())
Expand All @@ -217,6 +300,16 @@ func (s *Reporter) DeleteRange(ctx context.Context, startKey []byte, endKey []by
// some backends may ignore expires based on the implementation
// in case if the lease managed server side
func (s *Reporter) KeepAlive(ctx context.Context, lease Lease, expires time.Time) error {
ctx, span := s.Tracer.Start(
ctx,
"backend/KeepAlive",
oteltrace.WithAttributes(
attribute.Int64("lease", lease.ID),
attribute.String("key", string(lease.Key)),
),
)
defer span.End()

start := s.Clock().Now()
err := s.Backend.KeepAlive(ctx, lease, expires)
writeLatencies.WithLabelValues(s.Component).Observe(time.Since(start).Seconds())
Expand All @@ -230,6 +323,15 @@ func (s *Reporter) KeepAlive(ctx context.Context, lease Lease, expires time.Time

// NewWatcher returns a new event watcher
func (s *Reporter) NewWatcher(ctx context.Context, watch Watch) (Watcher, error) {
ctx, span := s.Tracer.Start(
ctx,
"backend/NewWatcher",
oteltrace.WithAttributes(
attribute.String("name", watch.Name),
),
)
defer span.End()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how tracing fits in with watchers, as they are long running background tasks, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only tracing the creation of a watcher. I was on the fence about including this as I wasn't sure how useful it would really be. But, I figured it was better to have it and not need it, than to not have it and wish we did.


w, err := s.Backend.NewWatcher(ctx, watch)
if err != nil {
return nil, trace.Wrap(err)
Expand Down
5 changes: 5 additions & 0 deletions lib/observability/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ func NoopProvider() *Provider {
)}
}

// NoopTracer creates a new Tracer that never samples any spans.
func NoopTracer(instrumentationName string) oteltrace.Tracer {
return NoopProvider().Tracer(instrumentationName)
}

// NewTraceProvider creates a new Provider that is configured per the provided Config.
func NewTraceProvider(ctx context.Context, cfg Config) (*Provider, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1682,6 +1682,7 @@ func (process *TeleportProcess) newAccessCache(cfg accessCacheConfig) (*cache.Ca
reporter, err := backend.NewReporter(backend.ReporterConfig{
Component: teleport.ComponentCache,
Backend: mem,
Tracer: process.TracingProvider.Tracer(teleport.ComponentCache),
})
if err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -4094,6 +4095,7 @@ func (process *TeleportProcess) initAuthStorage() (bk backend.Backend, err error
reporter, err := backend.NewReporter(backend.ReporterConfig{
Component: teleport.ComponentBackend,
Backend: backend.NewSanitizer(bk),
Tracer: process.TracingProvider.Tracer(teleport.ComponentBackend),
})
if err != nil {
return nil, trace.Wrap(err)
Expand Down