Skip to content

Commit

Permalink
kv: don't perform async intent resolution on 1PC with point lock spans
Browse files Browse the repository at this point in the history
Fixes #98571.

This commit fixes the regression detected in #98571. In that issue, we saw that
the bug fix in 86a5852 (#98044) caused a regression in kv0 (and other benchmarks).
This was due to a bug in `kvserverbase.IntersectSpan`, which was considering
local point spans to be external to the provided range span.

This commit fixes the bug by not calling `kvserverbase.IntersectSpan` for point lock
spans.

The commit also makes the utility panic instead of silently returning incorrect results.
There's an existing TODO on the utility to generalize it. For now, we just make it harder
to misuse.

Finally, we add a test that asserts against the presence of async intent resolution after
one-phase commits when external intent resolution is not needed.

```
name                            old time/op    new time/op    delta
KV/Insert/Native/rows=1-10        61.2µs ± 3%    48.9µs ± 3%  -20.10%  (p=0.000 n=8+9)
KV/Insert/Native/rows=10-10       93.3µs ±15%    76.2µs ± 3%  -18.34%  (p=0.000 n=9+9)
KV/Insert/Native/rows=1000-10     2.84ms ±12%    2.42ms ± 4%  -14.97%  (p=0.000 n=9+9)
KV/Insert/Native/rows=100-10       365µs ± 5%     320µs ± 8%  -12.40%  (p=0.000 n=10+9)
KV/Insert/Native/rows=10000-10    27.6ms ± 6%    24.4ms ± 3%  -11.53%  (p=0.000 n=9+9)

name                            old alloc/op   new alloc/op   delta
KV/Insert/Native/rows=1000-10     4.66MB ± 1%    2.76MB ± 1%  -40.89%  (p=0.000 n=9+9)
KV/Insert/Native/rows=100-10       478kB ± 1%     287kB ± 1%  -39.90%  (p=0.000 n=10+10)
KV/Insert/Native/rows=10000-10    54.2MB ± 2%    34.3MB ± 3%  -36.73%  (p=0.000 n=10+10)
KV/Insert/Native/rows=10-10       64.2kB ± 1%    42.1kB ± 1%  -34.39%  (p=0.000 n=10+9)
KV/Insert/Native/rows=1-10        22.1kB ± 1%    17.3kB ± 1%  -21.56%  (p=0.000 n=9+10)

name                            old allocs/op  new allocs/op  delta
KV/Insert/Native/rows=1000-10      21.5k ± 0%     14.7k ± 0%  -31.70%  (p=0.000 n=8+9)
KV/Insert/Native/rows=10000-10      212k ± 0%      146k ± 0%  -31.31%  (p=0.000 n=9+10)
KV/Insert/Native/rows=100-10       2.34k ± 1%     1.61k ± 0%  -31.31%  (p=0.000 n=10+10)
KV/Insert/Native/rows=10-10          392 ± 1%       276 ± 0%  -29.59%  (p=0.000 n=8+8)
KV/Insert/Native/rows=1-10           173 ± 1%       123 ± 0%  -29.04%  (p=0.000 n=9+8)
```

Release note: None
  • Loading branch information
nvanbenschoten committed Mar 14, 2023
1 parent 19e5845 commit d70ec06
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 29 deletions.
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/kvserverbase/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ func ContainsKeyRange(desc *roachpb.RangeDescriptor, start, end roachpb.Key) boo
// into up to three pieces: A first piece which is contained in the Range,
// and a slice of up to two further spans which are outside of the key
// range. An span for which [Key, EndKey) is empty does not result in any
// spans; thus IntersectSpan only applies to span ranges.
// spans; thus IntersectSpan only applies to span ranges and point keys will
// cause the function to panic.
//
// A range-local span range is never split: It's returned as either
// belonging to or outside of the descriptor's key range, and passing an
Expand All @@ -191,8 +192,7 @@ func IntersectSpan(
) (middle *roachpb.Span, outside []roachpb.Span) {
start, end := desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey()
if len(span.EndKey) == 0 {
outside = append(outside, span)
return
panic("unsupported point key")
}
if bytes.Compare(span.Key, keys.LocalRangeMax) < 0 {
if bytes.Compare(span.EndKey, keys.LocalRangeMax) >= 0 {
Expand Down
122 changes: 105 additions & 17 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6954,7 +6954,7 @@ func TestRequestLeaderEncounterGroupDeleteError(t *testing.T) {
}
}

func TestIntentIntersect(t *testing.T) {
func TestIntersectSpan(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
iPt := roachpb.Span{
Expand All @@ -6974,13 +6974,11 @@ func TestIntentIntersect(t *testing.T) {
kl1 := string(iLc.Key)
kl2 := string(iLc.EndKey)

for i, tc := range []struct {
for _, tc := range []struct {
intent roachpb.Span
from, to string
exp []string
}{
{intent: iPt, from: "", to: "z", exp: []string{"", "", "asd", ""}},

{intent: iRn, from: "", to: "a", exp: []string{"", "", "c", "x"}},
{intent: iRn, from: "", to: "c", exp: []string{"", "", "c", "x"}},
{intent: iRn, from: "a", to: "z", exp: []string{"c", "x"}},
Expand All @@ -7001,23 +6999,35 @@ func TestIntentIntersect(t *testing.T) {
{intent: iLc, from: "c", to: "x", exp: []string{kl1, kl2}},
{intent: iLc, from: "a", to: "z", exp: []string{kl1, kl2}},
} {
var all []string
in, out := kvserverbase.IntersectSpan(tc.intent, &roachpb.RangeDescriptor{
desc := &roachpb.RangeDescriptor{
StartKey: roachpb.RKey(tc.from),
EndKey: roachpb.RKey(tc.to),
})
if in != nil {
all = append(all, string(in.Key), string(in.EndKey))
} else {
all = append(all, "", "")
}
for _, o := range out {
all = append(all, string(o.Key), string(o.EndKey))
}
if !reflect.DeepEqual(all, tc.exp) {
t.Errorf("%d: wanted %v, got %v", i, tc.exp, all)
}
name := fmt.Sprintf("span=%v,desc=%v", tc.intent, desc.RSpan())
t.Run(name, func(t *testing.T) {
var all []string
in, out := kvserverbase.IntersectSpan(tc.intent, desc)
if in != nil {
all = append(all, string(in.Key), string(in.EndKey))
} else {
all = append(all, "", "")
}
for _, o := range out {
all = append(all, string(o.Key), string(o.EndKey))
}
require.Equal(t, tc.exp, all)
})
}

t.Run("point", func(t *testing.T) {
desc := &roachpb.RangeDescriptor{
StartKey: roachpb.RKey("a"),
EndKey: roachpb.RKey("z"),
}
require.Panics(t, func() {
_, _ = kvserverbase.IntersectSpan(iPt, desc)
})
})
}

// TestBatchErrorWithIndex tests that when an individual entry in a
Expand Down Expand Up @@ -11288,6 +11298,84 @@ func TestReplicaNotifyLockTableOn1PC(t *testing.T) {
}
}

// TestReplicaAsyncIntentResolutionOn1PC runs a transaction that acquires one or
// more unreplicated locks and then performs a one-phase commit. It tests that
// async resolution is performed for any unreplicated lock that is external to
// the range that the transaction is anchored on, but not for any unreplicated
// lock that is local to that range.
func TestReplicaAsyncIntentResolutionOn1PC(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunTrueAndFalse(t, "external", func(t *testing.T, external bool) {
// Intercept async intent resolution for the test's transaction.
var storeKnobs StoreTestingKnobs
var txnIDAtomic atomic.Value
txnIDAtomic.Store(uuid.Nil)
resIntentC := make(chan *kvpb.ResolveIntentRequest)
storeKnobs.TestingRequestFilter = func(_ context.Context, ba *kvpb.BatchRequest) *kvpb.Error {
for _, req := range ba.Requests {
riReq := req.GetResolveIntent()
if riReq != nil && riReq.IntentTxn.ID == txnIDAtomic.Load().(uuid.UUID) {
resIntentC <- riReq
}
}
return nil
}

ctx := context.Background()
s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{Store: &storeKnobs}})
defer s.Stopper().Stop(ctx)

// Perform a range split between key A and B.
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")
_, _, err := s.SplitRange(keyB)
require.NoError(t, err)

// Write a value to a key A and B.
_, err = kvDB.Inc(ctx, keyA, 1)
require.Nil(t, err)
_, err = kvDB.Inc(ctx, keyB, 1)
require.Nil(t, err)

// Create a new transaction.
txn := kvDB.NewTxn(ctx, "test")
txnIDAtomic.Store(txn.ID())

// Perform one or more "for update" gets. This should acquire unreplicated,
// exclusive locks on the keys.
b := txn.NewBatch()
b.GetForUpdate(keyA)
if external {
b.GetForUpdate(keyB)
}
err = txn.Run(ctx, b)
require.NoError(t, err)

// Update the locked value and commit in a single batch. This should hit the
// one-phase commit fast-path and then release the "for update" lock(s).
b = txn.NewBatch()
b.Inc(keyA, 1)
err = txn.CommitInBatch(ctx, b)
require.NoError(t, err)

// If an external lock was acquired, we should see its resolution.
if external {
riReq := <-resIntentC
require.Equal(t, keyB, riReq.Key)
}

// After that, we should see no other intent resolution request for this
// transaction.
select {
case riReq := <-resIntentC:
t.Fatalf("unexpected intent resolution request: %v", riReq)
case <-time.After(10 * time.Millisecond):
}
})
}

// TestReplicaQueryLocks tests QueryLocks in a number of scenarios while locks are
// held, such as filtering out uncontended locks and limiting results.
func TestReplicaQueryLocks(t *testing.T) {
Expand Down
21 changes: 12 additions & 9 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,15 +589,18 @@ func (r *Replica) evaluate1PC(
resolvedLocks := make([]roachpb.LockUpdate, 0, len(etArg.LockSpans))
var externalLocks []roachpb.Span
for _, sp := range etArg.LockSpans {
inSpan, outSpans := kvserverbase.IntersectSpan(sp, desc)
externalLocks = append(externalLocks, outSpans...)
if inSpan != nil {
resolvedLocks = append(resolvedLocks, roachpb.LockUpdate{
Span: *inSpan,
Txn: clonedTxn.TxnMeta,
Status: clonedTxn.Status,
IgnoredSeqNums: clonedTxn.IgnoredSeqNums,
})
if len(sp.EndKey) == 0 {
if kvserverbase.ContainsKey(desc, sp.Key) {
resolvedLocks = append(resolvedLocks, roachpb.MakeLockUpdate(clonedTxn, sp))
} else {
externalLocks = append(externalLocks, sp)
}
} else {
inSpan, outSpans := kvserverbase.IntersectSpan(sp, desc)
if inSpan != nil {
resolvedLocks = append(resolvedLocks, roachpb.MakeLockUpdate(clonedTxn, *inSpan))
}
externalLocks = append(externalLocks, outSpans...)
}
}
clonedTxn.LockSpans = externalLocks
Expand Down

0 comments on commit d70ec06

Please sign in to comment.