Skip to content

Commit

Permalink
Merge #111465
Browse files Browse the repository at this point in the history
111465: kv: add all lock strengths and durabilities to TestLockTableConcurrentRequests r=nvanbenschoten a=nvanbenschoten

Informs #91545.
Informs #100193.

This commit expands `TestLockTableConcurrentRequests` to support Shared, Exclusive, and Intent locking strength, in addition to both Replicated and Unreplicated locking durabilities. This provides randomized coverage of the lock table with these combinations.

The commit then temporarily disables Shared locks in the test, which occasionally fail with the panic: `tryMakeNewDistinguished called with new claimant txn`. This is related to #111144, so we can re-enable Shared locks when that issue is resolved.

Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Sep 29, 2023
2 parents 52ae7d5 + 1448128 commit 27e7c16
Showing 1 changed file with 92 additions and 56 deletions.
148 changes: 92 additions & 56 deletions pkg/kv/kvserver/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -722,30 +723,42 @@ func scanSpans(
spanStr := parts[1]
str := GetStrength(t, d, strS)
// Compute latch span access based on the supplied strength.
var sa spanset.SpanAccess
switch str {
case lock.None:
sa = spanset.SpanReadOnly
case lock.Intent:
sa = spanset.SpanReadWrite
case lock.Exclusive:
sa = spanset.SpanReadWrite
case lock.Shared:
// Unlike non-locking reads, shared-locking reads are isolated at all
// timestamps (not just the request's timestamp); so we acquire a read
// latch at max timestamp. See
// https://github.com/cockroachdb/cockroach/issues/102264.
sa = spanset.SpanReadOnly
ts = hlc.MaxTimestamp
default:
d.Fatalf(t, "unsupported lock strength: %s", str)
}
latchSpans.AddMVCC(sa, getSpan(t, d, spanStr), ts)
sa, latchTs := latchAccessForLockStrength(str, ts)
latchSpans.AddMVCC(sa, getSpan(t, d, spanStr), latchTs)
lockSpans.Add(str, getSpan(t, d, spanStr))
}
return latchSpans, lockSpans
}

// latchAccessForLockStrength returns the latch access and timestamp to use for
// a given lock strength and request timestamp. It duplicates some of the logic
// in DefaultDeclareIsolatedKeys to avoid the package dependency.
func latchAccessForLockStrength(
str lock.Strength, ts hlc.Timestamp,
) (spanset.SpanAccess, hlc.Timestamp) {
switch str {
case lock.None:
return spanset.SpanReadOnly, ts
case lock.Shared:
// Unlike non-locking reads, shared-locking reads are isolated at all
// timestamps (not just the request's timestamp); so we acquire a read
// latch at max timestamp. See
// https://github.com/cockroachdb/cockroach/issues/102264.
//
// We don't need to duplicate the special case for replicated shared
// locks here (see ReplicatedSharedLocksTransactionLatchingKey) because
// there is no risk in these tests of two shared lock acquisitions from
// the same transaction clobbering each other's state.
return spanset.SpanReadOnly, hlc.MaxTimestamp
case lock.Exclusive:
return spanset.SpanReadWrite, ts
case lock.Intent:
return spanset.SpanReadWrite, ts
default:
panic(fmt.Sprintf("unsupported lock strength: %s", str))
}
}

