diff --git a/pkg/ingester-rf1/metastore/client/client.go b/pkg/ingester-rf1/metastore/client/client.go index 013e0bdb37bc2..044ad613d7ed5 100644 --- a/pkg/ingester-rf1/metastore/client/client.go +++ b/pkg/ingester-rf1/metastore/client/client.go @@ -5,10 +5,16 @@ import ( "fmt" "github.com/grafana/dskit/grpcclient" + "github.com/grafana/dskit/instrument" + "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/services" + "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" + "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc" metastorepb "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb" + "github.com/grafana/loki/v3/pkg/util/constants" ) type Config struct { @@ -35,9 +41,9 @@ type Client struct { config Config } -func New(config Config) (c *Client, err error) { +func New(config Config, r prometheus.Registerer) (c *Client, err error) { c = &Client{config: config} - c.conn, err = dial(c.config) + c.conn, err = dial(c.config, r) if err != nil { return nil, err } @@ -50,11 +56,27 @@ func (c *Client) stopping(error) error { return c.conn.Close() } func (c *Client) Service() services.Service { return c.service } -func dial(cfg Config) (*grpc.ClientConn, error) { +func dial(cfg Config, r prometheus.Registerer) (*grpc.ClientConn, error) { + latency := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: constants.Loki, + Name: "metastore_request_duration_seconds", + Help: "Time (in seconds) spent serving requests when using the metastore", + Buckets: instrument.DefBuckets, + }, []string{"operation", "status_code"}) + if r != nil { + err := r.Register(latency) + if err != nil { + alreadyErr, ok := err.(prometheus.AlreadyRegisteredError) + if !ok { + return nil, err + } + latency = alreadyErr.ExistingCollector.(*prometheus.HistogramVec) + } + } if err := cfg.Validate(); err != nil { return nil, err } - options, err := cfg.GRPCClientConfig.DialOption(nil, nil) + options, err := cfg.GRPCClientConfig.DialOption(instrumentation(latency)) if err != nil { return nil, err } @@ -80,3 +102,17 @@ const grpcServiceConfig = `{ } }] }` + +func instrumentation(latency *prometheus.HistogramVec) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) { + var unaryInterceptors []grpc.UnaryClientInterceptor + unaryInterceptors = append(unaryInterceptors, otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())) + unaryInterceptors = append(unaryInterceptors, middleware.ClientUserHeaderInterceptor) + unaryInterceptors = append(unaryInterceptors, middleware.UnaryClientInstrumentInterceptor(latency)) + + var streamInterceptors []grpc.StreamClientInterceptor + streamInterceptors = append(streamInterceptors, otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer())) + streamInterceptors = append(streamInterceptors, middleware.StreamClientUserHeaderInterceptor) + streamInterceptors = append(streamInterceptors, middleware.StreamClientInstrumentInterceptor(latency)) + + return unaryInterceptors, streamInterceptors +} diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 28465b5278c20..0fe3448d2d7f1 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1815,7 +1815,7 @@ func (t *Loki) initMetastore() (services.Service, error) { } func (t *Loki) initMetastoreClient() (services.Service, error) { - mc, err := metastoreclient.New(t.Cfg.MetastoreClient) + mc, err := metastoreclient.New(t.Cfg.MetastoreClient, prometheus.DefaultRegisterer) if err != nil { return nil, err } diff --git a/pkg/storage/bloom/v1/bloom_tokenizer.go b/pkg/storage/bloom/v1/bloom_tokenizer.go index 274f4c37f25dc..e5a71d0aedd4d 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer.go @@ -7,12 +7,13 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/grafana/loki/pkg/push" "github.com/grafana/loki/v3/pkg/iter" v2iter "github.com/grafana/loki/v3/pkg/iter/v2" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter" "github.com/grafana/loki/v3/pkg/util/encoding" + + "github.com/grafana/loki/pkg/push" ) /* diff --git a/pkg/storage/bloom/v1/bloom_tokenizer_test.go b/pkg/storage/bloom/v1/bloom_tokenizer_test.go index 8f3e4f473e930..29deada6ab82c 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer_test.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer_test.go @@ -11,13 +11,14 @@ import ( logger "github.com/go-kit/log" "github.com/grafana/dskit/multierror" - "github.com/grafana/loki/pkg/push" "github.com/grafana/loki/v3/pkg/chunkenc" "github.com/grafana/loki/v3/pkg/iter" v2 "github.com/grafana/loki/v3/pkg/iter/v2" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" + "github.com/grafana/loki/pkg/push" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require"