Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update dskit, modify gRPC method limiting #6073

Merged
merged 3 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -2895,12 +2895,12 @@
},
{
"kind": "field",
"name": "limit_inflight_requests_using_grpc_tap_handle",
"name": "limit_inflight_requests_using_grpc_method_limiter",
"required": false,
"desc": "Use experimental method of limiting push requests",
"desc": "Use experimental method of limiting push requests.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "ingester.limit-inflight-requests-using-grpc-handlers",
"fieldFlag": "ingester.limit-inflight-requests-using-grpc-method-limiter",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
Expand Down
4 changes: 2 additions & 2 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1279,8 +1279,8 @@ Usage of ./cmd/mimir/mimir:
Max series that this ingester can hold (across all tenants). Requests to create additional series will be rejected. 0 = unlimited.
-ingester.instance-limits.max-tenants int
Max tenants that this ingester can hold. Requests from additional tenants will be rejected. 0 = unlimited.
-ingester.limit-inflight-requests-using-grpc-handlers
[experimental] Use experimental method of limiting push requests
-ingester.limit-inflight-requests-using-grpc-method-limiter
[experimental] Use experimental method of limiting push requests.
-ingester.log-utilization-based-limiter-cpu-samples
[experimental] Enable logging of utilization based limiter CPU samples.
-ingester.max-global-exemplars-per-user int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1039,9 +1039,9 @@ instance_limits:
# CLI flag: -ingester.log-utilization-based-limiter-cpu-samples
[log_utilization_based_limiter_cpu_samples: <boolean> | default = false]

# (experimental) Use experimental method of limiting push requests
# CLI flag: -ingester.limit-inflight-requests-using-grpc-handlers
[limit_inflight_requests_using_grpc_tap_handle: <boolean> | default = false]
# (experimental) Use experimental method of limiting push requests.
# CLI flag: -ingester.limit-inflight-requests-using-grpc-method-limiter
[limit_inflight_requests_using_grpc_method_limiter: <boolean> | default = false]

# (experimental) Each error will be logged once in this many times. Use 0 to log
# all of them.
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/golang/snappy v0.0.4
github.com/google/gopacket v1.1.19
github.com/gorilla/mux v1.8.0
github.com/grafana/dskit v0.0.0-20230829141140-06955c011ffd
github.com/grafana/dskit v0.0.0-20230920092933-98f431c981b2
github.com/grafana/e2e v0.1.1-0.20230828072146-510b1706a292
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/json-iterator/go v1.1.12
Expand Down Expand Up @@ -166,7 +166,7 @@ require (
github.com/googleapis/enterprise-certificate-proxy v0.2.5 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/gosimple/slug v1.1.1 // indirect
github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9 // indirect
github.com/grafana/gomemcache v0.0.0-20230914135007-70d78eaabfe1 // indirect
github.com/hashicorp/consul/api v1.22.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -895,12 +895,12 @@ github.com/gosimple/slug v1.1.1 h1:fRu/digW+NMwBIP+RmviTK97Ho/bEj/C9swrCspN3D4=
github.com/gosimple/slug v1.1.1/go.mod h1:ER78kgg1Mv0NQGlXiDe57DpCyfbNywXXZ9mIorhxAf0=
github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9 h1:LQAhgcUPnzdjU/OjCJaLlPQI7NmQCRlfjMPSA1VegvA=
github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4=
github.com/grafana/dskit v0.0.0-20230829141140-06955c011ffd h1:RHZuBHWNS2HRJ5XhQK7cKP11EMMJPtJO2xKvQ+ws+PU=
github.com/grafana/dskit v0.0.0-20230829141140-06955c011ffd/go.mod h1:3u7fr4hmOhuUL9Yc1QP/oa3za73kxvqJnRJH4BA5fOM=
github.com/grafana/dskit v0.0.0-20230920092933-98f431c981b2 h1:vWx9/Q+AB3MNfaIhlpA92wqKbTCgAo/cvt99Jsg9sNI=
github.com/grafana/dskit v0.0.0-20230920092933-98f431c981b2/go.mod h1:mZkAJTODiHxuvqvmwwFvp0ww9XA3mwlGA8Iv0SHP+5c=
github.com/grafana/e2e v0.1.1-0.20230828072146-510b1706a292 h1:t+QoVWpA+KIgwMmcNI3TDin2CUWFz3f12GdKWIfB1/g=
github.com/grafana/e2e v0.1.1-0.20230828072146-510b1706a292/go.mod h1:3UsooRp7yW5/NJQBlXcTsAHOoykEhNUYXkQ3r6ehEEY=
github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9 h1:WB3bGH2f1UN6jkd6uAEWfHB8OD7dKJ0v2Oo6SNfhpfQ=
github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU=
github.com/grafana/gomemcache v0.0.0-20230914135007-70d78eaabfe1 h1:MLYY2R60/74hfYl5vRRmC2VDo0Yuql1QQ1ig8hnvgSI=
github.com/grafana/gomemcache v0.0.0-20230914135007-70d78eaabfe1/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/grafana/mimir-prometheus v0.0.0-20230907080713-7c067467a0fd h1:UtvS3ax2aL970lCLX3P88zkhjhYfanDfqx/KxPf0Ulo=
Expand Down
12 changes: 9 additions & 3 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ type Config struct {
ReadPathMemoryUtilizationLimit uint64 `yaml:"read_path_memory_utilization_limit" category:"experimental"`
LogUtilizationBasedLimiterCPUSamples bool `yaml:"log_utilization_based_limiter_cpu_samples" category:"experimental"`

LimitInflightRequestsUsingGrpcTapHandle bool `yaml:"limit_inflight_requests_using_grpc_tap_handle" category:"experimental"`
LimitInflightRequestsUsingGrpcMethodLimiter bool `yaml:"limit_inflight_requests_using_grpc_method_limiter" category:"experimental"`

ErrorSampleRate int64 `yaml:"error_sample_rate" json:"error_sample_rate" category:"experimental"`
}
Expand All @@ -190,7 +190,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.Float64Var(&cfg.ReadPathCPUUtilizationLimit, "ingester.read-path-cpu-utilization-limit", 0, "CPU utilization limit, as CPU cores, for CPU/memory utilization based read request limiting. Use 0 to disable it.")
f.Uint64Var(&cfg.ReadPathMemoryUtilizationLimit, "ingester.read-path-memory-utilization-limit", 0, "Memory limit, in bytes, for CPU/memory utilization based read request limiting. Use 0 to disable it.")
f.BoolVar(&cfg.LogUtilizationBasedLimiterCPUSamples, "ingester.log-utilization-based-limiter-cpu-samples", false, "Enable logging of utilization based limiter CPU samples.")
f.BoolVar(&cfg.LimitInflightRequestsUsingGrpcTapHandle, "ingester.limit-inflight-requests-using-grpc-handlers", false, "Use experimental method of limiting push requests")
f.BoolVar(&cfg.LimitInflightRequestsUsingGrpcMethodLimiter, "ingester.limit-inflight-requests-using-grpc-method-limiter", false, "Use experimental method of limiting push requests.")
f.Int64Var(&cfg.ErrorSampleRate, "ingester.error-sample-rate", 0, "Each error will be logged once in this many times. Use 0 to log all of them.")
}

Expand Down Expand Up @@ -735,6 +735,12 @@ type pushStats struct {

// StartPushRequest checks if ingester can start push request, and increments relevant counters.
// If new push request cannot be started, errors converible to gRPC status code are returned, and metrics are updated.
//
// This method can be called in two ways: 1. Ingester.PushWithCleanup, or 2. from gRPC server's method limiter.
//
// In the first case, returned errors can be inspected/logged by middleware. Ingester.PushWithCleanup will wrap the error in util_log.DoNotLogError wrapper.
//
// In the second case, returned errors will not be logged, because request will not reach any middleware.
func (i *Ingester) StartPushRequest() error {
if err := i.checkRunning(); err != nil {
return err
Expand Down Expand Up @@ -778,7 +784,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) (
defer pushReq.CleanUp()

// If we're using grpc handlers, we don't need to start/finish request here.
if !i.cfg.LimitInflightRequestsUsingGrpcTapHandle {
if !i.cfg.LimitInflightRequestsUsingGrpcMethodLimiter {
if err := i.StartPushRequest(); err != nil {
return nil, util_log.DoNotLogError{Err: err}
}
Expand Down
69 changes: 20 additions & 49 deletions pkg/mimir/grpc_push_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,73 +3,44 @@
package mimir

import (
"context"

"google.golang.org/grpc/stats"
"google.golang.org/grpc/tap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type pushReceiver interface {
StartPushRequest() error
FinishPushRequest()
}

func newGrpcPushCheck(getIngester func() pushReceiver) *grpcPushCheck {
return &grpcPushCheck{getIngester: getIngester}
// getPushReceiver function must be constant -- return same value on each call.
func newGrpcInflightMethodLimiter(getIngester func() pushReceiver) *grpcInflightMethodLimiter {
return &grpcInflightMethodLimiter{getIngester: getIngester}
}

// grpcPushCheck implements gRPC TapHandle and gRPC stats.Handler.
// grpcPushCheck can track push inflight requests, and reject requests before even reading them into memory.
type grpcPushCheck struct {
// grpcInflightMethodLimiter implements gRPC TapHandle and gRPC stats.Handler.
type grpcInflightMethodLimiter struct {
getIngester func() pushReceiver
}

// Custom type to hide it from other packages.
type grpcPushCheckContextKey int

const (
pushRequestStartedKey grpcPushCheckContextKey = 1
)
const ingesterPushMethod = "/cortex.Ingester/Push"

// TapHandle is called after receiving grpc request and headers, but before reading any request data yet.
// If we reject request here, it won't be counted towards any metrics (eg. dskit middleware).
// If we accept request (not return error), eventually HandleRPC with stats.End notification will be called.
func (g *grpcPushCheck) TapHandle(ctx context.Context, info *tap.Info) (context.Context, error) {
pushRequestStarted := false
var errNoIngester = status.Error(codes.Unavailable, "no ingester")

var err error
if info.FullMethodName == "/cortex.Ingester/Push" {
func (g *grpcInflightMethodLimiter) RPCCallStarting(methodName string) error {
if methodName == ingesterPushMethod {
ing := g.getIngester()
if ing != nil {
// All errors returned by StartPushRequest can be converted to gRPC status.Status.
err = ing.StartPushRequest()
if err == nil {
pushRequestStarted = true
}
if ing == nil {
// We return error here, to make sure that RPCCallFinished doesn't get called for this RPC call.
return errNoIngester
}
}

ctx = context.WithValue(ctx, pushRequestStartedKey, pushRequestStarted)
return ctx, err
}

func (g *grpcPushCheck) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
return ctx
}

func (g *grpcPushCheck) HandleRPC(ctx context.Context, rpcStats stats.RPCStats) {
// when request ends, and we started push request tracking for this request, finish it.
if _, ok := rpcStats.(*stats.End); ok {
if b, ok := ctx.Value(pushRequestStartedKey).(bool); ok && b {
g.getIngester().FinishPushRequest()
}
return ing.StartPushRequest()
}
return nil
}

func (g *grpcPushCheck) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}

func (g *grpcPushCheck) HandleConn(_ context.Context, _ stats.ConnStats) {
// Not interested.
func (g *grpcInflightMethodLimiter) RPCCallFinished(methodName string) {
if methodName == ingesterPushMethod {
g.getIngester().FinishPushRequest()
}
}
150 changes: 41 additions & 109 deletions pkg/mimir/grpc_push_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,130 +3,62 @@
package mimir

import (
"context"
"net"
"sync"
"testing"
"time"

"github.com/grafana/dskit/test"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"

ingester_client "github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
)

func TestGrpcPushCheck(t *testing.T) {
const limit = 3
ingServer := &ingesterServer{finishRequest: make(chan struct{}), inflightLimit: limit}

c := setupGrpcServerWithCheckAndClient(t, ingServer)

// start push requests in background goroutines
started := sync.WaitGroup{}
finished := sync.WaitGroup{}
for i := 0; i < limit; i++ {
started.Add(1)
finished.Add(1)

go func() {
started.Done()
defer finished.Done()

_, err := c.Push(context.Background(), &mimirpb.WriteRequest{})
require.NoError(t, err)
}()
}

// Wait until all goroutines start.
started.Wait()

// Wait until all requests are inflight.
test.Poll(t, 1*time.Second, int64(limit), func() interface{} {
return ingServer.inflight.Load()
})

// Try another request. This one should fail with too many inflight requests error.
_, err := c.Push(context.Background(), &mimirpb.WriteRequest{})
require.Error(t, err)
s, ok := status.FromError(err)
require.True(t, ok)
require.Equal(t, codes.Code(123), s.Code())
require.Equal(t, "too many requests", s.Message())

require.Equal(t, int64(limit), ingServer.inflight.Load())

// unblock all requests
for i := 0; i < limit; i++ {
ingServer.finishRequest <- struct{}{}
}
finished.Wait()
func TestGrpcInflightMethodLimiter(t *testing.T) {
t.Run("nil push receiver", func(t *testing.T) {
l := newGrpcInflightMethodLimiter(func() pushReceiver { return nil })

require.Equal(t, int64(0), ingServer.inflight.Load())
require.NoError(t, l.RPCCallStarting("test"))
require.NotPanics(t, func() {
l.RPCCallFinished("test")
})

// unblock subsequent requests immediately
close(ingServer.finishRequest)

// another push request should succeed.
_, err = c.Push(context.Background(), &mimirpb.WriteRequest{})
require.NoError(t, err)
}

func setupGrpcServerWithCheckAndClient(t *testing.T, ingServer *ingesterServer) ingester_client.IngesterClient {
g := newGrpcPushCheck(func() pushReceiver {
return ingServer
require.ErrorIs(t, l.RPCCallStarting(ingesterPushMethod), errNoIngester)
require.Panics(t, func() {
// In practice, this will not be called, since l.RPCCallStarting(ingesterPushMethod) returns error.
l.RPCCallFinished(ingesterPushMethod)
})
})

server := grpc.NewServer(grpc.InTapHandle(g.TapHandle), grpc.StatsHandler(g))
ingester_client.RegisterIngesterServer(server, ingServer)

l, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

go func() {
_ = server.Serve(l)
}()

t.Cleanup(func() {
_ = l.Close()
t.Run("push receiver, no error", func(t *testing.T) {
m := &mockPushReceiver{startErr: nil}

l := newGrpcInflightMethodLimiter(func() pushReceiver { return m })
require.NoError(t, l.RPCCallStarting("test"))
require.NotPanics(t, func() {
l.RPCCallFinished("test")
})
require.Equal(t, 0, m.startCalls)
require.Equal(t, 0, m.finishCalls)

require.NoError(t, l.RPCCallStarting(ingesterPushMethod))
require.Equal(t, 1, m.startCalls)
require.Equal(t, 0, m.finishCalls)

require.NotPanics(t, func() {
l.RPCCallFinished(ingesterPushMethod)
})
require.Equal(t, 1, m.startCalls)
require.Equal(t, 1, m.finishCalls)
})

cc, err := grpc.Dial(l.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)

t.Cleanup(func() {
_ = cc.Close()
})

return ingester_client.NewIngesterClient(cc)
}

type ingesterServer struct {
ingester_client.IngesterServer
inflightLimit int64
finishRequest chan struct{}
inflight atomic.Int64
}
type mockPushReceiver struct {
startCalls int
finishCalls int

func (i *ingesterServer) Push(_ context.Context, _ *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
<-i.finishRequest
return &mimirpb.WriteResponse{}, nil
startErr error
}

func (i *ingesterServer) StartPushRequest() error {
v := i.inflight.Inc()
if v > i.inflightLimit {
i.inflight.Dec()
return status.Error(123, "too many requests")
}
return nil
func (i *mockPushReceiver) StartPushRequest() error {
i.startCalls++
return i.startErr
}

func (i *ingesterServer) FinishPushRequest() {
i.inflight.Dec()
func (i *mockPushReceiver) FinishPushRequest() {
i.finishCalls++
}
Loading