From d4e4dac2f1325b0e5cce9d0ad2efdbcfd9aa76cb Mon Sep 17 00:00:00 2001
From: Erik Grinaker <grinaker@cockroachlabs.com>
Date: Fri, 19 Jan 2024 10:28:30 +0000
Subject: [PATCH] kvnemsis: add support for `Barrier` operations

This only executes random `Barrier` requests, but does not verify that
the barrier guarantees are actually satisfied (i.e. that all past and
concurrent writes are applied before it returns). At least we get some
execution coverage, and verify that it does not have negative
interactions with other operations.

Epic: none
Release note: None
---
 pkg/kv/kvnemesis/applier.go                   | 21 ++++++++++++
 pkg/kv/kvnemesis/generator.go                 | 33 +++++++++++++++++++
 pkg/kv/kvnemesis/generator_test.go            |  5 ++-
 pkg/kv/kvnemesis/operations.go                | 14 ++++++++
 pkg/kv/kvnemesis/operations.proto             |  8 +++++
 pkg/kv/kvnemesis/operations_test.go           |  2 ++
 .../kvnemesis/testdata/TestOperationsFormat/6 |  3 ++
 .../kvnemesis/testdata/TestOperationsFormat/7 |  3 ++
 pkg/kv/kvnemesis/validator.go                 | 14 ++++++++
 9 files changed, 102 insertions(+), 1 deletion(-)
 create mode 100644 pkg/kv/kvnemesis/testdata/TestOperationsFormat/6
 create mode 100644 pkg/kv/kvnemesis/testdata/TestOperationsFormat/7

diff --git a/pkg/kv/kvnemesis/applier.go b/pkg/kv/kvnemesis/applier.go
index bee6951cad21..39f9a2ebe0f7 100644
--- a/pkg/kv/kvnemesis/applier.go
+++ b/pkg/kv/kvnemesis/applier.go
@@ -151,6 +151,14 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
 	case *ChangeZoneOperation:
 		err := updateZoneConfigInEnv(ctx, env, o.Type)
 		o.Result = resultInit(ctx, err)
+	case *BarrierOperation:
+		var err error
+		if o.WithLeaseAppliedIndex {
+			_, _, err = db.BarrierWithLAI(ctx, o.Key, o.EndKey)
+		} else {
+			_, err = db.Barrier(ctx, o.Key, o.EndKey)
+		}
+		o.Result = resultInit(ctx, err)
 	case *ClosureTxnOperation:
 		// Use a backoff loop to avoid thrashing on txn aborts. Don't wait between
 		// epochs of the same transaction to avoid waiting while holding locks.
@@ -446,6 +454,17 @@ func applyClientOp(
 			return
 		}
 		o.Result.OptionalTimestamp = ts
+	case *BarrierOperation:
+		_, _, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
+			b.AddRawRequest(&kvpb.BarrierRequest{
+				RequestHeader: kvpb.RequestHeader{
+					Key:    o.Key,
+					EndKey: o.EndKey,
+				},
+				WithLeaseAppliedIndex: o.WithLeaseAppliedIndex,
+			})
+		})
+		o.Result = resultInit(ctx, err)
 	case *BatchOperation:
 		b := &kv.Batch{}
 		applyBatchOp(ctx, b, db.Run, o)
@@ -564,6 +583,8 @@ func applyBatchOp(
 			setLastReqSeq(b, subO.Seq)
 		case *AddSSTableOperation:
 			panic(errors.AssertionFailedf(`AddSSTable cannot be used in batches`))
+		case *BarrierOperation:
+			panic(errors.AssertionFailedf(`Barrier cannot be used in batches`))
 		default:
 			panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO))
 		}
diff --git a/pkg/kv/kvnemesis/generator.go b/pkg/kv/kvnemesis/generator.go
index 1d7432f14b2f..b6695215bb39 100644
--- a/pkg/kv/kvnemesis/generator.go
+++ b/pkg/kv/kvnemesis/generator.go
@@ -263,6 +263,8 @@ type ClientOperationConfig struct {
 	DeleteRangeUsingTombstone int
 	// AddSSTable is an operations that ingests an SSTable with random KV pairs.
 	AddSSTable int
+	// Barrier is an operation that waits for in-flight writes to complete.
+	Barrier int
 }
 
 // BatchOperationConfig configures the relative probability of generating a
