Skip to content

Commit

Permalink
kv: add all lock strengths and durabilities to TestLockTableConcurren…
Browse files Browse the repository at this point in the history
…tRequests

Informs cockroachdb#91545.
Informs cockroachdb#100193.

This commit expands TestLockTableConcurrentRequests to support Shared,
Exclusive, and Intent locking strength, in addition to both Replicated
and Unreplicated locking durabilities. This provides randomized coverage
of the lock table with these combinations.

The commit then temporarily disables Shared locks in the test, which
occasionally fail with the panic: `tryMakeNewDistinguished called with new claimant txn`.
This is related to cockroachdb#111144, so we can re-enable Shared locks when that issue
is resolved.

Release note: None
  • Loading branch information
nvanbenschoten authored and Thomas Hardy committed Oct 4, 2023
1 parent 976c04c commit d9b1147
Showing 1 changed file with 92 additions and 56 deletions.
148 changes: 92 additions & 56 deletions pkg/kv/kvserver/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -722,30 +723,42 @@ func scanSpans(
spanStr := parts[1]
str := GetStrength(t, d, strS)
// Compute latch span access based on the supplied strength.
var sa spanset.SpanAccess
switch str {
case lock.None:
sa = spanset.SpanReadOnly
case lock.Intent:
sa = spanset.SpanReadWrite
case lock.Exclusive:
sa = spanset.SpanReadWrite
case lock.Shared:
// Unlike non-locking reads, shared-locking reads are isolated at all
// timestamps (not just the request's timestamp); so we acquire a read
// latch at max timestamp. See
// https://github.com/cockroachdb/cockroach/issues/102264.
sa = spanset.SpanReadOnly
ts = hlc.MaxTimestamp
default:
d.Fatalf(t, "unsupported lock strength: %s", str)
}
latchSpans.AddMVCC(sa, getSpan(t, d, spanStr), ts)
sa, latchTs := latchAccessForLockStrength(str, ts)
latchSpans.AddMVCC(sa, getSpan(t, d, spanStr), latchTs)
lockSpans.Add(str, getSpan(t, d, spanStr))
}
return latchSpans, lockSpans
}

// latchAccessForLockStrength returns the latch access and timestamp to use for
// a given lock strength and request timestamp. It duplicates some of the logic
// in DefaultDeclareIsolatedKeys to avoid the package dependency.
func latchAccessForLockStrength(
str lock.Strength, ts hlc.Timestamp,
) (spanset.SpanAccess, hlc.Timestamp) {
switch str {
case lock.None:
return spanset.SpanReadOnly, ts
case lock.Shared:
// Unlike non-locking reads, shared-locking reads are isolated at all
// timestamps (not just the request's timestamp); so we acquire a read
// latch at max timestamp. See
// https://github.com/cockroachdb/cockroach/issues/102264.
//
// We don't need to duplicate the special case for replicated shared
// locks here (see ReplicatedSharedLocksTransactionLatchingKey) because
// there is no risk in these tests of two shared lock acquisitions from
// the same transaction clobbering each other's state.
return spanset.SpanReadOnly, hlc.MaxTimestamp
case lock.Exclusive:
return spanset.SpanReadWrite, ts
case lock.Intent:
return spanset.SpanReadWrite, ts
default:
panic(fmt.Sprintf("unsupported lock strength: %s", str))
}
}

