Skip to content

Commit

Permalink
kvnemesis: uniquely identify all versions
Browse files Browse the repository at this point in the history
  • Loading branch information
tbg committed Nov 8, 2022
1 parent 362c461 commit 2b0afee
Show file tree
Hide file tree
Showing 248 changed files with 4,537 additions and 1,894 deletions.
6 changes: 6 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ ALL_TESTS = [
"//pkg/util/admission:admission_test",
"//pkg/util/binfetcher:binfetcher_test",
"//pkg/util/bitarray:bitarray_test",
"//pkg/util/buildutil/testingint:testingint_test",
"//pkg/util/buildutil:buildutil_test",
"//pkg/util/bulk:bulk_test",
"//pkg/util/cache:cache_test",
Expand Down Expand Up @@ -1114,6 +1115,7 @@ GO_TARGETS = [
"//pkg/kv/kvclient/rangefeed:rangefeed_test",
"//pkg/kv/kvclient/rangestats:rangestats",
"//pkg/kv/kvclient:kvclient",
"//pkg/kv/kvnemesis/kvnemesisutil:kvnemesisutil",
"//pkg/kv/kvnemesis:kvnemesis",
"//pkg/kv/kvnemesis:kvnemesis_test",
"//pkg/kv/kvprober:kvprober",
Expand Down Expand Up @@ -1927,6 +1929,8 @@ GO_TARGETS = [
"//pkg/util/bitarray:bitarray",
"//pkg/util/bitarray:bitarray_test",
"//pkg/util/bufalloc:bufalloc",
"//pkg/util/buildutil/testingint:testingint",
"//pkg/util/buildutil/testingint:testingint_test",
"//pkg/util/buildutil:buildutil",
"//pkg/util/buildutil:buildutil_test",
"//pkg/util/bulk:bulk",
Expand Down Expand Up @@ -2461,6 +2465,7 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvclient/rangefeed/rangefeedcache:get_x_data",
"//pkg/kv/kvclient/rangestats:get_x_data",
"//pkg/kv/kvnemesis:get_x_data",
"//pkg/kv/kvnemesis/kvnemesisutil:get_x_data",
"//pkg/kv/kvprober:get_x_data",
"//pkg/kv/kvserver:get_x_data",
"//pkg/kv/kvserver/abortspan:get_x_data",
Expand Down Expand Up @@ -2964,6 +2969,7 @@ GET_X_DATA_TARGETS = [
"//pkg/util/bitarray:get_x_data",
"//pkg/util/bufalloc:get_x_data",
"//pkg/util/buildutil:get_x_data",
"//pkg/util/buildutil/testingint:get_x_data",
"//pkg/util/bulk:get_x_data",
"//pkg/util/cache:get_x_data",
"//pkg/util/caller:get_x_data",
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ func (b *Batch) ApproximateMutationBytes() int {
return b.approxMutationReqBytes
}

// Requests exposes the requests stashed in the batch thus far.
func (b *Batch) Requests() []roachpb.RequestUnion {
return b.reqs
}

// RawResponse returns the BatchResponse which was the result of a successful
// execution of the batch, and nil otherwise.
func (b *Batch) RawResponse() *roachpb.BatchResponse {
Expand Down
26 changes: 24 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ type DistSender struct {
// LatencyFunc is used to estimate the latency to other nodes.
latencyFunc LatencyFunc

onRangeSpanningNonTxnalBatch func(ba *roachpb.BatchRequest) *roachpb.Error

// locality is the description of the topography of the server on which the
// DistSender is running. It is used to estimate the latency to other nodes
// in the absence of a latency function.
Expand Down Expand Up @@ -498,6 +500,11 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
} else {
ds.latencyFunc = ds.rpcContext.RemoteClocks.Latency
}

if cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch != nil {
ds.onRangeSpanningNonTxnalBatch = cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch
}

return ds
}

Expand Down Expand Up @@ -1243,8 +1250,23 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// If there's no transaction and ba spans ranges, possibly re-run as part of
// a transaction for consistency. The case where we don't need to re-run is
// if the read consistency is not required.
if ba.Txn == nil && ba.IsTransactional() && ba.ReadConsistency == roachpb.CONSISTENT {
return nil, roachpb.NewError(&roachpb.OpRequiresTxnError{})
//
// NB: this check isn't quite right. If we mixed a DeleteRangeUsingTombstone
// with a Put, for example, we'd restart with a txn, but
// DeleteRangeUsingTombstone does not support txns. Could we instead determine
// the read/write timestamp here? But then the write might not be possible at
// that timestamp, and we need to start retrying the batch as a kind of
// starvable txn (currently the contract is that batches can't return retry
// errors).
if ba.Txn == nil {
if ba.IsTransactional() && ba.ReadConsistency == roachpb.CONSISTENT {
return nil, roachpb.NewError(&roachpb.OpRequiresTxnError{})
}
if fn := ds.onRangeSpanningNonTxnalBatch; fn != nil {
if pErr := fn(ba); pErr != nil {
return nil, pErr
}
}
}
// If the batch contains a non-parallel commit EndTxn and spans ranges then
// we want the caller to come again with the EndTxn in a separate
Expand Down
11 changes: 10 additions & 1 deletion pkg/kv/kvclient/kvcoord/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@

package kvcoord

import "github.com/cockroachdb/cockroach/pkg/base"
import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
)

// ClientTestingKnobs contains testing options that dictate the behavior
// of the key-value client.
Expand Down Expand Up @@ -52,6 +55,12 @@ type ClientTestingKnobs struct {
// CommitWaitFilter allows tests to instrument the beginning of a transaction
// commit wait sleep.
CommitWaitFilter func()

// OnRangeSpanningNonTxnalBatch is invoked whenever DistSender attempts to split
// a non-transactional batch across a range boundary. The method may inject an
// error which, if non-nil, becomes the result of the batch. Otherwise, execution
// continues.
OnRangeSpanningNonTxnalBatch func(ba *roachpb.BatchRequest) *roachpb.Error
}

var _ base.ModuleTestingKnobs = &ClientTestingKnobs{}
Expand Down
14 changes: 10 additions & 4 deletions pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,21 @@ func (gt *grpcTransport) sendBatch(
gt.opts.metrics.LocalSentCount.Inc(1)
}
reply, err := iface.Batch(ctx, ba)

// If we queried a remote node, perform extra validation and
// import trace spans.
if reply != nil && !rpc.IsLocal(iface) {
for i := range reply.Responses {
if err := reply.Responses[i].GetInner().Verify(ba.Requests[i].GetInner()); err != nil {
log.Errorf(ctx, "%v", err)
if err == nil {
for i := range reply.Responses {
if err := reply.Responses[i].GetInner().Verify(ba.Requests[i].GetInner()); err != nil {
log.Errorf(ctx, "%v", err)
return nil, err
}
}
}
// Import the remotely collected spans, if any.

// Import the remotely collected spans, if any. Do this on error too,
// to get traces in that case as well (or to at least have a chance).
if len(reply.CollectedSpans) != 0 {
span := tracing.SpanFromContext(ctx)
if span == nil {
Expand Down
12 changes: 11 additions & 1 deletion pkg/kv/kvnemesis/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"generator.go",
"kvnemesis.go",
"operations.go",
"seq_tracker.go",
"validator.go",
"watcher.go",
],
Expand All @@ -24,11 +25,13 @@ go_library(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvnemesis/kvnemesisutil",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/liveness",
"//pkg/roachpb",
"//pkg/sql/catalog/bootstrap",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/bufalloc",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
Expand All @@ -41,11 +44,11 @@ go_library(
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_petermattis_goid//:goid",
"@org_golang_google_protobuf//proto",
],
)
Expand All @@ -63,21 +66,28 @@ go_test(
"validator_test.go",
],
args = ["-test.timeout=55s"],
data = glob(["testdata/**"]),
embed = [":kvnemesis"],
deps = [
"//pkg/base",
"//pkg/config/zonepb",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvnemesis/kvnemesisutil",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/testutils",
"//pkg/testutils/echotest",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/buildutil",
"//pkg/util/envutil",
"//pkg/util/hlc",
"//pkg/util/leaktest",
Expand Down
60 changes: 40 additions & 20 deletions pkg/kv/kvnemesis/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvnemesis/kvnemesisutil"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -83,8 +84,9 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
*ScanOperation,
*BatchOperation,
*DeleteOperation,
*DeleteRangeOperation:
applyClientOp(ctx, db, op, false /* inTxn */)
*DeleteRangeOperation,
*DeleteRangeUsingTombstoneOperation:
applyClientOp(ctx, db, op, false)
case *SplitOperation:
err := db.AdminSplit(ctx, o.Key, hlc.MaxTimestamp, roachpb.AdminSplitRequest_INGESTION)
o.Result = resultInit(ctx, err)
Expand All @@ -94,7 +96,6 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
case *ChangeReplicasOperation:
desc := getRangeDesc(ctx, o.Key, db)
_, err := db.AdminChangeReplicas(ctx, o.Key, desc, o.Changes)
// TODO(dan): Save returned desc?
o.Result = resultInit(ctx, err)
case *TransferLeaseOperation:
err := db.AdminTransferLease(ctx, o.Key, o.Target)
Expand All @@ -120,15 +121,15 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
for i := range o.Ops {
op := &o.Ops[i]
op.Result().Reset() // in case we're a retry
applyClientOp(ctx, txn, op, true /* inTxn */)
applyClientOp(ctx, txn, op, true)
// The KV api disallows use of a txn after an operation on it errors.
if r := op.Result(); r.Type == ResultType_Error {
return errors.DecodeError(ctx, *r.Err)
}
}
if o.CommitInBatch != nil {
b := txn.NewBatch()
applyBatchOp(ctx, b, txn.CommitInBatch, o.CommitInBatch, true)
applyBatchOp(ctx, b, txn.CommitInBatch, o.CommitInBatch)
// The KV api disallows use of a txn after an operation on it errors.
if r := o.CommitInBatch.Result; r.Type == ResultType_Error {
return errors.DecodeError(ctx, *r.Err)
Expand Down Expand Up @@ -221,7 +222,8 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) {
}
case *PutOperation:
_, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
b.Put(o.Key, o.Value)
b.Put(o.Key, o.Value())
setLastReqSeq(b, o.Seq)
})
o.Result = resultInit(ctx, err)
if err != nil {
Expand Down Expand Up @@ -250,13 +252,14 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) {
o.Result.Values = make([]KeyValue, len(kvs))
for i, kv := range kvs {
o.Result.Values[i] = KeyValue{
Key: []byte(kv.Key),
Key: kv.Key,
Value: kv.Value.RawBytes,
}
}
case *DeleteOperation:
res, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
b.Del(o.Key)
setLastReqSeq(b, o.Seq)
})
o.Result = resultInit(ctx, err)
if err != nil {
Expand All @@ -270,11 +273,9 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) {
o.Result.Keys[i] = deletedKey
}
case *DeleteRangeOperation:
if !inTxn {
panic(errors.AssertionFailedf(`non-transactional DelRange operations currently unsupported`))
}
res, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
b.DelRange(o.Key, o.EndKey, true /* returnKeys */)
setLastReqSeq(b, o.Seq)
})
o.Result = resultInit(ctx, err)
if err != nil {
Expand All @@ -287,20 +288,34 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) {
for i, deletedKey := range deletedKeys {
o.Result.Keys[i] = deletedKey
}
case *DeleteRangeUsingTombstoneOperation:
_, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
b.DelRangeUsingTombstone(o.Key, o.EndKey)
setLastReqSeq(b, o.Seq)
})
o.Result = resultInit(ctx, err)
if err != nil {
return
}
o.Result.OptionalTimestamp = ts
case *BatchOperation:
b := &kv.Batch{}
applyBatchOp(ctx, b, db.Run, o, inTxn)
applyBatchOp(ctx, b, db.Run, o)
default:
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, o, o))
}
}

