diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 99d367c51eb..8e87d6dc286 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -1019,8 +1019,11 @@ type requestState struct { // (which can be after push handler returns). pushHandlerPerformsCleanup bool - // If positive, it means that request size has been checked and added to inflightPushRequestsBytes. - requestSize int64 + // If positive, it means that size of httpgrpc.HTTPRequest has been checked and added to inflightPushRequestsBytes. + httpgrpcRequestSize int64 + + // If positive, it means that size of mimirpb.WriteRequest has been checked and added to inflightPushRequestsBytes. + writeRequestSize int64 } // StartPushRequest does limits checks at the beginning of Push request in distributor. @@ -1033,12 +1036,12 @@ type requestState struct { // // This method creates requestState object and stores it in the context. This object describes which checks were already performed on the request, // and which component is responsible for doing a cleanup. -func (d *Distributor) StartPushRequest(ctx context.Context, requestSize int64) (context.Context, error) { - ctx, _, err := d.startPushRequest(ctx, requestSize) +func (d *Distributor) StartPushRequest(ctx context.Context, httpgrpcRequestSize int64) (context.Context, error) { + ctx, _, err := d.startPushRequest(ctx, httpgrpcRequestSize) return ctx, err } -func (d *Distributor) startPushRequest(ctx context.Context, requestSize int64) (context.Context, *requestState, error) { +func (d *Distributor) startPushRequest(ctx context.Context, httpgrpcRequestSize int64) (context.Context, *requestState, error) { // If requestState is already in context, it means that StartPushRequest already ran for this request. rs, alreadyInContext := ctx.Value(requestStateKey).(*requestState) if alreadyInContext { @@ -1070,9 +1073,9 @@ func (d *Distributor) startPushRequest(ctx context.Context, requestSize int64) ( } } - // If we know the request size already, we can check it. - if requestSize > 0 { - if err := d.checkRequestSize(rs, requestSize); err != nil { + // If we know the httpgrpcRequestSize, we can check it. + if httpgrpcRequestSize > 0 { + if err := d.checkHttpgrpcRequestSize(rs, httpgrpcRequestSize); err != nil { return ctx, nil, err } } @@ -1083,14 +1086,29 @@ func (d *Distributor) startPushRequest(ctx context.Context, requestSize int64) ( return ctx, rs, nil } -func (d *Distributor) checkRequestSize(rs *requestState, requestSize int64) error { - // If request size was already checked, don't check it again. - if rs.requestSize > 0 { +func (d *Distributor) checkHttpgrpcRequestSize(rs *requestState, httpgrpcRequestSize int64) error { + // If httpgrpcRequestSize was already checked, don't check it again. + if rs.httpgrpcRequestSize > 0 { return nil } - rs.requestSize = requestSize - inflightBytes := d.inflightPushRequestsBytes.Add(requestSize) + rs.httpgrpcRequestSize = httpgrpcRequestSize + inflightBytes := d.inflightPushRequestsBytes.Add(httpgrpcRequestSize) + return d.checkInflightBytes(inflightBytes) +} + +func (d *Distributor) checkWriteRequestSize(rs *requestState, writeRequestSize int64) error { + // If writeRequestSize was already checked, don't check it again. + if rs.writeRequestSize > 0 { + return nil + } + + rs.writeRequestSize = writeRequestSize + inflightBytes := d.inflightPushRequestsBytes.Add(writeRequestSize) + return d.checkInflightBytes(inflightBytes) +} + +func (d *Distributor) checkInflightBytes(inflightBytes int64) error { il := d.getInstanceLimits() if il.MaxInflightPushRequestsBytes > 0 && inflightBytes > int64(il.MaxInflightPushRequestsBytes) { @@ -1117,8 +1135,11 @@ func (d *Distributor) FinishPushRequest(ctx context.Context) { func (d *Distributor) cleanupAfterPushFinished(rs *requestState) { d.inflightPushRequests.Dec() - if rs.requestSize > 0 { - d.inflightPushRequestsBytes.Sub(rs.requestSize) + if rs.httpgrpcRequestSize > 0 { + d.inflightPushRequestsBytes.Sub(rs.httpgrpcRequestSize) + } + if rs.writeRequestSize > 0 { + d.inflightPushRequestsBytes.Sub(rs.writeRequestSize) } } @@ -1164,7 +1185,7 @@ func (d *Distributor) limitsMiddleware(next PushFunc) PushFunc { return err } - if err := d.checkRequestSize(rs, int64(req.Size())); err != nil { + if err := d.checkWriteRequestSize(rs, int64(req.Size())); err != nil { return err } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index b7f7d135939..415f5bfdaee 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -5024,13 +5024,15 @@ func TestStartFinishRequest(t *testing.T) { } type testCase struct { + externalCheck bool // Start request "externally", from outside of distributor. + httpgrpcRequestSize int64 // only used for external check. + inflightRequestsBeforePush int inflightRequestsSizeBeforePush int64 addIngestionRateBeforePush int64 - expectedStartSizeKnownError error - expectedStartSizeUnknownError error - expectedPushError error + expectedStartError error + expectedPushError error } const ( @@ -5040,118 +5042,142 @@ func TestStartFinishRequest(t *testing.T) { ) testcases := map[string]testCase{ - "request succeeds": { - expectedStartSizeKnownError: nil, - expectedStartSizeUnknownError: nil, - expectedPushError: nil, + "request succeeds, internal": { + expectedStartError: nil, + expectedPushError: nil, + }, + + "request succeeds, external": { + externalCheck: true, + expectedStartError: nil, + expectedPushError: nil, }, - "too many inflight requests": { + "request succeeds, external, with httpgrpc size": { + externalCheck: true, + httpgrpcRequestSize: 100, + expectedStartError: nil, + expectedPushError: nil, + }, + + "too many inflight requests, internal": { inflightRequestsBeforePush: inflightLimit, inflightRequestsSizeBeforePush: 0, - expectedStartSizeKnownError: errMaxInflightRequestsReached, - expectedStartSizeUnknownError: errMaxInflightRequestsReached, + expectedStartError: errMaxInflightRequestsReached, expectedPushError: errMaxInflightRequestsReached, }, - "too many inflight bytes requests": { + "too many inflight requests, external": { + externalCheck: true, + inflightRequestsBeforePush: inflightLimit, + inflightRequestsSizeBeforePush: 0, + expectedStartError: errMaxInflightRequestsReached, + expectedPushError: errMaxInflightRequestsReached, + }, + + "too many inflight bytes requests, internal": { + inflightRequestsBeforePush: 1, + inflightRequestsSizeBeforePush: 2 * inflightBytesLimit, + expectedStartError: errMaxInflightRequestsBytesReached, + expectedPushError: errMaxInflightRequestsBytesReached, + }, + + "too many inflight bytes requests, external": { + externalCheck: true, inflightRequestsBeforePush: 1, inflightRequestsSizeBeforePush: 2 * inflightBytesLimit, - expectedStartSizeKnownError: errMaxInflightRequestsBytesReached, - expectedStartSizeUnknownError: nil, + expectedStartError: nil, // httpgrpc request size is not set when calling StartPushRequest, so it's not checked. expectedPushError: errMaxInflightRequestsBytesReached, }, - "high ingestion rate": { - addIngestionRateBeforePush: 100 * ingestionRateLimit, - expectedStartSizeKnownError: errMaxIngestionRateReached, - expectedStartSizeUnknownError: errMaxIngestionRateReached, - expectedPushError: errMaxIngestionRateReached, + "too many inflight bytes requests, external with httpgrpc size within limit": { + externalCheck: true, + httpgrpcRequestSize: 500, + inflightRequestsBeforePush: 1, + inflightRequestsSizeBeforePush: inflightBytesLimit - 500, + expectedStartError: nil, // httpgrpc request size fits into inflight request size limit. + expectedPushError: errMaxInflightRequestsBytesReached, + }, + + "too many inflight bytes requests, external with httpgrpc size outside limit": { + externalCheck: true, + httpgrpcRequestSize: 500, + inflightRequestsBeforePush: 1, + inflightRequestsSizeBeforePush: inflightBytesLimit, + expectedStartError: errMaxInflightRequestsBytesReached, + expectedPushError: errMaxInflightRequestsBytesReached, + }, + + "high ingestion rate, internal": { + addIngestionRateBeforePush: 100 * ingestionRateLimit, + expectedStartError: errMaxIngestionRateReached, + expectedPushError: errMaxIngestionRateReached, + }, + + "high ingestion rate, external": { + externalCheck: true, + addIngestionRateBeforePush: 100 * ingestionRateLimit, + expectedStartError: errMaxIngestionRateReached, + expectedPushError: errMaxIngestionRateReached, }, } for name, tc := range testcases { - for _, externalCheck := range []bool{false, true} { - for _, externalSizeKnown := range []bool{false, true} { - if !externalCheck && externalSizeKnown { - continue - } + t.Run(name, func(t *testing.T) { + pushReq := makeWriteRequestForGenerators(1, uniqueMetricsGen, nil, nil) - tname := name - if externalCheck { - if externalSizeKnown { - tname = tname + ",external,size known" - } else { - tname = tname + ",external,size unknown" - } - } else { - tname = tname + ",internal" - } + var limits validation.Limits + flagext.DefaultValues(&limits) - t.Run(tname, func(t *testing.T) { - pushReq := makeWriteRequestForGenerators(1, uniqueMetricsGen, nil, nil) + // Prepare distributor and wrap the mock push function with its middlewares. + ds, _, _ := prepare(t, prepConfig{ + numDistributors: 1, + limits: &limits, + enableTracker: true, + maxInflightRequests: inflightLimit, + maxInflightRequestsBytes: inflightBytesLimit, + maxIngestionRate: ingestionRateLimit, + }) + wrappedPush := ds[0].wrapPushWithMiddlewares(finishPush) - var limits validation.Limits - flagext.DefaultValues(&limits) + // Setup inflight values before calling push. + ds[0].inflightPushRequests.Add(int64(tc.inflightRequestsBeforePush)) + ds[0].inflightPushRequestsBytes.Add(tc.inflightRequestsSizeBeforePush) + ds[0].ingestionRate.Add(tc.addIngestionRateBeforePush) + ds[0].ingestionRate.Tick() - // Prepare distributor and wrap the mock push function with its middlewares. - ds, _, _ := prepare(t, prepConfig{ - numDistributors: 1, - limits: &limits, - enableTracker: true, - maxInflightRequests: inflightLimit, - maxInflightRequestsBytes: inflightBytesLimit, - maxIngestionRate: ingestionRateLimit, - }) - wrappedPush := ds[0].wrapPushWithMiddlewares(finishPush) - - // Setup inflight values before calling push. - ds[0].inflightPushRequests.Add(int64(tc.inflightRequestsBeforePush)) - ds[0].inflightPushRequestsBytes.Add(tc.inflightRequestsSizeBeforePush) - ds[0].ingestionRate.Add(tc.addIngestionRateBeforePush) - ds[0].ingestionRate.Tick() - - ctx := user.InjectOrgID(context.Background(), "user") - if externalCheck { - var err error - size := int64(0) - expectedStartError := tc.expectedStartSizeUnknownError - if externalSizeKnown { - size = int64(pushReq.Size()) - expectedStartError = tc.expectedStartSizeKnownError - } - - ctx, err = ds[0].StartPushRequest(ctx, size) - - if expectedStartError == nil { - require.NoError(t, err) - } else { - require.ErrorIs(t, err, expectedStartError) - - // Verify that errors returned by StartPushRequest method are NOT gRPC status errors. - // They will be converted to gRPC status by grpcInflightMethodLimiter. - require.Error(t, err) - _, ok := status.FromError(err) - require.False(t, ok) - } - } + ctx := user.InjectOrgID(context.Background(), "user") + if tc.externalCheck { + var err error + ctx, err = ds[0].StartPushRequest(ctx, tc.httpgrpcRequestSize) - err := wrappedPush(ctx, NewParsedRequest(pushReq)) - if tc.expectedPushError == nil { - require.NoError(t, err) - } else { - require.ErrorIs(t, err, tc.expectedPushError) - } + if tc.expectedStartError == nil { + require.NoError(t, err) + } else { + require.ErrorIs(t, err, tc.expectedStartError) - if externalCheck { - ds[0].FinishPushRequest(ctx) - } + // Verify that errors returned by StartPushRequest method are NOT gRPC status errors. + // They will be converted to gRPC status by grpcInflightMethodLimiter. + require.Error(t, err) + _, ok := status.FromError(err) + require.False(t, ok) + } + } - // Verify that inflight metrics are the same as before the request. - require.Equal(t, int64(tc.inflightRequestsBeforePush), ds[0].inflightPushRequests.Load()) - require.Equal(t, tc.inflightRequestsSizeBeforePush, ds[0].inflightPushRequestsBytes.Load()) - }) + err := wrappedPush(ctx, NewParsedRequest(pushReq)) + if tc.expectedPushError == nil { + require.NoError(t, err) + } else { + require.ErrorIs(t, err, tc.expectedPushError) } - } + + if tc.externalCheck { + ds[0].FinishPushRequest(ctx) + } + + // Verify that inflight metrics are the same as before the request. + require.Equal(t, int64(tc.inflightRequestsBeforePush), ds[0].inflightPushRequests.Load()) + require.Equal(t, tc.inflightRequestsSizeBeforePush, ds[0].inflightPushRequestsBytes.Load()) + }) } } diff --git a/pkg/mimir/grpc_push_check.go b/pkg/mimir/grpc_push_check.go index ae2d164c996..c8a4a82c140 100644 --- a/pkg/mimir/grpc_push_check.go +++ b/pkg/mimir/grpc_push_check.go @@ -24,7 +24,7 @@ type ingesterPushReceiver interface { // Interface exposed by Distributor. type distributorPushReceiver interface { - StartPushRequest(ctx context.Context, requestSize int64) (context.Context, error) + StartPushRequest(ctx context.Context, httpgrpcRequestSize int64) (context.Context, error) FinishPushRequest(ctx context.Context) }