Skip to content

Commit

Permalink
kvnemsis: add support for Barrier operations
Browse files Browse the repository at this point in the history
This only executes random `Barrier` requests, but does not verify that
the barrier guarantees are actually satisfied (i.e. that all past and
concurrent writes are applied before it returns). At least we get some
execution coverage, and verify that it does not have negative
interactions with other operations.

Epic: none
Release note: None
  • Loading branch information
erikgrinaker committed Jan 20, 2024
1 parent e494351 commit d4e4dac
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 1 deletion.
21 changes: 21 additions & 0 deletions pkg/kv/kvnemesis/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
case *ChangeZoneOperation:
err := updateZoneConfigInEnv(ctx, env, o.Type)
o.Result = resultInit(ctx, err)
case *BarrierOperation:
var err error
if o.WithLeaseAppliedIndex {
_, _, err = db.BarrierWithLAI(ctx, o.Key, o.EndKey)
} else {
_, err = db.Barrier(ctx, o.Key, o.EndKey)
}
o.Result = resultInit(ctx, err)
case *ClosureTxnOperation:
// Use a backoff loop to avoid thrashing on txn aborts. Don't wait between
// epochs of the same transaction to avoid waiting while holding locks.
Expand Down Expand Up @@ -446,6 +454,17 @@ func applyClientOp(
return
}
o.Result.OptionalTimestamp = ts
case *BarrierOperation:
_, _, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
b.AddRawRequest(&kvpb.BarrierRequest{
RequestHeader: kvpb.RequestHeader{
Key: o.Key,
EndKey: o.EndKey,
},
WithLeaseAppliedIndex: o.WithLeaseAppliedIndex,
})
})
o.Result = resultInit(ctx, err)
case *BatchOperation:
b := &kv.Batch{}
applyBatchOp(ctx, b, db.Run, o)
Expand Down Expand Up @@ -564,6 +583,8 @@ func applyBatchOp(
setLastReqSeq(b, subO.Seq)
case *AddSSTableOperation:
panic(errors.AssertionFailedf(`AddSSTable cannot be used in batches`))
case *BarrierOperation:
panic(errors.AssertionFailedf(`Barrier cannot be used in batches`))
default:
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO))
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/kv/kvnemesis/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ type ClientOperationConfig struct {
DeleteRangeUsingTombstone int
// AddSSTable is an operations that ingests an SSTable with random KV pairs.
AddSSTable int
// Barrier is an operation that waits for in-flight writes to complete.
Barrier int
}

