diff --git a/pkg/kv/kvserver/concurrency/lock_table_test.go b/pkg/kv/kvserver/concurrency/lock_table_test.go index 8bf9fee51cfe..6afa206c7530 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_test.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -722,30 +723,42 @@ func scanSpans( spanStr := parts[1] str := GetStrength(t, d, strS) // Compute latch span access based on the supplied strength. - var sa spanset.SpanAccess - switch str { - case lock.None: - sa = spanset.SpanReadOnly - case lock.Intent: - sa = spanset.SpanReadWrite - case lock.Exclusive: - sa = spanset.SpanReadWrite - case lock.Shared: - // Unlike non-locking reads, shared-locking reads are isolated at all - // timestamps (not just the request's timestamp); so we acquire a read - // latch at max timestamp. See - // https://github.com/cockroachdb/cockroach/issues/102264. - sa = spanset.SpanReadOnly - ts = hlc.MaxTimestamp - default: - d.Fatalf(t, "unsupported lock strength: %s", str) - } - latchSpans.AddMVCC(sa, getSpan(t, d, spanStr), ts) + sa, latchTs := latchAccessForLockStrength(str, ts) + latchSpans.AddMVCC(sa, getSpan(t, d, spanStr), latchTs) lockSpans.Add(str, getSpan(t, d, spanStr)) } return latchSpans, lockSpans } +// latchAccessForLockStrength returns the latch access and timestamp to use for +// a given lock strength and request timestamp. It duplicates some of the logic +// in DefaultDeclareIsolatedKeys to avoid the package dependency. +func latchAccessForLockStrength( + str lock.Strength, ts hlc.Timestamp, +) (spanset.SpanAccess, hlc.Timestamp) { + switch str { + case lock.None: + return spanset.SpanReadOnly, ts + case lock.Shared: + // Unlike non-locking reads, shared-locking reads are isolated at all + // timestamps (not just the request's timestamp); so we acquire a read + // latch at max timestamp. See + // https://github.com/cockroachdb/cockroach/issues/102264. + // + // We don't need to duplicate the special case for replicated shared + // locks here (see ReplicatedSharedLocksTransactionLatchingKey) because + // there is no risk in these tests of two shared lock acquisitions from + // the same transaction clobbering each other's state. + return spanset.SpanReadOnly, hlc.MaxTimestamp + case lock.Exclusive: + return spanset.SpanReadWrite, ts + case lock.Intent: + return spanset.SpanReadWrite, ts + default: + panic(fmt.Sprintf("unsupported lock strength: %s", str)) + } +} + func ScanIsoLevel(t *testing.T, d *datadriven.TestData) isolation.Level { const key = "iso" if !d.HasArg(key) { @@ -1060,7 +1073,7 @@ type workItem struct { // Request. request *Request - locksToAcquire []roachpb.Key + locksToAcquire []lockToAcquire // Update locks. intents []roachpb.LockUpdate @@ -1080,6 +1093,16 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error { if item.request != nil { var lg *spanlatch.Guard var g lockTableGuard + defer func() { + if lg != nil { + e.lm.Release(lg) + lg = nil + } + if g != nil { + e.lt.Dequeue(g) + g = nil + } + }() var err error for { // Since we can't do a select involving latch acquisition and context @@ -1099,6 +1122,7 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error { break } e.lm.Release(lg) + lg = nil var lastID uuid.UUID L: for { @@ -1116,14 +1140,12 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error { if !lastID.Equal(uuid.UUID{}) && item.request.Txn != nil { _, err = e.waitingFor(item.request.Txn.ID, lastID, uuid.UUID{}) if err != nil { - e.lt.Dequeue(g) return err } } break L case waitSelf: if item.request.Txn == nil { - e.lt.Dequeue(g) return errors.Errorf("non-transactional request cannot waitSelf") } case waitForDistinguished, waitFor, waitElsewhere: @@ -1134,7 +1156,6 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error { lastID = state.txn.ID } if aborted { - e.lt.Dequeue(g) return err } } @@ -1144,15 +1165,13 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error { } } - // acquire locks. - for _, k := range item.locksToAcquire { - err = e.acquireLock(item.request.Txn, k) + // Acquire locks. + for _, toAcq := range item.locksToAcquire { + err = e.acquireLock(item.request.Txn, toAcq) if err != nil { break } } - e.lt.Dequeue(g) - e.lm.Release(lg) return err } for i := range item.intents { @@ -1169,12 +1188,19 @@ type workloadItem struct { // Request to be executed, iff request != nil request *Request // locks to be acquired by the request. - locksToAcquire []roachpb.Key + locksToAcquire []lockToAcquire // Non-empty when transaction should release locks. finish uuid.UUID } +// lockToAcquire is a lock that should be acquired by a request. +type lockToAcquire struct { + key roachpb.Key + str lock.Strength + dur lock.Durability +} + // state of a transaction maintained by workloadExecutor, for deadlock // detection, and deciding when transaction can be finished (when a // workloadItem has instructed that it be finished and all its ongoing @@ -1245,8 +1271,8 @@ func newWorkLoadExecutor(items []workloadItem, concurrency int) *workloadExecuto } } -func (e *workloadExecutor) acquireLock(txn *roachpb.Transaction, k roachpb.Key) error { - acq := roachpb.MakeLockAcquisition(txn, k, lock.Unreplicated, lock.Exclusive) +func (e *workloadExecutor) acquireLock(txn *roachpb.Transaction, toAcq lockToAcquire) error { + acq := roachpb.MakeLockAcquisition(txn, toAcq.key, toAcq.dur, toAcq.str) err := e.lt.AcquireLock(&acq) if err != nil { return err @@ -1257,7 +1283,7 @@ func (e *workloadExecutor) acquireLock(txn *roachpb.Transaction, k roachpb.Key) if !ok { return errors.Errorf("testbug: lock acquiring request with txnID %v has no transaction", txn.ID) } - tstate.acquiredLocks = append(tstate.acquiredLocks, k) + tstate.acquiredLocks = append(tstate.acquiredLocks, toAcq.key) return nil } @@ -1458,6 +1484,7 @@ func TestLockTableConcurrentSingleRequests(t *testing.T) { for i := 0; i < 10; i++ { keys = append(keys, roachpb.Key(string(rune('a'+i)))) } + strs := []lock.Strength{lock.None, lock.Shared, lock.Exclusive, lock.Intent} rng := rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano()))) const numKeys = 2 @@ -1472,12 +1499,9 @@ func TestLockTableConcurrentSingleRequests(t *testing.T) { lockSpans := &lockspanset.LockSpanSet{} for i := 0; i < numKeys; i++ { span := roachpb.Span{Key: keys[keysPerm[i]]} - acc := spanset.SpanAccess(rng.Intn(int(spanset.NumSpanAccess))) - str := lock.None - if acc == spanset.SpanReadWrite { - str = lock.Intent - } - latchSpans.AddMVCC(acc, span, ts) + str := strs[rand.Intn(len(strs))] + sa, latchTs := latchAccessForLockStrength(str, ts) + latchSpans.AddMVCC(sa, span, latchTs) lockSpans.Add(str, span) } var txn *roachpb.Transaction @@ -1537,6 +1561,9 @@ func TestLockTableConcurrentRequests(t *testing.T) { for i := 0; i < 10; i++ { keys = append(keys, roachpb.Key(string(rune('a'+i)))) } + // TODO(nvanbenschoten): add lock.Shared back in once #111144 is fixed. + //strs := []lock.Strength{lock.None, lock.Shared, lock.Exclusive, lock.Intent} + strs := []lock.Strength{lock.None, lock.Exclusive, lock.Intent} rng := rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano()))) const numActiveTxns = 8 var activeTxns [numActiveTxns]*enginepb.TxnMeta @@ -1585,27 +1612,33 @@ func TestLockTableConcurrentRequests(t *testing.T) { wi := workloadItem{request: request} for i := 0; i < numKeys; i++ { span := roachpb.Span{Key: keys[keysPerm[i]]} - acc := spanset.SpanReadOnly + str := lock.None - dupRead := false - if !onlyReads { - acc = spanset.SpanAccess(rng.Intn(int(spanset.NumSpanAccess))) - if acc == spanset.SpanReadWrite && txnMeta != nil && rng.Intn(2) == 0 { - // Acquire lock. - wi.locksToAcquire = append(wi.locksToAcquire, span.Key) - str = lock.Intent - } - if acc == spanset.SpanReadWrite && rng.Intn(2) == 0 { - // Also include the key as read. - dupRead = true - str = lock.Intent - } + if !onlyReads && txnMeta != nil { + // Randomly select a lock strength (including lock.None). + str = strs[rand.Intn(len(strs))] } - latchSpans.AddMVCC(acc, span, ts) + sa, latchTs := latchAccessForLockStrength(str, ts) + latchSpans.AddMVCC(sa, span, latchTs) lockSpans.Add(str, span) + if str != lock.None { + // Randomly select a lock durability. Shared and Exclusive locks + // can be unreplicated, but Intent can not. + dur := lock.Replicated + if str != lock.Intent && rng.Intn(2) == 0 { + dur = lock.Unreplicated + } + toAcq := lockToAcquire{span.Key, str, dur} + wi.locksToAcquire = append(wi.locksToAcquire, toAcq) + } + + dupRead := str != lock.None && rng.Intn(2) == 0 if dupRead { - latchSpans.AddMVCC(spanset.SpanReadOnly, span, ts) - lockSpans.Add(lock.None, span) + // Also include the key as a non-locking read. + str = lock.None + sa, latchTs := latchAccessForLockStrength(str, ts) + latchSpans.AddMVCC(sa, span, latchTs) + lockSpans.Add(str, span) } } items = append(items, wi) @@ -1620,7 +1653,10 @@ func TestLockTableConcurrentRequests(t *testing.T) { t.Run(fmt.Sprintf("concurrency %d", c), func(t *testing.T) { exec := newWorkLoadExecutor(items, c) if err := exec.execute(false, 200); err != nil { - t.Fatal(err) + // TODO(nvanbenschoten): remove this once #110435 is fixed. + if !testutils.IsError(err, "lock promotion from Shared to .* is not allowed") { + t.Fatal(err) + } } }) }