Skip to content

Commit

Permalink
distributor: do inflight checks before reading request (for httpgrpc …
Browse files Browse the repository at this point in the history
…requests) (#6300)

* Allow performing inflight checks before push handler starts.

Signed-off-by: Peter Štibraný <[email protected]>

* Add check for ingestion rate.

Signed-off-by: Peter Štibraný <[email protected]>

* distributor: when using httpgrpc check inflight limits before reading requests.

Signed-off-by: Peter Štibraný <[email protected]>

* CHANGELOG.md entry.

Signed-off-by: Peter Štibraný <[email protected]>

* Use suffix check.

Signed-off-by: Peter Štibraný <[email protected]>

* Don't return gRPC errors from Distributor.

Signed-off-by: Peter Štibraný <[email protected]>

* Call cleanupAfterPushFinished from defer.

Signed-off-by: Peter Štibraný <[email protected]>

* Separate httpgrpcRequestSize and writeRequestSize.

Signed-off-by: Peter Štibraný <[email protected]>

* Mention new flags in docs/sources/mimir/configure/about-versioning.md.

Signed-off-by: Peter Štibraný <[email protected]>

* Extract message size only when handling distributor push request.
Use prefix in tests.

Signed-off-by: Peter Štibraný <[email protected]>

* Update comment for startPushRequest, mention wrapping to util_log.DoNotLogError.

Signed-off-by: Peter Štibraný <[email protected]>

* Fix linter unhappiness.

Signed-off-by: Peter Štibraný <[email protected]>

---------

Signed-off-by: Peter Štibraný <[email protected]>
  • Loading branch information
pstibrany authored Oct 24, 2023
1 parent 83d0d7b commit 272e355
Show file tree
Hide file tree
Showing 11 changed files with 586 additions and 57 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
* [ENHANCEMENT] Querier: reduce memory consumed for queries that hit store-gateways. #6348
* [ENHANCEMENT] Ruler: include corresponding trace ID with log messages associated with rule evaluation. #6379
* [ENHANCEMENT] Querier: clarify log messages and span events emitted while querying ingesters, and include both ingester name and address when relevant. #6381
* [ENHANCEMENT] Ingester, Distributor: added experimental support for rejecting push requests received via gRPC before reading them into memory, if ingester or distributor is unable to accept the request. This is activated by using `-ingester.limit-inflight-requests-using-grpc-method-limiter` for ingester, and `-distributor.limit-inflight-requests-using-grpc-method-limiter` for distributor. #5976 #6300
* [BUGFIX] Ring: Ensure network addresses used for component hash rings are formatted correctly when using IPv6. #6068
* [BUGFIX] Query-scheduler: don't retain connections from queriers that have shut down, leading to gradually increasing enqueue latency over time. #6100 #6145
* [BUGFIX] Ingester: prevent query logic from continuing to execute after queries are canceled. #6085
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1563,6 +1563,17 @@
"fieldFlag": "distributor.write-requests-buffer-pooling-enabled",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "limit_inflight_requests_using_grpc_method_limiter",
"required": false,
"desc": "Use experimental method of limiting push requests.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "distributor.limit-inflight-requests-using-grpc-method-limiter",
"fieldType": "boolean",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,8 @@ Usage of ./cmd/mimir/mimir:
The sum of the request sizes in bytes of inflight push requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.
-distributor.instance-limits.max-ingestion-rate float
Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.
-distributor.limit-inflight-requests-using-grpc-method-limiter
[experimental] Use experimental method of limiting push requests.
-distributor.max-recv-msg-size int
Max message size in bytes that the distributors will accept for incoming push requests to the remote write API. If exceeded, the request will be rejected. (default 104857600)
-distributor.remote-timeout duration
Expand Down
3 changes: 3 additions & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ The following features are currently experimental:
- Timeseries Unmarshal caching optimization in distributor (`-timeseries-unmarshal-caching-optimization-enabled`)
- Reusing buffers for marshalling write requests in distributors (`-distributor.write-requests-buffer-pooling-enabled`)
- Using a worker pool for handling GRPC requests (`-server.grpc.num-workers`)
- Limiting inflight requests to Distributor and Ingester via gRPC limiter:
- `-distributor.limit-inflight-requests-using-grpc-method-limiter`
- `-ingester.limit-inflight-requests-using-grpc-method-limiter`

## Deprecated features

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,10 @@ instance_limits:
# (experimental) Enable pooling of buffers used for marshaling write requests.
# CLI flag: -distributor.write-requests-buffer-pooling-enabled
[write_requests_buffer_pooling_enabled: <boolean> | default = false]
# (experimental) Use experimental method of limiting push requests.
# CLI flag: -distributor.limit-inflight-requests-using-grpc-method-limiter
[limit_inflight_requests_using_grpc_method_limiter: <boolean> | default = false]
```

### ingester
Expand Down
7 changes: 5 additions & 2 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,15 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc, userL
a.RegisterRoute("/api/v1/user_limits", userLimitsHandler, true, true, "GET")
}

const PrometheusPushEndpoint = "/api/v1/push"
const OTLPPushEndpoint = "/otlp/v1/metrics"

// RegisterDistributor registers the endpoints associated with the distributor.
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, reg prometheus.Registerer, limits *validation.Overrides) {
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute("/api/v1/push", distributor.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, limits, d.PushWithMiddlewares), true, false, "POST")
a.RegisterRoute("/otlp/v1/metrics", distributor.OTLPHandler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.EnableOtelMetadataStorage, limits, reg, d.PushWithMiddlewares), true, false, "POST")
a.RegisterRoute(PrometheusPushEndpoint, distributor.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, limits, d.PushWithMiddlewares), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.EnableOtelMetadataStorage, limits, reg, d.PushWithMiddlewares), true, false, "POST")

a.indexPage.AddLinks(defaultWeight, "Distributor", []IndexPageLink{
{Desc: "Ring status", Path: "/distributor/ring"},
Expand Down
175 changes: 150 additions & 25 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ type Config struct {
// These functions will only receive samples that don't get dropped by HA deduplication.
PushWrappers []PushWrapper `yaml:"-"`

WriteRequestsBufferPoolingEnabled bool `yaml:"write_requests_buffer_pooling_enabled" category:"experimental"`
WriteRequestsBufferPoolingEnabled bool `yaml:"write_requests_buffer_pooling_enabled" category:"experimental"`
LimitInflightRequestsUsingGrpcMethodLimiter bool `yaml:"limit_inflight_requests_using_grpc_method_limiter" category:"experimental"`
}

// PushWrapper wraps around a push. It is similar to middleware.Interface.
Expand All @@ -201,6 +202,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-msg-size", 100<<20, "Max message size in bytes that the distributors will accept for incoming push requests to the remote write API. If exceeded, the request will be rejected.")
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.BoolVar(&cfg.WriteRequestsBufferPoolingEnabled, "distributor.write-requests-buffer-pooling-enabled", false, "Enable pooling of buffers used for marshaling write requests.")
f.BoolVar(&cfg.LimitInflightRequestsUsingGrpcMethodLimiter, "distributor.limit-inflight-requests-using-grpc-method-limiter", false, "Use experimental method of limiting push requests.")

cfg.DefaultLimits.RegisterFlags(f)
}
Expand Down Expand Up @@ -1005,36 +1007,165 @@ func (d *Distributor) metricsMiddleware(next PushFunc) PushFunc {
}
}

type ctxKey int

const requestStateKey ctxKey = 1

// requestState represents state of checks for given request. If this object is stored in context,
// it means that request has been checked against inflight requests limit, and FinishPushRequest,
// cleanupAfterPushFinished (or both) must be called for given context.
type requestState struct {
// If set to true, push request will perform cleanup of inflight metrics after request has actually finished
// (which can be after push handler returns).
pushHandlerPerformsCleanup bool

// If positive, it means that size of httpgrpc.HTTPRequest has been checked and added to inflightPushRequestsBytes.
httpgrpcRequestSize int64

// If positive, it means that size of mimirpb.WriteRequest has been checked and added to inflightPushRequestsBytes.
writeRequestSize int64
}

func (d *Distributor) StartPushRequest(ctx context.Context, httpgrpcRequestSize int64) (context.Context, error) {
ctx, _, err := d.startPushRequest(ctx, httpgrpcRequestSize)
return ctx, err
}

// startPushRequest does limits checks at the beginning of Push request in distributor.
// This can be called from different places, even multiple times for the same request:
//
// - from gRPC Method limit check. This only applies if request arrived as httpgrpc.HTTPRequest, and request metadata
// in gRPC request provided enough information to do the check. Errors are not logged on this path, only returned to client.
//
// - from Distributor's limitsMiddleware method. If error is returned, limitsMiddleware will wrap the error using util_log.DoNotLogError.
//
// This method creates requestState object and stores it in the context.
// This object describes which checks were already performed on the request,
// and which component is responsible for doing a cleanup.
func (d *Distributor) startPushRequest(ctx context.Context, httpgrpcRequestSize int64) (context.Context, *requestState, error) {
// If requestState is already in context, it means that StartPushRequest already ran for this request.
rs, alreadyInContext := ctx.Value(requestStateKey).(*requestState)
if alreadyInContext {
return ctx, rs, nil
}

rs = &requestState{}

cleanupInDefer := true

// Increment number of requests and bytes before doing the checks, so that we hit error if this request crosses the limits.
inflight := d.inflightPushRequests.Inc()
defer func() {
if cleanupInDefer {
d.cleanupAfterPushFinished(rs)
}
}()

il := d.getInstanceLimits()
if il.MaxInflightPushRequests > 0 && inflight > int64(il.MaxInflightPushRequests) {
d.rejectedRequests.WithLabelValues(reasonDistributorMaxInflightPushRequests).Inc()
return ctx, nil, errMaxInflightRequestsReached
}

if il.MaxIngestionRate > 0 {
if rate := d.ingestionRate.Rate(); rate >= il.MaxIngestionRate {
d.rejectedRequests.WithLabelValues(reasonDistributorMaxIngestionRate).Inc()
return ctx, nil, errMaxIngestionRateReached
}
}

// If we know the httpgrpcRequestSize, we can check it.
if httpgrpcRequestSize > 0 {
if err := d.checkHttpgrpcRequestSize(rs, httpgrpcRequestSize); err != nil {
return ctx, nil, err
}
}

ctx = context.WithValue(ctx, requestStateKey, rs)

cleanupInDefer = false
return ctx, rs, nil
}

func (d *Distributor) checkHttpgrpcRequestSize(rs *requestState, httpgrpcRequestSize int64) error {
// If httpgrpcRequestSize was already checked, don't check it again.
if rs.httpgrpcRequestSize > 0 {
return nil
}

rs.httpgrpcRequestSize = httpgrpcRequestSize
inflightBytes := d.inflightPushRequestsBytes.Add(httpgrpcRequestSize)
return d.checkInflightBytes(inflightBytes)
}

func (d *Distributor) checkWriteRequestSize(rs *requestState, writeRequestSize int64) error {
// If writeRequestSize was already checked, don't check it again.
if rs.writeRequestSize > 0 {
return nil
}

rs.writeRequestSize = writeRequestSize
inflightBytes := d.inflightPushRequestsBytes.Add(writeRequestSize)
return d.checkInflightBytes(inflightBytes)
}

func (d *Distributor) checkInflightBytes(inflightBytes int64) error {
il := d.getInstanceLimits()

if il.MaxInflightPushRequestsBytes > 0 && inflightBytes > int64(il.MaxInflightPushRequestsBytes) {
d.rejectedRequests.WithLabelValues(reasonDistributorMaxInflightPushRequestsBytes).Inc()
return errMaxInflightRequestsBytesReached
}
return nil
}

// FinishPushRequest is a counter-part to StartPushRequest, and must be called exactly once while handling the push request,
// on the same goroutine as push method itself.
func (d *Distributor) FinishPushRequest(ctx context.Context) {
rs, ok := ctx.Value(requestStateKey).(*requestState)
if !ok {
return
}

if rs.pushHandlerPerformsCleanup {
return
}

d.cleanupAfterPushFinished(rs)
}

func (d *Distributor) cleanupAfterPushFinished(rs *requestState) {
d.inflightPushRequests.Dec()
if rs.httpgrpcRequestSize > 0 {
d.inflightPushRequestsBytes.Sub(rs.httpgrpcRequestSize)
}
if rs.writeRequestSize > 0 {
d.inflightPushRequestsBytes.Sub(rs.writeRequestSize)
}
}

// limitsMiddleware checks for instance limits and rejects request if this instance cannot process it at the moment.
func (d *Distributor) limitsMiddleware(next PushFunc) PushFunc {
return func(ctx context.Context, pushReq *Request) error {
// Increment number of requests and bytes before doing the checks, so that we hit error if this request crosses the limits.
inflight := d.inflightPushRequests.Inc()
// We don't know request size yet, will check it later.
ctx, rs, err := d.startPushRequest(ctx, -1)
if err != nil {
return util_log.DoNotLogError{Err: err}
}

rs.pushHandlerPerformsCleanup = true
// Decrement counter after all ingester calls have finished or been cancelled.
pushReq.AddCleanup(func() {
d.inflightPushRequests.Dec()
d.cleanupAfterPushFinished(rs)
})

cleanupInDefer := true
defer func() {
if cleanupInDefer {
pushReq.CleanUp()
}
}()

il := d.getInstanceLimits()
if il.MaxInflightPushRequests > 0 && inflight > int64(il.MaxInflightPushRequests) {
d.rejectedRequests.WithLabelValues(reasonDistributorMaxInflightPushRequests).Inc()
return util_log.DoNotLogError{Err: errMaxInflightRequestsReached}
}

if il.MaxIngestionRate > 0 {
if rate := d.ingestionRate.Rate(); rate >= il.MaxIngestionRate {
d.rejectedRequests.WithLabelValues(reasonDistributorMaxIngestionRate).Inc()
return errMaxIngestionRateReached
}
}

userID, err := tenant.TenantID(ctx)
if err != nil {
return err
Expand All @@ -1054,15 +1185,9 @@ func (d *Distributor) limitsMiddleware(next PushFunc) PushFunc {
if err != nil {
return err
}
reqSize := int64(req.Size())
inflightBytes := d.inflightPushRequestsBytes.Add(reqSize)
pushReq.AddCleanup(func() {
d.inflightPushRequestsBytes.Sub(reqSize)
})

if il.MaxInflightPushRequestsBytes > 0 && inflightBytes > int64(il.MaxInflightPushRequestsBytes) {
d.rejectedRequests.WithLabelValues(reasonDistributorMaxInflightPushRequestsBytes).Inc()
return errMaxInflightRequestsBytesReached
if err := d.checkWriteRequestSize(rs, int64(req.Size())); err != nil {
return err
}

cleanupInDefer = false
Expand Down
Loading

0 comments on commit 272e355

Please sign in to comment.