// BatchOperationConfig configures the relative probability of generating a
Expand Down Expand Up @@ -395,6 +397,7 @@ func newAllOperationsConfig() GeneratorConfig {
DeleteRange: 1,
DeleteRangeUsingTombstone: 1,
AddSSTable: 1,
Barrier: 1,
}
batchOpConfig := BatchOperationConfig{
Batch: 4,
Expand Down Expand Up @@ -521,6 +524,12 @@ func NewDefaultConfig() GeneratorConfig {
config.Ops.ClosureTxn.CommitBatchOps.AddSSTable = 0
config.Ops.ClosureTxn.TxnClientOps.AddSSTable = 0
config.Ops.ClosureTxn.TxnBatchOps.Ops.AddSSTable = 0
// Barrier cannot be used in batches, and we omit it in txns too because it
// can result in spurious RangeKeyMismatchErrors that fail the txn operation.
config.Ops.Batch.Ops.Barrier = 0
config.Ops.ClosureTxn.CommitBatchOps.Barrier = 0
config.Ops.ClosureTxn.TxnClientOps.Barrier = 0
config.Ops.ClosureTxn.TxnBatchOps.Ops.Barrier = 0
return config
}

Expand Down Expand Up @@ -816,6 +825,7 @@ func (g *generator) registerClientOps(allowed *[]opGen, c *ClientOperationConfig
addOpGen(allowed, randDelRange, c.DeleteRange)
addOpGen(allowed, randDelRangeUsingTombstone, c.DeleteRangeUsingTombstone)
addOpGen(allowed, randAddSSTable, c.AddSSTable)
addOpGen(allowed, randBarrier, c.Barrier)
}

func (g *generator) registerBatchOps(allowed *[]opGen, c *BatchOperationConfig) {
Expand Down Expand Up @@ -1106,6 +1116,21 @@ func randAddSSTable(g *generator, rng *rand.Rand) Operation {
return addSSTable(f.Data(), sstSpan, sstTimestamp, seq, asWrites)
}

func randBarrier(g *generator, rng *rand.Rand) Operation {
withLAI := rng.Float64() < 0.5
var key, endKey string
if withLAI {
// Barriers requesting LAIs can't span multiple ranges, so we try to fit
// them within an existing range. This may race with a concurrent split, in
// which case the Barrier will fail, but that's ok -- most should still
// succeed. These errors are ignored by the validator.
key, endKey = randRangeSpan(rng, g.currentSplits)
} else {
key, endKey = randSpan(rng)
}
return barrier(key, endKey, withLAI)
}

func randScan(g *generator, rng *rand.Rand) Operation {
key, endKey := randSpan(rng)
return scan(key, endKey)
Expand Down Expand Up @@ -1924,6 +1949,14 @@ func addSSTable(
}}
}

func barrier(key, endKey string, withLAI bool) Operation {
return Operation{Barrier: &BarrierOperation{
Key: []byte(key),
EndKey: []byte(endKey),
WithLeaseAppliedIndex: withLAI,
}}
}

func createSavepoint(id int) Operation {
return Operation{SavepointCreate: &SavepointCreateOperation{ID: int32(id)}}
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvnemesis/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ func TestRandStep(t *testing.T) {
client.DeleteRangeUsingTombstone++
case *AddSSTableOperation:
client.AddSSTable++
case *BarrierOperation:
client.Barrier++
case *BatchOperation:
batch.Batch++
countClientOps(&batch.Ops, nil, o.Ops...)
Expand Down Expand Up @@ -286,7 +288,8 @@ func TestRandStep(t *testing.T) {
*DeleteOperation,
*DeleteRangeOperation,
*DeleteRangeUsingTombstoneOperation,
*AddSSTableOperation:
*AddSSTableOperation,
*BarrierOperation:
countClientOps(&counts.DB, &counts.Batch, step.Op)
case *ClosureTxnOperation:
countClientOps(&counts.ClosureTxn.TxnClientOps, &counts.ClosureTxn.TxnBatchOps, o.Ops...)
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvnemesis/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func (op Operation) Result() *Result {
return &o.Result
case *AddSSTableOperation:
return &o.Result
case *BarrierOperation:
return &o.Result
case *SplitOperation:
return &o.Result
case *MergeOperation:
Expand Down Expand Up @@ -137,6 +139,8 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) {
o.format(w, fctx)
case *AddSSTableOperation:
o.format(w, fctx)
case *BarrierOperation:
o.format(w, fctx)
case *SplitOperation:
o.format(w, fctx)
case *MergeOperation:
Expand Down Expand Up @@ -351,6 +355,16 @@ func (op AddSSTableOperation) format(w *strings.Builder, fctx formatCtx) {
}
}

func (op BarrierOperation) format(w *strings.Builder, fctx formatCtx) {
if op.WithLeaseAppliedIndex {
fmt.Fprintf(w, `%s.BarrierWithLAI(ctx, %s, %s)`,
fctx.receiver, fmtKey(op.Key), fmtKey(op.EndKey))
} else {
fmt.Fprintf(w, `%s.Barrier(ctx, %s, %s)`, fctx.receiver, fmtKey(op.Key), fmtKey(op.EndKey))
}
op.Result.format(w)
}

func (op SplitOperation) format(w *strings.Builder, fctx formatCtx) {
fmt.Fprintf(w, `%s.AdminSplit(ctx, %s, hlc.MaxTimestamp)`, fctx.receiver, fmtKey(op.Key))
op.Result.format(w)
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvnemesis/operations.proto
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ message AddSSTableOperation {
Result result = 6 [(gogoproto.nullable) = false];
}

message BarrierOperation {
bytes key = 1;
bytes end_key = 2;
bool with_lease_applied_index = 3;
Result result = 4 [(gogoproto.nullable) = false];
}

message SplitOperation {
bytes key = 1;
Result result = 2 [(gogoproto.nullable) = false];
Expand Down Expand Up @@ -168,6 +175,7 @@ message Operation {
SavepointCreateOperation savepoint_create = 19;
SavepointReleaseOperation savepoint_release = 20;
SavepointRollbackOperation savepoint_rollback = 21;
BarrierOperation barrier = 22;
}

enum ResultType {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvnemesis/operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ func TestOperationsFormat(t *testing.T) {
createSavepoint(4), del(k9, 1), rollbackSavepoint(4),
)),
},
{step: step(barrier(k1, k2, false /* withLAI */))},
{step: step(barrier(k3, k4, true /* withLAI */))},
}

w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name()))
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvnemesis/testdata/TestOperationsFormat/6
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
echo
----
···db0.Barrier(ctx, tk(1), tk(2))
3 changes: 3 additions & 0 deletions pkg/kv/kvnemesis/testdata/TestOperationsFormat/7
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
echo
----
···db0.BarrierWithLAI(ctx, tk(3), tk(4))
14 changes: 14 additions & 0 deletions pkg/kv/kvnemesis/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,20 @@ func (v *validator) processOp(op Operation) {
if v.buffering == bufferingSingle {
v.checkAtomic(`addSSTable`, t.Result)
}
case *BarrierOperation:
execTimestampStrictlyOptional = true
if op.Barrier.WithLeaseAppliedIndex &&
resultHasErrorType(t.Result, &kvpb.RangeKeyMismatchError{}) {
// Barriers requesting LAIs can't span ranges. The generator will
// optimistically try to fit the barrier inside one of the current ranges,
// but this may race with a split, so we ignore the error in this case and
// try again later.
} else {
// Fail or retry on other errors, depending on type.
v.checkNonAmbError(op, t.Result, exceptUnhandledRetry)
}
// We don't yet actually check the barrier guarantees here, i.e. that all
// concurrent writes are applied by the time it completes. Maybe later.
case *ScanOperation:
if _, isErr := v.checkError(op, t.Result); isErr {
break
Expand Down

0 comments on commit d4e4dac

Please sign in to comment.