Skip to content

Commit

Permalink
Separate httpgrpcRequestSize and writeRequestSize.
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Štibraný <[email protected]>
  • Loading branch information
pstibrany committed Oct 24, 2023
1 parent 8caa52b commit da5c593
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 109 deletions.
53 changes: 37 additions & 16 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,8 +1019,11 @@ type requestState struct {
// (which can be after push handler returns).
pushHandlerPerformsCleanup bool

// If positive, it means that request size has been checked and added to inflightPushRequestsBytes.
requestSize int64
// If positive, it means that size of httpgrpc.HTTPRequest has been checked and added to inflightPushRequestsBytes.
httpgrpcRequestSize int64

// If positive, it means that size of mimirpb.WriteRequest has been checked and added to inflightPushRequestsBytes.
writeRequestSize int64
}

// StartPushRequest does limits checks at the beginning of Push request in distributor.
Expand All @@ -1033,12 +1036,12 @@ type requestState struct {
//
// This method creates requestState object and stores it in the context. This object describes which checks were already performed on the request,
// and which component is responsible for doing a cleanup.
func (d *Distributor) StartPushRequest(ctx context.Context, requestSize int64) (context.Context, error) {
ctx, _, err := d.startPushRequest(ctx, requestSize)
func (d *Distributor) StartPushRequest(ctx context.Context, httpgrpcRequestSize int64) (context.Context, error) {
ctx, _, err := d.startPushRequest(ctx, httpgrpcRequestSize)
return ctx, err
}

func (d *Distributor) startPushRequest(ctx context.Context, requestSize int64) (context.Context, *requestState, error) {
func (d *Distributor) startPushRequest(ctx context.Context, httpgrpcRequestSize int64) (context.Context, *requestState, error) {
// If requestState is already in context, it means that StartPushRequest already ran for this request.
rs, alreadyInContext := ctx.Value(requestStateKey).(*requestState)
if alreadyInContext {
Expand Down Expand Up @@ -1070,9 +1073,9 @@ func (d *Distributor) startPushRequest(ctx context.Context, requestSize int64) (
}
}

// If we know the request size already, we can check it.
if requestSize > 0 {
if err := d.checkRequestSize(rs, requestSize); err != nil {
// If we know the httpgrpcRequestSize, we can check it.
if httpgrpcRequestSize > 0 {
if err := d.checkHttpgrpcRequestSize(rs, httpgrpcRequestSize); err != nil {
return ctx, nil, err
}
}
Expand All @@ -1083,14 +1086,29 @@ func (d *Distributor) startPushRequest(ctx context.Context, requestSize int64) (
return ctx, rs, nil
}

func (d *Distributor) checkRequestSize(rs *requestState, requestSize int64) error {
// If request size was already checked, don't check it again.
if rs.requestSize > 0 {
func (d *Distributor) checkHttpgrpcRequestSize(rs *requestState, httpgrpcRequestSize int64) error {
// If httpgrpcRequestSize was already checked, don't check it again.
if rs.httpgrpcRequestSize > 0 {
return nil
}

rs.requestSize = requestSize
inflightBytes := d.inflightPushRequestsBytes.Add(requestSize)
rs.httpgrpcRequestSize = httpgrpcRequestSize
inflightBytes := d.inflightPushRequestsBytes.Add(httpgrpcRequestSize)
return d.checkInflightBytes(inflightBytes)
}

func (d *Distributor) checkWriteRequestSize(rs *requestState, writeRequestSize int64) error {
// If writeRequestSize was already checked, don't check it again.
if rs.writeRequestSize > 0 {
return nil
}

rs.writeRequestSize = writeRequestSize
inflightBytes := d.inflightPushRequestsBytes.Add(writeRequestSize)
return d.checkInflightBytes(inflightBytes)
}

func (d *Distributor) checkInflightBytes(inflightBytes int64) error {
il := d.getInstanceLimits()

if il.MaxInflightPushRequestsBytes > 0 && inflightBytes > int64(il.MaxInflightPushRequestsBytes) {
Expand All @@ -1117,8 +1135,11 @@ func (d *Distributor) FinishPushRequest(ctx context.Context) {

func (d *Distributor) cleanupAfterPushFinished(rs *requestState) {
d.inflightPushRequests.Dec()
if rs.requestSize > 0 {
d.inflightPushRequestsBytes.Sub(rs.requestSize)
if rs.httpgrpcRequestSize > 0 {
d.inflightPushRequestsBytes.Sub(rs.httpgrpcRequestSize)
}
if rs.writeRequestSize > 0 {
d.inflightPushRequestsBytes.Sub(rs.writeRequestSize)
}
}

Expand Down Expand Up @@ -1164,7 +1185,7 @@ func (d *Distributor) limitsMiddleware(next PushFunc) PushFunc {
return err
}

if err := d.checkRequestSize(rs, int64(req.Size())); err != nil {
if err := d.checkWriteRequestSize(rs, int64(req.Size())); err != nil {
return err
}

Expand Down
210 changes: 118 additions & 92 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5024,13 +5024,15 @@ func TestStartFinishRequest(t *testing.T) {
}

type testCase struct {
externalCheck bool // Start request "externally", from outside of distributor.
httpgrpcRequestSize int64 // only used for external check.

inflightRequestsBeforePush int
inflightRequestsSizeBeforePush int64
addIngestionRateBeforePush int64

expectedStartSizeKnownError error
expectedStartSizeUnknownError error
expectedPushError error
expectedStartError error
expectedPushError error
}

const (
Expand All @@ -5040,118 +5042,142 @@ func TestStartFinishRequest(t *testing.T) {
)

testcases := map[string]testCase{
"request succeeds": {
expectedStartSizeKnownError: nil,
expectedStartSizeUnknownError: nil,
expectedPushError: nil,
"request succeeds, internal": {
expectedStartError: nil,
expectedPushError: nil,
},

"request succeeds, external": {
externalCheck: true,
expectedStartError: nil,
expectedPushError: nil,
},

"too many inflight requests": {
"request succeeds, external, with httpgrpc size": {
externalCheck: true,
httpgrpcRequestSize: 100,
expectedStartError: nil,
expectedPushError: nil,
},

"too many inflight requests, internal": {
inflightRequestsBeforePush: inflightLimit,
inflightRequestsSizeBeforePush: 0,
expectedStartSizeKnownError: errMaxInflightRequestsReached,
expectedStartSizeUnknownError: errMaxInflightRequestsReached,
expectedStartError: errMaxInflightRequestsReached,
expectedPushError: errMaxInflightRequestsReached,
},

"too many inflight bytes requests": {
"too many inflight requests, external": {
externalCheck: true,
inflightRequestsBeforePush: inflightLimit,
inflightRequestsSizeBeforePush: 0,
expectedStartError: errMaxInflightRequestsReached,
expectedPushError: errMaxInflightRequestsReached,
},

"too many inflight bytes requests, internal": {
inflightRequestsBeforePush: 1,
inflightRequestsSizeBeforePush: 2 * inflightBytesLimit,
expectedStartError: errMaxInflightRequestsBytesReached,
expectedPushError: errMaxInflightRequestsBytesReached,
},

"too many inflight bytes requests, external": {
externalCheck: true,
inflightRequestsBeforePush: 1,
inflightRequestsSizeBeforePush: 2 * inflightBytesLimit,
expectedStartSizeKnownError: errMaxInflightRequestsBytesReached,
expectedStartSizeUnknownError: nil,
expectedStartError: nil, // httpgrpc request size is not set when calling StartPushRequest, so it's not checked.
expectedPushError: errMaxInflightRequestsBytesReached,
},

"high ingestion rate": {
addIngestionRateBeforePush: 100 * ingestionRateLimit,
expectedStartSizeKnownError: errMaxIngestionRateReached,
expectedStartSizeUnknownError: errMaxIngestionRateReached,
expectedPushError: errMaxIngestionRateReached,
"too many inflight bytes requests, external with httpgrpc size within limit": {
externalCheck: true,
httpgrpcRequestSize: 500,
inflightRequestsBeforePush: 1,
inflightRequestsSizeBeforePush: inflightBytesLimit - 500,
expectedStartError: nil, // httpgrpc request size fits into inflight request size limit.
expectedPushError: errMaxInflightRequestsBytesReached,
},

"too many inflight bytes requests, external with httpgrpc size outside limit": {
externalCheck: true,
httpgrpcRequestSize: 500,
inflightRequestsBeforePush: 1,
inflightRequestsSizeBeforePush: inflightBytesLimit,
expectedStartError: errMaxInflightRequestsBytesReached,
expectedPushError: errMaxInflightRequestsBytesReached,
},

"high ingestion rate, internal": {
addIngestionRateBeforePush: 100 * ingestionRateLimit,
expectedStartError: errMaxIngestionRateReached,
expectedPushError: errMaxIngestionRateReached,
},

"high ingestion rate, external": {
externalCheck: true,
addIngestionRateBeforePush: 100 * ingestionRateLimit,
expectedStartError: errMaxIngestionRateReached,
expectedPushError: errMaxIngestionRateReached,
},
}

for name, tc := range testcases {
for _, externalCheck := range []bool{false, true} {
for _, externalSizeKnown := range []bool{false, true} {
if !externalCheck && externalSizeKnown {
continue
}
t.Run(name, func(t *testing.T) {
pushReq := makeWriteRequestForGenerators(1, uniqueMetricsGen, nil, nil)

tname := name
if externalCheck {
if externalSizeKnown {
tname = tname + ",external,size known"
} else {
tname = tname + ",external,size unknown"
}
} else {
tname = tname + ",internal"
}
var limits validation.Limits
flagext.DefaultValues(&limits)

t.Run(tname, func(t *testing.T) {
pushReq := makeWriteRequestForGenerators(1, uniqueMetricsGen, nil, nil)
// Prepare distributor and wrap the mock push function with its middlewares.
ds, _, _ := prepare(t, prepConfig{
numDistributors: 1,
limits: &limits,
enableTracker: true,
maxInflightRequests: inflightLimit,
maxInflightRequestsBytes: inflightBytesLimit,
maxIngestionRate: ingestionRateLimit,
})
wrappedPush := ds[0].wrapPushWithMiddlewares(finishPush)

var limits validation.Limits
flagext.DefaultValues(&limits)
// Setup inflight values before calling push.
ds[0].inflightPushRequests.Add(int64(tc.inflightRequestsBeforePush))
ds[0].inflightPushRequestsBytes.Add(tc.inflightRequestsSizeBeforePush)
ds[0].ingestionRate.Add(tc.addIngestionRateBeforePush)
ds[0].ingestionRate.Tick()

// Prepare distributor and wrap the mock push function with its middlewares.
ds, _, _ := prepare(t, prepConfig{
numDistributors: 1,
limits: &limits,
enableTracker: true,
maxInflightRequests: inflightLimit,
maxInflightRequestsBytes: inflightBytesLimit,
maxIngestionRate: ingestionRateLimit,
})
wrappedPush := ds[0].wrapPushWithMiddlewares(finishPush)

// Setup inflight values before calling push.
ds[0].inflightPushRequests.Add(int64(tc.inflightRequestsBeforePush))
ds[0].inflightPushRequestsBytes.Add(tc.inflightRequestsSizeBeforePush)
ds[0].ingestionRate.Add(tc.addIngestionRateBeforePush)
ds[0].ingestionRate.Tick()

ctx := user.InjectOrgID(context.Background(), "user")
if externalCheck {
var err error
size := int64(0)
expectedStartError := tc.expectedStartSizeUnknownError
if externalSizeKnown {
size = int64(pushReq.Size())
expectedStartError = tc.expectedStartSizeKnownError
}

ctx, err = ds[0].StartPushRequest(ctx, size)

if expectedStartError == nil {
require.NoError(t, err)
} else {
require.ErrorIs(t, err, expectedStartError)

// Verify that errors returned by StartPushRequest method are NOT gRPC status errors.
// They will be converted to gRPC status by grpcInflightMethodLimiter.
require.Error(t, err)
_, ok := status.FromError(err)
require.False(t, ok)
}
}
ctx := user.InjectOrgID(context.Background(), "user")
if tc.externalCheck {
var err error
ctx, err = ds[0].StartPushRequest(ctx, tc.httpgrpcRequestSize)

err := wrappedPush(ctx, NewParsedRequest(pushReq))
if tc.expectedPushError == nil {
require.NoError(t, err)
} else {
require.ErrorIs(t, err, tc.expectedPushError)
}
if tc.expectedStartError == nil {
require.NoError(t, err)
} else {
require.ErrorIs(t, err, tc.expectedStartError)

if externalCheck {
ds[0].FinishPushRequest(ctx)
}
// Verify that errors returned by StartPushRequest method are NOT gRPC status errors.
// They will be converted to gRPC status by grpcInflightMethodLimiter.
require.Error(t, err)
_, ok := status.FromError(err)
require.False(t, ok)
}
}

// Verify that inflight metrics are the same as before the request.
require.Equal(t, int64(tc.inflightRequestsBeforePush), ds[0].inflightPushRequests.Load())
require.Equal(t, tc.inflightRequestsSizeBeforePush, ds[0].inflightPushRequestsBytes.Load())
})
err := wrappedPush(ctx, NewParsedRequest(pushReq))
if tc.expectedPushError == nil {
require.NoError(t, err)
} else {
require.ErrorIs(t, err, tc.expectedPushError)
}
}

if tc.externalCheck {
ds[0].FinishPushRequest(ctx)
}

// Verify that inflight metrics are the same as before the request.
require.Equal(t, int64(tc.inflightRequestsBeforePush), ds[0].inflightPushRequests.Load())
require.Equal(t, tc.inflightRequestsSizeBeforePush, ds[0].inflightPushRequestsBytes.Load())
})
}
}
2 changes: 1 addition & 1 deletion pkg/mimir/grpc_push_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type ingesterPushReceiver interface {

// Interface exposed by Distributor.
type distributorPushReceiver interface {
StartPushRequest(ctx context.Context, requestSize int64) (context.Context, error)
StartPushRequest(ctx context.Context, httpgrpcRequestSize int64) (context.Context, error)
FinishPushRequest(ctx context.Context)
}

Expand Down

0 comments on commit da5c593

Please sign in to comment.