Skip to content

Commit

Permalink
batcheval: respect bytes/key limits in GetRequests
Browse files Browse the repository at this point in the history
Previously, bytes/key limits were ignored in GetRequest. Now, they are
respected.

Release note: None
  • Loading branch information
jordanlewis committed Apr 14, 2021
1 parent b2a6c7d commit aed8f19
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 45 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ go_test(
"cmd_add_sstable_test.go",
"cmd_clear_range_test.go",
"cmd_end_transaction_test.go",
"cmd_get_test.go",
"cmd_lease_test.go",
"cmd_recover_txn_test.go",
"cmd_refresh_range_test.go",
Expand Down
21 changes: 20 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,31 @@ func Get(
h := cArgs.Header
reply := resp.(*roachpb.GetResponse)

val, intent, err := storage.MVCCGet(ctx, reader, args.Key, h.Timestamp, storage.MVCCGetOptions{
var val *roachpb.Value
var intent *roachpb.Intent
var err error
if h.MaxSpanRequestKeys < 0 || h.TargetBytes < 0 {
// Receipt of a GetRequest with negative MaxSpanRequestKeys or TargetBytes
// indicates that the request was part of a batch that has already exhausted
// its limit, which means that we should *not* serve the request and return
// a ResumeSpan for this GetRequest.
//
// This mirrors the logic in MVCCScan, though the logic in MVCCScan is
// slightly lower in the stack.
reply.ResumeSpan = &roachpb.Span{Key: args.Key}
reply.ResumeReason = roachpb.RESUME_KEY_LIMIT
return result.Result{}, nil
}
val, intent, err = storage.MVCCGet(ctx, reader, args.Key, h.Timestamp, storage.MVCCGetOptions{
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
Txn: h.Txn,
FailOnMoreRecent: args.KeyLocking != lock.None,
LocalUncertaintyLimit: cArgs.LocalUncertaintyLimit,
})
if val != nil {
reply.NumKeys = 1
reply.NumBytes = int64(len(val.RawBytes))
}
if err != nil {
return result.Result{}, err
}
Expand Down
94 changes: 94 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_get_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package batcheval

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/assert"
)

// TestGetResumeSpan tests that a GetRequest with a target bytes or max span
// request keys is properly handled by returning no result with a resume span.
func TestGetResumeSpan(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
resp := &roachpb.GetResponse{}
key := roachpb.Key([]byte{'a'})
value := roachpb.MakeValueFromString("woohoo")

db := storage.NewDefaultInMemForTesting()
defer db.Close()

_, err := Put(ctx, db, CommandArgs{
Header: roachpb.Header{TargetBytes: -1},
Args: &roachpb.PutRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
},
Value: value,
},
}, resp)
assert.NoError(t, err)

// Case 1: Check that a negative TargetBytes causes a resume span.
_, err = Get(ctx, db, CommandArgs{
Header: roachpb.Header{TargetBytes: -1},
Args: &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
},
},
}, resp)
assert.NoError(t, err)

assert.NotNil(t, resp.ResumeSpan)
assert.Equal(t, key, resp.ResumeSpan.Key)
assert.Nil(t, resp.ResumeSpan.EndKey)
assert.Nil(t, resp.Value)

resp = &roachpb.GetResponse{}
// Case 2: Check that a negative MaxSpanRequestKeys causes a resume span.
_, err = Get(ctx, db, CommandArgs{
Header: roachpb.Header{MaxSpanRequestKeys: -1},
Args: &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
},
},
}, resp)
assert.NoError(t, err)

assert.NotNil(t, resp.ResumeSpan)
assert.Equal(t, key, resp.ResumeSpan.Key)
assert.Nil(t, resp.ResumeSpan.EndKey)
assert.Nil(t, resp.Value)

resp = &roachpb.GetResponse{}
// Case 3: Check that a positive limit causes a normal return.
_, err = Get(ctx, db, CommandArgs{
Header: roachpb.Header{MaxSpanRequestKeys: 10, TargetBytes: 100},
Args: &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
},
},
}, resp)
assert.NoError(t, err)