func setLastReqSeq(b *kv.Batch, seq kvnemesisutil.Seq) {
sl := b.Requests()
req := sl[len(sl)-1].GetInner()
h := req.Header()
h.KVNemesisSeq.Set(int64(seq))
req.SetHeader(h)
}

func applyBatchOp(
ctx context.Context,
b *kv.Batch,
run func(context.Context, *kv.Batch) error,
o *BatchOperation,
inTxn bool,
ctx context.Context, b *kv.Batch, run func(context.Context, *kv.Batch) error, o *BatchOperation,
) {
for i := range o.Ops {
switch subO := o.Ops[i].GetValue().(type) {
Expand All @@ -311,7 +326,8 @@ func applyBatchOp(
b.Get(subO.Key)
}
case *PutOperation:
b.Put(subO.Key, subO.Value)
b.Put(subO.Key, subO.Value())
setLastReqSeq(b, subO.Seq)
case *ScanOperation:
if subO.Reverse && subO.ForUpdate {
b.ReverseScanForUpdate(subO.Key, subO.EndKey)
Expand All @@ -324,11 +340,13 @@ func applyBatchOp(
}
case *DeleteOperation:
b.Del(subO.Key)
setLastReqSeq(b, subO.Seq)
case *DeleteRangeOperation:
if !inTxn {
panic(errors.AssertionFailedf(`non-transactional batch DelRange operations currently unsupported`))
}
b.DelRange(subO.Key, subO.EndKey, true /* returnKeys */)
setLastReqSeq(b, subO.Seq)
case *DeleteRangeUsingTombstoneOperation:
b.DelRangeUsingTombstone(subO.Key, subO.EndKey)
setLastReqSeq(b, subO.Seq)
default:
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO))
}
Expand Down Expand Up @@ -384,6 +402,8 @@ func applyBatchOp(
subO.Result.Keys[j] = key
}
}
case *DeleteRangeUsingTombstoneOperation:
subO.Result = resultInit(ctx, err)
default:
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO))
}
Expand Down
Loading

0 comments on commit 2b0afee

Please sign in to comment.