@@ -395,6 +397,7 @@ func newAllOperationsConfig() GeneratorConfig {
 		DeleteRange:                                        1,
 		DeleteRangeUsingTombstone:                          1,
 		AddSSTable:                                         1,
+		Barrier:                                            1,
 	}
 	batchOpConfig := BatchOperationConfig{
 		Batch: 4,
@@ -521,6 +524,12 @@ func NewDefaultConfig() GeneratorConfig {
 	config.Ops.ClosureTxn.CommitBatchOps.AddSSTable = 0
 	config.Ops.ClosureTxn.TxnClientOps.AddSSTable = 0
 	config.Ops.ClosureTxn.TxnBatchOps.Ops.AddSSTable = 0
+	// Barrier cannot be used in batches, and we omit it in txns too because it
+	// can result in spurious RangeKeyMismatchErrors that fail the txn operation.
+	config.Ops.Batch.Ops.Barrier = 0
+	config.Ops.ClosureTxn.CommitBatchOps.Barrier = 0
+	config.Ops.ClosureTxn.TxnClientOps.Barrier = 0
+	config.Ops.ClosureTxn.TxnBatchOps.Ops.Barrier = 0
 	return config
 }
 
@@ -816,6 +825,7 @@ func (g *generator) registerClientOps(allowed *[]opGen, c *ClientOperationConfig
 	addOpGen(allowed, randDelRange, c.DeleteRange)
 	addOpGen(allowed, randDelRangeUsingTombstone, c.DeleteRangeUsingTombstone)
 	addOpGen(allowed, randAddSSTable, c.AddSSTable)
+	addOpGen(allowed, randBarrier, c.Barrier)
 }
 
 func (g *generator) registerBatchOps(allowed *[]opGen, c *BatchOperationConfig) {
@@ -1106,6 +1116,21 @@ func randAddSSTable(g *generator, rng *rand.Rand) Operation {
 	return addSSTable(f.Data(), sstSpan, sstTimestamp, seq, asWrites)
 }
 
+func randBarrier(g *generator, rng *rand.Rand) Operation {
+	withLAI := rng.Float64() < 0.5
+	var key, endKey string
+	if withLAI {
+		// Barriers requesting LAIs can't span multiple ranges, so we try to fit
+		// them within an existing range. This may race with a concurrent split, in
+		// which case the Barrier will fail, but that's ok -- most should still
+		// succeed. These errors are ignored by the validator.
+		key, endKey = randRangeSpan(rng, g.currentSplits)
+	} else {
+		key, endKey = randSpan(rng)
+	}
+	return barrier(key, endKey, withLAI)
+}
+
 func randScan(g *generator, rng *rand.Rand) Operation {
 	key, endKey := randSpan(rng)
 	return scan(key, endKey)
@@ -1924,6 +1949,14 @@ func addSSTable(
 	}}
 }
 
+func barrier(key, endKey string, withLAI bool) Operation {
+	return Operation{Barrier: &BarrierOperation{
+		Key:                   []byte(key),
+		EndKey:                []byte(endKey),
+		WithLeaseAppliedIndex: withLAI,
+	}}
+}
+
 func createSavepoint(id int) Operation {
 	return Operation{SavepointCreate: &SavepointCreateOperation{ID: int32(id)}}
 }
diff --git a/pkg/kv/kvnemesis/generator_test.go b/pkg/kv/kvnemesis/generator_test.go
index 2a29087dc506..a4fb32947c90 100644
--- a/pkg/kv/kvnemesis/generator_test.go
+++ b/pkg/kv/kvnemesis/generator_test.go
@@ -251,6 +251,8 @@ func TestRandStep(t *testing.T) {
 				client.DeleteRangeUsingTombstone++
 			case *AddSSTableOperation:
 				client.AddSSTable++
+			case *BarrierOperation:
+				client.Barrier++
 			case *BatchOperation:
 				batch.Batch++
 				countClientOps(&batch.Ops, nil, o.Ops...)
@@ -286,7 +288,8 @@ func TestRandStep(t *testing.T) {
 			*DeleteOperation,
 			*DeleteRangeOperation,
 			*DeleteRangeUsingTombstoneOperation,
-			*AddSSTableOperation:
+			*AddSSTableOperation,
+			*BarrierOperation:
 			countClientOps(&counts.DB, &counts.Batch, step.Op)
 		case *ClosureTxnOperation:
 			countClientOps(&counts.ClosureTxn.TxnClientOps, &counts.ClosureTxn.TxnBatchOps, o.Ops...)
diff --git a/pkg/kv/kvnemesis/operations.go b/pkg/kv/kvnemesis/operations.go
index 1ed357a36564..315f2d13017d 100644
--- a/pkg/kv/kvnemesis/operations.go
+++ b/pkg/kv/kvnemesis/operations.go
@@ -41,6 +41,8 @@ func (op Operation) Result() *Result {
 		return &o.Result
 	case *AddSSTableOperation:
 		return &o.Result
+	case *BarrierOperation:
+		return &o.Result
 	case *SplitOperation:
 		return &o.Result
 	case *MergeOperation:
@@ -137,6 +139,8 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) {
 		o.format(w, fctx)
 	case *AddSSTableOperation:
 		o.format(w, fctx)
+	case *BarrierOperation:
+		o.format(w, fctx)
 	case *SplitOperation:
 		o.format(w, fctx)
 	case *MergeOperation:
@@ -351,6 +355,16 @@ func (op AddSSTableOperation) format(w *strings.Builder, fctx formatCtx) {
 	}
 }
 
+func (op BarrierOperation) format(w *strings.Builder, fctx formatCtx) {
+	if op.WithLeaseAppliedIndex {
+		fmt.Fprintf(w, `%s.BarrierWithLAI(ctx, %s, %s)`,
+			fctx.receiver, fmtKey(op.Key), fmtKey(op.EndKey))
+	} else {
+		fmt.Fprintf(w, `%s.Barrier(ctx, %s, %s)`, fctx.receiver, fmtKey(op.Key), fmtKey(op.EndKey))
+	}
+	op.Result.format(w)
+}
+
 func (op SplitOperation) format(w *strings.Builder, fctx formatCtx) {
 	fmt.Fprintf(w, `%s.AdminSplit(ctx, %s, hlc.MaxTimestamp)`, fctx.receiver, fmtKey(op.Key))
 	op.Result.format(w)
diff --git a/pkg/kv/kvnemesis/operations.proto b/pkg/kv/kvnemesis/operations.proto
index cd980b3bdfdb..e9e19431ea1b 100644
--- a/pkg/kv/kvnemesis/operations.proto
+++ b/pkg/kv/kvnemesis/operations.proto
@@ -95,6 +95,13 @@ message AddSSTableOperation {
   Result result = 6 [(gogoproto.nullable) = false];
 }
 
+message BarrierOperation {
+  bytes key = 1;
+  bytes end_key = 2;
+  bool with_lease_applied_index = 3;
+  Result result = 4 [(gogoproto.nullable) = false];
+}
+
 message SplitOperation {
   bytes key = 1;
   Result result = 2 [(gogoproto.nullable) = false];
@@ -168,6 +175,7 @@ message Operation {
   SavepointCreateOperation savepoint_create = 19;
   SavepointReleaseOperation savepoint_release = 20;
   SavepointRollbackOperation savepoint_rollback = 21;
+  BarrierOperation barrier = 22;
 }
 
 enum ResultType {
diff --git a/pkg/kv/kvnemesis/operations_test.go b/pkg/kv/kvnemesis/operations_test.go
index 1968dcbc94b3..0a311df33383 100644
--- a/pkg/kv/kvnemesis/operations_test.go
+++ b/pkg/kv/kvnemesis/operations_test.go
@@ -96,6 +96,8 @@ func TestOperationsFormat(t *testing.T) {
 					createSavepoint(4), del(k9, 1), rollbackSavepoint(4),
 				)),
 		},
+		{step: step(barrier(k1, k2, false /* withLAI */))},
+		{step: step(barrier(k3, k4, true /* withLAI */))},
 	}
 
 	w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name()))
diff --git a/pkg/kv/kvnemesis/testdata/TestOperationsFormat/6 b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/6
new file mode 100644
index 000000000000..b848afebfc9b
--- /dev/null
+++ b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/6
@@ -0,0 +1,3 @@
+echo
+----
+···db0.Barrier(ctx, tk(1), tk(2))
diff --git a/pkg/kv/kvnemesis/testdata/TestOperationsFormat/7 b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/7
new file mode 100644
index 000000000000..92c0790f55de
--- /dev/null
+++ b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/7
@@ -0,0 +1,3 @@
+echo
+----
+···db0.BarrierWithLAI(ctx, tk(3), tk(4))
diff --git a/pkg/kv/kvnemesis/validator.go b/pkg/kv/kvnemesis/validator.go
index 8563c8488207..267cfb025e4c 100644
--- a/pkg/kv/kvnemesis/validator.go
+++ b/pkg/kv/kvnemesis/validator.go
@@ -707,6 +707,20 @@ func (v *validator) processOp(op Operation) {
 		if v.buffering == bufferingSingle {
 			v.checkAtomic(`addSSTable`, t.Result)
 		}
+	case *BarrierOperation:
+		execTimestampStrictlyOptional = true
+		if op.Barrier.WithLeaseAppliedIndex &&
+			resultHasErrorType(t.Result, &kvpb.RangeKeyMismatchError{}) {
+			// Barriers requesting LAIs can't span ranges. The generator will
+			// optimistically try to fit the barrier inside one of the current ranges,
+			// but this may race with a split, so we ignore the error in this case and
+			// try again later.
+		} else {
+			// Fail or retry on other errors, depending on type.
+			v.checkNonAmbError(op, t.Result, exceptUnhandledRetry)
+		}
+		// We don't yet actually check the barrier guarantees here, i.e. that all
+		// concurrent writes are applied by the time it completes. Maybe later.
 	case *ScanOperation:
 		if _, isErr := v.checkError(op, t.Result); isErr {
 			break