Skip to content

Commit

Permalink
feat: Instrument metastore GRPC calls (#13598)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Jul 22, 2024
1 parent 39a5297 commit 04613b4
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 7 deletions.
44 changes: 40 additions & 4 deletions pkg/ingester-rf1/metastore/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@ import (
"fmt"

"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/instrument"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/services"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"

metastorepb "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
"github.com/grafana/loki/v3/pkg/util/constants"
)

type Config struct {
Expand All @@ -35,9 +41,9 @@ type Client struct {
config Config
}

func New(config Config) (c *Client, err error) {
func New(config Config, r prometheus.Registerer) (c *Client, err error) {
c = &Client{config: config}
c.conn, err = dial(c.config)
c.conn, err = dial(c.config, r)
if err != nil {
return nil, err
}
Expand All @@ -50,11 +56,27 @@ func (c *Client) stopping(error) error { return c.conn.Close() }

func (c *Client) Service() services.Service { return c.service }

func dial(cfg Config) (*grpc.ClientConn, error) {
func dial(cfg Config, r prometheus.Registerer) (*grpc.ClientConn, error) {
latency := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: constants.Loki,
Name: "metastore_request_duration_seconds",
Help: "Time (in seconds) spent serving requests when using the metastore",
Buckets: instrument.DefBuckets,
}, []string{"operation", "status_code"})
if r != nil {
err := r.Register(latency)
if err != nil {
alreadyErr, ok := err.(prometheus.AlreadyRegisteredError)
if !ok {
return nil, err
}
latency = alreadyErr.ExistingCollector.(*prometheus.HistogramVec)
}
}
if err := cfg.Validate(); err != nil {
return nil, err
}
options, err := cfg.GRPCClientConfig.DialOption(nil, nil)
options, err := cfg.GRPCClientConfig.DialOption(instrumentation(latency))
if err != nil {
return nil, err
}
Expand All @@ -80,3 +102,17 @@ const grpcServiceConfig = `{
}
}]
}`

func instrumentation(latency *prometheus.HistogramVec) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) {
var unaryInterceptors []grpc.UnaryClientInterceptor
unaryInterceptors = append(unaryInterceptors, otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()))
unaryInterceptors = append(unaryInterceptors, middleware.ClientUserHeaderInterceptor)
unaryInterceptors = append(unaryInterceptors, middleware.UnaryClientInstrumentInterceptor(latency))

var streamInterceptors []grpc.StreamClientInterceptor
streamInterceptors = append(streamInterceptors, otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()))
streamInterceptors = append(streamInterceptors, middleware.StreamClientUserHeaderInterceptor)
streamInterceptors = append(streamInterceptors, middleware.StreamClientInstrumentInterceptor(latency))

return unaryInterceptors, streamInterceptors
}
2 changes: 1 addition & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1815,7 +1815,7 @@ func (t *Loki) initMetastore() (services.Service, error) {
}

func (t *Loki) initMetastoreClient() (services.Service, error) {
mc, err := metastoreclient.New(t.Cfg.MetastoreClient)
mc, err := metastoreclient.New(t.Cfg.MetastoreClient, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/bloom/v1/bloom_tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"

"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/iter"
v2iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter"
"github.com/grafana/loki/v3/pkg/util/encoding"

"github.com/grafana/loki/pkg/push"
)

/*
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/bloom/v1/bloom_tokenizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import (
logger "github.com/go-kit/log"
"github.com/grafana/dskit/multierror"

"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/iter"
v2 "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log"

"github.com/grafana/loki/pkg/push"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

Expand Down

0 comments on commit 04613b4

Please sign in to comment.