assert.Nil(t, resp.ResumeSpan)
assert.NotNil(t, resp.Value)
assert.Equal(t, resp.Value.RawBytes, value.RawBytes)
}
120 changes: 76 additions & 44 deletions pkg/kv/kvserver/replica_evaluate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,9 @@ func TestEvaluateBatch(t *testing.T) {
verifyResumeSpans(t, r, "", "")
},
}, {
// A batch limited to return only one key. Throw in a Get which is
// not subject to limitation and should thus have returned a value.
// However, the second scan comes up empty because there's no quota left.
//
// Note that there is currently a lot of undesirable behavior in the KV
// API for pretty much any batch that's not a nonoverlapping sorted run
// of only scans or only reverse scans. For example, in the example
// below, one would get a response for get(f) even though the resume
// span on the first scan is `[c,...)`. The higher layers of KV don't
// handle that correctly. Right now we just trust that nobody will
// send such requests.
// A batch limited to return only one key. Throw in a Get which will come
// up empty because there's no quota left. The second scan will also
// return nothing due to lack of quota.
name: "scans with MaxSpanRequestKeys=1",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
Expand All @@ -147,11 +139,8 @@ func TestEvaluateBatch(t *testing.T) {
d.ba.MaxSpanRequestKeys = 1
},
check: func(t *testing.T, r resp) {
verifyScanResult(t, r, []string{"a"}, []string{"f"}, nil)
verifyResumeSpans(t, r, "b-c", "", "d-f")
b, err := r.br.Responses[1].GetGet().Value.GetBytes()
require.NoError(t, err)
require.Equal(t, "value-f", string(b))
verifyScanResult(t, r, []string{"a"}, nil, nil)
verifyResumeSpans(t, r, "b-c", "f-", "d-f")
},
}, {
// Ditto in reverse.
Expand All @@ -164,23 +153,20 @@ func TestEvaluateBatch(t *testing.T) {
d.ba.MaxSpanRequestKeys = 1
},
check: func(t *testing.T, r resp) {
verifyScanResult(t, r, []string{"e"}, []string{"f"}, nil)
verifyResumeSpans(t, r, "d-d\x00", "", "a-c")
b, err := r.br.Responses[1].GetGet().Value.GetBytes()
require.NoError(t, err)
require.Equal(t, "value-f", string(b))
verifyScanResult(t, r, []string{"e"}, nil, nil)
verifyResumeSpans(t, r, "d-d\x00", "f-", "a-c")
},
}, {
// Similar, but this time the request allows the second scan to
// return one (but not more) remaining key. Again there's a Get
// that isn't counted against the limit.
name: "scans with MaxSpanRequestKeys=3",
// return one (but not more) remaining key. Again there's a Get, which
// uses up one quota.
name: "scans with MaxSpanRequestKeys=4",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
d.ba.Add(scanArgsString("a", "c"))
d.ba.Add(getArgsString("e"))
d.ba.Add(scanArgsString("c", "e"))
d.ba.MaxSpanRequestKeys = 3
d.ba.MaxSpanRequestKeys = 4
},
check: func(t *testing.T, r resp) {
verifyScanResult(t, r, []string{"a", "b"}, []string{"e"}, []string{"c"})
Expand All @@ -191,13 +177,13 @@ func TestEvaluateBatch(t *testing.T) {
},
}, {
// Ditto in reverse.
name: "reverse scans with MaxSpanRequestKeys=3",
name: "reverse scans with MaxSpanRequestKeys=4",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
d.ba.Add(revScanArgsString("c", "e"))
d.ba.Add(getArgsString("e"))
d.ba.Add(revScanArgsString("a", "c"))
d.ba.MaxSpanRequestKeys = 3
d.ba.MaxSpanRequestKeys = 4
},
check: func(t *testing.T, r resp) {
verifyScanResult(t, r, []string{"d", "c"}, []string{"e"}, []string{"b"})
Expand All @@ -206,6 +192,36 @@ func TestEvaluateBatch(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "value-e", string(b))
},
}, {
// GetRequests that come before scans.
name: "gets and scans with MaxSpanRequestKeys=2, the second uses up the limit.",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
d.ba.Add(getArgsString("a"))
d.ba.Add(getArgsString("b"))
d.ba.Add(getArgsString("c"))
d.ba.Add(scanArgsString("d", "e"))
d.ba.MaxSpanRequestKeys = 2
},
check: func(t *testing.T, r resp) {
verifyScanResult(t, r, []string{"a"}, []string{"b"}, nil, nil)
verifyResumeSpans(t, r, "", "", "c-", "d-e")
},
}, {
// GetRequests that come before revscans.
name: "gets and revscans with MaxSpanRequestKeys=2, the second uses up the limit.",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
d.ba.Add(getArgsString("a"))
d.ba.Add(getArgsString("b"))
d.ba.Add(getArgsString("c"))
d.ba.Add(revScanArgsString("d", "e"))
d.ba.MaxSpanRequestKeys = 2
},
check: func(t *testing.T, r resp) {
verifyScanResult(t, r, []string{"a"}, []string{"b"}, nil, nil)
verifyResumeSpans(t, r, "", "", "c-", "d-e")
},
},
//
// Test suite for TargetBytes.
Expand All @@ -226,11 +242,8 @@ func TestEvaluateBatch(t *testing.T) {
d.ba.MaxSpanRequestKeys = 3
},
check: func(t *testing.T, r resp) {
verifyScanResult(t, r, []string{"a"}, []string{"e"}, nil)
verifyResumeSpans(t, r, "b-c", "", "c-e")
b, err := r.br.Responses[1].GetGet().Value.GetBytes()
require.NoError(t, err)
require.Equal(t, "value-e", string(b))
verifyScanResult(t, r, []string{"a"}, nil, nil)
verifyResumeSpans(t, r, "b-c", "e-", "c-e")
},
}, {
// Ditto in reverse.
Expand All @@ -244,11 +257,36 @@ func TestEvaluateBatch(t *testing.T) {
d.ba.MaxSpanRequestKeys = 3
},
check: func(t *testing.T, r resp) {
verifyScanResult(t, r, []string{"d"}, []string{"e"}, nil)
verifyResumeSpans(t, r, "c-c\x00", "", "a-c")
b, err := r.br.Responses[1].GetGet().Value.GetBytes()
require.NoError(t, err)
require.Equal(t, "value-e", string(b))
verifyScanResult(t, r, []string{"d"}, nil, nil)
verifyResumeSpans(t, r, "c-c\x00", "e-", "a-c")
},
}, {
// GetRequests that come before scans.
name: "gets and scans with TargetBytes=1, the first uses up the limit.",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
d.ba.Add(getArgsString("a"))
d.ba.Add(getArgsString("b"))
d.ba.Add(scanArgsString("c", "e"))
d.ba.TargetBytes = 1
},
check: func(t *testing.T, r resp) {
verifyScanResult(t, r, []string{"a"}, nil, nil)
verifyResumeSpans(t, r, "", "b-", "c-e")
},
}, {
// GetRequests that come before revscans.
name: "gets and revscans with TargetBytes=1, the first uses up the limit.",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
d.ba.Add(getArgsString("a"))
d.ba.Add(getArgsString("b"))
d.ba.Add(revScanArgsString("c", "e"))
d.ba.TargetBytes = 1
},
check: func(t *testing.T, r resp) {
verifyScanResult(t, r, []string{"a"}, nil, nil)
verifyResumeSpans(t, r, "", "b-", "c-e")
},
},
//
Expand Down Expand Up @@ -681,7 +719,6 @@ func verifyScanResult(t *testing.T, r resp, keysPerResp ...[]string) {
require.NotNil(t, r.br)
require.Len(t, r.br.Responses, len(keysPerResp))
for i, keys := range keysPerResp {
var isGet bool
scan := r.br.Responses[i].GetInner()
var rows []roachpb.KeyValue
switch req := scan.(type) {
Expand All @@ -690,7 +727,6 @@ func verifyScanResult(t *testing.T, r resp, keysPerResp ...[]string) {
case *roachpb.ReverseScanResponse:
rows = req.Rows
case *roachpb.GetResponse:
isGet = true
if req.Value != nil {
rows = []roachpb.KeyValue{{
Key: r.d.ba.Requests[i].GetGet().Key,
Expand All @@ -700,11 +736,7 @@ func verifyScanResult(t *testing.T, r resp, keysPerResp ...[]string) {
default:
}

if !isGet {
require.EqualValues(t, len(keys), scan.Header().NumKeys, "in response #%d", i+1)
} else {
require.Zero(t, scan.Header().NumKeys, "in response #%d", i+1)
}
require.EqualValues(t, len(keys), scan.Header().NumKeys, "in response #%d", i+1)
var actKeys []string
for _, row := range rows {
actKeys = append(actKeys, string(row.Key))
Expand Down

0 comments on commit aed8f19

Please sign in to comment.