diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 1e8ef25d4e..e5847e8e19 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -2895,12 +2895,12 @@ }, { "kind": "field", - "name": "limit_inflight_requests_using_grpc_tap_handle", + "name": "limit_inflight_requests_using_grpc_method_limiter", "required": false, - "desc": "Use experimental method of limiting push requests", + "desc": "Use experimental method of limiting push requests.", "fieldValue": null, "fieldDefaultValue": false, - "fieldFlag": "ingester.limit-inflight-requests-using-grpc-handlers", + "fieldFlag": "ingester.limit-inflight-requests-using-grpc-method-limiter", "fieldType": "boolean", "fieldCategory": "experimental" }, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 99af101e42..7b5a689a8c 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1279,8 +1279,8 @@ Usage of ./cmd/mimir/mimir: Max series that this ingester can hold (across all tenants). Requests to create additional series will be rejected. 0 = unlimited. -ingester.instance-limits.max-tenants int Max tenants that this ingester can hold. Requests from additional tenants will be rejected. 0 = unlimited. - -ingester.limit-inflight-requests-using-grpc-handlers - [experimental] Use experimental method of limiting push requests + -ingester.limit-inflight-requests-using-grpc-method-limiter + [experimental] Use experimental method of limiting push requests. -ingester.log-utilization-based-limiter-cpu-samples [experimental] Enable logging of utilization based limiter CPU samples. -ingester.max-global-exemplars-per-user int diff --git a/docs/sources/mimir/references/configuration-parameters/index.md b/docs/sources/mimir/references/configuration-parameters/index.md index 57325f8786..887d0d61fd 100644 --- a/docs/sources/mimir/references/configuration-parameters/index.md +++ b/docs/sources/mimir/references/configuration-parameters/index.md @@ -1039,9 +1039,9 @@ instance_limits: # CLI flag: -ingester.log-utilization-based-limiter-cpu-samples [log_utilization_based_limiter_cpu_samples: | default = false] -# (experimental) Use experimental method of limiting push requests -# CLI flag: -ingester.limit-inflight-requests-using-grpc-handlers -[limit_inflight_requests_using_grpc_tap_handle: | default = false] +# (experimental) Use experimental method of limiting push requests. +# CLI flag: -ingester.limit-inflight-requests-using-grpc-method-limiter +[limit_inflight_requests_using_grpc_method_limiter: | default = false] # (experimental) Each error will be logged once in this many times. Use 0 to log # all of them. diff --git a/go.mod b/go.mod index d028346659..e4a3a18687 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.0 - github.com/grafana/dskit v0.0.0-20230829141140-06955c011ffd + github.com/grafana/dskit v0.0.0-20230920092933-98f431c981b2 github.com/grafana/e2e v0.1.1-0.20230828072146-510b1706a292 github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/json-iterator/go v1.1.12 @@ -166,7 +166,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.2.5 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect github.com/gosimple/slug v1.1.1 // indirect - github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9 // indirect + github.com/grafana/gomemcache v0.0.0-20230914135007-70d78eaabfe1 // indirect github.com/hashicorp/consul/api v1.22.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect diff --git a/go.sum b/go.sum index b76826105b..b3da9b95e1 100644 --- a/go.sum +++ b/go.sum @@ -895,12 +895,12 @@ github.com/gosimple/slug v1.1.1 h1:fRu/digW+NMwBIP+RmviTK97Ho/bEj/C9swrCspN3D4= github.com/gosimple/slug v1.1.1/go.mod h1:ER78kgg1Mv0NQGlXiDe57DpCyfbNywXXZ9mIorhxAf0= github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9 h1:LQAhgcUPnzdjU/OjCJaLlPQI7NmQCRlfjMPSA1VegvA= github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= -github.com/grafana/dskit v0.0.0-20230829141140-06955c011ffd h1:RHZuBHWNS2HRJ5XhQK7cKP11EMMJPtJO2xKvQ+ws+PU= -github.com/grafana/dskit v0.0.0-20230829141140-06955c011ffd/go.mod h1:3u7fr4hmOhuUL9Yc1QP/oa3za73kxvqJnRJH4BA5fOM= +github.com/grafana/dskit v0.0.0-20230920092933-98f431c981b2 h1:vWx9/Q+AB3MNfaIhlpA92wqKbTCgAo/cvt99Jsg9sNI= +github.com/grafana/dskit v0.0.0-20230920092933-98f431c981b2/go.mod h1:mZkAJTODiHxuvqvmwwFvp0ww9XA3mwlGA8Iv0SHP+5c= github.com/grafana/e2e v0.1.1-0.20230828072146-510b1706a292 h1:t+QoVWpA+KIgwMmcNI3TDin2CUWFz3f12GdKWIfB1/g= github.com/grafana/e2e v0.1.1-0.20230828072146-510b1706a292/go.mod h1:3UsooRp7yW5/NJQBlXcTsAHOoykEhNUYXkQ3r6ehEEY= -github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9 h1:WB3bGH2f1UN6jkd6uAEWfHB8OD7dKJ0v2Oo6SNfhpfQ= -github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= +github.com/grafana/gomemcache v0.0.0-20230914135007-70d78eaabfe1 h1:MLYY2R60/74hfYl5vRRmC2VDo0Yuql1QQ1ig8hnvgSI= +github.com/grafana/gomemcache v0.0.0-20230914135007-70d78eaabfe1/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= github.com/grafana/mimir-prometheus v0.0.0-20230907080713-7c067467a0fd h1:UtvS3ax2aL970lCLX3P88zkhjhYfanDfqx/KxPf0Ulo= diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 78c0b99a7a..6b400f57ac 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -171,7 +171,7 @@ type Config struct { ReadPathMemoryUtilizationLimit uint64 `yaml:"read_path_memory_utilization_limit" category:"experimental"` LogUtilizationBasedLimiterCPUSamples bool `yaml:"log_utilization_based_limiter_cpu_samples" category:"experimental"` - LimitInflightRequestsUsingGrpcTapHandle bool `yaml:"limit_inflight_requests_using_grpc_tap_handle" category:"experimental"` + LimitInflightRequestsUsingGrpcMethodLimiter bool `yaml:"limit_inflight_requests_using_grpc_method_limiter" category:"experimental"` ErrorSampleRate int64 `yaml:"error_sample_rate" json:"error_sample_rate" category:"experimental"` } @@ -190,7 +190,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.Float64Var(&cfg.ReadPathCPUUtilizationLimit, "ingester.read-path-cpu-utilization-limit", 0, "CPU utilization limit, as CPU cores, for CPU/memory utilization based read request limiting. Use 0 to disable it.") f.Uint64Var(&cfg.ReadPathMemoryUtilizationLimit, "ingester.read-path-memory-utilization-limit", 0, "Memory limit, in bytes, for CPU/memory utilization based read request limiting. Use 0 to disable it.") f.BoolVar(&cfg.LogUtilizationBasedLimiterCPUSamples, "ingester.log-utilization-based-limiter-cpu-samples", false, "Enable logging of utilization based limiter CPU samples.") - f.BoolVar(&cfg.LimitInflightRequestsUsingGrpcTapHandle, "ingester.limit-inflight-requests-using-grpc-handlers", false, "Use experimental method of limiting push requests") + f.BoolVar(&cfg.LimitInflightRequestsUsingGrpcMethodLimiter, "ingester.limit-inflight-requests-using-grpc-method-limiter", false, "Use experimental method of limiting push requests.") f.Int64Var(&cfg.ErrorSampleRate, "ingester.error-sample-rate", 0, "Each error will be logged once in this many times. Use 0 to log all of them.") } @@ -735,6 +735,12 @@ type pushStats struct { // StartPushRequest checks if ingester can start push request, and increments relevant counters. // If new push request cannot be started, errors converible to gRPC status code are returned, and metrics are updated. +// +// This method can be called in two ways: 1. Ingester.PushWithCleanup, or 2. from gRPC server's method limiter. +// +// In the first case, returned errors can be inspected/logged by middleware. Ingester.PushWithCleanup will wrap the error in util_log.DoNotLogError wrapper. +// +// In the second case, returned errors will not be logged, because request will not reach any middleware. func (i *Ingester) StartPushRequest() error { if err := i.checkRunning(); err != nil { return err @@ -778,7 +784,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) ( defer pushReq.CleanUp() // If we're using grpc handlers, we don't need to start/finish request here. - if !i.cfg.LimitInflightRequestsUsingGrpcTapHandle { + if !i.cfg.LimitInflightRequestsUsingGrpcMethodLimiter { if err := i.StartPushRequest(); err != nil { return nil, util_log.DoNotLogError{Err: err} } diff --git a/pkg/mimir/grpc_push_check.go b/pkg/mimir/grpc_push_check.go index 9c2683ff85..e24b188313 100644 --- a/pkg/mimir/grpc_push_check.go +++ b/pkg/mimir/grpc_push_check.go @@ -3,10 +3,8 @@ package mimir import ( - "context" - - "google.golang.org/grpc/stats" - "google.golang.org/grpc/tap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type pushReceiver interface { @@ -14,62 +12,35 @@ type pushReceiver interface { FinishPushRequest() } -func newGrpcPushCheck(getIngester func() pushReceiver) *grpcPushCheck { - return &grpcPushCheck{getIngester: getIngester} +// getPushReceiver function must be constant -- return same value on each call. +func newGrpcInflightMethodLimiter(getIngester func() pushReceiver) *grpcInflightMethodLimiter { + return &grpcInflightMethodLimiter{getIngester: getIngester} } -// grpcPushCheck implements gRPC TapHandle and gRPC stats.Handler. -// grpcPushCheck can track push inflight requests, and reject requests before even reading them into memory. -type grpcPushCheck struct { +// grpcInflightMethodLimiter implements gRPC TapHandle and gRPC stats.Handler. +type grpcInflightMethodLimiter struct { getIngester func() pushReceiver } -// Custom type to hide it from other packages. -type grpcPushCheckContextKey int - -const ( - pushRequestStartedKey grpcPushCheckContextKey = 1 -) +const ingesterPushMethod = "/cortex.Ingester/Push" -// TapHandle is called after receiving grpc request and headers, but before reading any request data yet. -// If we reject request here, it won't be counted towards any metrics (eg. dskit middleware). -// If we accept request (not return error), eventually HandleRPC with stats.End notification will be called. -func (g *grpcPushCheck) TapHandle(ctx context.Context, info *tap.Info) (context.Context, error) { - pushRequestStarted := false +var errNoIngester = status.Error(codes.Unavailable, "no ingester") - var err error - if info.FullMethodName == "/cortex.Ingester/Push" { +func (g *grpcInflightMethodLimiter) RPCCallStarting(methodName string) error { + if methodName == ingesterPushMethod { ing := g.getIngester() - if ing != nil { - // All errors returned by StartPushRequest can be converted to gRPC status.Status. - err = ing.StartPushRequest() - if err == nil { - pushRequestStarted = true - } + if ing == nil { + // We return error here, to make sure that RPCCallFinished doesn't get called for this RPC call. + return errNoIngester } - } - - ctx = context.WithValue(ctx, pushRequestStartedKey, pushRequestStarted) - return ctx, err -} -func (g *grpcPushCheck) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context { - return ctx -} - -func (g *grpcPushCheck) HandleRPC(ctx context.Context, rpcStats stats.RPCStats) { - // when request ends, and we started push request tracking for this request, finish it. - if _, ok := rpcStats.(*stats.End); ok { - if b, ok := ctx.Value(pushRequestStartedKey).(bool); ok && b { - g.getIngester().FinishPushRequest() - } + return ing.StartPushRequest() } + return nil } -func (g *grpcPushCheck) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { - return ctx -} - -func (g *grpcPushCheck) HandleConn(_ context.Context, _ stats.ConnStats) { - // Not interested. +func (g *grpcInflightMethodLimiter) RPCCallFinished(methodName string) { + if methodName == ingesterPushMethod { + g.getIngester().FinishPushRequest() + } } diff --git a/pkg/mimir/grpc_push_check_test.go b/pkg/mimir/grpc_push_check_test.go index f614195c26..18c692f82b 100644 --- a/pkg/mimir/grpc_push_check_test.go +++ b/pkg/mimir/grpc_push_check_test.go @@ -3,130 +3,62 @@ package mimir import ( - "context" - "net" - "sync" "testing" - "time" - "github.com/grafana/dskit/test" "github.com/stretchr/testify/require" - "go.uber.org/atomic" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/status" - - ingester_client "github.com/grafana/mimir/pkg/ingester/client" - "github.com/grafana/mimir/pkg/mimirpb" ) -func TestGrpcPushCheck(t *testing.T) { - const limit = 3 - ingServer := &ingesterServer{finishRequest: make(chan struct{}), inflightLimit: limit} - - c := setupGrpcServerWithCheckAndClient(t, ingServer) - - // start push requests in background goroutines - started := sync.WaitGroup{} - finished := sync.WaitGroup{} - for i := 0; i < limit; i++ { - started.Add(1) - finished.Add(1) - - go func() { - started.Done() - defer finished.Done() - - _, err := c.Push(context.Background(), &mimirpb.WriteRequest{}) - require.NoError(t, err) - }() - } - - // Wait until all goroutines start. - started.Wait() - - // Wait until all requests are inflight. - test.Poll(t, 1*time.Second, int64(limit), func() interface{} { - return ingServer.inflight.Load() - }) - - // Try another request. This one should fail with too many inflight requests error. - _, err := c.Push(context.Background(), &mimirpb.WriteRequest{}) - require.Error(t, err) - s, ok := status.FromError(err) - require.True(t, ok) - require.Equal(t, codes.Code(123), s.Code()) - require.Equal(t, "too many requests", s.Message()) - - require.Equal(t, int64(limit), ingServer.inflight.Load()) - - // unblock all requests - for i := 0; i < limit; i++ { - ingServer.finishRequest <- struct{}{} - } - finished.Wait() +func TestGrpcInflightMethodLimiter(t *testing.T) { + t.Run("nil push receiver", func(t *testing.T) { + l := newGrpcInflightMethodLimiter(func() pushReceiver { return nil }) - require.Equal(t, int64(0), ingServer.inflight.Load()) + require.NoError(t, l.RPCCallStarting("test")) + require.NotPanics(t, func() { + l.RPCCallFinished("test") + }) - // unblock subsequent requests immediately - close(ingServer.finishRequest) - - // another push request should succeed. - _, err = c.Push(context.Background(), &mimirpb.WriteRequest{}) - require.NoError(t, err) -} - -func setupGrpcServerWithCheckAndClient(t *testing.T, ingServer *ingesterServer) ingester_client.IngesterClient { - g := newGrpcPushCheck(func() pushReceiver { - return ingServer + require.ErrorIs(t, l.RPCCallStarting(ingesterPushMethod), errNoIngester) + require.Panics(t, func() { + // In practice, this will not be called, since l.RPCCallStarting(ingesterPushMethod) returns error. + l.RPCCallFinished(ingesterPushMethod) + }) }) - server := grpc.NewServer(grpc.InTapHandle(g.TapHandle), grpc.StatsHandler(g)) - ingester_client.RegisterIngesterServer(server, ingServer) - - l, err := net.Listen("tcp", "localhost:0") - require.NoError(t, err) - - go func() { - _ = server.Serve(l) - }() - - t.Cleanup(func() { - _ = l.Close() + t.Run("push receiver, no error", func(t *testing.T) { + m := &mockPushReceiver{startErr: nil} + + l := newGrpcInflightMethodLimiter(func() pushReceiver { return m }) + require.NoError(t, l.RPCCallStarting("test")) + require.NotPanics(t, func() { + l.RPCCallFinished("test") + }) + require.Equal(t, 0, m.startCalls) + require.Equal(t, 0, m.finishCalls) + + require.NoError(t, l.RPCCallStarting(ingesterPushMethod)) + require.Equal(t, 1, m.startCalls) + require.Equal(t, 0, m.finishCalls) + + require.NotPanics(t, func() { + l.RPCCallFinished(ingesterPushMethod) + }) + require.Equal(t, 1, m.startCalls) + require.Equal(t, 1, m.finishCalls) }) - - cc, err := grpc.Dial(l.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) - require.NoError(t, err) - - t.Cleanup(func() { - _ = cc.Close() - }) - - return ingester_client.NewIngesterClient(cc) } -type ingesterServer struct { - ingester_client.IngesterServer - inflightLimit int64 - finishRequest chan struct{} - inflight atomic.Int64 -} +type mockPushReceiver struct { + startCalls int + finishCalls int -func (i *ingesterServer) Push(_ context.Context, _ *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) { - <-i.finishRequest - return &mimirpb.WriteResponse{}, nil + startErr error } -func (i *ingesterServer) StartPushRequest() error { - v := i.inflight.Inc() - if v > i.inflightLimit { - i.inflight.Dec() - return status.Error(123, "too many requests") - } - return nil +func (i *mockPushReceiver) StartPushRequest() error { + i.startCalls++ + return i.startErr } -func (i *ingesterServer) FinishPushRequest() { - i.inflight.Dec() +func (i *mockPushReceiver) FinishPushRequest() { + i.finishCalls++ } diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index 5f1333828e..aa32055285 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -32,7 +32,6 @@ import ( "github.com/prometheus/prometheus/rules" prom_storage "github.com/prometheus/prometheus/storage" prom_remote "github.com/prometheus/prometheus/storage/remote" - "google.golang.org/grpc" "github.com/grafana/mimir/pkg/alertmanager" "github.com/grafana/mimir/pkg/alertmanager/alertstore" @@ -221,11 +220,11 @@ func (t *Mimir) initSanityCheck() (services.Service, error) { } func (t *Mimir) initServer() (services.Service, error) { - if t.Cfg.Ingester.LimitInflightRequestsUsingGrpcTapHandle { - // We can't inject t.Ingester directly, because it may not be set yet. However by the time when grpcPushCheck runs + if t.Cfg.Ingester.LimitInflightRequestsUsingGrpcMethodLimiter { + // We can't inject t.Ingester directly, because it may not be set yet. However by the time when grpcInflightMethodLimiter runs // t.Ingester will be available. There's no race condition here, because gRPC server (service returned by this method, ie. initServer) // is started only after t.Ingester is set in initIngester. - g := newGrpcPushCheck(func() pushReceiver { + g := newGrpcInflightMethodLimiter(func() pushReceiver { // Return explicit nil, if there's no ingester. We don't want to return typed-nil as interface value. if t.Ingester == nil { return nil @@ -234,10 +233,7 @@ func (t *Mimir) initServer() (services.Service, error) { }) // Installing this allows us to reject push requests received via gRPC early -- before they are fully read into memory. - t.Cfg.Server.GRPCOptions = append(t.Cfg.Server.GRPCOptions, - grpc.InTapHandle(g.TapHandle), - grpc.StatsHandler(g), - ) + t.Cfg.Server.GrpcMethodLimiter = g } // Mimir handles signals on its own. diff --git a/vendor/github.com/grafana/dskit/middleware/http_tracing.go b/vendor/github.com/grafana/dskit/middleware/http_tracing.go index e914c10ffb..989f50fe1e 100644 --- a/vendor/github.com/grafana/dskit/middleware/http_tracing.go +++ b/vendor/github.com/grafana/dskit/middleware/http_tracing.go @@ -12,7 +12,6 @@ import ( "github.com/opentracing-contrib/go-stdlib/nethttp" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" - "github.com/uber/jaeger-client-go" ) // Dummy dependency to enforce that we have a nethttp version newer @@ -47,8 +46,6 @@ func (t Tracer) Wrap(next http.Handler) http.Handler { return nethttp.Middleware(opentracing.GlobalTracer(), next, options...) } -const httpGRPCHandleMethod = "/httpgrpc.HTTP/Handle" - // HTTPGRPCTracer is a middleware which traces incoming httpgrpc requests. type HTTPGRPCTracer struct { RouteMatcher RouteMatcher @@ -85,25 +82,13 @@ func InitHTTPGRPCMiddleware(router *mux.Router) *mux.Router { // // opentracing-contrib/go-stdlib/nethttp.Middleware could not be used here // as it does not expose options to access and tag the incoming parent span. -// -// Parent span tagging depends on using a Jaeger tracer for now to check the parent span's -// OperationName(), which is not available on the generic opentracing Tracer interface. func (hgt HTTPGRPCTracer) Wrap(next http.Handler) http.Handler { httpOperationNameFunc := makeHTTPOperationNameFunc(hgt.RouteMatcher) fn := func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() tracer := opentracing.GlobalTracer() - // skip spans which were not forwarded from httpgrpc.HTTP/Handle spans; - // standard http spans started directly from the HTTP server are presumed to - // already be instrumented by Tracer.Wrap parentSpan := opentracing.SpanFromContext(ctx) - if parentSpan, ok := parentSpan.(*jaeger.Span); ok { - if parentSpan.OperationName() != httpGRPCHandleMethod { - next.ServeHTTP(w, r) - return - } - } // extract relevant span & tag data from request method := r.Method diff --git a/vendor/github.com/grafana/dskit/server/limits.go b/vendor/github.com/grafana/dskit/server/limits.go new file mode 100644 index 0000000000..6b18bb1cb0 --- /dev/null +++ b/vendor/github.com/grafana/dskit/server/limits.go @@ -0,0 +1,89 @@ +package server + +import ( + "context" + "strings" + + "google.golang.org/grpc/stats" + "google.golang.org/grpc/tap" +) + +type GrpcInflightMethodLimiter interface { + // RPCCallStarting is called before request has been read into memory. + // All that's known about the request at this point is grpc method name. + // Returned error should be convertible to gRPC Status via status.FromError, + // otherwise gRPC-server implementation-specific error will be returned to the client (codes.PermissionDenied in grpc@v1.55.0). + RPCCallStarting(methodName string) error + RPCCallFinished(methodName string) +} + +// Custom type to hide it from other packages. +type grpcLimitCheckContextKey int + +// Presence of this key in the context indicates that inflight request counter was increased for this request, and needs to be decreased when request ends. +const ( + requestFullMethod grpcLimitCheckContextKey = 1 +) + +func newGrpcInflightLimitCheck(methodLimiter GrpcInflightMethodLimiter) *grpcInflightLimitCheck { + return &grpcInflightLimitCheck{ + methodLimiter: methodLimiter, + } +} + +// grpcInflightLimitCheck implements gRPC TapHandle and gRPC stats.Handler. +// grpcInflightLimitCheck can track inflight requests, and reject requests before even reading them into memory. +type grpcInflightLimitCheck struct { + methodLimiter GrpcInflightMethodLimiter +} + +// TapHandle is called after receiving grpc request and headers, but before reading any request data yet. +// If we reject request here, it won't be counted towards any metrics (eg. in middleware.grpcStatsHandler). +// If we accept request (not return error), eventually HandleRPC with stats.End notification will be called. +func (g *grpcInflightLimitCheck) TapHandle(ctx context.Context, info *tap.Info) (context.Context, error) { + if !isMethodNameValid(info.FullMethodName) { + // If method name is not valid, we let the request continue, but not call method limiter. + // Otherwise, we would not be able to call method limiter again when the call finishes, because in this case grpc server will not call stat handler. + return ctx, nil + } + + if err := g.methodLimiter.RPCCallStarting(info.FullMethodName); err != nil { + return ctx, err + } + + ctx = context.WithValue(ctx, requestFullMethod, info.FullMethodName) + return ctx, nil +} + +func (g *grpcInflightLimitCheck) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context { + return ctx +} + +func (g *grpcInflightLimitCheck) HandleRPC(ctx context.Context, rpcStats stats.RPCStats) { + // when request ends, and we started "inflight" request tracking for it, finish it. + if _, ok := rpcStats.(*stats.End); !ok { + return + } + + if name, ok := ctx.Value(requestFullMethod).(string); ok { + g.methodLimiter.RPCCallFinished(name) + } +} + +func (g *grpcInflightLimitCheck) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { + return ctx +} + +func (g *grpcInflightLimitCheck) HandleConn(_ context.Context, _ stats.ConnStats) { + // Not interested. +} + +// This function mimics the check in grpc library, server.go, handleStream method. handleStream method can stop processing early, +// without calling stat handler if the method name is invalid. +func isMethodNameValid(method string) bool { + if method != "" && method[0] == '/' { + method = method[1:] + } + pos := strings.LastIndex(method, "/") + return pos >= 0 +} diff --git a/vendor/github.com/grafana/dskit/server/server.go b/vendor/github.com/grafana/dskit/server/server.go index 726d62b488..ff1769cc2b 100644 --- a/vendor/github.com/grafana/dskit/server/server.go +++ b/vendor/github.com/grafana/dskit/server/server.go @@ -137,6 +137,9 @@ type Config struct { Gatherer prometheus.Gatherer `yaml:"-"` PathPrefix string `yaml:"http_path_prefix"` + + // This limiter is called for every started and finished gRPC request. + GrpcMethodLimiter GrpcInflightMethodLimiter `yaml:"-"` } var infinty = time.Duration(math.MaxInt64) @@ -375,12 +378,21 @@ func newServer(cfg Config, metrics *Metrics) (*Server, error) { grpc.MaxRecvMsgSize(cfg.GPRCServerMaxRecvMsgSize), grpc.MaxSendMsgSize(cfg.GRPCServerMaxSendMsgSize), grpc.MaxConcurrentStreams(uint32(cfg.GPRCServerMaxConcurrentStreams)), + } + + if cfg.GrpcMethodLimiter != nil { + grpcServerLimit := newGrpcInflightLimitCheck(cfg.GrpcMethodLimiter) + grpcOptions = append(grpcOptions, grpc.InTapHandle(grpcServerLimit.TapHandle), grpc.StatsHandler(grpcServerLimit)) + } + + grpcOptions = append(grpcOptions, grpc.StatsHandler(middleware.NewStatsHandler( metrics.ReceivedMessageSize, metrics.SentMessageSize, metrics.InflightRequests, )), - } + ) + grpcOptions = append(grpcOptions, cfg.GRPCOptions...) if grpcTLSConfig != nil { grpcCreds := credentials.NewTLS(grpcTLSConfig) diff --git a/vendor/github.com/grafana/gomemcache/memcache/memcache.go b/vendor/github.com/grafana/gomemcache/memcache/memcache.go index 0855df163e..f6b9cfdbf4 100644 --- a/vendor/github.com/grafana/gomemcache/memcache/memcache.go +++ b/vendor/github.com/grafana/gomemcache/memcache/memcache.go @@ -135,7 +135,10 @@ func New(server ...string) *Client { // NewFromSelector returns a new Client using the provided ServerSelector. func NewFromSelector(ss ServerSelector) *Client { - c := &Client{selector: ss, closed: make(chan struct{})} + c := &Client{ + selector: ss, + closed: make(chan struct{}), + } go c.releaseIdleConnectionsUntilClosed() @@ -418,7 +421,7 @@ func (c *Client) getConn(addr net.Addr) (*conn, error) { return cn, nil } -func (c *Client) onItem(item *Item, fn func(*Client, *bufio.ReadWriter, *Item) error) error { +func (c *Client) onItem(item *Item, operation string, fn func(*Client, *bufio.ReadWriter, *Item) error) error { addr, err := c.selector.PickServer(item.Key) if err != nil { return err @@ -606,7 +609,8 @@ func (c *Client) GetMulti(keys []string, opts ...Option) (map[string]*Item, erro ch := make(chan error, buffered) for addr, keys := range keyMap { go func(addr net.Addr, keys []string) { - ch <- c.getFromAddr(addr, keys, options, addItemToMap) + err := c.getFromAddr(addr, keys, options, addItemToMap) + ch <- err }(addr, keys) } @@ -702,7 +706,7 @@ func cut(s string, sep byte) (before, after string, found bool) { // Set writes the given item, unconditionally. func (c *Client) Set(item *Item) error { - return c.onItem(item, (*Client).set) + return c.onItem(item, "set", (*Client).set) } func (c *Client) set(rw *bufio.ReadWriter, item *Item) error { @@ -712,7 +716,7 @@ func (c *Client) set(rw *bufio.ReadWriter, item *Item) error { // Add writes the given item, if no value already exists for its // key. ErrNotStored is returned if that condition is not met. func (c *Client) Add(item *Item) error { - return c.onItem(item, (*Client).add) + return c.onItem(item, "add", (*Client).add) } func (c *Client) add(rw *bufio.ReadWriter, item *Item) error { @@ -722,7 +726,7 @@ func (c *Client) add(rw *bufio.ReadWriter, item *Item) error { // Replace writes the given item, but only if the server *does* // already hold data for this key func (c *Client) Replace(item *Item) error { - return c.onItem(item, (*Client).replace) + return c.onItem(item, "replace", (*Client).replace) } func (c *Client) replace(rw *bufio.ReadWriter, item *Item) error { @@ -737,7 +741,7 @@ func (c *Client) replace(rw *bufio.ReadWriter, item *Item) error { // calls. ErrNotStored is returned if the value was evicted in between // the calls. func (c *Client) CompareAndSwap(item *Item) error { - return c.onItem(item, (*Client).cas) + return c.onItem(item, "cas", (*Client).cas) } func (c *Client) cas(rw *bufio.ReadWriter, item *Item) error { diff --git a/vendor/modules.txt b/vendor/modules.txt index 9c3657a0db..7bdf2535f5 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -567,7 +567,7 @@ github.com/gosimple/slug # github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9 ## explicit; go 1.13 github.com/grafana-tools/sdk -# github.com/grafana/dskit v0.0.0-20230829141140-06955c011ffd +# github.com/grafana/dskit v0.0.0-20230920092933-98f431c981b2 ## explicit; go 1.19 github.com/grafana/dskit/backoff github.com/grafana/dskit/cache @@ -620,7 +620,7 @@ github.com/grafana/e2e github.com/grafana/e2e/cache github.com/grafana/e2e/db github.com/grafana/e2e/images -# github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9 +# github.com/grafana/gomemcache v0.0.0-20230914135007-70d78eaabfe1 ## explicit; go 1.18 github.com/grafana/gomemcache/memcache # github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd => github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6