func ScanIsoLevel(t *testing.T, d *datadriven.TestData) isolation.Level {
const key = "iso"
if !d.HasArg(key) {
Expand Down Expand Up @@ -1060,7 +1073,7 @@ type workItem struct {

// Request.
request *Request
locksToAcquire []roachpb.Key
locksToAcquire []lockToAcquire

// Update locks.
intents []roachpb.LockUpdate
Expand All @@ -1080,6 +1093,16 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error {
if item.request != nil {
var lg *spanlatch.Guard
var g lockTableGuard
defer func() {
if lg != nil {
e.lm.Release(lg)
lg = nil
}
if g != nil {
e.lt.Dequeue(g)
g = nil
}
}()
var err error
for {
// Since we can't do a select involving latch acquisition and context
Expand All @@ -1099,6 +1122,7 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error {
break
}
e.lm.Release(lg)
lg = nil
var lastID uuid.UUID
L:
for {
Expand All @@ -1116,14 +1140,12 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error {
if !lastID.Equal(uuid.UUID{}) && item.request.Txn != nil {
_, err = e.waitingFor(item.request.Txn.ID, lastID, uuid.UUID{})
if err != nil {
e.lt.Dequeue(g)
return err
}
}
break L
case waitSelf:
if item.request.Txn == nil {
e.lt.Dequeue(g)
return errors.Errorf("non-transactional request cannot waitSelf")
}
case waitForDistinguished, waitFor, waitElsewhere:
Expand All @@ -1134,7 +1156,6 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error {
lastID = state.txn.ID
}
if aborted {
e.lt.Dequeue(g)
return err
}
}
Expand All @@ -1144,15 +1165,13 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error {
}
}

// acquire locks.
for _, k := range item.locksToAcquire {
err = e.acquireLock(item.request.Txn, k)
// Acquire locks.
for _, toAcq := range item.locksToAcquire {
err = e.acquireLock(item.request.Txn, toAcq)
if err != nil {
break
}
}
e.lt.Dequeue(g)
e.lm.Release(lg)
return err
}
for i := range item.intents {
Expand All @@ -1169,12 +1188,19 @@ type workloadItem struct {
// Request to be executed, iff request != nil
request *Request
// locks to be acquired by the request.
locksToAcquire []roachpb.Key
locksToAcquire []lockToAcquire

// Non-empty when transaction should release locks.
finish uuid.UUID
}

// lockToAcquire is a lock that should be acquired by a request.
type lockToAcquire struct {
key roachpb.Key
str lock.Strength
dur lock.Durability
}

// state of a transaction maintained by workloadExecutor, for deadlock
// detection, and deciding when transaction can be finished (when a
// workloadItem has instructed that it be finished and all its ongoing
Expand Down Expand Up @@ -1245,8 +1271,8 @@ func newWorkLoadExecutor(items []workloadItem, concurrency int) *workloadExecuto
}
}

func (e *workloadExecutor) acquireLock(txn *roachpb.Transaction, k roachpb.Key) error {
acq := roachpb.MakeLockAcquisition(txn, k, lock.Unreplicated, lock.Exclusive)
func (e *workloadExecutor) acquireLock(txn *roachpb.Transaction, toAcq lockToAcquire) error {
acq := roachpb.MakeLockAcquisition(txn, toAcq.key, toAcq.dur, toAcq.str)
err := e.lt.AcquireLock(&acq)
if err != nil {
return err
Expand All @@ -1257,7 +1283,7 @@ func (e *workloadExecutor) acquireLock(txn *roachpb.Transaction, k roachpb.Key)
if !ok {
return errors.Errorf("testbug: lock acquiring request with txnID %v has no transaction", txn.ID)
}
tstate.acquiredLocks = append(tstate.acquiredLocks, k)
tstate.acquiredLocks = append(tstate.acquiredLocks, toAcq.key)
return nil
}

Expand Down Expand Up @@ -1458,6 +1484,7 @@ func TestLockTableConcurrentSingleRequests(t *testing.T) {
for i := 0; i < 10; i++ {
keys = append(keys, roachpb.Key(string(rune('a'+i))))
}
strs := []lock.Strength{lock.None, lock.Shared, lock.Exclusive, lock.Intent}
rng := rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano())))

const numKeys = 2
Expand All @@ -1472,12 +1499,9 @@ func TestLockTableConcurrentSingleRequests(t *testing.T) {
lockSpans := &lockspanset.LockSpanSet{}
for i := 0; i < numKeys; i++ {
span := roachpb.Span{Key: keys[keysPerm[i]]}
acc := spanset.SpanAccess(rng.Intn(int(spanset.NumSpanAccess)))
str := lock.None
if acc == spanset.SpanReadWrite {
str = lock.Intent
}
latchSpans.AddMVCC(acc, span, ts)
str := strs[rand.Intn(len(strs))]
sa, latchTs := latchAccessForLockStrength(str, ts)
latchSpans.AddMVCC(sa, span, latchTs)
lockSpans.Add(str, span)
}
var txn *roachpb.Transaction
Expand Down Expand Up @@ -1537,6 +1561,9 @@ func TestLockTableConcurrentRequests(t *testing.T) {
for i := 0; i < 10; i++ {
keys = append(keys, roachpb.Key(string(rune('a'+i))))
}
// TODO(nvanbenschoten): add lock.Shared back in once #111144 is fixed.
//strs := []lock.Strength{lock.None, lock.Shared, lock.Exclusive, lock.Intent}
strs := []lock.Strength{lock.None, lock.Exclusive, lock.Intent}
rng := rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano())))
const numActiveTxns = 8
var activeTxns [numActiveTxns]*enginepb.TxnMeta
Expand Down Expand Up @@ -1585,27 +1612,33 @@ func TestLockTableConcurrentRequests(t *testing.T) {
wi := workloadItem{request: request}
for i := 0; i < numKeys; i++ {
span := roachpb.Span{Key: keys[keysPerm[i]]}
acc := spanset.SpanReadOnly

str := lock.None
dupRead := false
if !onlyReads {
acc = spanset.SpanAccess(rng.Intn(int(spanset.NumSpanAccess)))
if acc == spanset.SpanReadWrite && txnMeta != nil && rng.Intn(2) == 0 {
// Acquire lock.
wi.locksToAcquire = append(wi.locksToAcquire, span.Key)
str = lock.Intent
}
if acc == spanset.SpanReadWrite && rng.Intn(2) == 0 {
// Also include the key as read.
dupRead = true
str = lock.Intent
}
if !onlyReads && txnMeta != nil {
// Randomly select a lock strength (including lock.None).
str = strs[rand.Intn(len(strs))]
}
latchSpans.AddMVCC(acc, span, ts)
sa, latchTs := latchAccessForLockStrength(str, ts)
latchSpans.AddMVCC(sa, span, latchTs)
lockSpans.Add(str, span)
if str != lock.None {
// Randomly select a lock durability. Shared and Exclusive locks
// can be unreplicated, but Intent can not.
dur := lock.Replicated
if str != lock.Intent && rng.Intn(2) == 0 {
dur = lock.Unreplicated
}
toAcq := lockToAcquire{span.Key, str, dur}
wi.locksToAcquire = append(wi.locksToAcquire, toAcq)
}

dupRead := str != lock.None && rng.Intn(2) == 0
if dupRead {
latchSpans.AddMVCC(spanset.SpanReadOnly, span, ts)
lockSpans.Add(lock.None, span)
// Also include the key as a non-locking read.
str = lock.None
sa, latchTs := latchAccessForLockStrength(str, ts)
latchSpans.AddMVCC(sa, span, latchTs)
lockSpans.Add(str, span)
}
}
items = append(items, wi)
Expand All @@ -1620,7 +1653,10 @@ func TestLockTableConcurrentRequests(t *testing.T) {
t.Run(fmt.Sprintf("concurrency %d", c), func(t *testing.T) {
exec := newWorkLoadExecutor(items, c)
if err := exec.execute(false, 200); err != nil {
t.Fatal(err)
// TODO(nvanbenschoten): remove this once #110435 is fixed.
if !testutils.IsError(err, "lock promotion from Shared to .* is not allowed") {
t.Fatal(err)
}
}
})
}
Expand Down

0 comments on commit 27e7c16

Please sign in to comment.