func ScanIsoLevel(t *testing.T, d *datadriven.TestData) isolation.Level {
const key = "iso"
if !d.HasArg(key) {
Expand Down Expand Up @@ -1060,7 +1073,7 @@ type workItem struct {

// Request.
request *Request
locksToAcquire []roachpb.Key
locksToAcquire []lockToAcquire

// Update locks.
intents []roachpb.LockUpdate
Expand All @@ -1080,6 +1093,16 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error {
if item.request != nil {
var lg *spanlatch.Guard
var g lockTableGuard
defer func() {
if lg != nil {
e.lm.Release(lg)
lg = nil
}
if g != nil {
e.lt.Dequeue(g)
g = nil
}
}()
var err error
for {
// Since we can't do a select involving latch acquisition and context
Expand All @@ -1099,6 +1122,7 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error {
break
}
e.lm.Release(lg)
lg = nil
var lastID uuid.UUID
L:
for {
Expand All @@ -1116,14 +1140,12 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error {
if !lastID.Equal(uuid.UUID{}) && item.request.Txn != nil {
_, err = e.waitingFor(item.request.Txn.ID, lastID, uuid.UUID{})
if err != nil {
e.lt.Dequeue(g)
return err
}
}
break L
case waitSelf:
if item.request.Txn == nil {
e.lt.Dequeue(g)
return errors.Errorf("non-transactional request cannot waitSelf")
}
case waitForDistinguished, waitFor, waitElsewhere:
Expand All @@ -1134,7 +1156,6 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error {
lastID = state.txn.ID
}
if aborted {
e.lt.Dequeue(g)
return err
}
}
Expand All @@ -1144,15 +1165,13 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error {
}
}

// acquire locks.
for _, k := range item.locksToAcquire {
err = e.acquireLock(item.request.Txn, k)
// Acquire locks.
for _, toAcq := range item.locksToAcquire {
err = e.acquireLock(item.request.Txn, toAcq)
if err != nil {
break
}
}
e.lt.Dequeue(g)
e.lm.Release(lg)
return err
}
for i := range item.intents {
Expand All @@ -1169,12 +1188,19 @@ type workloadItem struct {
// Request to be executed, iff request != nil
request *Request
// locks to be acquired by the request.
locksToAcquire []roachpb.Key
locksToAcquire []lockToAcquire

// Non-empty when transaction should release locks.
finish uuid.UUID
}

// lockToAcquire is a lock that should be acquired by a request.
type lockToAcquire struct {
key roachpb.Key
str lock.Strength
dur lock.Durability
}

// state of a transaction maintained by workloadExecutor, for deadlock
// detection, and deciding when transaction can be finished (when a
// workloadItem has instructed that it be finished and all its ongoing
Expand Down Expand Up @@ -1245,8 +1271,8 @@ func newWorkLoadExecutor(items []workloadItem, concurrency int) *workloadExecuto
}
}

func (e *workloadExecutor) acquireLock(txn *roachpb.Transaction, k roachpb.Key) error {
acq := roachpb.MakeLockAcquisition(txn, k, lock.Unreplicated, lock.Exclusive)
func (e *workloadExecutor) acquireLock(txn *roachpb.Transaction, toAcq lockToAcquire) error {
acq := roachpb.MakeLockAcquisition(txn, toAcq.key, toAcq.dur, toAcq.str)
err := e.lt.AcquireLock(&acq)
if err != nil {
return err
Expand All @@ -1257,7 +1283,7 @@ func (e *workloadExecutor) acquireLock(txn *roachpb.Transaction, k roachpb.Key)
if !ok {
return errors.Errorf("testbug: lock acquiring request with txnID %v has no transaction", txn.ID)
}
tstate.acquiredLocks = append(tstate.acquiredLocks, k)
tstate.acquiredLocks = append(tstate.acquiredLocks, toAcq.key)
return nil
}

Expand Down Expand Up @@ -1458,6 +1484,7 @@ func TestLockTableConcurrentSingleRequests(t *testing.T) {
for i := 0; i < 10; i++ {
keys = append(keys, roachpb.Key(string(rune('a'+i))))
}
strs := []lock.Strength{lock.None, lock.Shared, lock.Exclusive, lock.Intent}
rng := rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano())))

const numKeys = 2
Expand All @@ -1472,12 +1499,9 @@ func TestLockTableConcurrentSingleRequests(t *testing.T) {
lockSpans := &lockspanset.LockSpanSet{}
for i := 0; i < numKeys; i++ {
span := roachpb.Span{Key: keys[keysPerm[i]]}
acc := spanset.SpanAccess(rng.Intn(int(spanset.NumSpanAccess)))
str := lock.None
if acc == spanset.SpanReadWrite {
str = lock.Intent
}
latchSpans.AddMVCC(acc, span, ts)
str := strs[rand.Intn(len(strs))]
sa, latchTs := latchAccessForLockStrength(str, ts)
latchSpans.AddMVCC(sa, span, latchTs)
lockSpans.Add(str, span)
}
var txn *roachpb.Transaction
Expand Down Expand Up @@ -1537,6 +1561,9 @@ func TestLockTableConcurrentRequests(t *testing.T) {
for i := 0; i < 10; i++ {
keys = append(keys, roachpb.Key(string(rune('a'+i))))
}
// TODO(nvanbenschoten): add lock.Shared back in once #111144 is fixed.
//strs := []lock.Strength{lock.None, lock.Shared, lock.Exclusive, lock.Intent}
strs := []lock.Strength{lock.None, lock.Exclusive, lock.Intent}
rng := rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano())))
const numActiveTxns = 8
var activeTxns [numActiveTxns]*enginepb.TxnMeta
Expand Down Expand Up @@ -1585,27 +1612,33 @@ func TestLockTableConcurrentRequests(t *testing.T) {
wi := workloadItem{request: request}
for i := 0; i < numKeys; i++ {
span := roachpb.Span{Key: keys[keysPerm[i]]}
acc := spanset.SpanReadOnly

str := lock.None
dupRead := false
if !onlyReads {
acc = spanset.SpanAccess(rng.Intn(int(spanset.NumSpanAccess)))
if acc == spanset.SpanReadWrite && txnMeta != nil && rng.Intn(2) == 0 {
// Acquire lock.
wi.locksToAcquire = append(wi.locksToAcquire, span.Key)
str = lock.Intent
}
if acc == spanset.SpanReadWrite && rng.Intn(2) == 0 {
// Also include the key as read.
dupRead = true
str = lock.Intent
}
if !onlyReads && txnMeta != nil {
// Randomly select a lock strength (including lock.None).
str = strs[rand.Intn(len(strs))]
}
latchSpans.AddMVCC(acc, span, ts)
sa, latchTs := latchAccessForLockStrength(str, ts)
latchSpans.AddMVCC(sa, span, latchTs)
lockSpans.Add(str, span)
if str != lock.None {
// Randomly select a lock durability. Shared and Exclusive locks
// can be unreplicated, but Intent can not.
dur := lock.Replicated
if str != lock.Intent && rng.Intn(2) == 0 {
dur = lock.Unreplicated
}
toAcq := lockToAcquire{span.Key, str, dur}
wi.locksToAcquire = append(wi.locksToAcquire, toAcq)
}

dupRead := str != lock.None && rng.Intn(2) == 0
if dupRead {
latchSpans.AddMVCC(spanset.SpanReadOnly, span, ts)
lockSpans.Add(lock.None, span)
// Also include the key as a non-locking read.
str = lock.None
sa, latchTs := latchAccessForLockStrength(str, ts)
latchSpans.AddMVCC(sa, span, latchTs)
lockSpans.Add(str, span)
}
}
items = append(items, wi)
Expand All @@ -1620,7 +1653,10 @@ func TestLockTableConcurrentRequests(t *testing.T) {
t.Run(fmt.Sprintf("concurrency %d", c), func(t *testing.T) {
exec := newWorkLoadExecutor(items, c)
if err := exec.execute(false, 200); err != nil {
t.Fatal(err)
// TODO(nvanbenschoten): remove this once #110435 is fixed.
if !testutils.IsError(err, "lock promotion from Shared to .* is not allowed") {
t.Fatal(err)
}
}
})
}
Expand Down

0 comments on commit d9b1147

Please sign in to comment.