Skip to content

Commit

Permalink
Merge pull request #118407 from erikgrinaker/backport23.2-117787-1179…
Browse files Browse the repository at this point in the history
…67-117968

release-23.2: batcheval: add `BarrierRequest.WithLeaseAppliedIndex`
  • Loading branch information
erikgrinaker authored Feb 1, 2024
2 parents d8ebc77 + 954f001 commit 5c319a1
Show file tree
Hide file tree
Showing 22 changed files with 658 additions and 22 deletions.
3 changes: 2 additions & 1 deletion pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -1116,7 +1116,7 @@ func (b *Batch) queryResolvedTimestamp(s, e interface{}) {
b.initResult(1, 0, notRaw, nil)
}

func (b *Batch) barrier(s, e interface{}) {
func (b *Batch) barrier(s, e interface{}, withLAI bool) {
begin, err := marshalKey(s)
if err != nil {
b.initResult(0, 0, notRaw, err)
Expand All @@ -1132,6 +1132,7 @@ func (b *Batch) barrier(s, e interface{}) {
Key: begin,
EndKey: end,
},
WithLeaseAppliedIndex: withLAI,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
Expand Down
44 changes: 34 additions & 10 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,23 +889,47 @@ func (db *DB) QueryResolvedTimestamp(
// writes on the specified key range to finish.
func (db *DB) Barrier(ctx context.Context, begin, end interface{}) (hlc.Timestamp, error) {
b := &Batch{}
b.barrier(begin, end)
err := getOneErr(db.Run(ctx, b), b)
if err != nil {
b.barrier(begin, end, false /* withLAI */)
if err := getOneErr(db.Run(ctx, b), b); err != nil {
return hlc.Timestamp{}, err
}
responses := b.response.Responses
if len(responses) == 0 {
return hlc.Timestamp{}, errors.Errorf("unexpected empty response for Barrier")
if l := len(b.response.Responses); l != 1 {
return hlc.Timestamp{}, errors.Errorf("got %d responses for Barrier", l)
}
resp, ok := responses[0].GetInner().(*kvpb.BarrierResponse)
if !ok {
return hlc.Timestamp{}, errors.Errorf("unexpected response of type %T for Barrier",
responses[0].GetInner())
resp := b.response.Responses[0].GetBarrier()
if resp == nil {
return hlc.Timestamp{}, errors.Errorf("unexpected response %T for Barrier",
b.response.Responses[0].GetInner())
}
return resp.Timestamp, nil
}

// BarrierWithLAI is like Barrier, but also returns the lease applied index and
// range descriptor at which the barrier was applied. In this case, the barrier
// can't span multiple ranges, otherwise a RangeKeyMismatchError is returned.
//
// NB: the protocol support for this was added in a patch release, and is not
// guaranteed to be present with nodes prior to 24.1. In this case, the request
// will return an empty result.
func (db *DB) BarrierWithLAI(
ctx context.Context, begin, end interface{},
) (kvpb.LeaseAppliedIndex, roachpb.RangeDescriptor, error) {
b := &Batch{}
b.barrier(begin, end, true /* withLAI */)
if err := getOneErr(db.Run(ctx, b), b); err != nil {
return 0, roachpb.RangeDescriptor{}, err
}
if l := len(b.response.Responses); l != 1 {
return 0, roachpb.RangeDescriptor{}, errors.Errorf("got %d responses for Barrier", l)
}
resp := b.response.Responses[0].GetBarrier()
if resp == nil {
return 0, roachpb.RangeDescriptor{}, errors.Errorf("unexpected response %T for Barrier",
b.response.Responses[0].GetInner())
}
return resp.LeaseAppliedIndex, resp.RangeDesc, nil
}

// sendAndFill is a helper which sends the given batch and fills its results,
// returning the appropriate error which is either from the first failing call,
// or an "internal" error.
Expand Down
21 changes: 21 additions & 0 deletions pkg/kv/kvnemesis/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
case *ChangeZoneOperation:
err := updateZoneConfigInEnv(ctx, env, o.Type)
o.Result = resultInit(ctx, err)
case *BarrierOperation:
var err error
if o.WithLeaseAppliedIndex {
_, _, err = db.BarrierWithLAI(ctx, o.Key, o.EndKey)
} else {
_, err = db.Barrier(ctx, o.Key, o.EndKey)
}
o.Result = resultInit(ctx, err)
case *ClosureTxnOperation:
// Use a backoff loop to avoid thrashing on txn aborts. Don't wait between
// epochs of the same transaction to avoid waiting while holding locks.
Expand Down Expand Up @@ -435,6 +443,17 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) {
return
}
o.Result.OptionalTimestamp = ts
case *BarrierOperation:
_, _, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
b.AddRawRequest(&kvpb.BarrierRequest{
RequestHeader: kvpb.RequestHeader{
Key: o.Key,
EndKey: o.EndKey,
},
WithLeaseAppliedIndex: o.WithLeaseAppliedIndex,
})
})
o.Result = resultInit(ctx, err)
case *BatchOperation:
b := &kv.Batch{}
applyBatchOp(ctx, b, db.Run, o)
Expand Down Expand Up @@ -510,6 +529,8 @@ func applyBatchOp(
setLastReqSeq(b, subO.Seq)
case *AddSSTableOperation:
panic(errors.AssertionFailedf(`AddSSTable cannot be used in batches`))
case *BarrierOperation:
panic(errors.AssertionFailedf(`Barrier cannot be used in batches`))
default:
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO))
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/kv/kvnemesis/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ type ClientOperationConfig struct {
DeleteRangeUsingTombstone int
// AddSSTable is an operations that ingests an SSTable with random KV pairs.
AddSSTable int
// Barrier is an operation that waits for in-flight writes to complete.
Barrier int
}

// BatchOperationConfig configures the relative probability of generating a
Expand Down Expand Up @@ -373,6 +375,7 @@ func newAllOperationsConfig() GeneratorConfig {
DeleteRange: 1,
DeleteRangeUsingTombstone: 1,
AddSSTable: 1,
Barrier: 1,
}
batchOpConfig := BatchOperationConfig{
Batch: 4,
Expand Down Expand Up @@ -491,6 +494,12 @@ func NewDefaultConfig() GeneratorConfig {
config.Ops.ClosureTxn.CommitBatchOps.AddSSTable = 0
config.Ops.ClosureTxn.TxnClientOps.AddSSTable = 0
config.Ops.ClosureTxn.TxnBatchOps.Ops.AddSSTable = 0
// Barrier cannot be used in batches, and we omit it in txns too because it
// can result in spurious RangeKeyMismatchErrors that fail the txn operation.
config.Ops.Batch.Ops.Barrier = 0
config.Ops.ClosureTxn.CommitBatchOps.Barrier = 0
config.Ops.ClosureTxn.TxnClientOps.Barrier = 0
config.Ops.ClosureTxn.TxnBatchOps.Ops.Barrier = 0
return config
}

Expand Down Expand Up @@ -766,6 +775,7 @@ func (g *generator) registerClientOps(allowed *[]opGen, c *ClientOperationConfig
addOpGen(allowed, randDelRange, c.DeleteRange)
addOpGen(allowed, randDelRangeUsingTombstone, c.DeleteRangeUsingTombstone)
addOpGen(allowed, randAddSSTable, c.AddSSTable)
addOpGen(allowed, randBarrier, c.Barrier)
}

func (g *generator) registerBatchOps(allowed *[]opGen, c *BatchOperationConfig) {
Expand Down Expand Up @@ -1056,6 +1066,21 @@ func randAddSSTable(g *generator, rng *rand.Rand) Operation {
return addSSTable(f.Data(), sstSpan, sstTimestamp, seq, asWrites)
}

func randBarrier(g *generator, rng *rand.Rand) Operation {
withLAI := rng.Float64() < 0.5
var key, endKey string
if withLAI {
// Barriers requesting LAIs can't span multiple ranges, so we try to fit
// them within an existing range. This may race with a concurrent split, in
// which case the Barrier will fail, but that's ok -- most should still
// succeed. These errors are ignored by the validator.
key, endKey = randRangeSpan(rng, g.currentSplits)
} else {
key, endKey = randSpan(rng)
}
return barrier(key, endKey, withLAI)
}

func randScan(g *generator, rng *rand.Rand) Operation {
key, endKey := randSpan(rng)
return scan(key, endKey)
Expand Down Expand Up @@ -1735,3 +1760,11 @@ func addSSTable(
AsWrites: asWrites,
}}
}

func barrier(key, endKey string, withLAI bool) Operation {
return Operation{Barrier: &BarrierOperation{
Key: []byte(key),
EndKey: []byte(endKey),
WithLeaseAppliedIndex: withLAI,
}}
}
5 changes: 4 additions & 1 deletion pkg/kv/kvnemesis/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ func TestRandStep(t *testing.T) {
client.DeleteRangeUsingTombstone++
case *AddSSTableOperation:
client.AddSSTable++
case *BarrierOperation:
client.Barrier++
case *BatchOperation:
batch.Batch++
countClientOps(&batch.Ops, nil, o.Ops...)
Expand All @@ -270,7 +272,8 @@ func TestRandStep(t *testing.T) {
*DeleteOperation,
*DeleteRangeOperation,
*DeleteRangeUsingTombstoneOperation,
*AddSSTableOperation:
*AddSSTableOperation,
*BarrierOperation:
countClientOps(&counts.DB, &counts.Batch, step.Op)
case *ClosureTxnOperation:
countClientOps(&counts.ClosureTxn.TxnClientOps, &counts.ClosureTxn.TxnBatchOps, o.Ops...)
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvnemesis/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func (op Operation) Result() *Result {
return &o.Result
case *AddSSTableOperation:
return &o.Result
case *BarrierOperation:
return &o.Result
case *SplitOperation:
return &o.Result
case *MergeOperation:
Expand Down Expand Up @@ -131,6 +133,8 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) {
o.format(w, fctx)
case *AddSSTableOperation:
o.format(w, fctx)
case *BarrierOperation:
o.format(w, fctx)
case *SplitOperation:
o.format(w, fctx)
case *MergeOperation:
Expand Down Expand Up @@ -339,6 +343,16 @@ func (op AddSSTableOperation) format(w *strings.Builder, fctx formatCtx) {
}
}

func (op BarrierOperation) format(w *strings.Builder, fctx formatCtx) {
if op.WithLeaseAppliedIndex {
fmt.Fprintf(w, `%s.BarrierWithLAI(ctx, %s, %s)`,
fctx.receiver, fmtKey(op.Key), fmtKey(op.EndKey))
} else {
fmt.Fprintf(w, `%s.Barrier(ctx, %s, %s)`, fctx.receiver, fmtKey(op.Key), fmtKey(op.EndKey))
}
op.Result.format(w)
}

func (op SplitOperation) format(w *strings.Builder, fctx formatCtx) {
fmt.Fprintf(w, `%s.AdminSplit(ctx, %s, hlc.MaxTimestamp)`, fctx.receiver, fmtKey(op.Key))
op.Result.format(w)
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvnemesis/operations.proto
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ message AddSSTableOperation {
Result result = 6 [(gogoproto.nullable) = false];
}

message BarrierOperation {
bytes key = 1;
bytes end_key = 2;
bool with_lease_applied_index = 3;
Result result = 4 [(gogoproto.nullable) = false];
}

message SplitOperation {
bytes key = 1;
Result result = 2 [(gogoproto.nullable) = false];
Expand Down Expand Up @@ -150,6 +157,7 @@ message Operation {
TransferLeaseOperation transfer_lease = 16;
ChangeZoneOperation change_zone = 17;
AddSSTableOperation add_sstable = 18 [(gogoproto.customname) = "AddSSTable"];
BarrierOperation barrier = 22;
}

enum ResultType {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvnemesis/operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ func TestOperationsFormat(t *testing.T) {
{
step: step(addSSTable(sstFile.Data(), sstSpan, sstTS, sstValueHeader.KVNemesisSeq.Get(), true)),
},
{step: step(barrier(k1, k2, false /* withLAI */))},
{step: step(barrier(k3, k4, true /* withLAI */))},
}

w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name()))
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvnemesis/testdata/TestOperationsFormat/5
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
echo
----
···db0.Barrier(ctx, tk(1), tk(2))
3 changes: 3 additions & 0 deletions pkg/kv/kvnemesis/testdata/TestOperationsFormat/6
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
echo
----
···db0.BarrierWithLAI(ctx, tk(3), tk(4))
14 changes: 14 additions & 0 deletions pkg/kv/kvnemesis/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,20 @@ func (v *validator) processOp(op Operation) {
if v.buffering == bufferingSingle {
v.checkAtomic(`addSSTable`, t.Result)
}
case *BarrierOperation:
execTimestampStrictlyOptional = true
if op.Barrier.WithLeaseAppliedIndex &&
resultHasErrorType(t.Result, &kvpb.RangeKeyMismatchError{}) {
// Barriers requesting LAIs can't span ranges. The generator will
// optimistically try to fit the barrier inside one of the current ranges,
// but this may race with a split, so we ignore the error in this case and
// try again later.
} else {
// Fail or retry on other errors, depending on type.
v.checkNonAmbError(op, t.Result, exceptUnhandledRetry)
}
// We don't yet actually check the barrier guarantees here, i.e. that all
// concurrent writes are applied by the time it completes. Maybe later.
case *ScanOperation:
if _, isErr := v.checkError(op, t.Result); isErr {
break
Expand Down
14 changes: 13 additions & 1 deletion pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,12 @@ func (r *BarrierResponse) combine(_ context.Context, c combinable, _ *BatchReque
return err
}
r.Timestamp.Forward(otherR.Timestamp)
if r.LeaseAppliedIndex != 0 || otherR.LeaseAppliedIndex != 0 {
return errors.AssertionFailedf("can't combine BarrierResponses with LeaseAppliedIndex")
}
if r.RangeDesc.NextReplicaID != 0 || otherR.RangeDesc.NextReplicaID != 0 {
return errors.AssertionFailedf("can't combine BarrierResponses with RangeDesc")
}
}
return nil
}
Expand Down Expand Up @@ -1767,7 +1773,13 @@ func (*RangeStatsRequest) flags() flag { return isRead }
func (*QueryResolvedTimestampRequest) flags() flag {
return isRead | isRange | requiresClosedTSOlderThanStorageSnapshot
}
func (*BarrierRequest) flags() flag { return isWrite | isRange }
func (r *BarrierRequest) flags() flag {
flags := isWrite | isRange | isAlone
if r.WithLeaseAppliedIndex {
flags |= isUnsplittable // the LAI is only valid for a single range
}
return flags
}
func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange }

// IsParallelCommit returns whether the EndTxn request is attempting to perform
Expand Down
31 changes: 28 additions & 3 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2432,11 +2432,28 @@ message QueryResolvedTimestampResponse {
(gogoproto.nullable) = false, (gogoproto.customname) = "ResolvedTS"];
}

// BarrierRequest is the request for a Barrier operation. This goes through Raft
// and has the purpose of waiting until all conflicting in-flight operations on
// this range have completed, without blocking any new operations.
// BarrierRequest is the request for a Barrier operation. This guarantees that
// all past and ongoing writes to a key span have completed and applied on the
// leaseholder. It does this by waiting for all conflicting write latches and
// then submitting a noop write through Raft, waiting for it to apply. Later
// writes are not affected -- in particular, it does not actually take out a
// latch, so writers don't have to wait for it to complete and can write below
// the barrier.
message BarrierRequest {
RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];

// WithLeaseAppliedIndex will return the LeaseAppliedIndex of the barrier
// command in the response, allowing the caller to wait for the barrier to
// apply on an arbitrary replica. It also returns the range descriptor, so the
// caller can detect any unexpected range changes.
//
// When enabled, the barrier request can no longer span multiple ranges, and
// will instead return RangeKeyMismatchError. The caller must be prepared to
// handle this.
//
// NB: This field was added in a patch release. Nodes prior to 24.1 are not
// guaranteed to support it, returning a zero LeaseAppliedIndex instead.
bool with_lease_applied_index = 2;
}

// BarrierResponse is the response for a Barrier operation.
Expand All @@ -2446,6 +2463,14 @@ message BarrierResponse {
// Timestamp at which this Barrier was evaluated. Can be used to guarantee
// future operations happen on the same or newer leaseholders.
util.hlc.Timestamp timestamp = 2 [(gogoproto.nullable) = false];

// LeaseAppliedIndex at which this Barrier was applied. Only returned when
// requested via WithLeaseAppliedIndex.
uint64 lease_applied_index = 3 [(gogoproto.casttype) = "LeaseAppliedIndex"];

// RangeDesc at the time the barrier was applied. Only returned when requested
// via WithLeaseAppliedIndex.
RangeDescriptor range_desc = 4 [(gogoproto.nullable) = false];
}

// A RequestUnion contains exactly one of the requests.
Expand Down
8 changes: 7 additions & 1 deletion pkg/kv/kvpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,12 @@ func (ba *BatchRequest) IsSingleRequest() bool {
return len(ba.Requests) == 1
}

// IsSingleBarrierRequest returns true iff the batch contains a single request,
// and that request is a Barrier.
func (ba *BatchRequest) IsSingleBarrierRequest() bool {
return ba.isSingleRequestWithMethod(Barrier)
}

// IsSingleSkipsLeaseCheckRequest returns true iff the batch contains a single
// request, and that request has the skipsLeaseCheck flag set.
func (ba *BatchRequest) IsSingleSkipsLeaseCheckRequest() bool {
Expand Down Expand Up @@ -349,7 +355,7 @@ func (ba *BatchRequest) IsSingleExportRequest() bool {
// a no-op. The Barrier request requires consensus even though its evaluation
// is a no-op.
func (ba *BatchRequest) RequiresConsensus() bool {
return ba.isSingleRequestWithMethod(Barrier) || ba.isSingleRequestWithMethod(Probe)
return ba.IsSingleBarrierRequest() || ba.IsSingleProbeRequest()
}

// IsCompleteTransaction determines whether a batch contains every write in a
Expand Down
Loading

0 comments on commit 5c319a1

Please sign in to comment.